我们从Python开源项目中,提取了以下33个代码示例,用于说明如何使用json.decoder()。
def split_buffer(stream, splitter=None, decoder=lambda a: a): """Given a generator which yields strings and a splitter function, joins all input, splits on the separator and yields each chunk. Unlike string.split(), each chunk includes the trailing separator, except for the last one if none was found on the end of the input. """ splitter = splitter or line_splitter buffered = six.text_type('') for data in stream_as_text(stream): buffered += data while True: buffer_split = splitter(buffered) if buffer_split is None: break item, buffered = buffer_split yield item if buffered: yield decoder(buffered)
def embed_from_json(self, ctx, *, js_text: str): """Send an embed from JSON. Usage: embed_from_json [json]""" echeck_perms(ctx, ('bot_owner',)) class SemiEmbed: def __init__(self, obj): self.obj = obj def to_dict(self): return self.obj try: embed_obj = json.loads(js_text) except json.decoder.JSONDecodeError: await ctx.send(':warning: **Invalid JSON data!**') else: sembed = SemiEmbed(embed_obj) try: await ctx.send(embed=sembed) except discord.HTTPException as e: if '400' in str(e): await ctx.send(':warning: **Couldn\'t send embed, check your data!**') else: raise e
def split_buffer(stream, splitter=None, decoder=lambda a: a): """Given a generator which yields strings and a splitter function, joins all input, splits on the separator and yields each chunk. Unlike string.split(), each chunk includes the trailing separator, except for the last one if none was found on the end of the input. """ splitter = splitter or line_splitter buffered = six.text_type('') for data in stream_as_text(stream): buffered += data while True: buffer_split = splitter(buffered) if buffer_split is None: break item, buffered = buffer_split yield item if buffered: try: yield decoder(buffered) except Exception as e: raise StreamParseError(e)
def json_splitter(buffer): """Attempt to parse a json object from a buffer. If there is at least one object, return it and the rest of the buffer, otherwise return None. """ try: obj, index = json_decoder.raw_decode(buffer) rest = buffer[json.decoder.WHITESPACE.match(buffer, index).end():] return obj, rest except ValueError: return None
def test_pyjson(self): self.assertEqual(self.json.scanner.make_scanner.__module__, 'json.scanner') self.assertEqual(self.json.decoder.scanstring.__module__, 'json.decoder') self.assertEqual(self.json.encoder.encode_basestring_ascii.__module__, 'json.encoder')
def test_cjson(self): self.assertEqual(self.json.scanner.make_scanner.__module__, '_json') self.assertEqual(self.json.decoder.scanstring.__module__, '_json') self.assertEqual(self.json.encoder.c_make_encoder.__module__, '_json') self.assertEqual(self.json.encoder.encode_basestring_ascii.__module__, '_json')
def additional_tests(): suite = unittest.TestSuite() for mod in (json, json.encoder, json.decoder): suite.addTest(doctest.DocTestSuite(mod)) suite.addTest(TestPyTest('test_pyjson')) suite.addTest(TestCTest('test_cjson')) return suite
def loads(self, str_data): try: decoder = import_string(settings.MODERNRPC_JSON_DECODER) return json.loads(str_data, cls=decoder) except JSONDecodeError as e: raise RPCParseError(str(e))
def json_splitter(buffer): """Attempt to parse a json object from a buffer. If there is at least one object, return it and the rest of the buffer, otherwise return None. """ buffer = buffer.strip() try: obj, index = json_decoder.raw_decode(buffer) rest = buffer[json.decoder.WHITESPACE.match(buffer, index).end():] return obj, rest except ValueError: return None
def log_in_for_userkey(): headers = {} url = settings.API_LOGIN_URL + settings.APPKEY provided_api_app = settings.SECRETKEY and settings.ACCOUNTKEY while True: username = get_username() if provided_api_app: data = {'login': username, 'accountkey': settings.ACCOUNTKEY} headers = apisign(url, settings.SECRETKEY, **data) msg = 'Nieprawid?owy login, accountkey, lub secretkey.' else: password = get_password() data = {'login': username, 'password': password} msg = 'Nieprawid?owy login lub has?o.' response = requests.post(url, data=data, headers=headers) password = None data = None try: userkey = response.json()['userkey'] except (json.decoder.JSONDecodeError, KeyError): logging.error(msg) logging.debug(traceback.format_exc()) if provided_api_app: raise SystemExit reset_credentials() continue else: settings.USERKEY = userkey save_userkey(userkey) break
def load_tests(loader, _, pattern): suite = unittest.TestSuite() for mod in (json, json.encoder, json.decoder): suite.addTest(doctest.DocTestSuite(mod)) suite.addTest(TestPyTest('test_pyjson')) suite.addTest(TestCTest('test_cjson')) pkg_dir = os.path.dirname(__file__) return support.load_package_tests(pkg_dir, loader, suite, pattern)
def decoder(jsonobj): if 'source' in jsonobj: return TrackedLayer(jsonobj['source'], jsonobj['repoUrl']) else: return jsonobj
def readTrackedLayers(): try: global tracked filename = os.path.join(userFolder(), "trackedlayers") if os.path.exists(filename): with open(filename) as f: lines = f.readlines() jsonstring = "\n".join(lines) if jsonstring: tracked = JSONDecoder(object_hook = decoder).decode(jsonstring) except KeyError: pass