我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用bottle.request.json()。
def new_comment(): data = request.json comments_dir = app._config['henet']['comments_dir'] article_uuid = data['source_path'] article_thread = ArticleThread(comments_dir, article_uuid) article_thread.add_comment(text=data['text'], author=data['author']) article_thread.save() notifs = app._config['notifications'] moderator = notifs.get('moderate_comment') if moderator is not None: app.send_email([moderator], u'Nouveau commentaire', MODERATE_BODY) emit(EVENT_CREATED_COMMENT, article_uuid=article_uuid) return {'result': 'OK'}
def _request(request, request_fallback=None): ''' Extract request fields wherever they may come from: GET, POST, forms, fallback ''' # Use lambdas to avoid evaluating bottle.request.* which may throw an Error all_dicts = [ lambda: request.json, lambda: request.forms, lambda: request.query, lambda: request.files, #lambda: request.POST, lambda: request_fallback ] request_dict = dict() for req_dict_ in all_dicts: try: req_dict = req_dict_() except KeyError: continue if req_dict is not None and hasattr(req_dict, 'items'): for req_key, req_val in req_dict.items(): request_dict[req_key] = req_val return request_dict
def bot_hook(): """Entry point for the Telegram connection.""" bot = telegram.Bot(botdata['BotToken']) dispatcher = Dispatcher(bot, None, workers=0) dispatcher.add_handler(CommandHandler('Abfahrten', abfahrten, pass_args=True)) dispatcher.add_handler(CommandHandler('abfahrten', abfahrten, pass_args=True)) dispatcher.add_handler(CommandHandler('Abfahrt', abfahrten, pass_args=True)) dispatcher.add_handler(CommandHandler('abfahrt', abfahrten, pass_args=True)) dispatcher.add_handler(CommandHandler('A', abfahrten, pass_args=True)) dispatcher.add_handler(CommandHandler('a', abfahrten, pass_args=True)) dispatcher.add_handler(CommandHandler('Hilfe', hilfe)) dispatcher.add_handler(CommandHandler('hilfe', hilfe)) dispatcher.add_handler(CommandHandler('help', hilfe)) dispatcher.add_handler(MessageHandler(Filters.location, nearest_stations)) update = telegram.update.Update.de_json(request.json, bot) dispatcher.process_update(update) return 'OK'
def load_config(config_file): """ Load configuration from file (output: dict) """ if os.path.isfile(config_file): try: with open(config_file, 'rU') as f: config = json.load(f) except ValueError: print('Wrong JSON format in {} file'.format(config_file)) sys.exit(3) except IOError as e: print('Error while reading from file, {}'.format(e)) sys.exit(2) else: return config else: print('Configuration file {} not found'.format(config_file)) sys.exit(1)
def add_woman(): new_woman = {} try: max_id_woman = max(EXTRAORDINARY_WOMEN, key=lambda x:x['id']) max_id = max_id_woman['id'] + 1 data = request.json new_woman = { "id": max_id, "name": data['name'], "origin": data['origin'], "occupation": data['occupation'] } EXTRAORDINARY_WOMEN.append(new_woman) return HTTPResponse( status=200, body=json.dumps({"extraordinary_woman": new_woman})) except: return HTTPResponse( status=400, body=json.dumps({'error': 'error adding a woman'}))
def network(): global network_graph try: graph_json = request.json except IndexError: pass # some network nodes could be removed from the graph to avoid confusing the user # the graph contains network_json = loads(graph_json) G = json_graph.node_link_graph(network_json) fig = plt.figure() plt.axis('off') networkx.draw_networkx(G, node_size=80, node_color='c', font_size=8) network_graph = BytesIO() fig.savefig(network_graph, format='png') # redirect any attempts to non-existing pages to the main page
def dtos_get_films_with_actors(): try: queryObject = QueryObject( filter = "ReleaseYear='{0}'".format(request.query['releaseYear']), expand = ['FilmActors.Actor', 'FilmCategories'] ) resultSerialData = dataService.dataViewDto.getItems("Film", queryObject) return json.dumps(resultSerialData, cls=CustomEncoder, indent=2) except dalUtils.StatusCodeError as err: response.status = err.value except: abort(400, 'Bad Request') # POST: POST: api/datasource/crud/operations/dtos/TestAction # with: Content-Type: application/json and body - {"param1":1}
def post_batch_entityset(entitySetName): try: result = dataProviderDto.apiProvider.handleInsertEntityBatch(entitySetName, request.json, dataService) response.content_type = "application/json; charset=utf-8" return json.dumps(result, cls=CustomEncoder) except dalUtils.StatusCodeError as err: response.status = err.value except: abort(400, 'Bad Request') ## DELETE: api/datasource/crud/batch/:entitySetName #@delete('/api/datasource/crud/batch/<entitySetName>') #def delete_batch_entityset(entitySetName): # try: # result = dataProviderDto.apiProvider.handleDeleteEntityBatch(entitySetName, request.json, dataService) # response.content_type = "application/json; charset=utf-8" # return json.dumps(result, cls=CustomEncoder) # except dalUtils.StatusCodeError as err: # response.status = err.value # except: # abort(400, 'Bad Request') # DELETE: api/datasource/crud/batch/:entitySetName?keys=key1:1,2,3,4;key2:4,5,6,7
def entities_get_films_with_actors(): try: queryObject = QueryObject( filter = "ReleaseYear='{0}'".format(request.query['releaseYear']), expand = ['FilmActors.Actor', 'FilmCategories'] ) resultSerialData = dataService.from_.remote.dtoView.Films.getItems(queryObject) return json.dumps(resultSerialData, cls=CustomEncoder, indent=2) except dalUtils.StatusCodeError as err: response.status = err.value except: abort(400, 'Bad Request') # POST: api/datasource/crud/operations/entities/TestAction # with: Content-Type: application/json and body - {"param1":1}
def apply(self, callback, route): def wrapper(*a, **ka): try: rv = callback(*a, **ka) except HTTPResponse as resp: rv = resp if isinstance(rv, dict): json_response = dumps(rv) response.content_type = 'application/json' return json_response elif isinstance(rv, HTTPResponse) and isinstance(rv.body, dict): rv.body = dumps(rv.body) rv.content_type = 'application/json' return rv return wrapper
def post_index(): event_type = request.get_header('X-GitHub-Event') if not is_request_from_github(): abort(403, "Forbidden for IP %s, it's not GitHub's address" % remote_ip()) if request.content_type.split(';')[0] != 'application/json': abort(415, "Expected application/json, but got %s" % request.content_type) if event_type == 'ping': return handle_ping() elif event_type == 'push': return handle_push() else: abort(400, "Unsupported event type: %s" % event_type)
def build_preview(): if request.content_type == 'application/json': rst = request.json['rst'] else: rst = request.POST['rst'] key = md5(rst) if key in _CACHE: res, warnings = _CACHE[key] else: res, warnings = _CACHE[key] = rst2html(rst, theme='acr') return {'result': res, 'warnings': warnings, 'valid': len(warnings) == 0}
def __call__(self, *args, **kwargs): headers = {} if self._access_token: headers['Authorization'] = 'Token ' + self._access_token resp = requests.post( self._url, json=kwargs, headers=headers ) if resp.ok: data = resp.json() if data.get('status') == 'failed': raise Error(resp.status_code, data.get('retcode')) return data.get('data') raise Error(resp.status_code)
def scale_down(k8s_host, **kwargs): """ Scale down number of replicas to 0 """ pass_headers = {} if 'k8s_api_headers' in kwargs: headers = kwargs.pop('k8s_api_headers') pass_headers.update(headers) pass_headers.update({ 'Content-Type': 'application/strategic-merge-patch+json' }) payload = { 'spec': { 'replicas': 0 } } api_path = K8S_API['deployments'] namespace = kwargs['namespace'] specs = kwargs['objects']['deployments']['specification'] if specs['kind'] == 'List': deployments = specs['items'] else: deployments = [specs] for deployment in deployments: deployment_name = deployment['metadata']['name'] url = '{}/{}/namespaces/{}/deployments/{}'.format( k8s_host, api_path, namespace, deployment_name ) req('PATCH', url, pass_headers, payload)
def get_kv(consul_host, key, list_keys=False): """ Retrieve value for specified key from Consul (output: dict or list) """ url = '{}/{}/{}'.format(consul_host, CONSUL_KV_API, key) if list_keys: value = req('GET', url + '/?keys') else: try: value = json.loads(b64decode(req('GET', url)[0]['Value'])) except ValueError as e: abort(422, 'Bad JSON: {}'.format(e)) return value
def list_women(): return HTTPResponse( status=200, body=json.dumps({"extraordinary_women": EXTRAORDINARY_WOMEN}))
def get_woman(woman_id): for woman in EXTRAORDINARY_WOMEN: if woman['id'] == int(woman_id): return HTTPResponse( status=200, body=json.dumps({'extraordinary_woman': woman})) else: return HTTPResponse( status=404, body=json.dumps({'error': 'id not found'}))
def delete_woman(woman_id): for woman in EXTRAORDINARY_WOMEN: if woman['id'] == int(woman_id): EXTRAORDINARY_WOMEN.remove(woman) return HTTPResponse(status=204) else: return HTTPResponse( status=204, body=json.dumps({'error': 'id not found'}))
def get_json_profiles(): """Get all profiles (JSON)""" results = db.get_profiles() return json.dumps(results)
def get_json_profile(item): """Get one profile info""" results = db.get_profile(item) return json.dumps(results)
def update_profile(name): """Update profile info (port & autostart)""" response.set_cookie("indiserver_profile", name, None, max_age=3600000, path='/') data = request.json port = data.get('port', args.indi_port) autostart = bool(data.get('autostart', 0)) db.update_profile(name, port, autostart)
def save_profile_drivers(name): """Add drivers to existing profile""" data = request.json db.save_profile_drivers(name, data)
def get_json_profile_labels(item): """Get driver labels of specific profile""" results = db.get_profile_drivers_labels(item) return json.dumps(results)
def get_server_status(): """Server status""" status = [{'status': str(indi_server.is_running()), 'active_profile': active_profile}] return json.dumps(status)
def get_server_drivers(): """List server drivers""" status = [] for driver in indi_server.get_running_drivers(): status.append({'driver': driver}) return json.dumps(status)
def get_json_groups(): """Get all driver families (JSON)""" response.content_type = 'application/json' families = collection.get_families() return json.dumps(sorted(families.keys()))
def get_json_drivers(): """Get all drivers (JSON)""" response.content_type = 'application/json' return json.dumps([ob.__dict__ for ob in collection.drivers])
def post_data(): global global_cnt global global_src_ip global global_dst_ip global global_proto try: post = request.json except IndexError: pass # overall received packets global_cnt += 1 data = loads(post) # src ip distribution if data[u'ip_src'] in global_src_ip.keys(): global_src_ip[data[u'ip_src']] += 1 else: global_src_ip[data[u'ip_src']] = 1 # dst ip distribution if data[u'ip_dst'] in global_dst_ip.keys(): global_dst_ip[data[u'ip_dst']] += 1 else: global_dst_ip[data[u'ip_dst']] = 1 # proto distribution if data[u'protocol'] in global_proto.keys(): global_proto[data[u'protocol']] += 1 else: global_proto[data[u'protocol']] = 1
def dtos_test_action(): try: param1 = request.json['param1'] # TODO: Add some actions in here except dalUtils.StatusCodeError as err: response.status = err.value except: abort(400, 'Bad Request')
def get_metadata(): try: response.content_type = "application/json; charset=utf-8" metadataClient = databaseInfo.getMetadataClient() return json.dumps(metadataClient, cls=MetadataEncoder, indent=2) except: abort(500, 'Internal server error') # GET: api/datasource/crud/:entitySetName?skip=20&top=10
def get_entityset(entitySetName): try: result = dataProviderDto.apiProvider.handleGet(entitySetName, request.query, dataService) response.content_type = "application/json; charset=utf-8" return json.dumps(result, cls=CustomEncoder, indent=2) except: abort(400, 'Bad Request') # GET: api/datasource/crud/single/:entitySetName?keys=key1:{key1}
def get_single_entityset(entitySetName): try: result = dataProviderDto.apiProvider.handleGetSingle(entitySetName, request.query, dataService) response.content_type = "application/json; charset=utf-8" return json.dumps(result, cls=CustomEncoder, indent=2) except: abort(400, 'Bad Request') # GET: api/datasource/crud/many/:entitySetName?keys=key1:1,2,3,4;key2:4,5,6,7
def put_entityset(entitySetName): try: result = dataProviderDto.apiProvider.handleUpdateEntity(entitySetName, request.query, request.json, dataService) response.content_type = "application/json; charset=utf-8" return json.dumps(result, cls=CustomEncoder) except dalUtils.StatusCodeError as err: response.status = err.value except: abort(400, 'Bad Request') # PATCH: api/datasource/crud/:entitySetName?keys=key1:{key1} #@patch('/api/datasource/crud/<entitySetName>')
def patch_entityset(entitySetName): try: result = dataProviderDto.apiProvider.handleUpdateEntity(entitySetName, request.query, request.json, dataService) response.content_type = "application/json; charset=utf-8" return json.dumps(result, cls=CustomEncoder) except dalUtils.StatusCodeError as err: response.status = err.value except: abort(400, 'Bad Request') # POST: api/datasource/crud/:entitySetName
def post_entityset(entitySetName): # test1 = json.loads(request.body.read()) try: result = dataProviderDto.apiProvider.handleInsertEntity(entitySetName, request.json, dataService) response.content_type = "application/json; charset=utf-8" return json.dumps(result, cls=CustomEncoder) except dalUtils.StatusCodeError as err: response.status = err.value except: abort(400, 'Bad Request') # DELETE: api/datasource/crud/:entitySetName?keys=key1:{key1}
def delete_entityset(entitySetName): try: result = dataProviderDto.apiProvider.handleDeleteEntity(entitySetName, request.query, dataService) response.content_type = "application/json; charset=utf-8" return json.dumps(result, cls=CustomEncoder) except dalUtils.StatusCodeError as err: response.status = err.value except: abort(400, 'Bad Request') # PUT: api/datasource/crud/batch/:entitySetName
def put_batch_entityset(entitySetName): try: result = dataProviderDto.apiProvider.handleUpdateEntityBatch(entitySetName, request.json, dataService) response.content_type = "application/json; charset=utf-8" return json.dumps(result, cls=CustomEncoder) except dalUtils.StatusCodeError as err: response.status = err.value except: abort(400, 'Bad Request') # PATCH: api/datasource/crud/batch/:entitySetName #@patch('/api/datasource/crud/batch/<entitySetName>')
def delete_batch_entityset(entitySetName): try: result = dataProviderDto.apiProvider.handleDeleteEntityBatch(entitySetName, request.query, dataService) response.content_type = "application/json; charset=utf-8" return json.dumps(result, cls=CustomEncoder) except dalUtils.StatusCodeError as err: response.status = err.value except: abort(400, 'Bad Request')
def entities_test_action(): try: param1 = request.json['param1'] # TODO: Add some actions in here except dalUtils.StatusCodeError as err: response.status = err.value except: abort(400, 'Bad Request')
def setup(self, app): def default_error_handler(res): if res.content_type == "application/json": return res.body res.content_type = "application/json" return dumps({'message': str(res.exception if res.exception else res.body)}) app.default_error_handler = default_error_handler
def _handle_auth(self): request_data = request.json csr = x509.load_pem_x509_csr(data=request_data['csr'].encode(), backend=default_backend()) # pylint: disable=unsubscriptable-object if not csr.is_signature_valid: raise HTTPResponse( status=400, body={'message': 'The certificate signing request signature is invalid.'} ) host = csr.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value csr_file = os.path.join(self.csr_path, "%s.csr" % (host)) crt_file = os.path.join(self.crt_path, "%s.crt" % (host)) if os.path.isfile(crt_file): crt = load_certificate(crt_file) if crt.public_key().public_numbers() == csr.public_key().public_numbers(): return { 'status': 'authorized', 'crt': dump_pem(crt).decode() } else: raise HTTPResponse( status=409, body={'message': 'Mismatch between the certificate signing request and the certificate.'} ) else: # Save CSR with open(csr_file, 'w') as f: f.write(csr.public_bytes(serialization.Encoding.PEM).decode()) response.status = 202 return { 'status': 'pending' }
def account_status(account_id): result = controller.db.iter_resources(account_id) response.content_type = "application/json" return json.dumps(result, indent=2, cls=Encoder)
def lock(account_id, resource_id): request_data = request.json for rp in ('region',): if not request_data or rp not in request_data: abort(400, "Missing required parameter %s" % rp) return controller.lock(account_id, resource_id, request_data['region'])
def info(account_id, resource_id): request_data = request.query if resource_id.startswith('sg-') and 'parent_id' not in request_data: abort(400, "Missing required parameter parent_id") result = controller.info( account_id, resource_id, request_data.get('parent_id', resource_id)) response.content_type = "application/json" return json.dumps(result, indent=2, cls=Encoder) # this set to post to restrict permissions, perhaps another url space.
def delta(account_id): request_data = request.json for rp in ('region',): if not request_data or rp not in request_data: abort(400, "Missing required parameter %s" % rp) result = controller.get_account_delta( account_id, request_data['region'], api_url()) response.content_type = "application/json" return json.dumps(result, indent=2, cls=Encoder)
def on_config_message(records): for r in records: json.loads(r['Sns'].get('Message'))
def error400(error): _add_cors_headers(response) response.set_header('Content-Type', 'application/json') return json.dumps({'detail': error.body})
def error404(error): _add_cors_headers(response) response.set_header('Content-Type', 'application/json') return json.dumps({'detail': 'Page not found'})
def validate(): try: data = request.json except JSONDecodeError: abort(400, 'Request data is not valid JSON') # Validate if data is None: abort(400, 'JSON payload missing') if 'data' not in data: abort(400, 'Payload does not contain a "data" field') try: data = json.loads(data['data']) except JSONDecodeError: return invalid_payload('Data is not valid JSON') if 'api' not in data: return invalid_payload('Data does not contain an "api" field') version = data['api'] if version not in SCHEMATA: return invalid_payload('Unknown api version: "%s"' % version) # Do validation of submitted endpoint try: valid, message = validation.validate(schema_path=SCHEMATA[version], data=data) except SchemaError: abort(500, 'Invalid schema on server! Please contact one of the admins.') return { 'valid': valid, 'message': message, }
def create(resources): """create a new resource""" if resources not in app.resources: abort(404, "Not Found") new_data = dict.fromkeys(app.resources[resources]["model"].keys()) for key in new_data: if key in request.json: new_data[key] = request.json.get(key) new_data['id'] = uuid.uuid4().__str__() app.data[resources][new_data['id']] = new_data return new_data