344 lines
9.0 KiB
Python
344 lines
9.0 KiB
Python
import re
|
|
|
|
from datetime import (
|
|
date,
|
|
datetime,
|
|
)
|
|
|
|
from collections import namedtuple
|
|
|
|
from webob.byterange import (
|
|
ContentRange,
|
|
Range,
|
|
)
|
|
|
|
from webob.compat import (
|
|
PY2,
|
|
text_type,
|
|
)
|
|
|
|
from webob.datetime_utils import (
|
|
parse_date,
|
|
serialize_date,
|
|
)
|
|
|
|
from webob.util import (
|
|
header_docstring,
|
|
warn_deprecation,
|
|
)
|
|
|
|
|
|
CHARSET_RE = re.compile(r';\s*charset=([^;]*)', re.I)
|
|
SCHEME_RE = re.compile(r'^[a-z]+:', re.I)
|
|
|
|
|
|
_not_given = object()
|
|
|
|
def environ_getter(key, default=_not_given, rfc_section=None):
|
|
if rfc_section:
|
|
doc = header_docstring(key, rfc_section)
|
|
else:
|
|
doc = "Gets and sets the ``%s`` key in the environment." % key
|
|
if default is _not_given:
|
|
def fget(req):
|
|
return req.environ[key]
|
|
def fset(req, val):
|
|
req.environ[key] = val
|
|
fdel = None
|
|
else:
|
|
def fget(req):
|
|
return req.environ.get(key, default)
|
|
def fset(req, val):
|
|
if val is None:
|
|
if key in req.environ:
|
|
del req.environ[key]
|
|
else:
|
|
req.environ[key] = val
|
|
def fdel(req):
|
|
del req.environ[key]
|
|
return property(fget, fset, fdel, doc=doc)
|
|
|
|
|
|
def environ_decoder(key, default=_not_given, rfc_section=None,
|
|
encattr=None):
|
|
if rfc_section:
|
|
doc = header_docstring(key, rfc_section)
|
|
else:
|
|
doc = "Gets and sets the ``%s`` key in the environment." % key
|
|
if default is _not_given:
|
|
def fget(req):
|
|
return req.encget(key, encattr=encattr)
|
|
def fset(req, val):
|
|
return req.encset(key, val, encattr=encattr)
|
|
fdel = None
|
|
else:
|
|
def fget(req):
|
|
return req.encget(key, default, encattr=encattr)
|
|
def fset(req, val):
|
|
if val is None:
|
|
if key in req.environ:
|
|
del req.environ[key]
|
|
else:
|
|
return req.encset(key, val, encattr=encattr)
|
|
def fdel(req):
|
|
del req.environ[key]
|
|
return property(fget, fset, fdel, doc=doc)
|
|
|
|
def upath_property(key):
|
|
if PY2:
|
|
def fget(req):
|
|
encoding = req.url_encoding
|
|
return req.environ.get(key, '').decode(encoding)
|
|
def fset(req, val):
|
|
encoding = req.url_encoding
|
|
if isinstance(val, text_type):
|
|
val = val.encode(encoding)
|
|
req.environ[key] = val
|
|
else:
|
|
def fget(req):
|
|
encoding = req.url_encoding
|
|
return req.environ.get(key, '').encode('latin-1').decode(encoding)
|
|
def fset(req, val):
|
|
encoding = req.url_encoding
|
|
req.environ[key] = val.encode(encoding).decode('latin-1')
|
|
|
|
return property(fget, fset, doc='upath_property(%r)' % key)
|
|
|
|
|
|
def deprecated_property(attr, name, text, version): # pragma: no cover
|
|
"""
|
|
Wraps a descriptor, with a deprecation warning or error
|
|
"""
|
|
def warn():
|
|
warn_deprecation('The attribute %s is deprecated: %s'
|
|
% (name, text),
|
|
version,
|
|
3
|
|
)
|
|
def fget(self):
|
|
warn()
|
|
return attr.__get__(self, type(self))
|
|
def fset(self, val):
|
|
warn()
|
|
attr.__set__(self, val)
|
|
def fdel(self):
|
|
warn()
|
|
attr.__delete__(self)
|
|
return property(fget, fset, fdel,
|
|
'<Deprecated attribute %s>' % name
|
|
)
|
|
|
|
|
|
def header_getter(header, rfc_section):
|
|
doc = header_docstring(header, rfc_section)
|
|
key = header.lower()
|
|
|
|
def fget(r):
|
|
for k, v in r._headerlist:
|
|
if k.lower() == key:
|
|
return v
|
|
|
|
def fset(r, value):
|
|
fdel(r)
|
|
if value is not None:
|
|
if '\n' in value or '\r' in value:
|
|
raise ValueError('Header value may not contain control characters')
|
|
|
|
if isinstance(value, text_type) and PY2:
|
|
value = value.encode('latin-1')
|
|
r._headerlist.append((header, value))
|
|
|
|
def fdel(r):
|
|
r._headerlist[:] = [(k, v) for (k, v) in r._headerlist if k.lower() != key]
|
|
|
|
return property(fget, fset, fdel, doc)
|
|
|
|
|
|
|
|
|
|
def converter(prop, parse, serialize, convert_name=None):
|
|
assert isinstance(prop, property)
|
|
convert_name = convert_name or "``%s`` and ``%s``" % (parse.__name__,
|
|
serialize.__name__)
|
|
doc = prop.__doc__ or ''
|
|
doc += " Converts it using %s." % convert_name
|
|
hget, hset = prop.fget, prop.fset
|
|
def fget(r):
|
|
return parse(hget(r))
|
|
def fset(r, val):
|
|
if val is not None:
|
|
val = serialize(val)
|
|
hset(r, val)
|
|
return property(fget, fset, prop.fdel, doc)
|
|
|
|
|
|
|
|
def list_header(header, rfc_section):
|
|
prop = header_getter(header, rfc_section)
|
|
return converter(prop, parse_list, serialize_list, 'list')
|
|
|
|
def parse_list(value):
|
|
if not value:
|
|
return None
|
|
return tuple(filter(None, [v.strip() for v in value.split(',')]))
|
|
|
|
def serialize_list(value):
|
|
if isinstance(value, (text_type, bytes)):
|
|
return str(value)
|
|
else:
|
|
return ', '.join(map(str, value))
|
|
|
|
|
|
|
|
|
|
def converter_date(prop):
|
|
return converter(prop, parse_date, serialize_date, 'HTTP date')
|
|
|
|
def date_header(header, rfc_section):
|
|
return converter_date(header_getter(header, rfc_section))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
########################
|
|
## Converter functions
|
|
########################
|
|
|
|
|
|
_rx_etag = re.compile(r'(?:^|\s)(W/)?"((?:\\"|.)*?)"')
|
|
|
|
def parse_etag_response(value, strong=False):
|
|
"""
|
|
Parse a response ETag.
|
|
See:
|
|
* http://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.19
|
|
* http://www.w3.org/Protocols/rfc2616/rfc2616-sec3.html#sec3.11
|
|
"""
|
|
if not value:
|
|
return None
|
|
m = _rx_etag.match(value)
|
|
if not m:
|
|
# this etag is invalid, but we'll just return it anyway
|
|
return value
|
|
elif strong and m.group(1):
|
|
# this is a weak etag and we want only strong ones
|
|
return None
|
|
else:
|
|
return m.group(2).replace('\\"', '"')
|
|
|
|
def serialize_etag_response(value): #return '"%s"' % value.replace('"', '\\"')
|
|
strong = True
|
|
if isinstance(value, tuple):
|
|
value, strong = value
|
|
elif _rx_etag.match(value):
|
|
# this is a valid etag already
|
|
return value
|
|
# let's quote the value
|
|
r = '"%s"' % value.replace('"', '\\"')
|
|
if not strong:
|
|
r = 'W/' + r
|
|
return r
|
|
|
|
def serialize_if_range(value):
|
|
if isinstance(value, (datetime, date)):
|
|
return serialize_date(value)
|
|
value = str(value)
|
|
return value or None
|
|
|
|
def parse_range(value):
|
|
if not value:
|
|
return None
|
|
# Might return None too:
|
|
return Range.parse(value)
|
|
|
|
def serialize_range(value):
|
|
if not value:
|
|
return None
|
|
elif isinstance(value, (list, tuple)):
|
|
return str(Range(*value))
|
|
else:
|
|
assert isinstance(value, str)
|
|
return value
|
|
|
|
def parse_int(value):
|
|
if value is None or value == '':
|
|
return None
|
|
return int(value)
|
|
|
|
def parse_int_safe(value):
|
|
if value is None or value == '':
|
|
return None
|
|
try:
|
|
return int(value)
|
|
except ValueError:
|
|
return None
|
|
|
|
serialize_int = str
|
|
|
|
def parse_content_range(value):
|
|
if not value or not value.strip():
|
|
return None
|
|
# May still return None
|
|
return ContentRange.parse(value)
|
|
|
|
def serialize_content_range(value):
|
|
if isinstance(value, (tuple, list)):
|
|
if len(value) not in (2, 3):
|
|
raise ValueError(
|
|
"When setting content_range to a list/tuple, it must "
|
|
"be length 2 or 3 (not %r)" % value)
|
|
if len(value) == 2:
|
|
begin, end = value
|
|
length = None
|
|
else:
|
|
begin, end, length = value
|
|
value = ContentRange(begin, end, length)
|
|
value = str(value).strip()
|
|
if not value:
|
|
return None
|
|
return value
|
|
|
|
|
|
|
|
|
|
_rx_auth_param = re.compile(r'([a-z]+)[ \t]*=[ \t]*(".*?"|[^,]*?)[ \t]*(?:\Z|, *)')
|
|
|
|
def parse_auth_params(params):
|
|
r = {}
|
|
for k, v in _rx_auth_param.findall(params):
|
|
r[k] = v.strip('"')
|
|
return r
|
|
|
|
# see http://lists.w3.org/Archives/Public/ietf-http-wg/2009OctDec/0297.html
|
|
known_auth_schemes = ['Basic', 'Digest', 'WSSE', 'HMACDigest', 'GoogleLogin',
|
|
'Cookie', 'OpenID']
|
|
known_auth_schemes = dict.fromkeys(known_auth_schemes, None)
|
|
|
|
_authorization = namedtuple('Authorization', ['authtype', 'params'])
|
|
|
|
def parse_auth(val):
|
|
if val is not None:
|
|
authtype, sep, params = val.partition(' ')
|
|
if authtype in known_auth_schemes:
|
|
if authtype == 'Basic' and '"' not in params:
|
|
# this is the "Authentication: Basic XXXXX==" case
|
|
pass
|
|
else:
|
|
params = parse_auth_params(params)
|
|
return _authorization(authtype, params)
|
|
return val
|
|
|
|
def serialize_auth(val):
|
|
if isinstance(val, (tuple, list)):
|
|
authtype, params = val
|
|
if isinstance(params, dict):
|
|
params = ', '.join(map('%s="%s"'.__mod__, params.items()))
|
|
assert isinstance(params, str)
|
|
return '%s %s' % (authtype, params)
|
|
return val
|