Python bottle 模块,HTTPResponse() 实例源码

我们从Python开源项目中,提取了以下39个代码示例,用于说明如何使用bottle.HTTPResponse()

项目:extraordinary-women-api    作者:yamila-moreno    | 项目源码 | 文件源码
def add_woman():
    new_woman = {}
    try:
        max_id_woman = max(EXTRAORDINARY_WOMEN, key=lambda x:x['id'])
        max_id = max_id_woman['id'] + 1
        data = request.json
        new_woman = {
            "id": max_id,
            "name": data['name'],
            "origin": data['origin'],
            "occupation": data['occupation']
        }
        EXTRAORDINARY_WOMEN.append(new_woman)
        return HTTPResponse(
                status=200,
                body=json.dumps({"extraordinary_woman": new_woman}))
    except:
        return HTTPResponse(
                status=400,
                body=json.dumps({'error': 'error adding a woman'}))
项目:certproxy    作者:geneanet    | 项目源码 | 文件源码
def apply(self, callback, route):
        def wrapper(*a, **ka):
            try:
                rv = callback(*a, **ka)
            except HTTPResponse as resp:
                rv = resp

            if isinstance(rv, dict):
                json_response = dumps(rv)
                response.content_type = 'application/json'
                return json_response
            elif isinstance(rv, HTTPResponse) and isinstance(rv.body, dict):
                rv.body = dumps(rv.body)
                rv.content_type = 'application/json'
            return rv

        return wrapper
项目:certproxy    作者:geneanet    | 项目源码 | 文件源码
def _assert_admin(self, cert):
        if cert is not None:
            if isinstance(cert, bytes):
                cert = load_certificate(cert_bytes=cert)

            if isinstance(cert, x509.Certificate):
                host = cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value
            else:
                raise TypeError('cert must be a raw certificate in PEM or DER format or an x509.Certificate instance.')

        else:
            logger.warning('Admin command received by unauthentified host.')
            raise HTTPResponse(
                status=401,
                body={'message': 'Authentication required'}
            )

        if host not in self.admin_hosts:
            logger.warning('Host %s unauthorized for admin commands.', host)
            raise HTTPResponse(
                status=403,
                body={'message': 'Host {} unauthorized for admin commands'.format(host)}
            )
项目:Pardus-Bulut    作者:ferhatacikalin    | 项目源码 | 文件源码
def assertRedirect(self, target, result, query=None, status=303, **args):
        env = {'SERVER_PROTOCOL':'HTTP/1.1'}
        for key in list(args):
            if key.startswith('wsgi'):
                args[key.replace('_', '.', 1)] = args[key]
                del args[key]
        env.update(args)
        request.bind(env)
        bottle.response.bind()
        try:
            bottle.redirect(target, **(query or {}))
        except bottle.HTTPResponse:
            r = _e()
            self.assertEqual(status, r.status_code)
            self.assertTrue(r.headers)
            self.assertEqual(result, r.headers['Location'])
项目:bottlecap    作者:foxx    | 项目源码 | 文件源码
def test_mismatch_accept_header_true(self, app):
        """
        Accept header does not match any renderers, however
        a mismatch renderer has been provided
        """
        @app.routecv
        class ExampleView(JSONEchoView):
            mismatch_renderer_class = ExampleRenderer

            def dispatch(self):
                return HTTPResponse("wtf")

        resp = app.webtest.get('/echo', 
            headers={'Accept': 'vnd/invalid'})
        assert resp.status == '200 OK'
        assert resp.headers['Content-Type'] == 'vnd/example'
        assert resp.body == b'wtf'
        assert request.negotiation_context.renderer == ExampleRenderer
项目:bottlecap    作者:foxx    | 项目源码 | 文件源码
def __call__(self):
        request.negotiation_context = cneg = ContentNegotiationContext()
        request.negotiator = self.content_negotiation_class()
        request.body_parsed = None

        try:
            response = super(ContentNegotiationViewMixin, self).__call__()
        except HTTPResponse:
            if not self.render_errors:
                raise
            if self.render_errors:
                if not cneg.renderer:
                    raise
                response = HTTPResponse()
                get_exception().apply(response)

        if cneg.renderer:
            if not isinstance(response, HTTPResponse):
                response = HTTPResponse(response)
            response.body = cneg.renderer.render(response.body)
            response.content_type = cneg.response_content_type
        return response
项目:auto-tagging-image-search    作者:OsamaJBR    | 项目源码 | 文件源码
def upload_image():
    name = request.forms.get('name')
    data = request.files.get('data')
    if name and data and data.file:
        raw = data.file.read()
        filename = data.filename
        save_path="{path}/{file}".format(
                path=config.get('storage','imagesdir'),
                file=filename
                )
        if not os.path.exists(config.get('storage','imagesdir')):
            os.makedirs(save_path)
        if 'image' not in magic.from_buffer(raw):
            return HTTPResponse(status=400,body=json.dumps({'error' : 'file type is not allowed'}))
        with open(save_path,'w') as open_file:
            open_file.write(raw)
        if queue.add_to_queue(queue_name='images',image=save_path):
            return HTTPResponse(status=200,body=json.dumps({'status' : 'Image Stored'}))
        else:
            return HTTPResponse(status=500,body=json.dumps({'error' : 'Internal Server Error'}))
    else:
        return HTTPResponse(status=400,body=json.dumps({'error' : 'missing fields'}))
项目:maltego_tds_transforms    作者:passivetotal    | 项目源码 | 文件源码
def maltego_response(trx, status_code=200, override=None):
    """Return a properly formatted Maltego response."""
    try:
        response = trx.returnOutput()
    except Exception as err:
        msg = "Encountered errors in Maltegos Python library."
        override = custom_exception(681, msg)

    if override:
        response = override
    return HTTPResponse(status=status_code, body=response)
项目:extraordinary-women-api    作者:yamila-moreno    | 项目源码 | 文件源码
def list_women():
    return HTTPResponse(
            status=200,
            body=json.dumps({"extraordinary_women": EXTRAORDINARY_WOMEN}))
项目:extraordinary-women-api    作者:yamila-moreno    | 项目源码 | 文件源码
def get_woman(woman_id):
    for woman in EXTRAORDINARY_WOMEN:
        if woman['id'] == int(woman_id):
            return HTTPResponse(
                status=200,
                body=json.dumps({'extraordinary_woman': woman}))
    else:
        return HTTPResponse(
            status=404,
            body=json.dumps({'error': 'id not found'}))
项目:extraordinary-women-api    作者:yamila-moreno    | 项目源码 | 文件源码
def delete_woman(woman_id):
    for woman in EXTRAORDINARY_WOMEN:
        if woman['id'] == int(woman_id):
            EXTRAORDINARY_WOMEN.remove(woman)
            return HTTPResponse(status=204)
    else:
        return HTTPResponse(
            status=204,
            body=json.dumps({'error': 'id not found'}))
项目:icfpc2016-judge    作者:icfpc2016    | 项目源码 | 文件源码
def json_api_handler(handler):
    """Bottle handler decorator for JSON REST API handlers.

    Args:
        handler: A Bottle handler function.

    Returns:
        A wrapped Bottle handler function.
    """
    @functools.wraps(handler)
    def wrapped_handler(*args, **kwargs):
        try:
            handler_result = handler(*args, **kwargs)
        except bottle.HTTPResponse as handler_result:
            pass
        except Exception:
            logging.exception('Uncaught exception')
            handler_result = bottle.HTTPError(500, 'Internal Server Error')
        if isinstance(handler_result, bottle.HTTPResponse):
            # For now, we do not support raising successful HTTPResponse.
            assert handler_result.status_code // 100 != 2
            # Forcibly convert HTTPError to HTTPResponse to avoid formatting.
            response = handler_result.copy(cls=bottle.HTTPResponse)
            response_data = {'ok': False, 'error': handler_result.body}
        else:
            assert isinstance(handler_result, dict)
            response = bottle.response.copy(cls=bottle.HTTPResponse)
            response_data = handler_result
            response_data['ok'] = True
        response.body = ujson.dumps(response_data, double_precision=6)
        response.content_type = 'application/json'
        return response
    return wrapped_handler
项目:experiment-manager    作者:softfire-eu    | 项目源码 | 文件源码
def get_certificate():
    username = post_get('username')
    if not username:
        raise bottle.HTTPError(500, "Username missing")
    password = post_get('password', default=None)
    days = int(post_get('days', default=None))
    cert_gen = CertificateGenerator()
    cert_gen.generate(password, username, days)
    openvpn_config = cert_gen.get_openvpn_config()
    headers = {
        'Content-Type': 'text/plain;charset=UTF-8',
        'Content-Disposition': 'attachment; filename="softfire-vpn_%s.ovpn"' % username,
        "Content-Length": len(openvpn_config)
    }
    return bottle.HTTPResponse(openvpn_config, 200, **headers)
项目:home-automation    作者:danionescu0    | 项目源码 | 文件源码
def __text_to_speech(self):
        self.__check_auth()
        text = request.query.text
        times = int(request.query.times)
        self.__tts.say(text, times)

        return HTTPResponse(status=200, body='')
项目:certproxy    作者:geneanet    | 项目源码 | 文件源码
def _get_cert_config_if_allowed(self, domain, cert):
        if cert is not None:
            if isinstance(cert, bytes):
                cert = load_certificate(cert_bytes=cert)

            if isinstance(cert, x509.Certificate):
                host = cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value
            else:
                raise TypeError('cert must be a raw certificate in PEM or DER format or an x509.Certificate instance.')

        else:
            logger.warning('Request received for domain %s by unauthentified host.', domain)
            raise HTTPResponse(
                status=401,
                body={'message': 'Authentication required'}
            )

        certconfig = self.certificates_config.match(domain)

        if certconfig:
            logger.debug('Domain %s matches pattern %s', domain, certconfig.pattern)
            if host in self.admin_hosts or host in certconfig.allowed_hosts:
                return certconfig
            else:
                logger.warning('Host %s unauthorized for domain %s.', host, domain)
                raise HTTPResponse(
                    status=403,
                    body={'message': 'Host {} unauthorized for domain {}'.format(host, domain)}
                )
        else:
            logger.warning('No config matching domain %s found.', domain)
            raise HTTPResponse(
                status=404,
                body={'message': 'No configuration found for domain {}'.format(domain)}
            )
项目:certproxy    作者:geneanet    | 项目源码 | 文件源码
def _handle_auth(self):
        request_data = request.json

        csr = x509.load_pem_x509_csr(data=request_data['csr'].encode(), backend=default_backend())  # pylint: disable=unsubscriptable-object

        if not csr.is_signature_valid:
            raise HTTPResponse(
                status=400,
                body={'message': 'The certificate signing request signature is invalid.'}
            )

        host = csr.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value
        csr_file = os.path.join(self.csr_path, "%s.csr" % (host))
        crt_file = os.path.join(self.crt_path, "%s.crt" % (host))

        if os.path.isfile(crt_file):
            crt = load_certificate(crt_file)

            if crt.public_key().public_numbers() == csr.public_key().public_numbers():
                return {
                    'status': 'authorized',
                    'crt': dump_pem(crt).decode()
                }
            else:
                raise HTTPResponse(
                    status=409,
                    body={'message': 'Mismatch between the certificate signing request and the certificate.'}
                )

        else:
            # Save CSR
            with open(csr_file, 'w') as f:
                f.write(csr.public_bytes(serialization.Encoding.PEM).decode())
            response.status = 202
            return {
                'status': 'pending'
            }
项目:Pardus-Bulut    作者:ferhatacikalin    | 项目源码 | 文件源码
def test_redirect_preserve_cookies(self):
        env = {'SERVER_PROTOCOL':'HTTP/1.1'}
        request.bind(env)
        bottle.response.bind()
        try:
            bottle.response.set_cookie('xxx', 'yyy')
            bottle.redirect('...')
        except bottle.HTTPResponse:
            h = [v for (k, v) in _e().headerlist if k == 'Set-Cookie']
            self.assertEqual(h, ['xxx=yyy'])
项目:Pardus-Bulut    作者:ferhatacikalin    | 项目源码 | 文件源码
def test_json_HTTPResponse(self):
        self.app.route('/')(lambda: bottle.HTTPResponse({'a': 1}, 500))
        try:
            self.assertBody(bottle.json_dumps({'a': 1}))
            self.assertHeader('Content-Type','application/json')
        except ImportError:
            warn("Skipping JSON tests.")
项目:Pardus-Bulut    作者:ferhatacikalin    | 项目源码 | 文件源码
def test_httpresponse_in_generator_callback(self):
        @self.app.route('/')
        def test():
            yield bottle.HTTPResponse('test')
        self.assertBody('test')
项目:bottlecap    作者:foxx    | 项目源码 | 文件源码
def dispatch(self):
        return HTTPResponse([1,2,3])
项目:ricedb    作者:TheReverend403    | 项目源码 | 文件源码
def __http_index(self, user=None, key=None):
        bottle.response.content_type = 'application/json'
        if user:
            if user not in self:
                return bottle.HTTPResponse(status=404, body=json.dumps(None))
            if key:
                if key not in self.get(user):
                    return bottle.HTTPResponse(status=404, body=json.dumps(None))
                return json.dumps(self.get(user).get(key))
            return json.dumps(self.get(user))
        return json.dumps(self)
项目:ocspresponder    作者:threema-ch    | 项目源码 | 文件源码
def _build_http_response(self, request_der: bytes) -> HTTPResponse:
        response_der = self._build_ocsp_response(request_der).dump()
        return HTTPResponse(
            status=200,
            body=response_der,
            content_type='application/ocsp-response',
        )
项目:auto-tagging-image-search    作者:OsamaJBR    | 项目源码 | 文件源码
def get_images():
    image_path = request.query.get('path',None)
    if os.path.exists(image_path):
        data = open(image_path,'rb').read()
        response.set_header('Content-type', 'image/jpeg')
        return data
    else:
        HTTPResponse(status=404,body=json.dumps({'error' : 'image not found'}))

#------------------
# MAIN
#------------------
项目:web-nlp-interface    作者:kanjirz50    | 项目源码 | 文件源码
def return_analysis_result():
    """
    Example of analyzer API(GET)
    """
    # Get submitted contents.
    text = request.query.input_text
    # Analyze.
    _text = your_tool.lowercase(text)

    # Make a request.
    body = json.dumps(_text)
    r = HTTPResponse(status=200, body=body)
    r.set_header('Content-Type', 'application/json')
    return r
项目:memex-dossier-open    作者:dossier    | 项目源码 | 文件源码
def enable_cors(self):
        '''Enables Cross Origin Resource Sharing.

        This makes sure the necessary headers are set so that this
        web application's routes can be accessed from other origins.

        :rtype: :class:`WebBuilder`
        '''
        def access_control_headers():
            bottle.response.headers['Access-Control-Allow-Origin'] = '*'
            bottle.response.headers['Access-Control-Allow-Methods'] = \
                'GET, POST, PUT, DELETE, OPTIONS'
            bottle.response.headers['Access-Control-Allow-Headers'] = \
                'Origin, X-Requested-With, Content-Type, Accept, Authorization'

        def options_response(res):
            if bottle.request.method == 'OPTIONS':
                new_res = bottle.HTTPResponse()
                new_res.headers['Access-Control-Allow-Origin'] = '*'
                new_res.headers['Access-Control-Allow-Methods'] = \
                    bottle.request.headers.get(
                        'Access-Control-Request-Method', '')
                new_res.headers['Access-Control-Allow-Headers'] = \
                    bottle.request.headers.get(
                        'Access-Control-Request-Headers', '')
                return new_res
            res.headers['Allow'] += ', OPTIONS'
            return bottle.request.app.default_error_handler(res)

        self.app.add_hook('after_request', access_control_headers)
        self.app.error_handler[int(405)] = options_response
        return self
项目:raspberry_exp_board    作者:Moisesbr    | 项目源码 | 文件源码
def transistor():
   transistor_name = request.forms.get('transistorname')
   state = request.forms.get('state')
   token = request.forms.get('token')
   valid_token = verify_auth_token(token)
   if valid_token == True:
      if transistor_name == 'transistor1' or transistor_name == 'transistor2' or transistor_name == 'transistor3':
         if state == "high":
            print "aqui"
            high_rele(dic_pins[transistor_name],"b")
            print dic_pins[transistor_name]
            theBody = json.dumps({'200': transistor_name}) # you seem to want a JSON response
            return HTTPResponse(status=200, body=theBody)
         elif state == "low":
            low_rele(dic_pins[transistor_name],"b")
            theBody = json.dumps({'200': transistor_name}) # you seem to want a JSON response
            return HTTPResponse(status=200, body=theBody)
         else:
            theBody = json.dumps({'412': "Precondition Failed"}) # you seem to want a JSON response
            return HTTPResponse(status=412, body=theBody)
      else:
         theBody = json.dumps({'412': "Precondition Failed"}) # you seem to want a JSON response
         return HTTPResponse(status=412, body=theBody)
   else:
      theBody = json.dumps({'401': 'Unauthorized'}) # you seem to want a JSON response
      return HTTPResponse(status=401, body=theBody)
项目:raspberry_exp_board    作者:Moisesbr    | 项目源码 | 文件源码
def toogle():
    relay_name = request.forms.get('relayname')
    token = request.forms.get('token')
    valid_token = verify_auth_token(token)
    if valid_token == True:
        if relay_name == 'relay1' or relay_name == 'relay2' or relay_name == 'relay3' or relay_name == 'relay4' or relay_name == 'relay5' or relay_name == 'relay6' or relay_name == 'relay7' or relay_name == 'relay8':
            tooglerelay_func(dic_pins[relay_name],"a")
        if relay_name == 'relay9' or relay_name == 'relay10':
            tooglerelay_func(dic_pins[relay_name],"b")
        theBody = json.dumps({'200': relay_name}) # you seem to want a JSON response
        return HTTPResponse(status=200, body=theBody)
    else:
        theBody = json.dumps({'401': 'Unauthorized'}) # you seem to want a JSON response
        return HTTPResponse(status=401, body=theBody)
项目:raspberry_exp_board    作者:Moisesbr    | 项目源码 | 文件源码
def on_off():
    relay_name = request.forms.get('relayname')
    token = request.forms.get('token')
    state = request.forms.get('state')
    valid_token = verify_auth_token(token)
    if valid_token == True:
        if relay_name == 'relay1' or relay_name == 'relay2' or relay_name == 'relay3' or relay_name == 'relay4' or relay_name == 'relay5' or relay_name == 'relay6' or relay_name == 'relay7' or relay_name == 'relay8':
            if state == "high":
                high_rele(dic_pins[relay_name],"a")
            elif state == "low":
                low_rele(dic_pins[relay_name],"a")
            else:
                theBody = json.dumps({'412': "Precondition Failed"}) # you seem to want a JSON response
                return HTTPResponse(status=412, body=theBody)
        if relay_name == 'relay9' or relay_name == 'relay10':
            if state == "high":
                high_rele(dic_pins[relay_name],"b")
            elif state == "low":
                low_rele(dic_pins[relay_name],"b")
            else:
                theBody = json.dumps({'412': "Precondition Failed"}) # you seem to want a JSON response
                return HTTPResponse(status=412, body=theBody)
        theBody = json.dumps({'200': relay_name}) # you seem to want a JSON response
        return HTTPResponse(status=200, body=theBody)
    else:
        theBody = json.dumps({'401': 'Unauthorized'}) # you seem to want a JSON response
        return HTTPResponse(status=401, body=theBody)
项目:raspberry_exp_board    作者:Moisesbr    | 项目源码 | 文件源码
def estado_dos_reles():
   token = request.forms.get('token')
   valid_token = verify_auth_token(token)
   if valid_token == True:
      theBody = come_back_reles()
      return HTTPResponse(status=200, body=theBody)
   else:
      theBody = json.dumps({'401': 'Unauthorized'}) # you seem to want a JSON response
      return HTTPResponse(status=401, body=theBody)
项目:raspberry_exp_board    作者:Moisesbr    | 项目源码 | 文件源码
def sensores_staus():
   token = request.forms.get('token')
   valid_token = verify_auth_token(token)
   if valid_token == True:
      try:
         sensores = return_state_inputs()
         #theBody = json.dumps({'200': 'ok'})
         return HTTPResponse(status=200, body=sensores)
      except:
         theBody = json.dumps({'500': 'Error'})
         return HTTPResponse(status=200, body=theBody)
   else:
      theBody = json.dumps({'401': 'Unauthorized'})
      return HTTPResponse(status=401, body=theBody)
项目:nokia-deployer    作者:nokia    | 项目源码 | 文件源码
def method_not_allowed(res):
    if request.method == 'OPTIONS':
        new_res = bottle.HTTPResponse()
        new_res.set_header('Access-Control-Allow-Origin', '*')
        new_res.set_header('Access-Control-Allow-Methods', 'GET, POST, DELETE, PUT')
        new_res.set_header('Access-Control-Allow-Headers', 'X-Session-Token, X-Impersonate-Username, Content-Type')
        return new_res
    res.headers['Allow'] += ', OPTIONS'
    return request.app.default_error_handler(res)
项目:besserberg    作者:Vnet-as    | 项目源码 | 文件源码
def render_pdf_from_html():
    template = (
        bottle.request.forms.data or bottle.request.forms.template or ''
    )
    backend = bottle.request.forms.backend or 'pdfkit'
    code = bottle.request.forms.qr or None
    options = bottle.request.forms.decode()

    try:
        qr_x = int(bottle.request.forms.qr_x or 545)
        qr_y = int(bottle.request.forms.qr_y or 20)
        version = None
        if bottle.request.forms.version:
            version = int(bottle.request.forms.version)
    except ValueError:
        return bottle.HTTPResponse(
            status=400,
            body='Invalid value passed to QR code coordinates.',
        )

    try:
        pdf_file = backends_registry.get(backend).render(template, options)
    except AttributeError:
        return bottle.HTTPResponse(
            status=400,
            body='Provided backend (%s) is not supported.' % backend,
        )

    if code is not None:
        try:
            pdf_file = postprocess_pdf(pdf_file, code, qr_x, qr_y, version)
        except ValueError:
            logger.error('Failed to append QR code', exc_info=True)
            return bottle.HTTPResponse(
                status=422,
                body='Unable to append QR code to rendered template.',
            )

    bottle.response.headers['Content-Type'] = 'application/pdf; charset=UTF-8'

    return pdf_file
项目:RaspberryPiControllerQtPython    作者:take-iwiw    | 项目源码 | 文件源码
def buttonIf():
    var = request.json
    # print (var)
    index = int(var["index"])
    onoff = button(index)
    retBody = {
        "ret": "ok",
        "onoff": "on" if onoff == True else "off"
    }
    r = HTTPResponse(status=200, body=retBody)
    r.set_header('Content-Type', 'application/json')
    return r
项目:RaspberryPiControllerQtPython    作者:take-iwiw    | 项目源码 | 文件源码
def rotaryEncoderIf():
    var = request.json
    # print (var)
    rotate = rotaryEncoder()
    retBody = {
        "ret": "ok",
        "rotate": rotate
    }
    r = HTTPResponse(status=200, body=retBody)
    r.set_header('Content-Type', 'application/json')
    return r
项目:RaspberryPiControllerQtPython    作者:take-iwiw    | 项目源码 | 文件源码
def tapIf():
    var = request.json
    # print (var)
    tapNum = tap()
    retBody = {
        "ret": "ok",
        "tapNum": tapNum
    }
    r = HTTPResponse(status=200, body=retBody)
    r.set_header('Content-Type', 'application/json')
    return r
项目:RaspberryPiControllerQtPython    作者:take-iwiw    | 项目源码 | 文件源码
def GSensorIf():
    var = request.json
    # print (var)
    xyz = GSensor()
    retBody = {
        "ret": "ok",
        "x": xyz[0],
        "y": xyz[1],
        "z": xyz[2],
    }
    r = HTTPResponse(status=200, body=retBody)
    r.set_header('Content-Type', 'application/json')
    return r
项目:lightbus    作者:adamcharnock    | 项目源码 | 文件源码
def method2(self) -> HTTPResponse:
        return HTTPResponse()
项目:whatstyle    作者:mikr    | 项目源码 | 文件源码
def apply(self, callback, route):
        # hack to support bottle v0.9.x
        if bottle.__version__.startswith('0.9'):
            config = route['config']
            _callback = route['callback']
        else:
            config = route.config
            _callback = route.callback

        if "sqlalchemy" in config:  # support for configuration before `ConfigDict` namespaces
            g = lambda key, default: config.get('sqlalchemy', {}).get(key, default)
        else:
            g = lambda key, default: config.get('sqlalchemy.' + key, default)

        keyword = g('keyword', self.keyword)
        create = g('create', self.create)
        commit = g('commit', self.commit)
        use_kwargs = g('use_kwargs', self.use_kwargs)

        try:
            # check if inspect.signature exists
            inspect.signature
        except AttributeError:
            argspec = inspect.getargspec(_callback)
            parameters = argspec.args
            accept_kwargs = argspec.keywords
        else:
            parameters = inspect.signature(_callback).parameters
            accept_kwargs = any(p.kind == inspect.Parameter.VAR_KEYWORD
                                for p in parameters.values())

        if not ((use_kwargs and accept_kwargs) or keyword in parameters):
            return callback

        if create:
            self.metadata.create_all(self.engine)

        def wrapper(*args, **kwargs):
            kwargs[keyword] = session = self.create_session(bind=self.engine)
            try:
                rv = callback(*args, **kwargs)
                if commit:
                    session.commit()
            except (SQLAlchemyError, bottle.HTTPError):
                session.rollback()
                raise
            except bottle.HTTPResponse:
                if commit:
                    session.commit()
                raise
            finally:
                if isinstance(self.create_session, ScopedSession):
                    self.create_session.remove()
                else:
                    session.close()
            return rv

        return wrapper
项目:whatstyle    作者:mikr    | 项目源码 | 文件源码
def apply(self, callback, route):
        # hack to support bottle v0.9.x
        if bottle.__version__.startswith('0.9'):
            config = route['config']
            _callback = route['callback']
        else:
            config = route.config
            _callback = route.callback

        if "sqlalchemy" in config:  # support for configuration before `ConfigDict` namespaces
            g = lambda key, default: config.get('sqlalchemy', {}).get(key, default)
        else:
            g = lambda key, default: config.get('sqlalchemy.' + key, default)

        keyword = g('keyword', self.keyword)
        create = g('create', self.create)
        commit = g('commit', self.commit)
        use_kwargs = g('use_kwargs', self.use_kwargs)

        try:
            # check if inspect.signature exists
            inspect.signature
        except AttributeError:
            argspec = inspect.getargspec(_callback)
            parameters = argspec.args
            accept_kwargs = argspec.keywords
        else:
            parameters = inspect.signature(_callback).parameters
            accept_kwargs = any(p.kind == inspect.Parameter.VAR_KEYWORD
                                for p in parameters.values())

        if not ((use_kwargs and accept_kwargs) or keyword in parameters):
            return callback

        if create:
            self.metadata.create_all(self.engine)

        def wrapper(*args, **kwargs):
            kwargs[keyword] = session = self.create_session(bind=self.engine)
            try:
                rv = callback(*args, **kwargs)
                if commit:
                    session.commit()
            except (SQLAlchemyError, bottle.HTTPError):
                session.rollback()
                raise
            except bottle.HTTPResponse:
                if commit:
                    session.commit()
                raise
            finally:
                if isinstance(self.create_session, ScopedSession):
                    self.create_session.remove()
                else:
                    session.close()
            return rv

        return wrapper