我们从Python开源项目中,提取了以下39个代码示例,用于说明如何使用bottle.HTTPResponse()。
def add_woman(): new_woman = {} try: max_id_woman = max(EXTRAORDINARY_WOMEN, key=lambda x:x['id']) max_id = max_id_woman['id'] + 1 data = request.json new_woman = { "id": max_id, "name": data['name'], "origin": data['origin'], "occupation": data['occupation'] } EXTRAORDINARY_WOMEN.append(new_woman) return HTTPResponse( status=200, body=json.dumps({"extraordinary_woman": new_woman})) except: return HTTPResponse( status=400, body=json.dumps({'error': 'error adding a woman'}))
def apply(self, callback, route): def wrapper(*a, **ka): try: rv = callback(*a, **ka) except HTTPResponse as resp: rv = resp if isinstance(rv, dict): json_response = dumps(rv) response.content_type = 'application/json' return json_response elif isinstance(rv, HTTPResponse) and isinstance(rv.body, dict): rv.body = dumps(rv.body) rv.content_type = 'application/json' return rv return wrapper
def _assert_admin(self, cert): if cert is not None: if isinstance(cert, bytes): cert = load_certificate(cert_bytes=cert) if isinstance(cert, x509.Certificate): host = cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value else: raise TypeError('cert must be a raw certificate in PEM or DER format or an x509.Certificate instance.') else: logger.warning('Admin command received by unauthentified host.') raise HTTPResponse( status=401, body={'message': 'Authentication required'} ) if host not in self.admin_hosts: logger.warning('Host %s unauthorized for admin commands.', host) raise HTTPResponse( status=403, body={'message': 'Host {} unauthorized for admin commands'.format(host)} )
def assertRedirect(self, target, result, query=None, status=303, **args): env = {'SERVER_PROTOCOL':'HTTP/1.1'} for key in list(args): if key.startswith('wsgi'): args[key.replace('_', '.', 1)] = args[key] del args[key] env.update(args) request.bind(env) bottle.response.bind() try: bottle.redirect(target, **(query or {})) except bottle.HTTPResponse: r = _e() self.assertEqual(status, r.status_code) self.assertTrue(r.headers) self.assertEqual(result, r.headers['Location'])
def test_mismatch_accept_header_true(self, app): """ Accept header does not match any renderers, however a mismatch renderer has been provided """ @app.routecv class ExampleView(JSONEchoView): mismatch_renderer_class = ExampleRenderer def dispatch(self): return HTTPResponse("wtf") resp = app.webtest.get('/echo', headers={'Accept': 'vnd/invalid'}) assert resp.status == '200 OK' assert resp.headers['Content-Type'] == 'vnd/example' assert resp.body == b'wtf' assert request.negotiation_context.renderer == ExampleRenderer
def __call__(self): request.negotiation_context = cneg = ContentNegotiationContext() request.negotiator = self.content_negotiation_class() request.body_parsed = None try: response = super(ContentNegotiationViewMixin, self).__call__() except HTTPResponse: if not self.render_errors: raise if self.render_errors: if not cneg.renderer: raise response = HTTPResponse() get_exception().apply(response) if cneg.renderer: if not isinstance(response, HTTPResponse): response = HTTPResponse(response) response.body = cneg.renderer.render(response.body) response.content_type = cneg.response_content_type return response
def upload_image(): name = request.forms.get('name') data = request.files.get('data') if name and data and data.file: raw = data.file.read() filename = data.filename save_path="{path}/{file}".format( path=config.get('storage','imagesdir'), file=filename ) if not os.path.exists(config.get('storage','imagesdir')): os.makedirs(save_path) if 'image' not in magic.from_buffer(raw): return HTTPResponse(status=400,body=json.dumps({'error' : 'file type is not allowed'})) with open(save_path,'w') as open_file: open_file.write(raw) if queue.add_to_queue(queue_name='images',image=save_path): return HTTPResponse(status=200,body=json.dumps({'status' : 'Image Stored'})) else: return HTTPResponse(status=500,body=json.dumps({'error' : 'Internal Server Error'})) else: return HTTPResponse(status=400,body=json.dumps({'error' : 'missing fields'}))
def maltego_response(trx, status_code=200, override=None): """Return a properly formatted Maltego response.""" try: response = trx.returnOutput() except Exception as err: msg = "Encountered errors in Maltegos Python library." override = custom_exception(681, msg) if override: response = override return HTTPResponse(status=status_code, body=response)
def list_women(): return HTTPResponse( status=200, body=json.dumps({"extraordinary_women": EXTRAORDINARY_WOMEN}))
def get_woman(woman_id): for woman in EXTRAORDINARY_WOMEN: if woman['id'] == int(woman_id): return HTTPResponse( status=200, body=json.dumps({'extraordinary_woman': woman})) else: return HTTPResponse( status=404, body=json.dumps({'error': 'id not found'}))
def delete_woman(woman_id): for woman in EXTRAORDINARY_WOMEN: if woman['id'] == int(woman_id): EXTRAORDINARY_WOMEN.remove(woman) return HTTPResponse(status=204) else: return HTTPResponse( status=204, body=json.dumps({'error': 'id not found'}))
def json_api_handler(handler): """Bottle handler decorator for JSON REST API handlers. Args: handler: A Bottle handler function. Returns: A wrapped Bottle handler function. """ @functools.wraps(handler) def wrapped_handler(*args, **kwargs): try: handler_result = handler(*args, **kwargs) except bottle.HTTPResponse as handler_result: pass except Exception: logging.exception('Uncaught exception') handler_result = bottle.HTTPError(500, 'Internal Server Error') if isinstance(handler_result, bottle.HTTPResponse): # For now, we do not support raising successful HTTPResponse. assert handler_result.status_code // 100 != 2 # Forcibly convert HTTPError to HTTPResponse to avoid formatting. response = handler_result.copy(cls=bottle.HTTPResponse) response_data = {'ok': False, 'error': handler_result.body} else: assert isinstance(handler_result, dict) response = bottle.response.copy(cls=bottle.HTTPResponse) response_data = handler_result response_data['ok'] = True response.body = ujson.dumps(response_data, double_precision=6) response.content_type = 'application/json' return response return wrapped_handler
def get_certificate(): username = post_get('username') if not username: raise bottle.HTTPError(500, "Username missing") password = post_get('password', default=None) days = int(post_get('days', default=None)) cert_gen = CertificateGenerator() cert_gen.generate(password, username, days) openvpn_config = cert_gen.get_openvpn_config() headers = { 'Content-Type': 'text/plain;charset=UTF-8', 'Content-Disposition': 'attachment; filename="softfire-vpn_%s.ovpn"' % username, "Content-Length": len(openvpn_config) } return bottle.HTTPResponse(openvpn_config, 200, **headers)
def __text_to_speech(self): self.__check_auth() text = request.query.text times = int(request.query.times) self.__tts.say(text, times) return HTTPResponse(status=200, body='')
def _get_cert_config_if_allowed(self, domain, cert): if cert is not None: if isinstance(cert, bytes): cert = load_certificate(cert_bytes=cert) if isinstance(cert, x509.Certificate): host = cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value else: raise TypeError('cert must be a raw certificate in PEM or DER format or an x509.Certificate instance.') else: logger.warning('Request received for domain %s by unauthentified host.', domain) raise HTTPResponse( status=401, body={'message': 'Authentication required'} ) certconfig = self.certificates_config.match(domain) if certconfig: logger.debug('Domain %s matches pattern %s', domain, certconfig.pattern) if host in self.admin_hosts or host in certconfig.allowed_hosts: return certconfig else: logger.warning('Host %s unauthorized for domain %s.', host, domain) raise HTTPResponse( status=403, body={'message': 'Host {} unauthorized for domain {}'.format(host, domain)} ) else: logger.warning('No config matching domain %s found.', domain) raise HTTPResponse( status=404, body={'message': 'No configuration found for domain {}'.format(domain)} )
def _handle_auth(self): request_data = request.json csr = x509.load_pem_x509_csr(data=request_data['csr'].encode(), backend=default_backend()) # pylint: disable=unsubscriptable-object if not csr.is_signature_valid: raise HTTPResponse( status=400, body={'message': 'The certificate signing request signature is invalid.'} ) host = csr.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value csr_file = os.path.join(self.csr_path, "%s.csr" % (host)) crt_file = os.path.join(self.crt_path, "%s.crt" % (host)) if os.path.isfile(crt_file): crt = load_certificate(crt_file) if crt.public_key().public_numbers() == csr.public_key().public_numbers(): return { 'status': 'authorized', 'crt': dump_pem(crt).decode() } else: raise HTTPResponse( status=409, body={'message': 'Mismatch between the certificate signing request and the certificate.'} ) else: # Save CSR with open(csr_file, 'w') as f: f.write(csr.public_bytes(serialization.Encoding.PEM).decode()) response.status = 202 return { 'status': 'pending' }
def test_redirect_preserve_cookies(self): env = {'SERVER_PROTOCOL':'HTTP/1.1'} request.bind(env) bottle.response.bind() try: bottle.response.set_cookie('xxx', 'yyy') bottle.redirect('...') except bottle.HTTPResponse: h = [v for (k, v) in _e().headerlist if k == 'Set-Cookie'] self.assertEqual(h, ['xxx=yyy'])
def test_json_HTTPResponse(self): self.app.route('/')(lambda: bottle.HTTPResponse({'a': 1}, 500)) try: self.assertBody(bottle.json_dumps({'a': 1})) self.assertHeader('Content-Type','application/json') except ImportError: warn("Skipping JSON tests.")
def test_httpresponse_in_generator_callback(self): @self.app.route('/') def test(): yield bottle.HTTPResponse('test') self.assertBody('test')
def dispatch(self): return HTTPResponse([1,2,3])
def __http_index(self, user=None, key=None): bottle.response.content_type = 'application/json' if user: if user not in self: return bottle.HTTPResponse(status=404, body=json.dumps(None)) if key: if key not in self.get(user): return bottle.HTTPResponse(status=404, body=json.dumps(None)) return json.dumps(self.get(user).get(key)) return json.dumps(self.get(user)) return json.dumps(self)
def _build_http_response(self, request_der: bytes) -> HTTPResponse: response_der = self._build_ocsp_response(request_der).dump() return HTTPResponse( status=200, body=response_der, content_type='application/ocsp-response', )
def get_images(): image_path = request.query.get('path',None) if os.path.exists(image_path): data = open(image_path,'rb').read() response.set_header('Content-type', 'image/jpeg') return data else: HTTPResponse(status=404,body=json.dumps({'error' : 'image not found'})) #------------------ # MAIN #------------------
def return_analysis_result(): """ Example of analyzer API(GET) """ # Get submitted contents. text = request.query.input_text # Analyze. _text = your_tool.lowercase(text) # Make a request. body = json.dumps(_text) r = HTTPResponse(status=200, body=body) r.set_header('Content-Type', 'application/json') return r
def enable_cors(self): '''Enables Cross Origin Resource Sharing. This makes sure the necessary headers are set so that this web application's routes can be accessed from other origins. :rtype: :class:`WebBuilder` ''' def access_control_headers(): bottle.response.headers['Access-Control-Allow-Origin'] = '*' bottle.response.headers['Access-Control-Allow-Methods'] = \ 'GET, POST, PUT, DELETE, OPTIONS' bottle.response.headers['Access-Control-Allow-Headers'] = \ 'Origin, X-Requested-With, Content-Type, Accept, Authorization' def options_response(res): if bottle.request.method == 'OPTIONS': new_res = bottle.HTTPResponse() new_res.headers['Access-Control-Allow-Origin'] = '*' new_res.headers['Access-Control-Allow-Methods'] = \ bottle.request.headers.get( 'Access-Control-Request-Method', '') new_res.headers['Access-Control-Allow-Headers'] = \ bottle.request.headers.get( 'Access-Control-Request-Headers', '') return new_res res.headers['Allow'] += ', OPTIONS' return bottle.request.app.default_error_handler(res) self.app.add_hook('after_request', access_control_headers) self.app.error_handler[int(405)] = options_response return self
def transistor(): transistor_name = request.forms.get('transistorname') state = request.forms.get('state') token = request.forms.get('token') valid_token = verify_auth_token(token) if valid_token == True: if transistor_name == 'transistor1' or transistor_name == 'transistor2' or transistor_name == 'transistor3': if state == "high": print "aqui" high_rele(dic_pins[transistor_name],"b") print dic_pins[transistor_name] theBody = json.dumps({'200': transistor_name}) # you seem to want a JSON response return HTTPResponse(status=200, body=theBody) elif state == "low": low_rele(dic_pins[transistor_name],"b") theBody = json.dumps({'200': transistor_name}) # you seem to want a JSON response return HTTPResponse(status=200, body=theBody) else: theBody = json.dumps({'412': "Precondition Failed"}) # you seem to want a JSON response return HTTPResponse(status=412, body=theBody) else: theBody = json.dumps({'412': "Precondition Failed"}) # you seem to want a JSON response return HTTPResponse(status=412, body=theBody) else: theBody = json.dumps({'401': 'Unauthorized'}) # you seem to want a JSON response return HTTPResponse(status=401, body=theBody)
def toogle(): relay_name = request.forms.get('relayname') token = request.forms.get('token') valid_token = verify_auth_token(token) if valid_token == True: if relay_name == 'relay1' or relay_name == 'relay2' or relay_name == 'relay3' or relay_name == 'relay4' or relay_name == 'relay5' or relay_name == 'relay6' or relay_name == 'relay7' or relay_name == 'relay8': tooglerelay_func(dic_pins[relay_name],"a") if relay_name == 'relay9' or relay_name == 'relay10': tooglerelay_func(dic_pins[relay_name],"b") theBody = json.dumps({'200': relay_name}) # you seem to want a JSON response return HTTPResponse(status=200, body=theBody) else: theBody = json.dumps({'401': 'Unauthorized'}) # you seem to want a JSON response return HTTPResponse(status=401, body=theBody)
def on_off(): relay_name = request.forms.get('relayname') token = request.forms.get('token') state = request.forms.get('state') valid_token = verify_auth_token(token) if valid_token == True: if relay_name == 'relay1' or relay_name == 'relay2' or relay_name == 'relay3' or relay_name == 'relay4' or relay_name == 'relay5' or relay_name == 'relay6' or relay_name == 'relay7' or relay_name == 'relay8': if state == "high": high_rele(dic_pins[relay_name],"a") elif state == "low": low_rele(dic_pins[relay_name],"a") else: theBody = json.dumps({'412': "Precondition Failed"}) # you seem to want a JSON response return HTTPResponse(status=412, body=theBody) if relay_name == 'relay9' or relay_name == 'relay10': if state == "high": high_rele(dic_pins[relay_name],"b") elif state == "low": low_rele(dic_pins[relay_name],"b") else: theBody = json.dumps({'412': "Precondition Failed"}) # you seem to want a JSON response return HTTPResponse(status=412, body=theBody) theBody = json.dumps({'200': relay_name}) # you seem to want a JSON response return HTTPResponse(status=200, body=theBody) else: theBody = json.dumps({'401': 'Unauthorized'}) # you seem to want a JSON response return HTTPResponse(status=401, body=theBody)
def estado_dos_reles(): token = request.forms.get('token') valid_token = verify_auth_token(token) if valid_token == True: theBody = come_back_reles() return HTTPResponse(status=200, body=theBody) else: theBody = json.dumps({'401': 'Unauthorized'}) # you seem to want a JSON response return HTTPResponse(status=401, body=theBody)
def sensores_staus(): token = request.forms.get('token') valid_token = verify_auth_token(token) if valid_token == True: try: sensores = return_state_inputs() #theBody = json.dumps({'200': 'ok'}) return HTTPResponse(status=200, body=sensores) except: theBody = json.dumps({'500': 'Error'}) return HTTPResponse(status=200, body=theBody) else: theBody = json.dumps({'401': 'Unauthorized'}) return HTTPResponse(status=401, body=theBody)
def method_not_allowed(res): if request.method == 'OPTIONS': new_res = bottle.HTTPResponse() new_res.set_header('Access-Control-Allow-Origin', '*') new_res.set_header('Access-Control-Allow-Methods', 'GET, POST, DELETE, PUT') new_res.set_header('Access-Control-Allow-Headers', 'X-Session-Token, X-Impersonate-Username, Content-Type') return new_res res.headers['Allow'] += ', OPTIONS' return request.app.default_error_handler(res)
def render_pdf_from_html(): template = ( bottle.request.forms.data or bottle.request.forms.template or '' ) backend = bottle.request.forms.backend or 'pdfkit' code = bottle.request.forms.qr or None options = bottle.request.forms.decode() try: qr_x = int(bottle.request.forms.qr_x or 545) qr_y = int(bottle.request.forms.qr_y or 20) version = None if bottle.request.forms.version: version = int(bottle.request.forms.version) except ValueError: return bottle.HTTPResponse( status=400, body='Invalid value passed to QR code coordinates.', ) try: pdf_file = backends_registry.get(backend).render(template, options) except AttributeError: return bottle.HTTPResponse( status=400, body='Provided backend (%s) is not supported.' % backend, ) if code is not None: try: pdf_file = postprocess_pdf(pdf_file, code, qr_x, qr_y, version) except ValueError: logger.error('Failed to append QR code', exc_info=True) return bottle.HTTPResponse( status=422, body='Unable to append QR code to rendered template.', ) bottle.response.headers['Content-Type'] = 'application/pdf; charset=UTF-8' return pdf_file
def buttonIf(): var = request.json # print (var) index = int(var["index"]) onoff = button(index) retBody = { "ret": "ok", "onoff": "on" if onoff == True else "off" } r = HTTPResponse(status=200, body=retBody) r.set_header('Content-Type', 'application/json') return r
def rotaryEncoderIf(): var = request.json # print (var) rotate = rotaryEncoder() retBody = { "ret": "ok", "rotate": rotate } r = HTTPResponse(status=200, body=retBody) r.set_header('Content-Type', 'application/json') return r
def tapIf(): var = request.json # print (var) tapNum = tap() retBody = { "ret": "ok", "tapNum": tapNum } r = HTTPResponse(status=200, body=retBody) r.set_header('Content-Type', 'application/json') return r
def GSensorIf(): var = request.json # print (var) xyz = GSensor() retBody = { "ret": "ok", "x": xyz[0], "y": xyz[1], "z": xyz[2], } r = HTTPResponse(status=200, body=retBody) r.set_header('Content-Type', 'application/json') return r
def method2(self) -> HTTPResponse: return HTTPResponse()
def apply(self, callback, route): # hack to support bottle v0.9.x if bottle.__version__.startswith('0.9'): config = route['config'] _callback = route['callback'] else: config = route.config _callback = route.callback if "sqlalchemy" in config: # support for configuration before `ConfigDict` namespaces g = lambda key, default: config.get('sqlalchemy', {}).get(key, default) else: g = lambda key, default: config.get('sqlalchemy.' + key, default) keyword = g('keyword', self.keyword) create = g('create', self.create) commit = g('commit', self.commit) use_kwargs = g('use_kwargs', self.use_kwargs) try: # check if inspect.signature exists inspect.signature except AttributeError: argspec = inspect.getargspec(_callback) parameters = argspec.args accept_kwargs = argspec.keywords else: parameters = inspect.signature(_callback).parameters accept_kwargs = any(p.kind == inspect.Parameter.VAR_KEYWORD for p in parameters.values()) if not ((use_kwargs and accept_kwargs) or keyword in parameters): return callback if create: self.metadata.create_all(self.engine) def wrapper(*args, **kwargs): kwargs[keyword] = session = self.create_session(bind=self.engine) try: rv = callback(*args, **kwargs) if commit: session.commit() except (SQLAlchemyError, bottle.HTTPError): session.rollback() raise except bottle.HTTPResponse: if commit: session.commit() raise finally: if isinstance(self.create_session, ScopedSession): self.create_session.remove() else: session.close() return rv return wrapper