我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用simplejson.JSONDecodeError()。
def test_invalid_escape_sequences(self): # incomplete escape sequence self.assertRaises(json.JSONDecodeError, json.loads, '"\\u') self.assertRaises(json.JSONDecodeError, json.loads, '"\\u1') self.assertRaises(json.JSONDecodeError, json.loads, '"\\u12') self.assertRaises(json.JSONDecodeError, json.loads, '"\\u123') self.assertRaises(json.JSONDecodeError, json.loads, '"\\u1234') # invalid escape sequence self.assertRaises(json.JSONDecodeError, json.loads, '"\\u123x"') self.assertRaises(json.JSONDecodeError, json.loads, '"\\u12x4"') self.assertRaises(json.JSONDecodeError, json.loads, '"\\u1x34"') self.assertRaises(json.JSONDecodeError, json.loads, '"\\ux234"') if sys.maxunicode > 65535: # invalid escape sequence for low surrogate self.assertRaises(json.JSONDecodeError, json.loads, '"\\ud800\\u"') self.assertRaises(json.JSONDecodeError, json.loads, '"\\ud800\\u0"') self.assertRaises(json.JSONDecodeError, json.loads, '"\\ud800\\u00"') self.assertRaises(json.JSONDecodeError, json.loads, '"\\ud800\\u000"') self.assertRaises(json.JSONDecodeError, json.loads, '"\\ud800\\u000x"') self.assertRaises(json.JSONDecodeError, json.loads, '"\\ud800\\u00x0"') self.assertRaises(json.JSONDecodeError, json.loads, '"\\ud800\\u0x00"') self.assertRaises(json.JSONDecodeError, json.loads, '"\\ud800\\ux000"')
def create( self, url, session=None, method=None, **params ): """Create a new resource :param string url: The API-specific portion of the URL path :param Session session: HTTP client session :param string method: HTTP method (default POST) """ if not method: method = 'POST' ret = self._request(method, url, session=session, **params) # Should this move into _requests()? try: return ret.json() except json.JSONDecodeError: return ret
def __init__(self): # load up the default json until we know that we can load something # from the file self.jsondata = self.defaultjson() if not os.path.exists(self.jsonpath): return try: with open(self.jsonpath, 'r') as f: data = f.read() if not len(data): return self.jsondata = simplejson.loads(data) self.checkjson() except simplejson.JSONDecodeError: raise JsonError("%s does not contain valid JSON" % self.jsonpath)
def changeWwwSetting(settingName, value): wwwSettingsFileName = util.addSlash(config['wwwPath']) + 'userSettings.json' if os.path.exists(wwwSettingsFileName): wwwSettingsFile = open(wwwSettingsFileName, 'r+b') try: wwwSettings = json.load(wwwSettingsFile) # read existing settings except json.JSONDecodeError: logMessage("Error in decoding userSettings.json, creating new empty json file") wwwSettings = {} # start with a fresh file when the json is corrupt. else: wwwSettingsFile = open(wwwSettingsFileName, 'w+b') # create new file wwwSettings = {} wwwSettings[settingName] = str(value) wwwSettingsFile.seek(0) wwwSettingsFile.write(json.dumps(wwwSettings)) wwwSettingsFile.truncate() wwwSettingsFile.close()
def changeWwwSetting(settingName, value): # We only update changeWwwSetting if we're using a configFile based installation (That is - brewpi-www) if configFile is not None: wwwSettingsFileName = util.addSlash(config['wwwPath']) + 'userSettings.json' if os.path.exists(wwwSettingsFileName): wwwSettingsFile = open(wwwSettingsFileName, 'r+b') try: wwwSettings = json.load(wwwSettingsFile) # read existing settings except json.JSONDecodeError: logMessage("Error in decoding userSettings.json, creating new empty json file") wwwSettings = {} # start with a fresh file when the json is corrupt. else: wwwSettingsFile = open(wwwSettingsFileName, 'w+b') # create new file wwwSettings = {} wwwSettings[settingName] = str(value) wwwSettingsFile.seek(0) wwwSettingsFile.write(json.dumps(wwwSettings)) wwwSettingsFile.truncate() wwwSettingsFile.close()
def receive_packet(self): """ ????????? :return: dict() """ received_data = self.request.recv(1024) print("[{}] ???: {}".format(self.packet_counts, received_data), end="\n\n") if VERBOSE else None try: received_data = simplejson.loads(received_data) except simplejson.JSONDecodeError: print(received_data) raise RuntimeError("[*] ????????, ?????????") self.packet_counts += 1 return received_data
def p_b_ot(): params = json.loads(cherrypy.request.headers['params']) body = cherrypy.request.body.read() event_stream = False try: body = json.loads(body) if 'js_func' in body and body['js_func']: js_func = True else: js_func = False if 'event_stream' in body and body['event_stream']: event_stream = True except json.JSONDecodeError: body = pickle.loads(body) js_func = False if 'type' in params: obj_type = params['type'] else: obj_type = None return params, body, obj_type, js_func, event_stream
def is_avro_schema_compatible(request): try: req = requests_v1.AvroSchemaCompatibilityRequest(**request.json_body) return _is_schema_compatible( req.schema_json, req.namespace, req.source ) except simplejson.JSONDecodeError as e: log.exception("Failed to construct AvroSchemaCompatibilityRequest. {}" .format(request.json_body)) raise exceptions_v1.invalid_schema_exception( 'Error "{error}" encountered decoding JSON: "{schema}"'.format( error=str(e), schema=request.json_body['schema'] ) )
def get_schema_migration(request): new_schema_json = request.json_body.get('new_schema') old_schema_json = request.json_body.get('old_schema') target_schema_type = request.json_body.get('target_schema_type') _get_migration = SCHEMA_MIGRATION_STRATEGY_MAP.get(target_schema_type) if _get_migration is None: raise exceptions_v1.unsupported_target_schema_exception() try: return _get_migration( new_avro_schema=json.loads(new_schema_json), old_avro_schema=json.loads(old_schema_json) if old_schema_json else {} ) except json.JSONDecodeError: raise exceptions_v1.invalid_schema_exception()
def get_api_token(self, refresh_token): """Retrieve API token given a refresh token Args: refresh_token (str): refresh token used to make a request for a new API token Returns: str """ post_body = {'refresh_token': refresh_token} try: response = self.client.Authentication.post_tokens( refreshToken=post_body).future.result() return response.json()['id_token'] except JSONDecodeError: raise RefreshTokenException('Error using refresh token, please ' 'verify it is valid')
def fetch_sourcemap(url, project=None, release=None, allow_scraping=True): if is_data_uri(url): body = base64.b64decode(url[BASE64_PREAMBLE_LENGTH:]) else: result = fetch_file(url, project=project, release=release, allow_scraping=allow_scraping) body = result.body # According to various specs[1][2] a SourceMap may be prefixed to force # a Javascript load error. # [1] https://docs.google.com/document/d/1U1RGAehQwRypUTovF1KRlpiOFze0b-_2gc6fAH0KY0k/edit#heading=h.h7yy76c5il9v # [2] http://www.html5rocks.com/en/tutorials/developertools/sourcemaps/#toc-xssi if body.startswith((")]}'\n", ")]}\n")): body = body.split('\n', 1)[1] try: return sourcemap_to_index(body) except (JSONDecodeError, ValueError, AssertionError) as exc: logger.warn(unicode(exc)) raise UnparseableSourcemap({ 'url': url, })
def test_failures(self): for idx, doc in enumerate(JSONDOCS): idx = idx + 1 if idx in SKIPS: json.loads(doc) continue try: json.loads(doc) except json.JSONDecodeError: pass else: self.fail("Expected failure for fail%d.json: %r" % (idx, doc))
def test_array_decoder_issue46(self): # http://code.google.com/p/simplejson/issues/detail?id=46 for doc in [u'[,]', '[,]']: try: json.loads(doc) except json.JSONDecodeError: e = sys.exc_info()[1] self.assertEqual(e.pos, 1) self.assertEqual(e.lineno, 1) self.assertEqual(e.colno, 2) except Exception: e = sys.exc_info()[1] self.fail("Unexpected exception raised %r %s" % (e, e)) else: self.fail("Unexpected success parsing '[,]'")
def test_truncated_input(self): test_cases = [ ('', 'Expecting value', 0), ('[', "Expecting value or ']'", 1), ('[42', "Expecting ',' delimiter", 3), ('[42,', 'Expecting value', 4), ('["', 'Unterminated string starting at', 1), ('["spam', 'Unterminated string starting at', 1), ('["spam"', "Expecting ',' delimiter", 7), ('["spam",', 'Expecting value', 8), ('{', 'Expecting property name enclosed in double quotes', 1), ('{"', 'Unterminated string starting at', 1), ('{"spam', 'Unterminated string starting at', 1), ('{"spam"', "Expecting ':' delimiter", 7), ('{"spam":', 'Expecting value', 8), ('{"spam":42', "Expecting ',' delimiter", 10), ('{"spam":42,', 'Expecting property name enclosed in double quotes', 11), ('"', 'Unterminated string starting at', 0), ('"spam', 'Unterminated string starting at', 0), ('[,', "Expecting value", 1), ] for data, msg, idx in test_cases: try: json.loads(data) except json.JSONDecodeError: e = sys.exc_info()[1] self.assertEqual( e.msg[:len(msg)], msg, "%r doesn't start with %r for %r" % (e.msg, msg, data)) self.assertEqual( e.pos, idx, "pos %r != %r for %r" % (e.pos, idx, data)) except Exception: e = sys.exc_info()[1] self.fail("Unexpected exception raised %r %s" % (e, e)) else: self.fail("Unexpected success parsing '%r'" % (data,))
def test_decode_error(self): err = None try: json.loads('{}\na\nb') except json.JSONDecodeError: err = sys.exc_info()[1] else: self.fail('Expected JSONDecodeError') self.assertEqual(err.lineno, 2) self.assertEqual(err.colno, 1) self.assertEqual(err.endlineno, 3) self.assertEqual(err.endcolno, 2)
def test_error_is_pickable(self): err = None try: json.loads('{}\na\nb') except json.JSONDecodeError: err = sys.exc_info()[1] else: self.fail('Expected JSONDecodeError') s = pickle.dumps(err) e = pickle.loads(s) self.assertEqual(err.msg, e.msg) self.assertEqual(err.doc, e.doc) self.assertEqual(err.pos, e.pos) self.assertEqual(err.end, e.end)
def _request_backend(self, context, data_dict, obj_name, action): context_dict = self._encode_context(context, action, obj_name) data = json.dumps({'context': context_dict, 'data': data_dict}) url_path = "%s/%s" % (self.PLUGIN_URL_PREFIX, obj_name) response = self._relay_request(url_path, data=data) try: return response.status_code, response.json() except JSONDecodeError: return response.status_code, {'message': response.content}
def from_string(cls, string): try: return cls.from_dict(json.loads(string)) except json.JSONDecodeError: raise JsonRpcParseError()
def parse_json_structure(string_item): """ Given a raw representation of a json structure, returns the parsed corresponding data structure (``JsonRpcRequest`` or ``JsonRpcRequestBatch``) :param string_item: :return: """ if not isinstance(string_item, str): raise TypeError("Expected str but got {} instead".format(type(string_item).__name__)) try: item = json.loads(string_item) except json.JSONDecodeError: raise JsonRpcParseError() if isinstance(item, dict): return JsonRpcRequest.from_dict(item) elif isinstance(item, list): if len(item) == 0: raise JsonRpcInvalidRequestError() request_batch = JsonRpcRequestBatch([]) for d in item: try: # handles the case of valid batch but with invalid # requests. if not isinstance(d, dict): raise JsonRpcInvalidRequestError() # is dict, all fine parsed_entry = JsonRpcRequest.from_dict(d) except JsonRpcInvalidRequestError: parsed_entry = GenericResponse.INVALID_REQUEST request_batch.add_item(parsed_entry) return request_batch
def _get_request_exception_message(request, url, params, exception): """ Try to extract message from requests exeption """ try: data = request.json() message = data['message'] except (AttributeError, KeyError, JSONDecodeError, NameError, TypeError): message = str(exception) Logger.warning(unicode('error while fetching url %s, %s : %s' % (url, params, message))) return message
def retrieveSavedProfile(): """ Retrieves the user's saved profile after a "socli -u" command. Asks the user to enter a User ID and saves it if a previous file is not found. :return: The user's ID as an integer """ global data_file global app_data user = None try: load_datafile() if "user" in app_data: user = app_data["user"] else: raise FileNotFoundError # Manually raising to get value except JSONDecodeError: # This maybe some write failures del_datafile() print_warning("Error in parsing the data file, it will be now deleted. Please rerun the " "socli -u command.") exit(1) except FileNotFoundError: print_warning("Default user not set...\n") try: # Code to execute when first time user runs socli -u app_data['user'] = int(inputs("Enter your Stackoverflow User ID: ")) save_datafile() user = app_data['user'] print_green("\nUserID saved...\n") except ValueError: print_warning("\nUser ID must be an integer.") print( "\nFollow the instructions on this page to get your User ID: http://meta.stackexchange.com/a/111130") exit(1) return user
def probe(self, source: str) -> bool: try: args = '-v error -show_streams -show_format -of json "{}"'.format(source) json_data = self.cmdExec(self.backends.ffprobe, args, output=True) self.media = Munch.fromDict(json.loads(json_data)) return hasattr(self.media, 'streams') and len(self.media.streams) except FileNotFoundError: self.logger.exception('Probe media file not found: {}'.format(source), exc_info=True) raise except json.JSONDecodeError: self.logger.exception('Error decoding ffprobe JSON output', exc_info=True) raise