我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用ujson.dumps()。
def setUp(self): httpretty.enable() httpretty.register_uri( httpretty.GET, "http://test.gf/users", body=ujson.dumps([ sam_profile, jack_profile, ]), content_type="application/json" ) httpretty.register_uri( httpretty.GET, "http://test.gf/blog/sam", body=ujson.dumps(sams_articles), content_type="application/json" ) httpretty.register_uri( httpretty.GET, "http://test.gf/blog/jack", body=ujson.dumps(jacks_articles), content_type="application/json" )
def process_response(self, req, resp, resource, req_succeeded): """ Converts response body to JSON. :param req: Falcon request :type req: falcon.request.Request :param resp: Falcon response :type resp: falcon.response.Response :param resource: :type resource: falcon_dbapi.resources.base.BaseResource :param req_succeeded: :type req_succeeded: bool """ if req_succeeded: resp.body = json.dumps(resp.body)
def _exec_hmset_dict(self, insts): models_keys_insts_keys_insts_map = defaultdict(dict) models_keys_insts_keys_map = defaultdict(set) for inst in insts: model = type(inst) if not model.__use_redis__: continue filters_names_set = await self._get_filters_names_set(inst) for filters_names in filters_names_set: model_redis_key = model.get_key(filters_names.decode()) inst_redis_key = model.get_instance_key(inst) inst_old_redis_key = getattr(inst, 'old_redis_key', None) if inst_old_redis_key is not None and inst_old_redis_key != inst_redis_key: models_keys_insts_keys_map[model_redis_key].add(inst_old_redis_key) models_keys_insts_keys_insts_map[model_redis_key][inst_redis_key] = ujson.dumps(inst.todict()) for model_key, insts_keys_insts_map in models_keys_insts_keys_insts_map.items(): await self.redis_bind.hmset_dict(model_key, insts_keys_insts_map) for model_key, insts_keys in models_keys_insts_keys_map.items(): await self.redis_bind.hdel(model_key, *insts_keys)
def test_integrity_error_handling_with_foreign_key(self, post_method, stream, session): stream.feed_data(ujson.dumps([{'m2_id': 1}]).encode()) stream.feed_eof() request = SwaggerRequest('/model1/', 'post', body=stream, headers={'content-type': 'application/json'}) resp = await post_method(request, session) assert resp.status_code == 400 assert ujson.loads(resp.body) == { 'params': {'m2_id': 1}, 'database message': { 'message': 'Cannot add or update a child row: ' 'a foreign key constraint fails ' '(`swaggerit_test`.`model1_swagger`, ' 'CONSTRAINT `model1_swagger_ibfk_1` FOREIGN ' 'KEY (`m2_id`) REFERENCES `model2_swagger` ' '(`id`))', 'code': 1452 } }
def test_model_base_error_handling_with_patch_and_with_nested_delete(self, patch_method, post_method, stream, session): stream.feed_data(b'[{}]') stream.feed_eof() request = SwaggerRequest('/model1/1/', 'patch', body=stream, headers={'content-type': 'application/json'}) await post_method(request, session) stream = asyncio.StreamReader(loop=session.loop) body = {'model2_': {'id': 1, '_operation': 'delete'}} stream.feed_data(ujson.dumps(body).encode()) stream.feed_eof() request = SwaggerRequest('/model1/1/', 'patch', path_params={'id': 1}, body=stream, headers={'content-type': 'application/json'}) resp = await patch_method(request, session) assert resp.status_code == 400 assert ujson.loads(resp.body) == { 'instance': [body], 'message': "Can't execute nested 'delete' operation" }
def test_model_base_error_handling_with_patch_and_with_nested_remove(self, patch_method, post_method, stream, session): stream.feed_data(b'[{}]') stream.feed_eof() request = SwaggerRequest('/model1/1/', 'patch', body=stream, headers={'content-type': 'application/json'}) await post_method(request, session) stream = asyncio.StreamReader(loop=session.loop) body = {'model2_': {'id': 1, '_operation': 'remove'}} stream.feed_data(ujson.dumps(body).encode()) stream.feed_eof() request = SwaggerRequest('/model1/1/', 'patch', path_params={'id': 1}, body=stream, headers={'content-type': 'application/json'}) resp = await patch_method(request, session) assert resp.status_code == 400 assert ujson.loads(resp.body) == { 'instance': [body], 'message': "Can't execute nested 'remove' operation" }
def test_model_base_error_handling_with_patch_and_with_nested_update(self, patch_method, post_method, stream, session): stream.feed_data(b'[{}]') stream.feed_eof() request = SwaggerRequest('/model1/1/', 'patch', body=stream, headers={'content-type': 'application/json'}) await post_method(request, session) stream = asyncio.StreamReader(loop=session.loop) body = {'model2_': {'id': 1, '_operation': 'update'}} stream.feed_data(ujson.dumps(body).encode()) stream.feed_eof() request = SwaggerRequest('/model1/1/', 'patch', path_params={'id': 1}, body=stream, headers={'content-type': 'application/json'}) resp = await patch_method(request, session) assert resp.status_code == 400 assert ujson.loads(resp.body) == { 'instance': [body], 'message': "Can't execute nested 'update' operation" }
def test_if_istances_are_seted_on_redis_with_two_models_correctly( self, session, redis): session.add(await Model10.new(session, id=1)) session.add(await Model11.new(session, id=1)) session.add(await Model10.new(session, id=2)) session.add(await Model11.new(session, id=2)) await session.commit() assert await redis.hgetall(Model10.__key__) == { b'1': ujson.dumps({b'id': 1}).encode(), b'2': ujson.dumps({b'id': 2}).encode() } assert await redis.hgetall(Model11.__key__) == { b'1': ujson.dumps({b'id': 1}).encode(), b'2': ujson.dumps({b'id': 2}).encode() }
def test_if_two_commits_sets_redis_with_two_models_correctly( self, session, redis): session.add(await Model10.new(session, id=1)) session.add(await Model11.new(session, id=1)) await session.commit() assert await redis.hgetall(Model10.__key__) == { b'1': ujson.dumps({b'id': 1}).encode() } assert await redis.hgetall(Model11.__key__) == { b'1': ujson.dumps({b'id': 1}).encode() } session.add(await Model10.new(session, id=2)) session.add(await Model11.new(session, id=2)) await session.commit() assert await redis.hgetall(Model10.__key__) == { b'1': ujson.dumps({b'id': 1}).encode(), b'2': ujson.dumps({b'id': 2}).encode() } assert await redis.hgetall(Model11.__key__) == { b'1': ujson.dumps({b'id': 1}).encode(), b'2': ujson.dumps({b'id': 2}).encode() }
def test_if_two_instance_are_deleted_from_redis(self, session, redis): inst1 = await Model10.new(session, id=1) inst2 = await Model10.new(session, id=2) session.add_all([inst1, inst2]) await session.commit() assert await redis.hgetall(Model10.__key__) == { b'1': ujson.dumps({b'id': 1}).encode(), b'2': ujson.dumps({b'id': 2}).encode() } session.delete(inst1) session.delete(inst2) await session.commit() assert await redis.hgetall(Model10.__key__) == {}
def _post(self, uri, data): if type(data) == dict: data = [data] if type(data[0]) != dict: raise RuntimeError('submitted data must be a dictionary') data = json.dumps(data) if self.nowait: uri = "{0}?nowait=1".format(uri) logger.debug('uri: %s' % uri) body = self.session.post(uri, data=data, verify=self.verify_ssl) logger.debug('status code: ' + str(body.status_code)) if body.status_code > 299: logger.error('request failed: %s' % str(body.status_code)) logger.error(json.loads(body.text).get('message')) return None body = json.loads(body.text) return body
def test_can_access_resource_params(self): """Test that params can be accessed from within process_resource""" global context class Resource: def on_get(self, req, resp, **params): resp.body = json.dumps(params) app = falcon.API(middleware=AccessParamsMiddleware()) app.add_route('/path/{id}', Resource()) client = testing.TestClient(app) response = client.simulate_request(path='/path/22') assert 'params' in context assert context['params'] assert context['params']['id'] == '22' assert response.json == {'added': True, 'id': '22'}
def __init__(self, status=None, body=None, json=None, headers=None): self._default_status = status self._default_headers = headers if json is not None: if body is not None: msg = 'Either json or body may be specified, but not both' raise ValueError(msg) self._default_body = json_dumps(json, ensure_ascii=False) else: self._default_body = body self.captured_req = None self.captured_resp = None self.captured_kwargs = None
def send_message(self, method, body, headers): id = self._message_id self._message_id += 1 new_headers = { 'message_id': id, 'sent_at': time.time(), } if headers: new_headers.update(headers) payload = { 'method': method, 'body': body, 'headers': new_headers, } # This is a bit ugly, but decide how much we care if (method != 'v0.reply.control.ping' and 'parent_message_id' in new_headers) or\ method == 'v0.connection.close': logger.info('Sending rewarder message: %s', payload) else: logger.debug('Sending rewarder message: %s', payload) self.sendMessage(ujson.dumps(payload).encode('utf-8'), False)
def test_double_performance_float_precision(benchmark): print("\nArray with 256 doubles:") name = 'rapidjson (precise)' serialize = rapidjson.dumps deserialize = rapidjson.loads ser_data, des_data = benchmark(run_client_test, name, serialize, deserialize, data=doubles, iterations=50000, ) msg = "%-11s serialize: %0.3f deserialize: %0.3f total: %0.3f" % ( name, ser_data, des_data, ser_data + des_data ) print(msg)
def from_tweets(cls, tweets, metadata=None, **kwargs): """ :param tweets: a iterable of tweets :param kwargs: extra attributes to be considered for inclusion. should be json serializable. :return: """ tl = cls() json_tweets = json.loads(serializers.serialize("json", tweets)); for key, values in kwargs.items(): if len(values) != len(json_tweets): continue for tweet, value in zip(json_tweets, values): tweet['fields'][key] = value tl.save() json_repr = {'metadata': metadata, 'tweets': json_tweets, 'pk': tl.pk, 'created_at': tl.datetime.isoformat()} tl.json = json.dumps(json_repr) tl.save() return tl
def test_should_fail_missing_timestamp_in_body(self, bulk_processor): events_resource = _init_resource(self) events_resource._processor = bulk_processor unit_test_patch = os.path.dirname(__file__) json_file_path = 'event_template_json/req_simple_event.json' patch_to_req_simple_event_file = os.path.join(unit_test_patch, json_file_path) with open(patch_to_req_simple_event_file, 'r') as fi: events = json.load(fi)['events'] body = {'events': [events]} self.simulate_request( path=ENDPOINT, method='POST', headers={ 'Content-Type': 'application/json', 'X_ROLES': 'monasca' }, body=json.dumps(body) ) self.assertEqual(falcon.HTTP_422, self.srmock.status)
def doAudit(user, path, kwargs, responsetime, statuscode, result, tags): client = getClient('system') audit = client.audit.new() audit.user = user audit.call = path audit.statuscode = statuscode audit.tags = tags audit.args = json.dumps([]) # we dont want to log self auditkwargs = kwargs.copy() auditkwargs.pop('ctx', None) audit.kwargs = json.dumps(auditkwargs) try: if not isinstance(result, types.GeneratorType): audit.result = json.dumps(result) else: audit.result = json.dumps('Result of type generator') except: audit.result = json.dumps('binary data') audit.responsetime = responsetime client.audit.set(audit)
def authenticate(self, type='', **kwargs): cache = j.clients.redis.getByInstance('system') if j.core.portal.active.force_oauth_instance and not type: type = j.core.portal.active.force_oauth_instance if not type: type = 'github' ctx = kwargs['ctx'] referer = ctx.env.get('HTTP_REFERER', '/') redirect = kwargs.get('redirect', referer) client = j.clients.oauth.get(instance=type) cache_data = json.dumps({'type': type, 'redirect': redirect}) cache.set(client.state, cache_data, ex=600) ctx.start_response('302 Found', [('Location', client.url)]) return 'OK'
def test_delete_valid(self, init_db, client, headers, headers_without_content_type): client = await client body = [{ 'store_id': 1, 'name': 'Placement Test', 'variations': [{ 'name': 'Var 1', '_operation': 'insert', 'slots': [{'id': 1}] }] }] resp = await client.post('/placements/', headers=headers, data=ujson.dumps(body)) obj = (await resp.json())[0] resp = await client.get('/placements/{}/'.format(obj['small_hash']), headers=headers_without_content_type) assert resp.status == 200 resp = await client.delete('/placements/{}/'.format(obj['small_hash']), headers=headers_without_content_type) assert resp.status == 204 resp = await client.get('/placements/{}/'.format(obj['small_hash']), headers=headers_without_content_type) assert resp.status == 404
def test_get_items_not_found(self, init_db, client, headers, headers_without_content_type): body = [{ 'store_id': 1, 'name': 'Placement Test', 'variations': [{ '_operation': 'insert', 'name': 'Var 1', 'slots': [{'id': 2}] }] }] client = await client resp = await client.post('/placements/', headers=headers, data=ujson.dumps(body)) obj = (await resp.json())[0] resp = await client.get('/placements/{}/items'.format(obj['small_hash']), headers=headers_without_content_type) assert resp.status == 404
def test_patch(self, init_db, client, headers): client = await client body = [{ 'name': 'test', 'class_module': 'tests.integration.fixtures', 'class_name': 'EngineStrategyTest' }] resp = await client.post('/engine_strategies/', headers=headers, data=ujson.dumps(body)) obj = (await resp.json())[0] body = { 'name': 'test2' } resp = await client.patch('/engine_strategies/1/', headers=headers, data=ujson.dumps(body)) obj['name'] = 'test2' assert resp.status == 200 assert (await resp.json()) == obj
def test_delete_valid(self, init_db, client, headers, headers_without_content_type): client = await client body = [{ 'name': 'test', 'class_module': 'tests.integration.fixtures', 'class_name': 'EngineStrategyTest' }] resp = await client.post('/engine_strategies/', headers=headers, data=ujson.dumps(body)) resp = await client.get('/engine_strategies/1/', headers=headers_without_content_type) assert resp.status == 200 resp = await client.delete('/engine_strategies/1/', headers=headers_without_content_type) assert resp.status == 204 resp = await client.get('/engine_strategies/1/', headers=headers_without_content_type) assert resp.status == 404
def test_get(self, init_db, headers, headers_without_content_type, client): client = await client body = [{ 'name': 'test', 'stores': [{'id': 1}], 'schema': {'properties': {'test': {'type': 'string'}}, 'type': 'object', 'id_names': ['test']} }] await client.post('/item_types/', headers=headers, data=ujson.dumps(body)) body[0]['id'] = 1 body[0]['available_filters'] = [{'name': 'test', 'schema': {'type': 'string'}}] body[0]['stores'] = [{ 'configuration': {'data_path': '/test'}, 'country': 'test', 'id': 1, 'name': 'test' }] body[0]['store_items_class'] = None resp = await client.get('/item_types/?stores=id:1', headers=headers_without_content_type) assert resp.status == 200 assert await resp.json() == body
def test_delete(self, init_db, headers, headers_without_content_type, client): client = await client body = [{ 'name': 'test', 'stores': [{'id': 1}], 'schema': {'properties': {'test': {'type': 'string'}}, 'type': 'object', 'id_names': ['test']} }] resp = await client.post('/item_types/', headers=headers, data=ujson.dumps(body)) resp = await client.get('/item_types/1/', headers=headers_without_content_type) assert resp.status == 200 resp = await client.delete('/item_types/1/', headers=headers_without_content_type) assert resp.status == 204 resp = await client.get('/item_types/1/', headers=headers_without_content_type) assert resp.status == 404
def test_get(self, init_db, headers, headers_without_content_type, client): client = await client body = [{ 'name': 'test', 'stores': [{'id': 1}], 'schema': {'properties': {'test': {'type': 'string'}}, 'type': 'object', 'id_names': ['test']} }] await client.post('/item_types/', headers=headers, data=ujson.dumps(body)) resp = await client.get('/item_types/1/', headers=headers_without_content_type) body[0]['id'] = 1 body[0]['available_filters'] = [{'name': 'test', 'schema': {'type': 'string'}}] body[0]['stores'] = [{ 'configuration': {'data_path': '/test'}, 'country': 'test', 'id': 1, 'name': 'test' }] body[0]['store_items_class'] = None assert resp.status == 200 assert await resp.json() == body[0]
def test_items_post_invalid(self, init_db, headers, client): client = await client body = [{ 'name': 'test', 'stores': [{'id': 1}], 'schema': {'properties': {'id': {'type': 'string'}}, 'type': 'object', 'id_names': ['id']} }] await client.post('/item_types/', headers=headers, data=ujson.dumps(body)) body = [{'id': 1}] resp = await client.post('/item_types/1/items?store_id=1', headers=headers, data=ujson.dumps(body)) assert resp.status == 400 assert await resp.json() == { 'instance': 1, 'message': "1 is not of type 'string'. "\ "Failed validating instance['0']['id'] "\ "for schema['items']['properties']['id']['type']", 'schema': {'type': 'string'} }
def test_items_patch_invalid(self, init_db, headers, client): client = await client body = [{ 'name': 'test', 'stores': [{'id': 1}], 'schema': { 'properties': { 'id': {'type': 'string'} }, 'type': 'object', 'id_names': ['id'] } }] await client.post('/item_types/', headers=headers, data=ujson.dumps(body)) body = [{'id': 1}] resp = await client.post('/item_types/1/items?store_id=1', headers=headers, data=ujson.dumps(body)) assert resp.status == 400 assert await resp.json() == { 'instance': 1, 'message': "1 is not of type 'string'. "\ "Failed validating instance['0']['id'] "\ "for schema['items']['properties']['id']['type']", 'schema': {'type': 'string'} }
def test_if_items_patch_updates_stock_filter(self, init_db, headers, redis, session, client, api): body = [{ 'name': 'test', 'stores': [{'id': 1}], 'schema': {'properties': {'id': {'type': 'string'}}, 'type': 'object', 'id_names': ['id']} }] client = await client await client.post('/item_types/', headers=headers, data=ujson.dumps(body)) body = [{'id': 'test'}] resp = await client.post('/item_types/1/items?store_id=1', headers=headers, data=ujson.dumps(body)) assert resp.status == 201 test_model = _all_models['store_items_test_1'] await ItemsIndicesMap(test_model).update(session) body = [{'id': 'test', '_operation': 'delete'}] resp = await client.patch('/item_types/1/items?store_id=1', headers=headers, data=ujson.dumps(body)) stock_filter = np.fromstring(await redis.get('store_items_test_1_stock_filter'), dtype=np.bool).tolist() assert stock_filter == [False]
def test_post(self, init_db, client, headers, headers_without_content_type): client = await client body = [{ 'name': 'Top Seller Object Test', 'type': 'top_seller_array', 'configuration': {'days_interval': 7}, 'store_id': 1, 'item_type_id': 1, 'strategy_id': 1 }] resp = await client.post('/engine_objects/', headers=headers, data=ujson.dumps(body)) resp_json = (await resp.json()) body[0]['id'] = 2 body[0]['store'] = resp_json[0]['store'] body[0]['strategy'] = resp_json[0]['strategy'] body[0]['item_type'] = resp_json[0]['item_type'] assert resp.status == 201 assert resp_json == body
def test_patch(self, init_db, client, headers, headers_without_content_type): client = await client body = [{ 'name': 'Top Seller Object Test', 'type': 'top_seller_array', 'configuration': {'days_interval': 7}, 'store_id': 1, 'item_type_id': 1, 'strategy_id': 1 }] resp = await client.post('/engine_objects/', headers=headers, data=ujson.dumps(body)) obj = (await resp.json())[0] body = { 'name': 'test2' } resp = await client.patch('/engine_objects/2/', headers=headers, data=ujson.dumps(body)) obj['name'] = 'test2' assert resp.status == 200 assert (await resp.json()) == obj
def test_exporter_get_running(self, init_db, headers_without_content_type, headers, client, monkeypatch, loop): set_patches(monkeypatch) prods = [ujson.dumps({'value': i, 'item_key': 'test{}'.format(i)}).encode() for i in range(100)] set_readers_builders_patch(monkeypatch, [[b'\n'.join(prods)]]) client = await client products = [{'sku': 'test{}'.format(i)} for i in range(10)] await _post_products(client, headers, headers_without_content_type, products) await client.post('/engine_objects/1/export', headers=headers_without_content_type) resp = await client.get( '/engine_objects/1/export?job_hash=6342e10bd7dca3240c698aa79c98362e', headers=headers_without_content_type) assert await resp.json() == {'status': 'running'} await _wait_job_finish(client, headers_without_content_type)
def test_delete_one(self, init_db, client, headers, headers_without_content_type): client = await client user = [{ 'name': 'test2', 'email': 'test2', 'password': 'test' }] resp = await client.post('/users', data=ujson.dumps(user), headers=headers) assert resp.status == 201 resp = await client.get('/users/test2', headers=headers_without_content_type) assert resp.status == 200 resp = await client.delete('/users/test2', headers=headers_without_content_type) assert resp.status == 204 resp = await client.get('/users/test2', headers=headers_without_content_type) assert resp.status == 404
def test_patch(self, init_db, client, headers, headers_without_content_type): client = await client body = [{ 'uri': 'test' }] resp = await client.post('/uris/', headers=headers, data=ujson.dumps(body)) obj = (await resp.json())[0] body = { 'uri': 'test2' } resp = await client.patch('/uris/5/', headers=headers, data=ujson.dumps(body)) obj['uri'] = 'test2' assert resp.status == 200 assert (await resp.json()) == obj
def test_post_with_invalid_grant(self, client): client = await client body = [{ 'max_items': 10, 'name': 'test', 'store_id': 1, 'engine_id': 1, 'slot_variables': [{ '_operation': 'insert', 'external_variable_id': 1, 'engine_variable_name': 'item_id' }] }] resp = await client.post('/slots/', headers={'Authorization': 'invalid'}, data=ujson.dumps(body)) assert resp.status == 401 assert await resp.json() == {'message': 'Invalid authorization'}
def test_patch_with_invalid_fallback_id(self, init_db, client, headers, headers_without_content_type): client = await client body = [{ 'max_items': 10, 'name': 'test', 'store_id': 1, 'engine_id': 1, 'slot_variables': [{ '_operation': 'insert', 'external_variable_id': 1, 'engine_variable_name': 'filter_test' }] }] resp = await client.post('/slots/', headers=headers, data=ujson.dumps(body)) body = { 'fallbacks': [{'id': 1}] } resp = await client.patch('/slots/1/', headers=headers, data=ujson.dumps(body)) assert resp.status == 400 assert await resp.json() == { 'instance': [{'fallbacks': [{'id': 1}]}], 'message': "a Engine Manager can't fallback itself" }
def test_delete(self, init_db, client, headers, headers_without_content_type): client = await client body = [{ 'max_items': 10, 'name': 'test', 'store_id': 1, 'engine_id': 1, 'slot_variables': [{ '_operation': 'insert', 'external_variable_id': 1, 'engine_variable_name': 'filter_test' }] }] await client.post('/slots/', headers=headers, data=ujson.dumps(body)) resp = await client.get('/slots/1/', headers=headers_without_content_type) assert resp.status == 200 resp = await client.delete('/slots/1/', headers=headers_without_content_type) assert resp.status == 204 resp = await client.get('/slots/1/', headers=headers_without_content_type) assert resp.status == 404
def test_patch(self, init_db, client, headers, headers_without_content_type): client = await client body = [{ 'name': 'test', 'country': 'test', 'configuration': {'data_path': '/test'} }] resp = await client.post('/stores/', headers=headers, data=ujson.dumps(body)) obj = (await resp.json())[0] body = { 'name': 'test2' } resp = await client.patch('/stores/1/', headers=headers, data=ujson.dumps(body)) obj['name'] = 'test2' assert resp.status == 200 assert (await resp.json()) == obj
def test_delete(self, init_db, client, headers, headers_without_content_type): client = await client body = [{ 'name': 'test', 'country': 'test', 'configuration': {'data_path': '/test'} }] await client.post('/stores/', headers=headers, data=ujson.dumps(body)) resp = await client.get('/stores/1/', headers=headers_without_content_type) assert resp.status == 200 resp = await client.delete('/stores/1/', headers=headers_without_content_type) assert resp.status == 204 resp = await client.get('/stores/1/', headers=headers_without_content_type) assert resp.status == 404
def test_patch(self, init_db, client, headers): client = await client body = [{ 'name': 'test', 'store_id': 1 }] resp = await client.post('/external_variables/', headers=headers, data=ujson.dumps(body)) obj = (await resp.json())[0] body = { 'name': 'test2' } resp = await client.patch('/external_variables/1/', headers=headers, data=ujson.dumps(body)) obj['name'] = 'test2' obj['id'] = 1 assert resp.status == 200 assert (await resp.json()) == obj
def test_delete_valid(self, init_db, client, headers, headers_without_content_type): client = await client body = [{ 'name': 'test', 'store_id': 1 }] resp = await client.post('/external_variables/', headers=headers, data=ujson.dumps(body)) resp = await client.get('/external_variables/1/', headers=headers_without_content_type) assert resp.status == 200 resp = await client.delete('/external_variables/1/', headers=headers_without_content_type) assert resp.status == 204 resp = await client.get('/external_variables/1/', headers=headers_without_content_type) assert resp.status == 404
def yield_num(result, queue, table): j = 1 stop = result.rowcount values = "" for i in result: if (j % 2048) == 0 or j == stop: values = '{}(\'{}\') '.format( values, json.dumps(i).replace("'", "`")) sql = "INSERT INTO {} (data) VALUES {}".format( table, values) values = "" queue.put_nowait(sql) else: values = '{}(\'{}\'), '.format( values, json.dumps(i).replace("'", "`")) j = j + 1
def create_random_vote(self): """Create secrrets for voting.""" """This is done for prevent automatic or robotic voting.""" random_number = random.randrange(0, 100000) current_time = int(time.time()) result = await self.db.fetchrow( # 'select * from stickers order by random() limit 1;' 'SELECT * FROM stickers TABLESAMPLE SYSTEM_ROWS(1);' ) random_sticker = json.loads(result[1]) token = await self.db.fetchval( "select md5('{}');".format(random_number)) await self.db.fetch( "insert into secret (data) values" "('{}')".format(json.dumps([ token, current_time, random_sticker[0], random_sticker[2] ]))) return (random_sticker[2], token)
def save_ygdy8_data(sid, url, data): """????????? Args: sid (string): ??id url (string): ???? data (dict): ???? """ celery_logger.info('save_ygdy8_data sid: {} url: {}'.format(sid, url)) try: post_date = ygdy8.get_post_date(url) title = data.get('title', '') images = data.get('images') if images: poster_url = images[0] try: poster = qiniu_upload_img(poster_url) except Exception as exc: celery_logger.error('upload image: {} failed'.format(poster_url)) celery_logger.exception(exc) poster = '' else: poster = '' data['date'] = post_date data['url'] = url collected_at = trans_date(post_date) record = Entity.add( sid, title, poster, ujson.dumps(data, ensure_ascii=False), ENTITY_YGDY_MOVIE, collected_at) add_ygdy_movie(record.sid, data) except Exception as exc: celery_logger.exception(exc)
def json(body, status_code=200, cookies=None, encoding="utf-8", headers=None): """ json """ resp = HTTPResponse('application/json; charset=' + encoding, encoding, status_code=status_code) if cookies: resp.cookies = cookies if headers: for header in headers: resp.headers.append(header) resp.write_bytes(json_dumps(body).encode(encoding)) return resp
def format(self, record): """Default json formatter.""" error_location = "%s.%s" % (record.name, record.funcName) line_number = "%s" % (record.lineno) location_line = error_location[:32] + ":" + line_number output = {'log_time': self.formatTime(record, self.datefmt), 'log_location': location_line, 'log_level': record.levelname, 'message': record.getMessage()} return json.dumps(output)
def serialize(value): return dumps(value)
def serialize(value): return dumps(value, default=json_util.default)
def post(self, url, data, timeout=None): """Request an URL. Args: url (str): The web location we want to retrieve. data (dict[str, str]): A dict of key/value pairs. Note: On py2.7 value is unicode. timeout (Optional[int|float]): If this value is specified, use it as the read timeout from the server (instead of the one specified during creation of the connection pool). Returns: A JSON object. """ urlopen_kwargs = {} if timeout is not None: urlopen_kwargs['timeout'] = Timeout(read=timeout, connect=self._connect_timeout) if InputFile.is_inputfile(data): data = InputFile(data) result = self._request_wrapper( 'POST', url, body=data.to_form(), headers=data.headers, **urlopen_kwargs) else: data = json.dumps(data) result = self._request_wrapper( 'POST', url, body=data.encode(), headers={'Content-Type': 'application/json'}, **urlopen_kwargs) return self._parse(result)
def to_json(self): """ Returns: str: """ return json.dumps(self.to_dict())