Python werkzeug.datastructures 模块,Headers() 实例源码


def download_xlsx(filename, info):
    '''Çreate xlsx file for given info and download it '''
    output = StringIO()
    workbook = xlsxwriter.Workbook(output, {'in_memory': True})
    worksheet = workbook.add_worksheet()
    row_num = 0
    col_num = 0

    for row in info:
        for grid in row:
            worksheet.write(row_num, col_num, grid)
            col_num += 1
        col_num = 0
        row_num += 1
    headers = Headers()
    headers.set('Content-Disposition', 'attachment', filename=filename)
    return Response(, mimetype='application/vnd.openxmlformats-'
                    'officedocument.spreadsheetml.sheet', headers=headers)
def __init__(self, response=None, **kwargs):
        if 'mimetype' not in kwargs and 'contenttype' not in kwargs:
            if response.startswith('<?xml'):
                kwargs['mimetype'] = 'application/xml'
        kwargs['headers'] = ''
        headers = kwargs.get('headers')
        # ???? 
        origin = ('Access-Control-Allow-Origin', '*')
        methods = ('Access-Control-Allow-Methods', 'HEAD, OPTIONS, GET, POST, DELETE, PUT')
        if headers:
            headers = Headers([origin, methods])
        kwargs['headers'] = headers
        return super(MyResponse, self).__init__(response, **kwargs)
def parse_multipart_headers(iterable):
    """Parses multipart headers from an iterable that yields lines (including
    the trailing newline symbol).  The iterable has to be newline terminated.

    The iterable will stop at the line where the headers ended so it can be
    further consumed.

    :param iterable: iterable of strings that are newline terminated
    result = []
    for line in iterable:
        line = to_native(line)
        line, line_terminated = _line_parse(line)
        if not line_terminated:
            raise ValueError('unexpected end of line in multipart header')
        if not line:
        elif line[0] in ' \t' and result:
            key, value = result[-1]
            result[-1] = (key, value + '\n ' + line[1:])
            parts = line.split(':', 1)
            if len(parts) == 2:
                result.append((parts[0].strip(), parts[1].strip()))

    # we link the list to the headers, no need to create a copy, the
    # list was not shared anyways.
    return Headers(result)
def run_fixed(self, environ, start_response):
        def fixing_start_response(status, headers, exc_info=None):
            headers = Headers(headers)
            self.fix_headers(environ, headers, status)
            return start_response(status, headers.to_wsgi_list(), exc_info)
        return, fixing_start_response)
def check_start_response(self, status, headers, exc_info):
        check_string('status', status)
        status_code = status.split(None, 1)[0]
        if len(status_code) != 3 or not status_code.isdigit():
            warn(WSGIWarning('Status code must be three digits'), stacklevel=3)
        if len(status) < 4 or status[3] != ' ':
            warn(WSGIWarning('Invalid value for status %r.  Valid '
                             'status strings are three digits, a space '
                             'and a status explanation'), stacklevel=3)
        status_code = int(status_code)
        if status_code < 100:
            warn(WSGIWarning('status code < 100 detected'), stacklevel=3)

        if type(headers) is not list:
            warn(WSGIWarning('header list is not a list'), stacklevel=3)
        for item in headers:
            if type(item) is not tuple or len(item) != 2:
                warn(WSGIWarning('Headers must tuple 2-item tuples'),
            name, value = item
            if type(name) is not str or type(value) is not str:
                warn(WSGIWarning('header items must be strings'),
            if name.lower() == 'status':
                warn(WSGIWarning('The status header is not supported due to '
                                 'conflicts with the CGI spec.'),

        if exc_info is not None and not isinstance(exc_info, tuple):
            warn(WSGIWarning('invalid value for exc_info'), stacklevel=3)

        headers = Headers(headers)

        return status_code, headers
def type_convert(self, obj):
        if obj is None:
            return None
        if isinstance(obj, (dict, list)) and not isinstance(obj, MultiDict):
            return obj
        if isinstance(obj, Headers):
            obj = MultiDict(obj)
        result = dict()

        convert_funs = {
            'integer': lambda v: int(v[0]),
            'boolean': lambda v: v[0].lower() not in ['n', 'no',
                                                      'false', '', '0'],
            'null': lambda v: None,
            'number': lambda v: float(v[0]),
            'string': lambda v: v[0]

        def convert_array(type_, v):
            func = convert_funs.get(type_, lambda v: v[0])
            return [func([i]) for i in v]

        for k, values in obj.lists():
            prop = self.validator.schema['properties'].get(k, {})
            type_ = prop.get('type')
            fun = convert_funs.get(type_, lambda v: v[0])
            if type_ == 'array':
                item_type = prop.get('items', {}).get('type')
                result[k] = convert_array(item_type, values)
                result[k] = fun(values)
        return result
def check_start_response(self, status, headers, exc_info):
        check_string('status', status)
        status_code = status.split(None, 1)[0]
        if len(status_code) != 3 or not status_code.isdigit():
            warn(WSGIWarning('Status code must be three digits'), stacklevel=3)
        if len(status) < 4 or status[3] != ' ':
            warn(WSGIWarning('Invalid value for status %r.  Valid '
                             'status strings are three digits, a space '
                             'and a status explanation'), stacklevel=3)
        status_code = int(status_code)
        if status_code < 100:
            warn(WSGIWarning('status code < 100 detected'), stacklevel=3)

        if type(headers) is not list:
            warn(WSGIWarning('header list is not a list'), stacklevel=3)
        for item in headers:
            if type(item) is not tuple or len(item) != 2:
                warn(WSGIWarning('Headers must tuple 2-item tuples'),
            name, value = item
            if type(name) is not str or type(value) is not str:
                warn(WSGIWarning('header items must be strings'),
            if name.lower() == 'status':
                warn(WSGIWarning('The status header is not supported due to '
                                 'conflicts with the CGI spec.'),

        if exc_info is not None and not isinstance(exc_info, tuple):
            warn(WSGIWarning('invalid value for exc_info'), stacklevel=3)

        headers = Headers(headers)

        return status_code, headers
def test_slicing(self):
        # there's nothing wrong with these being native strings
        # Headers doesn't care about the data types
        h = self.storage_class()
        h.set('X-Foo-Poo', 'bleh')
        h.set('Content-Type', 'application/whocares')
        h.set('X-Forwarded-For', '')
        h[:] = [(k, v) for k, v in h if k.startswith(u'X-')]
        self.assert_equal(list(h), [
            ('X-Foo-Poo', 'bleh'),
            ('X-Forwarded-For', '')
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def test_remove_entity_headers(self):
        now = http.http_date()
        headers1 = [('Date', now), ('Content-Type', 'text/html'), ('Content-Length', '0')]
        headers2 = datastructures.Headers(headers1)

        assert headers1 == [('Date', now)]

        self.assert_equal(headers2, datastructures.Headers([(u'Date', now)]))
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def test_remove_hop_by_hop_headers(self):
        headers1 = [('Connection', 'closed'), ('Foo', 'bar'),
                    ('Keep-Alive', 'wtf')]
        headers2 = datastructures.Headers(headers1)

        assert headers1 == [('Foo', 'bar')]

        assert headers2 == datastructures.Headers([('Foo', 'bar')])
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def test_cookie_unicode_dumping(self):
        val = http.dump_cookie('foo', u'\N{SNOWMAN}')
        h = datastructures.Headers()
        h.add('Set-Cookie', val)
        self.assert_equal(h['Set-Cookie'], 'foo="\\342\\230\\203"; Path=/')

        cookies = http.parse_cookie(h['Set-Cookie'])
        self.assert_equal(cookies['foo'], u'\N{SNOWMAN}')