普通文本  |  339行  |  8.83 KB

from datetime import (
    date,
    datetime,
    )

import re

from webob.byterange import (
    ContentRange,
    Range,
    )

from webob.compat import (
    PY3,
    text_type,
    )

from webob.datetime_utils import (
    parse_date,
    serialize_date,
    )

from webob.util import (
    header_docstring,
    warn_deprecation,
    )


CHARSET_RE = re.compile(r';\s*charset=([^;]*)', re.I)
SCHEME_RE = re.compile(r'^[a-z]+:', re.I)


_not_given = object()

def environ_getter(key, default=_not_given, rfc_section=None):
    if rfc_section:
        doc = header_docstring(key, rfc_section)
    else:
        doc = "Gets and sets the ``%s`` key in the environment." % key
    if default is _not_given:
        def fget(req):
            return req.environ[key]
        def fset(req, val):
            req.environ[key] = val
        fdel = None
    else:
        def fget(req):
            return req.environ.get(key, default)
        def fset(req, val):
            if val is None:
                if key in req.environ:
                    del req.environ[key]
            else:
                req.environ[key] = val
        def fdel(req):
            del req.environ[key]
    return property(fget, fset, fdel, doc=doc)


def environ_decoder(key, default=_not_given, rfc_section=None,
                    encattr=None):
    if rfc_section:
        doc = header_docstring(key, rfc_section)
    else:
        doc = "Gets and sets the ``%s`` key in the environment." % key
    if default is _not_given:
        def fget(req):
            return req.encget(key, encattr=encattr)
        def fset(req, val):
            return req.encset(key, val, encattr=encattr)
        fdel = None
    else:
        def fget(req):
            return req.encget(key, default, encattr=encattr)
        def fset(req, val):
            if val is None:
                if key in req.environ:
                    del req.environ[key]
            else:
                return req.encset(key, val, encattr=encattr)
        def fdel(req):
            del req.environ[key]
    return property(fget, fset, fdel, doc=doc)

def upath_property(key):
    if PY3: # pragma: no cover
        def fget(req):
            encoding = req.url_encoding
            return req.environ.get(key, '').encode('latin-1').decode(encoding)
        def fset(req, val):
            encoding = req.url_encoding
            req.environ[key] = val.encode(encoding).decode('latin-1')
    else:
        def fget(req):
            encoding = req.url_encoding
            return req.environ.get(key, '').decode(encoding)
        def fset(req, val):
            encoding = req.url_encoding
            if isinstance(val, text_type):
                val = val.encode(encoding)
            req.environ[key] = val
    return property(fget, fset, doc='upath_property(%r)' % key)


def deprecated_property(attr, name, text, version): # pragma: no cover
    """
    Wraps a descriptor, with a deprecation warning or error
    """
    def warn():
        warn_deprecation('The attribute %s is deprecated: %s'
            % (name, text),
            version,
            3
        )
    def fget(self):
        warn()
        return attr.__get__(self, type(self))
    def fset(self, val):
        warn()
        attr.__set__(self, val)
    def fdel(self):
        warn()
        attr.__delete__(self)
    return property(fget, fset, fdel,
        '<Deprecated attribute %s>' % name
    )


def header_getter(header, rfc_section):
    doc = header_docstring(header, rfc_section)
    key = header.lower()

    def fget(r):
        for k, v in r._headerlist:
            if k.lower() == key:
                return v

    def fset(r, value):
        fdel(r)
        if value is not None:
            if isinstance(value, text_type) and not PY3:
                value = value.encode('latin-1')
            r._headerlist.append((header, value))

    def fdel(r):
        items = r._headerlist
        for i in range(len(items)-1, -1, -1):
            if items[i][0].lower() == key:
                del items[i]

    return property(fget, fset, fdel, doc)




def converter(prop, parse, serialize, convert_name=None):
    assert isinstance(prop, property)
    convert_name = convert_name or "``%s`` and ``%s``" % (parse.__name__,
                                                  serialize.__name__)
    doc = prop.__doc__ or ''
    doc += "  Converts it using %s." % convert_name
    hget, hset = prop.fget, prop.fset
    def fget(r):
        return parse(hget(r))
    def fset(r, val):
        if val is not None:
            val = serialize(val)
        hset(r, val)
    return property(fget, fset, prop.fdel, doc)



def list_header(header, rfc_section):
    prop = header_getter(header, rfc_section)
    return converter(prop, parse_list, serialize_list, 'list')

def parse_list(value):
    if not value:
        return None
    return tuple(filter(None, [v.strip() for v in value.split(',')]))

def serialize_list(value):
    if isinstance(value, (text_type, bytes)):
        return str(value)
    else:
        return ', '.join(map(str, value))




def converter_date(prop):
    return converter(prop, parse_date, serialize_date, 'HTTP date')

def date_header(header, rfc_section):
    return converter_date(header_getter(header, rfc_section))









########################
## Converter functions
########################


_rx_etag = re.compile(r'(?:^|\s)(W/)?"((?:\\"|.)*?)"')

def parse_etag_response(value, strong=False):
    """
    Parse a response ETag.
    See:
        * http://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.19
        * http://www.w3.org/Protocols/rfc2616/rfc2616-sec3.html#sec3.11
    """
    if not value:
        return None
    m = _rx_etag.match(value)
    if not m:
        # this etag is invalid, but we'll just return it anyway
        return value
    elif strong and m.group(1):
        # this is a weak etag and we want only strong ones
        return None
    else:
        return m.group(2).replace('\\"', '"')

def serialize_etag_response(value): #return '"%s"' % value.replace('"', '\\"')
    strong = True
    if isinstance(value, tuple):
        value, strong = value
    elif _rx_etag.match(value):
        # this is a valid etag already
        return value
    # let's quote the value
    r = '"%s"' % value.replace('"', '\\"')
    if not strong:
        r = 'W/' + r
    return r

def serialize_if_range(value):
    if isinstance(value, (datetime, date)):
        return serialize_date(value)
    value = str(value)
    return value or None

def parse_range(value):
    if not value:
        return None
    # Might return None too:
    return Range.parse(value)

def serialize_range(value):
    if not value:
        return None
    elif isinstance(value, (list, tuple)):
        return str(Range(*value))
    else:
        assert isinstance(value, str)
        return value

def parse_int(value):
    if value is None or value == '':
        return None
    return int(value)

def parse_int_safe(value):
    if value is None or value == '':
        return None
    try:
        return int(value)
    except ValueError:
        return None

serialize_int = str

def parse_content_range(value):
    if not value or not value.strip():
        return None
    # May still return None
    return ContentRange.parse(value)

def serialize_content_range(value):
    if isinstance(value, (tuple, list)):
        if len(value) not in (2, 3):
            raise ValueError(
                "When setting content_range to a list/tuple, it must "
                "be length 2 or 3 (not %r)" % value)
        if len(value) == 2:
            begin, end = value
            length = None
        else:
            begin, end, length = value
        value = ContentRange(begin, end, length)
    value = str(value).strip()
    if not value:
        return None
    return value




_rx_auth_param = re.compile(r'([a-z]+)[ \t]*=[ \t]*(".*?"|[^,]*?)[ \t]*(?:\Z|, *)')

def parse_auth_params(params):
    r = {}
    for k, v in _rx_auth_param.findall(params):
        r[k] = v.strip('"')
    return r

# see http://lists.w3.org/Archives/Public/ietf-http-wg/2009OctDec/0297.html
known_auth_schemes = ['Basic', 'Digest', 'WSSE', 'HMACDigest', 'GoogleLogin',
                      'Cookie', 'OpenID']
known_auth_schemes = dict.fromkeys(known_auth_schemes, None)

def parse_auth(val):
    if val is not None:
        authtype, params = val.split(' ', 1)
        if authtype in known_auth_schemes:
            if authtype == 'Basic' and '"' not in params:
                # this is the "Authentication: Basic XXXXX==" case
                pass
            else:
                params = parse_auth_params(params)
        return authtype, params
    return val

def serialize_auth(val):
    if isinstance(val, (tuple, list)):
        authtype, params = val
        if isinstance(params, dict):
            params = ', '.join(map('%s="%s"'.__mod__, params.items()))
        assert isinstance(params, str)
        return '%s %s' % (authtype, params)
    return val