# Copyright (c) 2015 Ian Stapleton Cordasco # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or # implied. # See the License for the specific language governing permissions and # limitations under the License. """Module containing the urlparse compatibility logic.""" from collections import namedtuple from . import compat from . import exceptions from . import misc from . import normalizers from . import uri __all__ = ("ParseResult", "ParseResultBytes") PARSED_COMPONENTS = ( "scheme", "userinfo", "host", "port", "path", "query", "fragment", ) class ParseResultMixin: def _generate_authority(self, attributes): # I swear I did not align the comparisons below. That's just how they # happened to align based on pep8 and attribute lengths. userinfo, host, port = ( attributes[p] for p in ("userinfo", "host", "port") ) if self.userinfo != userinfo or self.host != host or self.port != port: if port: port = f"{port}" return normalizers.normalize_authority( ( compat.to_str(userinfo, self.encoding), compat.to_str(host, self.encoding), port, ) ) if isinstance(self.authority, bytes): return self.authority.decode("utf-8") return self.authority def geturl(self): """Shim to match the standard library method.""" return self.unsplit() @property def hostname(self): """Shim to match the standard library.""" return self.host @property def netloc(self): """Shim to match the standard library.""" return self.authority @property def params(self): """Shim to match the standard library.""" return self.query class ParseResult( namedtuple("ParseResult", PARSED_COMPONENTS), ParseResultMixin ): """Implementation of urlparse compatibility class. This uses the URIReference logic to handle compatibility with the urlparse.ParseResult class. """ slots = () def __new__( cls, scheme, userinfo, host, port, path, query, fragment, uri_ref, encoding="utf-8", ): """Create a new ParseResult.""" parse_result = super().__new__( cls, scheme or None, userinfo or None, host, port or None, path or None, query, fragment, ) parse_result.encoding = encoding parse_result.reference = uri_ref return parse_result @classmethod def from_parts( cls, scheme=None, userinfo=None, host=None, port=None, path=None, query=None, fragment=None, encoding="utf-8", ): """Create a ParseResult instance from its parts.""" authority = "" if userinfo is not None: authority += userinfo + "@" if host is not None: authority += host if port is not None: authority += f":{port}" uri_ref = uri.URIReference( scheme=scheme, authority=authority, path=path, query=query, fragment=fragment, encoding=encoding, ).normalize() userinfo, host, port = authority_from(uri_ref, strict=True) return cls( scheme=uri_ref.scheme, userinfo=userinfo, host=host, port=port, path=uri_ref.path, query=uri_ref.query, fragment=uri_ref.fragment, uri_ref=uri_ref, encoding=encoding, ) @classmethod def from_string( cls, uri_string, encoding="utf-8", strict=True, lazy_normalize=True ): """Parse a URI from the given unicode URI string. :param str uri_string: Unicode URI to be parsed into a reference. :param str encoding: The encoding of the string provided :param bool strict: Parse strictly according to :rfc:`3986` if True. If False, parse similarly to the standard library's urlparse function. :returns: :class:`ParseResult` or subclass thereof """ reference = uri.URIReference.from_string(uri_string, encoding) if not lazy_normalize: reference = reference.normalize() userinfo, host, port = authority_from(reference, strict) return cls( scheme=reference.scheme, userinfo=userinfo, host=host, port=port, path=reference.path, query=reference.query, fragment=reference.fragment, uri_ref=reference, encoding=encoding, ) @property def authority(self): """Return the normalized authority.""" return self.reference.authority def copy_with( self, scheme=misc.UseExisting, userinfo=misc.UseExisting, host=misc.UseExisting, port=misc.UseExisting, path=misc.UseExisting, query=misc.UseExisting, fragment=misc.UseExisting, ): """Create a copy of this instance replacing with specified parts.""" attributes = zip( PARSED_COMPONENTS, (scheme, userinfo, host, port, path, query, fragment), ) attrs_dict = {} for name, value in attributes: if value is misc.UseExisting: value = getattr(self, name) attrs_dict[name] = value authority = self._generate_authority(attrs_dict) ref = self.reference.copy_with( scheme=attrs_dict["scheme"], authority=authority, path=attrs_dict["path"], query=attrs_dict["query"], fragment=attrs_dict["fragment"], ) return ParseResult(uri_ref=ref, encoding=self.encoding, **attrs_dict) def encode(self, encoding=None): """Convert to an instance of ParseResultBytes.""" encoding = encoding or self.encoding attrs = dict( zip( PARSED_COMPONENTS, ( attr.encode(encoding) if hasattr(attr, "encode") else attr for attr in self ), ) ) return ParseResultBytes( uri_ref=self.reference, encoding=encoding, **attrs ) def unsplit(self, use_idna=False): """Create a URI string from the components. :returns: The parsed URI reconstituted as a string. :rtype: str """ parse_result = self if use_idna and self.host: hostbytes = self.host.encode("idna") host = hostbytes.decode(self.encoding) parse_result = self.copy_with(host=host) return parse_result.reference.unsplit() class ParseResultBytes( namedtuple("ParseResultBytes", PARSED_COMPONENTS), ParseResultMixin ): """Compatibility shim for the urlparse.ParseResultBytes object.""" def __new__( cls, scheme, userinfo, host, port, path, query, fragment, uri_ref, encoding="utf-8", lazy_normalize=True, ): """Create a new ParseResultBytes instance.""" parse_result = super().__new__( cls, scheme or None, userinfo or None, host, port or None, path or None, query or None, fragment or None, ) parse_result.encoding = encoding parse_result.reference = uri_ref parse_result.lazy_normalize = lazy_normalize return parse_result @classmethod def from_parts( cls, scheme=None, userinfo=None, host=None, port=None, path=None, query=None, fragment=None, encoding="utf-8", lazy_normalize=True, ): """Create a ParseResult instance from its parts.""" authority = "" if userinfo is not None: authority += userinfo + "@" if host is not None: authority += host if port is not None: authority += f":{int(port)}" uri_ref = uri.URIReference( scheme=scheme, authority=authority, path=path, query=query, fragment=fragment, encoding=encoding, ) if not lazy_normalize: uri_ref = uri_ref.normalize() to_bytes = compat.to_bytes userinfo, host, port = authority_from(uri_ref, strict=True) return cls( scheme=to_bytes(scheme, encoding), userinfo=to_bytes(userinfo, encoding), host=to_bytes(host, encoding), port=port, path=to_bytes(path, encoding), query=to_bytes(query, encoding), fragment=to_bytes(fragment, encoding), uri_ref=uri_ref, encoding=encoding, lazy_normalize=lazy_normalize, ) @classmethod def from_string( cls, uri_string, encoding="utf-8", strict=True, lazy_normalize=True ): """Parse a URI from the given unicode URI string. :param str uri_string: Unicode URI to be parsed into a reference. :param str encoding: The encoding of the string provided :param bool strict: Parse strictly according to :rfc:`3986` if True. If False, parse similarly to the standard library's urlparse function. :returns: :class:`ParseResultBytes` or subclass thereof """ reference = uri.URIReference.from_string(uri_string, encoding) if not lazy_normalize: reference = reference.normalize() userinfo, host, port = authority_from(reference, strict) to_bytes = compat.to_bytes return cls( scheme=to_bytes(reference.scheme, encoding), userinfo=to_bytes(userinfo, encoding), host=to_bytes(host, encoding), port=port, path=to_bytes(reference.path, encoding), query=to_bytes(reference.query, encoding), fragment=to_bytes(reference.fragment, encoding), uri_ref=reference, encoding=encoding, lazy_normalize=lazy_normalize, ) @property def authority(self): """Return the normalized authority.""" return self.reference.authority.encode(self.encoding) def copy_with( self, scheme=misc.UseExisting, userinfo=misc.UseExisting, host=misc.UseExisting, port=misc.UseExisting, path=misc.UseExisting, query=misc.UseExisting, fragment=misc.UseExisting, lazy_normalize=True, ): """Create a copy of this instance replacing with specified parts.""" attributes = zip( PARSED_COMPONENTS, (scheme, userinfo, host, port, path, query, fragment), ) attrs_dict = {} for name, value in attributes: if value is misc.UseExisting: value = getattr(self, name) if not isinstance(value, bytes) and hasattr(value, "encode"): value = value.encode(self.encoding) attrs_dict[name] = value authority = self._generate_authority(attrs_dict) to_str = compat.to_str ref = self.reference.copy_with( scheme=to_str(attrs_dict["scheme"], self.encoding), authority=to_str(authority, self.encoding), path=to_str(attrs_dict["path"], self.encoding), query=to_str(attrs_dict["query"], self.encoding), fragment=to_str(attrs_dict["fragment"], self.encoding), ) if not lazy_normalize: ref = ref.normalize() return ParseResultBytes( uri_ref=ref, encoding=self.encoding, lazy_normalize=lazy_normalize, **attrs_dict, ) def unsplit(self, use_idna=False): """Create a URI bytes object from the components. :returns: The parsed URI reconstituted as a string. :rtype: bytes """ parse_result = self if use_idna and self.host: # self.host is bytes, to encode to idna, we need to decode it # first host = self.host.decode(self.encoding) hostbytes = host.encode("idna") parse_result = self.copy_with(host=hostbytes) if self.lazy_normalize: parse_result = parse_result.copy_with(lazy_normalize=False) uri = parse_result.reference.unsplit() return uri.encode(self.encoding) def split_authority(authority): # Initialize our expected return values userinfo = host = port = None # Initialize an extra var we may need to use extra_host = None # Set-up rest in case there is no userinfo portion rest = authority if "@" in authority: userinfo, rest = authority.rsplit("@", 1) # Handle IPv6 host addresses if rest.startswith("["): host, rest = rest.split("]", 1) host += "]" if ":" in rest: extra_host, port = rest.split(":", 1) elif not host and rest: host = rest if extra_host and not host: host = extra_host return userinfo, host, port def authority_from(reference, strict): try: subauthority = reference.authority_info() except exceptions.InvalidAuthority: if strict: raise userinfo, host, port = split_authority(reference.authority) else: # Thanks to Richard Barrell for this idea: # https://twitter.com/0x2ba22e11/status/617338811975139328 userinfo, host, port = ( subauthority.get(p) for p in ("userinfo", "host", "port") ) if port: try: port = int(port) except ValueError: raise exceptions.InvalidPort(port) return userinfo, host, port