我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用simplejson.loads()。
def _assert_logged_info_correct( self, mock_logger, messages_already_published, topic, topic_offsets, message_count ): assert mock_logger.info.call_count == 1 (log_line,), _ = mock_logger.info.call_args logged_info = json.loads(log_line) assert logged_info['topic'] == topic assert logged_info['message_count'] == message_count assert logged_info['saved_offset'] == topic_offsets.get(topic, 0) assert logged_info['already_published_count'] == messages_already_published assert logged_info['high_watermark'] == ( topic_offsets.get(topic, 0) + messages_already_published )
def get_cookies(self): for cookie_file in self.cookie_files: with create_local_copy(cookie_file) as tmp_cookie_file: con = sqlite3.connect(tmp_cookie_file) cur = con.cursor() cur.execute('select host, path, isSecure, expiry, name, value from moz_cookies') for item in cur.fetchall(): yield create_cookie(*item) con.close() # current sessions are saved in sessionstore.js session_file = os.path.join(os.path.dirname(cookie_file), 'sessionstore.js') if os.path.exists(session_file): try: json_data = json.loads(open(session_file, 'rb').read().decode('utf-8')) except ValueError as e: print('Error parsing firefox session JSON:', str(e)) else: expires = str(int(time.time()) + 3600 * 24 * 7) for window in json_data.get('windows', []): for cookie in window.get('cookies', []): yield create_cookie(cookie.get('host', ''), cookie.get('path', ''), False, expires, cookie.get('name', ''), cookie.get('value', '')) else: print('Firefox session filename does not exist:', session_file)
def test_namedtuple_dump(self): for v in [Value(1), Point(1, 2), DuckValue(1), DuckPoint(1, 2)]: d = v._asdict() sio = StringIO() json.dump(v, sio) self.assertEqual(d, json.loads(sio.getvalue())) sio = StringIO() json.dump(v, sio, namedtuple_as_object=True) self.assertEqual( d, json.loads(sio.getvalue())) sio = StringIO() json.dump(v, sio, tuple_as_array=False) self.assertEqual(d, json.loads(sio.getvalue())) sio = StringIO() json.dump(v, sio, namedtuple_as_object=True, tuple_as_array=False) self.assertEqual( d, json.loads(sio.getvalue()))
def test_invalid_escape_sequences(self): # incomplete escape sequence self.assertRaises(json.JSONDecodeError, json.loads, '"\\u') self.assertRaises(json.JSONDecodeError, json.loads, '"\\u1') self.assertRaises(json.JSONDecodeError, json.loads, '"\\u12') self.assertRaises(json.JSONDecodeError, json.loads, '"\\u123') self.assertRaises(json.JSONDecodeError, json.loads, '"\\u1234') # invalid escape sequence self.assertRaises(json.JSONDecodeError, json.loads, '"\\u123x"') self.assertRaises(json.JSONDecodeError, json.loads, '"\\u12x4"') self.assertRaises(json.JSONDecodeError, json.loads, '"\\u1x34"') self.assertRaises(json.JSONDecodeError, json.loads, '"\\ux234"') if sys.maxunicode > 65535: # invalid escape sequence for low surrogate self.assertRaises(json.JSONDecodeError, json.loads, '"\\ud800\\u"') self.assertRaises(json.JSONDecodeError, json.loads, '"\\ud800\\u0"') self.assertRaises(json.JSONDecodeError, json.loads, '"\\ud800\\u00"') self.assertRaises(json.JSONDecodeError, json.loads, '"\\ud800\\u000"') self.assertRaises(json.JSONDecodeError, json.loads, '"\\ud800\\u000x"') self.assertRaises(json.JSONDecodeError, json.loads, '"\\ud800\\u00x0"') self.assertRaises(json.JSONDecodeError, json.loads, '"\\ud800\\u0x00"') self.assertRaises(json.JSONDecodeError, json.loads, '"\\ud800\\ux000"')
def test_object_pairs_hook(self): s = '{"xkd":1, "kcw":2, "art":3, "hxm":4, "qrt":5, "pad":6, "hoy":7}' p = [("xkd", 1), ("kcw", 2), ("art", 3), ("hxm", 4), ("qrt", 5), ("pad", 6), ("hoy", 7)] self.assertEqual(json.loads(s), eval(s)) self.assertEqual(json.loads(s, object_pairs_hook=lambda x: x), p) self.assertEqual(json.load(StringIO(s), object_pairs_hook=lambda x: x), p) od = json.loads(s, object_pairs_hook=OrderedDict) self.assertEqual(od, OrderedDict(p)) self.assertEqual(type(od), OrderedDict) # the object_pairs_hook takes priority over the object_hook self.assertEqual(json.loads(s, object_pairs_hook=OrderedDict, object_hook=lambda x: None), OrderedDict(p))
def run_command(session, args): cmd = json.loads(args.get('cmd')) if cmd and cmd[0] not in ALLOWED_CMDS: msg = _("Dom0 execution of '%s' is not permitted") % cmd[0] raise PluginError(msg) returncode, out, err = _run_command( cmd, json.loads(args.get('cmd_input', 'null'))) if not err: err = "" if not out: out = "" # This runs in Dom0, will return to neutron-ovs-agent in compute node result = {'returncode': returncode, 'out': out, 'err': err} return json.dumps(result)
def _get_agent_features(self, arg_dict): """Return an array of features that an agent supports.""" timeout = int(arg_dict.pop('timeout', DEFAULT_TIMEOUT)) tmp_id = commands.getoutput("uuidgen") dct = {} dct.update(arg_dict) dct["value"] = json.dumps({"name": "features", "value": ""}) dct["path"] = "data/host/%s" % tmp_id xenstore.write_record(self, dct) try: resp = _wait_for_agent(self, tmp_id, dct, timeout) except TimeoutError, e: # noqa raise PluginError(e) response = json.loads(resp) if response['returncode'] != 0: return response["message"].split(",") else: return {}
def set_config(self, arg_dict): """Write the specified key/value pair, overwriting any existing value.""" conf = _get_config_dict() params = arg_dict["params"] try: dct = json.loads(params) except Exception: dct = params key = dct["key"] val = dct["value"] if val is None: # Delete the key, if present conf.pop(key, None) else: conf.update({key: val}) _write_config_dict(conf)
def iptables_config(session, args): # command should be either save or restore logging.debug("iptables_config:enter") logging.debug("iptables_config: args=%s", args) cmd_args = pluginlib.exists(args, 'cmd_args') logging.debug("iptables_config: cmd_args=%s", cmd_args) process_input = pluginlib.optional(args, 'process_input') logging.debug("iptables_config: process_input=%s", process_input) cmd = json.loads(cmd_args) cmd = map(str, cmd) # either execute iptable-save or iptables-restore # command must be only one of these two # process_input must be used only with iptables-restore if len(cmd) > 0 and cmd[0] in ('iptables-save', 'iptables-restore', 'ip6tables-save', 'ip6tables-restore'): result = _run_command(cmd, process_input) ret_str = json.dumps(dict(out=result, err='')) logging.debug("iptables_config:exit") return ret_str # else don't do anything and return an error else: raise pluginlib.PluginError("Invalid iptables command")
def load_payload(self, payload, serializer=None): """Loads the encoded object. This function raises :class:`BadPayload` if the payload is not valid. The `serializer` parameter can be used to override the serializer stored on the class. The encoded payload is always byte based. """ if serializer is None: serializer = self.serializer is_text = self.is_text_serializer else: is_text = is_text_serializer(serializer) try: if is_text: payload = payload.decode('utf-8') return serializer.loads(payload) except Exception as e: raise BadPayload('Could not load the payload because an ' 'exception occurred on unserializing the data', original_error=e)
def zone_exists(module, base_url, zone): ''' Check if zone is configured in PowerDNS. Return kind of zone (native, master, slave) uppercased or None ''' url = "{0}/{1}".format(base_url, zone) response, info = fetch_url(module, url, headers=headers) if info['status'] == 422: # not found return None if info['status'] != 200: module.fail_json(msg="failed to check zone %s at %s: %s" % (zone, url, info['msg'])) content = response.read() data = json.loads(content) kind = data.get('kind', None) if kind is not None: kind = kind.upper() return kind
def zone_list(module, base_url, zone=None): ''' Return list of existing zones ''' list = [] url = "{0}".format(base_url) response, info = fetch_url(module, url, headers=headers) if info['status'] != 200: module.fail_json(msg="failed to enumerate zones at %s: %s" % (url, info['msg'])) content = response.read() data = json.loads(content) for z in data: if zone is None or fnmatch.fnmatch(z['name'], zone): list.append({ 'name' : z['name'], 'kind' : z['kind'].lower(), 'serial' : z['serial'], }) return list
def loads(s, *args, **kwargs): """Helper function that wraps :class:`json.loads`. Automatically passes the object_hook for BSON type conversion. :Parameters: - `compile_re` (optional): if ``False``, don't attempt to compile BSON regular expressions into Python regular expressions. Return instances of :class:`~bson.bsonregex.BSONRegex` instead. .. versionchanged:: 2.7 Added ``compile_re`` option. """ if not json_lib: raise Exception("No json library available") compile_re = kwargs.pop('compile_re', True) kwargs['object_hook'] = lambda dct: object_hook(dct, compile_re) return json.loads(s, *args, **kwargs)
def on_message(self, message): data = json.loads(message) pattern = data.get('pattern', '') self.command = data.get('command', '') self.asset_name_str = '' if pattern and self.command: for inv in self.runner.inventory.get_hosts(pattern=pattern): self.asset_name_str += '%s ' % inv.name self.write_message('????: ' + self.asset_name_str) self.write_message('<span style="color: yellow">Ansible> %s</span>\n\n' % self.command) self.__class__.tasks.append(MyThread(target=self.run_cmd, args=(self.command, pattern))) for t in self.__class__.tasks: if t.is_alive(): continue try: t.setDaemon(True) t.start() except RuntimeError: pass
def test_rpc_call_with_positional_parameters(http_client, base_url): base_url += "/api" body = json.dumps({"jsonrpc": "2.0", "method": "subtract", "params": [42, 23], "id": 1}) result = yield http_client.fetch(base_url, method="POST", body=body, headers={"content-type": "application/json"}) assert result.code == 200 response_body = json.loads(result.body) assert response_body["jsonrpc"] == "2.0" assert response_body["result"] == 19 assert response_body["id"] == 1 body = json.dumps({"jsonrpc": "2.0", "method": "subtract", "params": [23, 42], "id": 2}) result = yield http_client.fetch(base_url, method="POST", body=body, headers={"content-type": "application/json"}) assert result.code == 200 response_body = json.loads(result.body) assert response_body["jsonrpc"] == "2.0" assert response_body["result"] == -19 assert response_body["id"] == 2
def test_rpc_call_with_named_parameters(http_client, base_url): base_url += "/api" body = json.dumps( {"jsonrpc": "2.0", "method": "subtract", "params": {"a": 42, "b": 23}, "id": 3}) result = yield http_client.fetch(base_url, method="POST", body=body, headers={"content-type": "application/json"}) assert result.code == 200 response_body = json.loads(result.body) assert response_body["jsonrpc"] == "2.0" assert response_body["result"] == 19 assert response_body["id"] == 3 body = json.dumps( {"jsonrpc": "2.0", "method": "subtract", "params": {"b": 42, "a": 23}, "id": 4}) result = yield http_client.fetch(base_url, method="POST", body=body, headers={"content-type": "application/json"}) assert result.code == 200 response_body = json.loads(result.body) assert response_body["jsonrpc"] == "2.0" assert response_body["result"] == -19 assert response_body["id"] == 4
def test_simple_call_no_parameters(http_client, base_url): base_url += "/api" body = { "id": 1, "method": "say_hello", "jsonrpc": "2.0" } result = yield http_client.fetch(base_url, method="POST", body=json.dumps(body), headers={"content-type": "application/json"}) assert result.code == 200 resp_body = json.loads(result.body) assert resp_body["id"] == 1 assert resp_body["error"] is None assert resp_body["result"] == "hello" assert resp_body["jsonrpc"] == "2.0"
def test_simple_call_positional_arguments(http_client, base_url): base_url += "/api" body = { "id": 1, "method": "subtract", "params": [10, 20], "jsonrpc": "2.0" } result = yield http_client.fetch(base_url, method="POST", body=json.dumps(body), headers={"content-type": "application/json"}) assert result.code == 200 resp_body = json.loads(result.body) assert resp_body["id"] == 1 assert resp_body["error"] is None assert resp_body["result"] == -10 assert resp_body["jsonrpc"] == "2.0"
def test_simple_call_keyword_arguments(http_client, base_url): base_url += "/api" body = { "id": 1, "method": "subtract", "params": {"b": 20, "a": 10}, "jsonrpc": "2.0" } result = yield http_client.fetch(base_url, method="POST", body=json.dumps(body), headers={"content-type": "application/json"}) assert result.code == 200 resp_body = json.loads(result.body) assert resp_body["id"] == 1 assert resp_body["error"] is None assert resp_body["result"] == -10 assert resp_body["jsonrpc"] == "2.0"
def test_simple_call_raise_exception(http_client, base_url): base_url += "/api" body = { "id": 1, "method": "divide", "params": {"a": 10, "b": 0}, "jsonrpc": "2.0" } result = yield http_client.fetch(base_url, method="POST", body=json.dumps(body), headers={"content-type": "application/json"}) assert result.code == 200 resp_body = json.loads(result.body) assert resp_body["id"] == 1 assert resp_body["result"] is None assert resp_body["error"] is not None assert resp_body["error"]["code"] == -32603 assert resp_body["error"]["message"] == "Internal error" assert resp_body["error"]["data"]["class"] == "ZeroDivisionError" assert resp_body["error"]["data"]["info"] == "division by zero" assert resp_body["jsonrpc"] == "2.0"
def test_simple_call_wrong_arguments_too_few_positional_arguments(http_client, base_url): base_url += "/api" body = { "id": 1, "method": "subtract", "params": [10], "jsonrpc": "2.0" } result = yield http_client.fetch(base_url, method="POST", body=json.dumps(body), headers={"content-type": "application/json"}) assert result.code == 200 resp_body = json.loads(result.body) print(resp_body) assert resp_body["id"] == 1 assert resp_body["result"] is None assert resp_body["error"] is not None assert resp_body["error"]["code"] == -32602 assert resp_body["error"]["message"] == "Invalid params" assert resp_body["jsonrpc"] == "2.0"
def test_simple_call_wrong_arguments_too_many_positional_arguments(http_client, base_url): base_url += "/api" body = { "id": 1, "method": "subtract", "params": [10, 20, 30], "jsonrpc": "2.0" } result = yield http_client.fetch(base_url, method="POST", body=json.dumps(body), headers={"content-type": "application/json"}) assert result.code == 200 resp_body = json.loads(result.body) print(resp_body) assert resp_body["id"] == 1 assert resp_body["result"] is None assert resp_body["error"] is not None assert resp_body["error"]["code"] == -32602 assert resp_body["error"]["message"] == "Invalid params" assert resp_body["jsonrpc"] == "2.0"
def test_simple_call_wrong_arguments_too_few_keyword_arguments(http_client, base_url): base_url += "/api" body = { "id": 1, "method": "subtract", "params": {"a": 10}, "jsonrpc": "2.0" } result = yield http_client.fetch(base_url, method="POST", body=json.dumps(body), headers={"content-type": "application/json"}) assert result.code == 200 resp_body = json.loads(result.body) print(resp_body) assert resp_body["id"] == 1 assert resp_body["result"] is None assert resp_body["error"] is not None assert resp_body["error"]["code"] == -32602 assert resp_body["error"]["message"] == "Invalid params" assert resp_body["jsonrpc"] == "2.0"
def test_simple_call_wrong_arguments_too_many_keyword_arguments(http_client, base_url): base_url += "/api" body = { "id": 1, "method": "subtract", "params": {"a": 10, "b": 20, "c": 30}, "jsonrpc": "2.0" } result = yield http_client.fetch(base_url, method="POST", body=json.dumps(body), headers={"content-type": "application/json"}) assert result.code == 200 resp_body = json.loads(result.body) print(resp_body) assert resp_body["id"] == 1 assert resp_body["result"] is None assert resp_body["error"] is not None assert resp_body["error"]["code"] == -32602 assert resp_body["error"]["message"] == "Invalid params" assert resp_body["jsonrpc"] == "2.0"
def test_notification_valid(http_client, base_url): base_url += "/api" body = { "method": "subtract", "params": [20, 10], "jsonrpc": "2.0" } result = yield http_client.fetch(base_url, method="POST", body=json.dumps(body), headers={"content-type": "application/json"}) assert result.code == 200 resp_body = json.loads(result.body) print(resp_body) assert "id" not in resp_body assert resp_body["result"] is None assert resp_body["error"] is None assert resp_body["jsonrpc"] == "2.0"
def test_notification_method_not_found(http_client, base_url): base_url += "/api" body = { "method": "foobar", "params": [20, 10], "jsonrpc": "2.0" } result = yield http_client.fetch(base_url, method="POST", body=json.dumps(body), headers={"content-type": "application/json"}) assert result.code == 200 resp_body = json.loads(result.body) print(resp_body) assert "id" not in resp_body assert resp_body["result"] is None assert resp_body["error"] is None assert resp_body["jsonrpc"] == "2.0"
def test_notification_invalid_params(http_client, base_url): base_url += "/api" body = { "method": "divide", "params": [20, 10, 30], "jsonrpc": "2.0" } result = yield http_client.fetch(base_url, method="POST", body=json.dumps(body), headers={"content-type": "application/json"}) assert result.code == 200 resp_body = json.loads(result.body) print(resp_body) assert "id" not in resp_body assert resp_body["result"] is None assert resp_body["error"] is None assert resp_body["jsonrpc"] == "2.0"
def test_notification_internal_error(http_client, base_url): base_url += "/api" body = { "method": "divide", "params": [20, 0], "jsonrpc": "2.0" } result = yield http_client.fetch(base_url, method="POST", body=json.dumps(body), headers={"content-type": "application/json"}) assert result.code == 200 resp_body = json.loads(result.body) print(resp_body) assert "id" not in resp_body assert resp_body["result"] is None assert resp_body["error"] is None assert resp_body["jsonrpc"] == "2.0"
def test_private_call_wrong_token(http_client, base_url): base_url += "/api" body = { "id": 3, "method": "private_sum", "params": [13, -13], "jsonrpc": "2.0" } result = yield http_client.fetch(base_url, method="POST", body=json.dumps(body), headers={"content-type": "application/json", "X-Testing-Token": "bad_token"}) assert result.code == 200 resp_body = json.loads(result.body) print(resp_body) assert resp_body["id"] == 3 assert resp_body["result"] is None assert resp_body["error"] is not None assert resp_body["error"]["code"] == -32001 assert resp_body["error"]["message"] == "Access denied" assert resp_body["jsonrpc"] == "2.0"
def test_private_call_ok_token(http_client, base_url): # without a valid token, somebody should not be able to get information # about certain methods, such as the parameters base_url += "/api" body = { "id": 3, "method": "private_sum", "params": [13, -13], "jsonrpc": "2.0" } result = yield http_client.fetch(base_url, method="POST", body=json.dumps(body), headers={"content-type": "application/json", "X-Testing-Token": "testing_token"}) assert result.code == 200 resp_body = json.loads(result.body) print(resp_body) assert resp_body["id"] == 3 assert resp_body["result"] == 0 assert resp_body["error"] is None assert resp_body["jsonrpc"] == "2.0"
def test_private_call_ok_token_wrong_parameters(http_client, base_url): # without a valid token, somebody should not be able to get information # about certain methods, such as the parameters base_url += "/api" body = { "id": 3, "method": "private_sum", "params": [13], "jsonrpc": "2.0" } result = yield http_client.fetch(base_url, method="POST", body=json.dumps(body), headers={"content-type": "application/json", "X-Testing-Token": "testing_token"}) assert result.code == 200 resp_body = json.loads(result.body) print(resp_body) assert resp_body["id"] == 3 assert resp_body["result"] is None assert resp_body["error"] is not None assert resp_body["error"]["code"] == -32602 assert resp_body["error"]["message"] == "Invalid params" assert resp_body["jsonrpc"] == "2.0"
def get_keys(cls, records): pool = Pool() Config = pool.get('ir.configuration') keys = [] for record in records: new_key = { 'id': record.id, 'name': record.name, 'string': record.string, 'type_': record.type_, } if record.type_ == 'selection': with Transaction().set_context(language=Config.get_language()): english_key = cls(record.id) selection = OrderedDict(json.loads( english_key.selection_json)) selection.update(dict(json.loads(record.selection_json))) new_key['selection'] = selection.items() new_key['sorted'] = record.selection_sorted elif record.type_ in ('float', 'numeric'): new_key['digits'] = (16, record.digits) keys.append(new_key) return keys
def _create_json_typecasters(oid, array_oid, loads=None, name='JSON'): """Create typecasters for json data type.""" if loads is None: if json is None: raise ImportError("no json module available") else: loads = json.loads def typecast_json(s, cur): if s is None: return None return loads(s) JSON = new_type((oid, ), name, typecast_json) if array_oid is not None: JSONARRAY = new_array_type((array_oid, ), "%sARRAY" % name, JSON) else: JSONARRAY = None return JSON, JSONARRAY
def _query(self, path, before=None, after=None): res = [] url = '%s/lookup/%s' % (self.server, path) params = {} if self.limit: params['limit'] = self.limit if before and after: params['time_first_after'] = after params['time_last_before'] = before else: if before: params['time_first_before'] = before if after: params['time_last_after'] = after if params: url += '?{0}'.format(urllib.urlencode(params)) req = urllib2.Request(url) req.add_header('Accept', 'application/json') req.add_header('X-Api-Key', self.apikey) http = urllib2.urlopen(req) while True: line = http.readline() if not line: break yield json.loads(line)
def load_from_cache(self): ''' Reads the data from the cache file and assigns it to member variables as Python Objects''' try: cache = open(self.cache_filename, 'r') json_data = cache.read() cache.close() data = json.loads(json_data) except IOError: data = {'data': {}, 'inventory': {}} self.data = data['data'] self.inventory = data['inventory']
def load_index_from_cache(self): ''' Reads the index from the cache file sets self.index ''' cache = open(self.cache_path_index, 'r') json_index = cache.read() self.index = json.loads(json_index)
def parse_lsblk_json(output): def get_names(devices): names = [] for d in devices: names.append(d['name']) names.extend(get_names(d.get('children', []))) return names return set(get_names(json.loads(output)['blockdevices']))
def _assert_schema_values(self, actual, expected_resp): attrs = ( 'schema_id', 'base_schema_id', 'status', 'primary_keys', 'created_at', 'updated_at' ) self._assert_equal_multi_attrs(actual, expected_resp, *attrs) assert actual.schema_json == simplejson.loads(expected_resp.schema) self._assert_topic_values(actual.topic, expected_resp.topic) self._assert_note_values(actual.note, expected_resp.note)
def biz_schema_two(self, biz_schema): new_schema = simplejson.loads(biz_schema.schema) new_schema['fields'].append( {'type': 'int', 'doc': 'test', 'name': 'bar', 'default': 0} ) return self._register_avro_schema( namespace=biz_schema.topic.source.namespace.name, source=biz_schema.topic.source.name, schema=simplejson.dumps(new_schema) )
def example_compatible_schema(example_schema): schema = simplejson.loads(example_schema) schema['fields'].append({ "type": "int", "name": "good_compatible_field", "doc": "test", "default": 1 }) return simplejson.dumps(schema)
def example_non_compatible_schema(example_schema): schema = simplejson.loads(example_schema) schema['fields'].append({ 'doc': 'test', 'type': 'string', 'name': 'good_non_compatible_field' }) return simplejson.dumps(schema)