我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用simplejson.dumps()。
def DumpReadable(self, *args): """ DumpReadable(Feature self) void OGR_F_DumpReadable(OGRFeatureH hFeat, FILE *fpOut) Dump this feature in a human readable form. This dumps the attributes, and geometry; however, it doesn't definition information (other than field types and names), nor does it report the geometry spatial reference system. This function is the same as the C++ method OGRFeature::DumpReadable(). Parameters: ----------- hFeat: handle to the feature to dump. fpOut: the stream to write to, such as strout. """ return _ogr.Feature_DumpReadable(self, *args)
def _register_avro_schema( self, namespace, source, schema_json=None, **overrides ): schema_json = schema_json or { 'type': 'record', 'name': source, 'namespace': namespace, 'doc': 'test', 'fields': [{'type': 'int', 'doc': 'test', 'name': 'foo'}] } params = { 'namespace': namespace, 'source': source, 'source_owner_email': self.source_owner_email, 'schema': simplejson.dumps(schema_json), 'contains_pii': False } if overrides: params.update(**overrides) return self._get_client().schemas.register_schema(body=params).result()
def pk_topic_resp(self, yelp_namespace_name, usr_src_name): pk_schema_json = { 'type': 'record', 'name': usr_src_name, 'namespace': yelp_namespace_name, 'doc': 'test', 'fields': [ {'type': 'int', 'doc': 'test', 'name': 'id', 'pkey': 1}, {'type': 'int', 'doc': 'test', 'name': 'data'} ], 'pkey': ['id'] } return self._register_avro_schema( yelp_namespace_name, usr_src_name, schema=simplejson.dumps(pk_schema_json) ).topic
def _register_avro_schema(self, namespace, source, two_fields): fields = [{'type': 'int', 'doc': 'test', 'name': 'foo'}] if two_fields: fields.append({'type': 'int', 'doc': 'test', 'name': 'bar'}) schema_json = { 'type': 'record', 'name': source, 'namespace': namespace, 'doc': 'test', 'fields': fields } return get_schematizer().register_schema( namespace=namespace, source=source, schema_str=simplejson.dumps(schema_json), source_owner_email=self.source_owner_email, contains_pii=False, base_schema_id=None )
def _format_message(self, message): result_dict = self._get_message_result_dict(message) if self.options.iso_time: self._iso_time(result_dict) if self.options.json: return simplejson.dumps( obj=result_dict, sort_keys=True, # Objects can use _asdict() to be encoded as JSON objects namedtuple_as_object=True, # Use an object's __repr__() to return a serializable version # of an object, rather than raising a TypeError, if the object # does not define an _asdict() method default=lambda x: repr(x) ) else: return result_dict
def get_product(org_id, cp_id): """ Find and return the label of the given product ID """ prod_list = helpers.get_p_json( helpers.KATELLO_API + "/products/", \ json.dumps( { "organization_id": org_id, "per_page": '1000', } )) for prod in prod_list['results']: if prod['cp_id'] == cp_id: prodlabel = prod['label'] return prodlabel
def test_separators(self): lst = [1,2,3,4] expect = '[\n1,\n2,\n3,\n4\n]' expect_spaces = '[\n1, \n2, \n3, \n4\n]' # Ensure that separators still works self.assertEqual( expect_spaces, json.dumps(lst, indent=0, separators=(', ', ': '))) # Force the new defaults self.assertEqual( expect, json.dumps(lst, indent=0, separators=(',', ': '))) # Added in 2.1.4 self.assertEqual( expect, json.dumps(lst, indent=0))
def test_iterable(self): for l in ([], [1], [1, 2], [1, 2, 3]): for opts in [{}, {'indent': 2}]: for dumps in (json.dumps, iter_dumps, sio_dump): expect = dumps(l, **opts) default_expect = dumps(sum(l), **opts) # Default is False self.assertRaises(TypeError, dumps, iter(l), **opts) self.assertRaises(TypeError, dumps, iter(l), iterable_as_array=False, **opts) self.assertEqual(expect, dumps(iter(l), iterable_as_array=True, **opts)) # Ensure that the "default" gets called self.assertEqual(default_expect, dumps(iter(l), default=sum, **opts)) self.assertEqual(default_expect, dumps(iter(l), iterable_as_array=False, default=sum, **opts)) # Ensure that the "default" does not get called self.assertEqual( expect, dumps(iter(l), iterable_as_array=True, default=sum, **opts))
def test_tuple_array_dump(self): t = (1, 2, 3) expect = json.dumps(list(t)) # Default is True sio = StringIO() json.dump(t, sio) self.assertEqual(expect, sio.getvalue()) sio = StringIO() json.dump(t, sio, tuple_as_array=True) self.assertEqual(expect, sio.getvalue()) self.assertRaises(TypeError, json.dump, t, StringIO(), tuple_as_array=False) # Ensure that the "default" does not get called sio = StringIO() json.dump(t, sio, default=repr) self.assertEqual(expect, sio.getvalue()) sio = StringIO() json.dump(t, sio, tuple_as_array=True, default=repr) self.assertEqual(expect, sio.getvalue()) # Ensure that the "default" gets called sio = StringIO() json.dump(t, sio, tuple_as_array=False, default=repr) self.assertEqual( json.dumps(repr(t)), sio.getvalue())
def test_listrecursion(self): x = [] x.append(x) try: json.dumps(x) except ValueError: pass else: self.fail("didn't raise ValueError on list recursion") x = [] y = [x] x.append(y) try: json.dumps(x) except ValueError: pass else: self.fail("didn't raise ValueError on alternating list recursion") y = [] x = [y, y] # ensure that the marker is cleared json.dumps(x)
def run_command(session, args): cmd = json.loads(args.get('cmd')) if cmd and cmd[0] not in ALLOWED_CMDS: msg = _("Dom0 execution of '%s' is not permitted") % cmd[0] raise PluginError(msg) returncode, out, err = _run_command( cmd, json.loads(args.get('cmd_input', 'null'))) if not err: err = "" if not out: out = "" # This runs in Dom0, will return to neutron-ovs-agent in compute node result = {'returncode': returncode, 'out': out, 'err': err} return json.dumps(result)
def key_init(self, arg_dict): """Handles the Diffie-Hellman key exchange with the agent to establish the shared secret key used to encrypt/decrypt sensitive info to be passed, such as passwords. Returns the shared secret key value. """ timeout = int(arg_dict.pop('timeout', DEFAULT_TIMEOUT)) # WARNING: Some older Windows agents will crash if the public key isn't # a string pub = arg_dict["pub"] arg_dict["value"] = json.dumps({"name": "keyinit", "value": pub}) request_id = arg_dict["id"] arg_dict["path"] = "data/host/%s" % request_id xenstore.write_record(self, arg_dict) try: resp = _wait_for_agent(self, request_id, arg_dict, timeout) except TimeoutError, e: # noqa raise PluginError(e) return resp
def password(self, arg_dict): """Writes a request to xenstore that tells the agent to set the root password for the given VM. The password should be encrypted using the shared secret key that was returned by a previous call to key_init. The encrypted password value should be passed as the value for the 'enc_pass' key in arg_dict. """ timeout = int(arg_dict.pop('timeout', DEFAULT_TIMEOUT)) enc_pass = arg_dict["enc_pass"] arg_dict["value"] = json.dumps({"name": "password", "value": enc_pass}) request_id = arg_dict["id"] arg_dict["path"] = "data/host/%s" % request_id xenstore.write_record(self, arg_dict) try: resp = _wait_for_agent(self, request_id, arg_dict, timeout) except TimeoutError, e: # noqa raise PluginError(e) return resp
def agent_update(self, arg_dict): """Expects an URL and md5sum of the contents Then directs the agent to update itself. """ timeout = int(arg_dict.pop('timeout', DEFAULT_TIMEOUT)) request_id = arg_dict["id"] url = arg_dict["url"] md5sum = arg_dict["md5sum"] arg_dict["value"] = json.dumps({"name": "agentupdate", "value": "%s,%s" % (url, md5sum)}) arg_dict["path"] = "data/host/%s" % request_id xenstore.write_record(self, arg_dict) try: resp = _wait_for_agent(self, request_id, arg_dict, timeout) except TimeoutError, e: # noqa raise PluginError(e) return resp
def _get_agent_features(self, arg_dict): """Return an array of features that an agent supports.""" timeout = int(arg_dict.pop('timeout', DEFAULT_TIMEOUT)) tmp_id = commands.getoutput("uuidgen") dct = {} dct.update(arg_dict) dct["value"] = json.dumps({"name": "features", "value": ""}) dct["path"] = "data/host/%s" % tmp_id xenstore.write_record(self, dct) try: resp = _wait_for_agent(self, tmp_id, dct, timeout) except TimeoutError, e: # noqa raise PluginError(e) response = json.loads(resp) if response['returncode'] != 0: return response["message"].split(",") else: return {}
def test_iptables_config(self, mock_json_loads): self.mock_patch_object(self.pluginlib, 'exists', 'fake_cmd_args') self.mock_patch_object(self.pluginlib, 'optional', 'fake_cmd_pro_input') self.host._run_command.return_value = 'fake_run_cmd_resule' mock_json_loads.return_value = ['iptables-save'] expected = json.dumps(dict(out='fake_run_cmd_resule', err='')) ret_str = self.host.iptables_config('fake_session', 'fake_args') self.pluginlib.exists.assert_called_once() self.pluginlib.optional.assert_called_once() mock_json_loads.assert_called_with('fake_cmd_args') self.assertEqual(ret_str, expected) self.host._run_command.assert_called_with( map(str, ['iptables-save']), 'fake_cmd_pro_input')
def test_version_timout_exception(self): tmp_arg_dict = FAKE_ARG_DICT tmp_arg_dict["value"] = json.dumps({"name": "version", "value": "agent"}) request_id = tmp_arg_dict["id"] tmp_arg_dict["path"] = "data/host/%s" % request_id side_effects = [FakeTimeoutException('TIME_OUT')] self.agent.PluginError = FakeTimeoutException self.agent._wait_for_agent.side_effect = side_effects self.assertRaises(self.agent.PluginError, self.agent.version, self.agent, FAKE_ARG_DICT) self.agent._wait_for_agent.assert_called_once() self.agent.xenstore.write_record.assert_called_with(self.agent, tmp_arg_dict)
def test_password_timout_exception(self): tmp_arg_dict = FAKE_ARG_DICT enc_pass = tmp_arg_dict["enc_pass"] tmp_arg_dict["value"] = json.dumps({"name": "password", "value": enc_pass}) request_id = tmp_arg_dict["id"] tmp_arg_dict["path"] = "data/host/%s" % request_id side_effects = [FakeTimeoutException('TIME_OUT')] self.agent.PluginError = FakeTimeoutException self.agent._wait_for_agent.side_effect = side_effects self.assertRaises(self.agent.PluginError, self.agent.password, self.agent, FAKE_ARG_DICT) self.agent._wait_for_agent.assert_called_once() self.agent.xenstore.write_record.assert_called_with(self.agent, tmp_arg_dict)
def test_reset_network_timout_exception(self): tmp_arg_dict = FAKE_ARG_DICT tmp_arg_dict['value'] = json.dumps({'name': 'resetnetwork', 'value': ''}) request_id = tmp_arg_dict['id'] tmp_arg_dict['path'] = "data/host/%s" % request_id side_effects = [FakeTimeoutException('TIME_OUT')] self.agent.PluginError = FakeTimeoutException self.agent._wait_for_agent.side_effect = side_effects self.assertRaises(self.agent.PluginError, self.agent.resetnetwork, self.agent, FAKE_ARG_DICT) self.agent._wait_for_agent.assert_called_once() self.agent.xenstore.write_record.assert_called_with(self.agent, tmp_arg_dict)
def test_inject_file_with_new_agent(self): tmp_arg_dict = FAKE_ARG_DICT request_id = tmp_arg_dict["id"] b64_path = tmp_arg_dict["b64_path"] b64_file = tmp_arg_dict["b64_contents"] self.mock_patch_object(self.agent, '_get_agent_features', 'file_inject') tmp_arg_dict["value"] = json.dumps({"name": "file_inject", "value": {"b64_path": b64_path, "b64_file": b64_file}}) tmp_arg_dict["path"] = "data/host/%s" % request_id self.agent.inject_file(self.agent, FAKE_ARG_DICT) self.agent._wait_for_agent.assert_called_once() self.agent.xenstore.write_record.assert_called_with(self.agent, tmp_arg_dict) self.agent._get_agent_features.assert_called_once()
def test_inject_file_Timeout_exception(self): tmp_arg_dict = FAKE_ARG_DICT request_id = tmp_arg_dict["id"] b64_path = tmp_arg_dict["b64_path"] b64_file = tmp_arg_dict["b64_contents"] tmp_arg_dict["value"] = json.dumps({"name": "file_inject", "value": {"b64_path": b64_path, "b64_file": b64_file}}) tmp_arg_dict["path"] = "data/host/%s" % request_id self.mock_patch_object(self.agent, '_get_agent_features', 'file_inject') side_effects = [FakeTimeoutException('TIME_OUT')] self.agent.PluginError = FakeTimeoutException self.agent._wait_for_agent.side_effect = side_effects self.assertRaises(self.agent.PluginError, self.agent.inject_file, self.agent, FAKE_ARG_DICT) self.agent._wait_for_agent.assert_called_once() self.agent.xenstore.write_record.assert_called_with(self.agent, tmp_arg_dict)
def test_agent_update_timout_exception(self): tmp_arg_dict = FAKE_ARG_DICT request_id = tmp_arg_dict["id"] url = tmp_arg_dict["url"] md5sum = tmp_arg_dict["md5sum"] tmp_arg_dict["value"] = json.dumps({"name": "agentupdate", "value": "%s,%s" % (url, md5sum)}) tmp_arg_dict["path"] = "data/host/%s" % request_id side_effects = [FakeTimeoutException('TIME_OUT')] self.agent.PluginError = FakeTimeoutException self.agent._wait_for_agent.side_effect = side_effects self.assertRaises(self.agent.PluginError, self.agent.agent_update, self.agent, FAKE_ARG_DICT) self.agent._wait_for_agent.assert_called_once() self.agent.xenstore.write_record.assert_called_once()
def generate_adapter(adapter, name='url_for', map_name='url_map'): """Generates the url building function for a map.""" values = { u'server_name': dumps(adapter.server_name), u'script_name': dumps(adapter.script_name), u'subdomain': dumps(adapter.subdomain), u'url_scheme': dumps(adapter.url_scheme), u'name': name, u'map_name': map_name } return u'''\ var %(name)s = %(map_name)s( %(server_name)s, %(script_name)s, %(subdomain)s, %(url_scheme)s );''' % values
def writerow(self, row): """Write row.""" line = [] for s in row: if (type(s) == dict): line.append(json.dumps(s)) else: line.append(unicode(s).encode("utf-8")) self.writer.writerow(line) # Fetch UTF-8 output from the queue ... data = self.queue.getvalue() data = data.decode("utf-8") # ... and reencode it into the target encoding data = self.encoder.encode(data) # write to the target stream self.stream.write(data) # empty queue self.queue.truncate(0)
def get_disqus_sso_payload(user): """Return remote_auth_s3 and api_key for user.""" DISQUS_PUBLIC_KEY = current_app.config.get('DISQUS_PUBLIC_KEY') DISQUS_SECRET_KEY = current_app.config.get('DISQUS_SECRET_KEY') if DISQUS_PUBLIC_KEY and DISQUS_SECRET_KEY: if user: data = simplejson.dumps({ 'id': user.id, 'username': user.name, 'email': user.email_addr, }) else: data = simplejson.dumps({}) # encode the data to base64 message = base64.b64encode(data) # generate a timestamp for signing the message timestamp = int(time.time()) # generate our hmac signature sig = hmac.HMAC(DISQUS_SECRET_KEY, '%s %s' % (message, timestamp), hashlib.sha1).hexdigest() return message, timestamp, sig, DISQUS_PUBLIC_KEY else: return None, None, None, None
def test_rpc_call_with_positional_parameters(http_client, base_url): base_url += "/api" body = json.dumps({"jsonrpc": "2.0", "method": "subtract", "params": [42, 23], "id": 1}) result = yield http_client.fetch(base_url, method="POST", body=body, headers={"content-type": "application/json"}) assert result.code == 200 response_body = json.loads(result.body) assert response_body["jsonrpc"] == "2.0" assert response_body["result"] == 19 assert response_body["id"] == 1 body = json.dumps({"jsonrpc": "2.0", "method": "subtract", "params": [23, 42], "id": 2}) result = yield http_client.fetch(base_url, method="POST", body=body, headers={"content-type": "application/json"}) assert result.code == 200 response_body = json.loads(result.body) assert response_body["jsonrpc"] == "2.0" assert response_body["result"] == -19 assert response_body["id"] == 2
def test_rpc_call_with_named_parameters(http_client, base_url): base_url += "/api" body = json.dumps( {"jsonrpc": "2.0", "method": "subtract", "params": {"a": 42, "b": 23}, "id": 3}) result = yield http_client.fetch(base_url, method="POST", body=body, headers={"content-type": "application/json"}) assert result.code == 200 response_body = json.loads(result.body) assert response_body["jsonrpc"] == "2.0" assert response_body["result"] == 19 assert response_body["id"] == 3 body = json.dumps( {"jsonrpc": "2.0", "method": "subtract", "params": {"b": 42, "a": 23}, "id": 4}) result = yield http_client.fetch(base_url, method="POST", body=body, headers={"content-type": "application/json"}) assert result.code == 200 response_body = json.loads(result.body) assert response_body["jsonrpc"] == "2.0" assert response_body["result"] == -19 assert response_body["id"] == 4
def test_simple_call_no_parameters(http_client, base_url): base_url += "/api" body = { "id": 1, "method": "say_hello", "jsonrpc": "2.0" } result = yield http_client.fetch(base_url, method="POST", body=json.dumps(body), headers={"content-type": "application/json"}) assert result.code == 200 resp_body = json.loads(result.body) assert resp_body["id"] == 1 assert resp_body["error"] is None assert resp_body["result"] == "hello" assert resp_body["jsonrpc"] == "2.0"
def test_simple_call_keyword_arguments(http_client, base_url): base_url += "/api" body = { "id": 1, "method": "subtract", "params": {"b": 20, "a": 10}, "jsonrpc": "2.0" } result = yield http_client.fetch(base_url, method="POST", body=json.dumps(body), headers={"content-type": "application/json"}) assert result.code == 200 resp_body = json.loads(result.body) assert resp_body["id"] == 1 assert resp_body["error"] is None assert resp_body["result"] == -10 assert resp_body["jsonrpc"] == "2.0"
def test_simple_call_variable_length_positional_arguments(http_client, base_url): base_url += "/api" body = { "id": 1, "method": "sum", "params": list(range(0, 100)), "jsonrpc": "2.0" } result = yield http_client.fetch(base_url, method="POST", body=json.dumps(body), headers={"content-type": "application/json"}) assert result.code == 200 resp_body = json.loads(result.body) assert resp_body["id"] == 1 assert resp_body["error"] is None assert resp_body["result"] == 4950 assert resp_body["jsonrpc"] == "2.0"
def test_simple_call_raise_exception(http_client, base_url): base_url += "/api" body = { "id": 1, "method": "divide", "params": {"a": 10, "b": 0}, "jsonrpc": "2.0" } result = yield http_client.fetch(base_url, method="POST", body=json.dumps(body), headers={"content-type": "application/json"}) assert result.code == 200 resp_body = json.loads(result.body) assert resp_body["id"] == 1 assert resp_body["result"] is None assert resp_body["error"] is not None assert resp_body["error"]["code"] == -32603 assert resp_body["error"]["message"] == "Internal error" assert resp_body["error"]["data"]["class"] == "ZeroDivisionError" assert resp_body["error"]["data"]["info"] == "division by zero" assert resp_body["jsonrpc"] == "2.0"
def test_simple_call_wrong_arguments_too_few_positional_arguments(http_client, base_url): base_url += "/api" body = { "id": 1, "method": "subtract", "params": [10], "jsonrpc": "2.0" } result = yield http_client.fetch(base_url, method="POST", body=json.dumps(body), headers={"content-type": "application/json"}) assert result.code == 200 resp_body = json.loads(result.body) print(resp_body) assert resp_body["id"] == 1 assert resp_body["result"] is None assert resp_body["error"] is not None assert resp_body["error"]["code"] == -32602 assert resp_body["error"]["message"] == "Invalid params" assert resp_body["jsonrpc"] == "2.0"
def test_simple_call_wrong_arguments_too_many_positional_arguments(http_client, base_url): base_url += "/api" body = { "id": 1, "method": "subtract", "params": [10, 20, 30], "jsonrpc": "2.0" } result = yield http_client.fetch(base_url, method="POST", body=json.dumps(body), headers={"content-type": "application/json"}) assert result.code == 200 resp_body = json.loads(result.body) print(resp_body) assert resp_body["id"] == 1 assert resp_body["result"] is None assert resp_body["error"] is not None assert resp_body["error"]["code"] == -32602 assert resp_body["error"]["message"] == "Invalid params" assert resp_body["jsonrpc"] == "2.0"
def test_simple_call_wrong_arguments_too_many_keyword_arguments(http_client, base_url): base_url += "/api" body = { "id": 1, "method": "subtract", "params": {"a": 10, "b": 20, "c": 30}, "jsonrpc": "2.0" } result = yield http_client.fetch(base_url, method="POST", body=json.dumps(body), headers={"content-type": "application/json"}) assert result.code == 200 resp_body = json.loads(result.body) print(resp_body) assert resp_body["id"] == 1 assert resp_body["result"] is None assert resp_body["error"] is not None assert resp_body["error"]["code"] == -32602 assert resp_body["error"]["message"] == "Invalid params" assert resp_body["jsonrpc"] == "2.0"
def test_simple_call_wrong_arguments_no_arguments(http_client, base_url): base_url += "/api" body = { "id": 1, "method": "subtract", "jsonrpc": "2.0" } result = yield http_client.fetch(base_url, method="POST", body=json.dumps(body), headers={"content-type": "application/json"}) assert result.code == 200 resp_body = json.loads(result.body) print(resp_body) assert resp_body["id"] == 1 assert resp_body["result"] is None assert resp_body["error"] is not None assert resp_body["error"]["code"] == -32602 assert resp_body["error"]["message"] == "Invalid params" assert resp_body["jsonrpc"] == "2.0"
def test_notification_valid(http_client, base_url): base_url += "/api" body = { "method": "subtract", "params": [20, 10], "jsonrpc": "2.0" } result = yield http_client.fetch(base_url, method="POST", body=json.dumps(body), headers={"content-type": "application/json"}) assert result.code == 200 resp_body = json.loads(result.body) print(resp_body) assert "id" not in resp_body assert resp_body["result"] is None assert resp_body["error"] is None assert resp_body["jsonrpc"] == "2.0"
def test_notification_method_not_found(http_client, base_url): base_url += "/api" body = { "method": "foobar", "params": [20, 10], "jsonrpc": "2.0" } result = yield http_client.fetch(base_url, method="POST", body=json.dumps(body), headers={"content-type": "application/json"}) assert result.code == 200 resp_body = json.loads(result.body) print(resp_body) assert "id" not in resp_body assert resp_body["result"] is None assert resp_body["error"] is None assert resp_body["jsonrpc"] == "2.0"
def test_notification_invalid_params(http_client, base_url): base_url += "/api" body = { "method": "divide", "params": [20, 10, 30], "jsonrpc": "2.0" } result = yield http_client.fetch(base_url, method="POST", body=json.dumps(body), headers={"content-type": "application/json"}) assert result.code == 200 resp_body = json.loads(result.body) print(resp_body) assert "id" not in resp_body assert resp_body["result"] is None assert resp_body["error"] is None assert resp_body["jsonrpc"] == "2.0"
def test_private_call_wrong_token(http_client, base_url): base_url += "/api" body = { "id": 3, "method": "private_sum", "params": [13, -13], "jsonrpc": "2.0" } result = yield http_client.fetch(base_url, method="POST", body=json.dumps(body), headers={"content-type": "application/json", "X-Testing-Token": "bad_token"}) assert result.code == 200 resp_body = json.loads(result.body) print(resp_body) assert resp_body["id"] == 3 assert resp_body["result"] is None assert resp_body["error"] is not None assert resp_body["error"]["code"] == -32001 assert resp_body["error"]["message"] == "Access denied" assert resp_body["jsonrpc"] == "2.0"
def test_private_call_wrong_token_and_wrong_parameters(http_client, base_url): # without a valid token, somebody should not be able to get information # about certain methods, such as the parameters base_url += "/api" body = { "id": 3, "method": "private_sum", "params": [13], "jsonrpc": "2.0" } result = yield http_client.fetch(base_url, method="POST", body=json.dumps(body), headers={"content-type": "application/json", "X-Testing-Token": "bad_token"}) assert result.code == 200 resp_body = json.loads(result.body) print(resp_body) assert resp_body["id"] == 3 assert resp_body["result"] is None assert resp_body["error"] is not None assert resp_body["error"]["code"] == -32001 assert resp_body["error"]["message"] == "Access denied" assert resp_body["jsonrpc"] == "2.0"
def test_private_call_ok_token(http_client, base_url): # without a valid token, somebody should not be able to get information # about certain methods, such as the parameters base_url += "/api" body = { "id": 3, "method": "private_sum", "params": [13, -13], "jsonrpc": "2.0" } result = yield http_client.fetch(base_url, method="POST", body=json.dumps(body), headers={"content-type": "application/json", "X-Testing-Token": "testing_token"}) assert result.code == 200 resp_body = json.loads(result.body) print(resp_body) assert resp_body["id"] == 3 assert resp_body["result"] == 0 assert resp_body["error"] is None assert resp_body["jsonrpc"] == "2.0"
def test_private_call_ok_token_wrong_parameters(http_client, base_url): # without a valid token, somebody should not be able to get information # about certain methods, such as the parameters base_url += "/api" body = { "id": 3, "method": "private_sum", "params": [13], "jsonrpc": "2.0" } result = yield http_client.fetch(base_url, method="POST", body=json.dumps(body), headers={"content-type": "application/json", "X-Testing-Token": "testing_token"}) assert result.code == 200 resp_body = json.loads(result.body) print(resp_body) assert resp_body["id"] == 3 assert resp_body["result"] is None assert resp_body["error"] is not None assert resp_body["error"]["code"] == -32602 assert resp_body["error"]["message"] == "Invalid params" assert resp_body["jsonrpc"] == "2.0"
def connect(self, message): try: webssh = Global_Config.objects.get(id=1).webssh except Exception,ex: webssh = 0 if webssh != 1: self.message.reply_channel.send({"text":json.dumps(['stdout','\033[1;3;31m????????WebSSH??~\033[0m'])},immediately=True) self.disconnect(message) if self.message.user: #?????? user = User.objects.get(username=self.message.user) if user.is_superuser: self.message.reply_channel.send({"accept": True}) else: try: sid = int(self.message.content.get('query_string').split('=')[1]) except Exception: self.message.reply_channel.send({"text":json.dumps(['stdout','\033[1;3;31m"???????????"\033[0m'])},immediately=True) self.disconnect(message) server = User_Server.objects.get(user_id=user.id,server_id=sid) if server:self.message.reply_channel.send({"accept": True}) else: self.disconnect(message)
def posix_shell(chan,channel,log_name=None,width=90,height=40): from OpsManage.asgi import channel_layer stdout = list() begin_time = time.time() last_write_time = {'last_activity_time':begin_time} try: chan.settimeout(0.0) while True: try: x = u(chan.recv(1024)) if len(x) == 0: channel_layer.send(channel, {'text': json.dumps(['disconnect',smart_unicode('\r\n*** EOF\r\n')]) }) break now = time.time() delay = now - last_write_time['last_activity_time'] last_write_time['last_activity_time'] = now if x == "exit\r\n" or x == "logout\r\n" or x == 'logout': chan.close() else: stdout.append([delay,codecs.getincrementaldecoder('UTF-8')('replace').decode(x)]) channel_layer.send(channel, {'text': json.dumps(['stdout',smart_unicode(x)]) }) except socket.timeout: pass except Exception,e: channel_layer.send(channel, {'text': json.dumps(['stdout','A bug find,You can report it to me' + smart_unicode(e)]) }) finally: pass
def set(cls, model, domain, child_name, nodes, selected_nodes): # Normalize the json domain domain = json.dumps(json.loads(domain)) current_user = Transaction().user records = cls.search([ ('user', '=', current_user), ('model', '=', model), ('domain', '=', domain), ('child_name', '=', child_name), ]) cls.delete(records) cls.create([{ 'user': current_user, 'model': model, 'domain': domain, 'child_name': child_name, 'nodes': nodes, 'selected_nodes': selected_nodes, }])
def get(cls, model, domain, child_name): # Normalize the json domain domain = json.dumps(json.loads(domain)) current_user = Transaction().user try: expanded_info, = cls.search([ ('user', '=', current_user), ('model', '=', model), ('domain', '=', domain), ('child_name', '=', child_name), ], limit=1) except ValueError: return (cls.default_nodes(), cls.default_selected_nodes()) state = cls(expanded_info) return (state.nodes or cls.default_nodes(), state.selected_nodes or cls.default_selected_nodes())
def write_to_cache(self): ''' Writes data in JSON format to a file ''' data = {'data': self.data, 'inventory': self.inventory} json_data = json.dumps(data, sort_keys=True, indent=2) cache = open(self.cache_filename, 'w') cache.write(json_data) cache.close() ########################################################################### # Utilities ###########################################################################
def json_format_dict(self, data, pretty=False): ''' Converts a dict to a JSON object and dumps it as a formatted string ''' if pretty: return json.dumps(data, sort_keys=True, indent=2) else: return json.dumps(data) # Run the script