我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用json.JSONDecoder()。
def query(location, cty_codes, query_method, fuzzy): results = [] try: base_url = get_geonames_base_url() username = get_geonames_user_name() query_string = base_url + 'username={user}&{query_method}={name}&' \ 'style=FULL&orderby={order}&startRow=0&maxRows=5&fuzzy={fuzzy}' \ .format(user=username, query_method=query_method, name=quote(location), order='relevance', fuzzy=fuzzy) if cty_codes and len(cty_codes) > 0: query_string = query_string + '&' + '&'.join([('country={}'.format(c)) for c in cty_codes]) json_decode = json.JSONDecoder() # used to parse json response response = urlopen(query_string) response_string = response.read().decode('utf-8') parsed_response = json_decode.decode(response_string) if parsed_response.get('geonames') and len(parsed_response.get('geonames')) > 0: for item in parsed_response['geonames']: results.append(parse(item)) except URLError as e: logger.info("Oops! something didn't go well") logger.info(e) return results
def json_loads(data): """ It works as json.loads but supporting multiple encodings in the same string and accepting an `str` parameter that won't be converted to unicode. :param data: the string to load the objects from :type data: str :returns: the corresponding python object result of parsing 'data', this behaves similarly as json.loads, with the exception of that returns always `str` instead of `unicode`. """ obj = None with CustomJsonScanner(): # We need to use the cls parameter in order to trigger the code # that will let us control the string parsing method. obj = json.loads(data, cls=json.JSONDecoder) return obj
def action_varexport(self, ids, session=None): V = models.Variable qry = session.query(V).filter(V.id.in_(ids)).all() var_dict = {} d = json.JSONDecoder() for var in qry: val = None try: val = d.decode(var.val) except: val = var.val var_dict[var.key] = val response = make_response(json.dumps(var_dict, sort_keys=True, indent=4)) response.headers["Content-Disposition"] = "attachment; filename=variables.json" return response
def export_helper(filepath): session = settings.Session() qry = session.query(Variable).all() session.close() var_dict = {} d = json.JSONDecoder() for var in qry: val = None try: val = d.decode(var.val) except Exception: val = var.val var_dict[var.key] = val with open(filepath, 'w') as varfile: varfile.write(json.dumps(var_dict, sort_keys=True, indent=4)) print("{} variables successfully exported to {}".format(len(var_dict), filepath))
def json_parse(fileobj, decoder=json.JSONDecoder(), buffersize=2048): """ Small function to parse a file containing JSON objects separated by a new line. This format is used in the live-rundata-xx.json files produces by SMAC. taken from http://stackoverflow.com/questions/21708192/how-do-i-use-the-json-module-to-read-in-one-json-object-at-a-time/21709058#21709058 """ buffer = '' for chunk in iter(functools.partial(fileobj.read, buffersize), ''): buffer += chunk buffer = buffer.strip(' \n') while buffer: try: result, index = decoder.raw_decode(buffer) yield result buffer = buffer[index:] except ValueError: # Not enough data to decode, read more break
def __parse_crypto_keys(self, headerdata): self.__set_master_token(headerdata['keyresponsedata']['mastertoken']) # Init Decryption enc_key = headerdata['keyresponsedata']['keydata']['encryptionkey'] hmac_key = headerdata['keyresponsedata']['keydata']['hmackey'] encrypted_encryption_key = base64.standard_b64decode(enc_key) encrypted_sign_key = base64.standard_b64decode(hmac_key) cipher_rsa = PKCS1_OAEP.new(self.rsa_key) # Decrypt encryption key cipher_raw = cipher_rsa.decrypt(encrypted_encryption_key) encryption_key_data = json.JSONDecoder().decode(cipher_raw) self.encryption_key = base64key_decode(encryption_key_data['k']) # Decrypt sign key sign_key_raw = cipher_rsa.decrypt(encrypted_sign_key) sign_key_data = json.JSONDecoder().decode(sign_key_raw) self.sign_key = base64key_decode(sign_key_data['k']) self.__save_msl_data() self.handshake_performed = True
def __load_msl_data(self): raw_msl_data = self.load_file( msl_data_path=self.kodi_helper.msl_data_path, filename='msl_data.json') msl_data = json.JSONDecoder().decode(raw_msl_data) # Check expire date of the token raw_token = msl_data['tokens']['mastertoken']['tokendata'] base_token = base64.standard_b64decode(raw_token) master_token = json.JSONDecoder().decode(base_token) exp = int(master_token['expiration']) valid_until = datetime.utcfromtimestamp(exp) present = datetime.now() difference = valid_until - present difference = difference.total_seconds() / 60 / 60 # If token expires in less then 10 hours or is expires renew it if difference < 10: self.__load_rsa_keys() self.__perform_key_handshake() return self.__set_master_token(msl_data['tokens']['mastertoken']) enc_key = msl_data['encryption_key'] self.encryption_key = base64.standard_b64decode(enc_key) self.sign_key = base64.standard_b64decode(msl_data['sign_key'])
def __getVideoFileUrl (self, videoInfo) : videoInfo = json.JSONDecoder().decode(videoInfo) if 'stream' in videoInfo['data'] : temp = {} for x in videoInfo['data']['stream'] : temp[x['stream_type']] = [] for url in x['segs'] : if url['key'] != '-1' : temp[x['stream_type']].append(url['cdn_url']) if len(temp) > 0 : fileUrl = temp else : fileUrl = False return fileUrl
def __getVideoID(self, link): result = re.findall(r"/ac([\d_]*)", link) if len(result) > 0 : videoIDList = result[0].split('_') videoID = videoIDList[0] if len(videoIDList) > 1: videoPart = int(videoIDList[1]) - 1 else : videoPart = 0 pageHeader, pageBody = self.Tools.getPage(self.idInfoUrl + str(videoID), ['deviceType:2', 'User-Agent:Mozilla/5.0 (iPhone; CPU iPhone OS 9_1 like Mac OS X) AppleWebKit/601.1.46 (KHTML, like Gecko) Version/9.0 Mobile/13B143 Safari/601.1']) videoInfo = json.JSONDecoder().decode(pageBody) if videoInfo['code'] == 200: videoID = videoInfo['data']['videos'][videoPart]['videoId'] else: videoID = False else : videoID = False return videoID
def __getVideoFileUrl (self, confgFileUrl, siteType = 1) : if siteType != 1: confgFileUrl = confgFileUrl + '&site=' + str(siteType) pageHeader, pageBody = self.Tools.getPage(confgFileUrl) info = re.findall(r"^.*\((.*)\);", pageBody) info = info[0].decode('gbk').encode('utf-8') info = json.JSONDecoder().decode(info) if info.has_key('data') : if len(info['data']['urls']['m3u8'][self.videoTypeList[self.videoType]]) > 0 : fileUrl = info['data']['urls']['m3u8'][self.videoTypeList[self.videoType]][0] else : fileUrl = info['data']['urls']['m3u8'][self.videoTypeList['n']][0] fileUrlBase = re.findall(r"^(.*)\.m3u8\?", fileUrl) if len(fileUrlBase) > 0 : fileUrl = fileUrlBase[0] + self.fileUrlSuffix else : fileUrl = False else : fileUrl = self.__getVideoFileUrl(confgFileUrl, 2) return fileUrl
def main(): # pylint: disable=I0011,C0111 args = parse_args() parsed_project_index = sorted( json.JSONDecoder( object_pairs_hook=collections.OrderedDict ).decode(open(args.project_index).read()), key=lambda repo: repo['path'] ) json_string = strip_trailing_whitespace( json.dumps(parsed_project_index, sort_keys=False, indent=2) ) open(args.project_index, 'w').write(json_string) return 0
def object_hook(self, object_dict): """ Hook which when passed into a json.JSONDecoder will replace each dict in a json string with its index and convert the dict to an object as defined by the passed in condition_decoder. The newly created condition object is appended to the conditions_list. Args: object_dict: Dict representing an object. Returns: An index which will be used as the placeholder in the condition_structure """ instance = self.decoder(object_dict) self.condition_list.append(instance) self.index += 1 return self.index
def loads(conditions_string): """ Deserializes the conditions property into its corresponding components: the condition_structure and the condition_list. Args: conditions_string: String defining valid and/or conditions. Returns: A tuple of (condition_structure, condition_list). condition_structure: nested list of operators and placeholders for operands. condition_list: list of conditions whose index correspond to the values of the placeholders. """ decoder = ConditionDecoder(_audience_condition_deserializer) # Create a custom JSONDecoder using the ConditionDecoder's object_hook method # to create the condition_structure as well as populate the condition_list json_decoder = json.JSONDecoder(object_hook=decoder.object_hook) # Perform the decoding condition_structure = json_decoder.decode(conditions_string) condition_list = decoder.condition_list return (condition_structure, condition_list)
def get_tweets(user, auth_token): tweets = [] global tweet_gets_in_interval while len(tweets) < MAX_TWEETS_PER_USER: if len(tweets) == 0: lowest_id = "" # first run else: lowest_id = "&max_id=" + str(tweets[len(tweets) - 1].id - 1) req = urllib.request.Request(TWEETS_API_URL + "?screen_name=" + user + "&count=" + str(TWEETS_PER_REQ) + "&include_rts=false" + lowest_id) req.add_header("Authorization", auth_token) raw = urllib.request.urlopen(req).read().decode('utf-8') processed = json.JSONDecoder().decode(raw) if len(processed) == 0: break for tweet in processed: tweets.append(Tweet(tweet)) return tweets
def get_api(self, url): # return the json version get = None get = self.get_raw(url) if get != None: try: return json.JSONDecoder().decode(get) except Exception as e: self._error = "API response has invalid JSON format" self._error_msg = str(e.reason) self._update_ready = None return None else: return None # create a working directory and download the new files
def get_access_token(self, client_id, client_secret, auth_code): """ Aquire an OAuth2 access token To be run one time (then use refresh token) """ # curl -X POST -d "client_id=YOUR_CLIENT_ID&client_secret=YOUR_CLIENT_SECRET&grant_type=authorization_code&code=THE_GIVEN_CODE" https://www.freesound.org/apiv2/oauth2/access_token/ data = "client_id=%(client_id)s&client_secret=%(client_secret)s&grant_type=authorization_code&code=%(auth_code)s"%locals() print data request = urllib2.Request( 'https://www.freesound.org/apiv2/oauth2/access_token/', data=data ) # request.add_header('Accept', 'application/json') # try: response = urllib2.urlopen(request) # except urllib2.HTTPError, exc: # if exc.code == 401: # Unauthorized # raise Unauthorized("Bad request") return json.JSONDecoder().decode(response)
def refresh_token(self, client_id, client_secret, refresh_token): """ To get a new access token using your refresh token you basically need to repeat Step 3 setting the grant_type parameter to ‘refresh_token’ (instead of ‘authorization_code’) and adding a refresh_token parameter with your refresh token (instead of adding the code parameter with the authorization code). See the following example: # curl -X POST -d "client_id=YOUR_CLIENT_ID&client_secret=YOUR_CLIENT_SECRET&grant_type=refresh_token&refresh_token=REFRESH_TOKEN" "https://www.freesound.org/apiv2/oauth2/access_token/" """ data = "client_id=%(client_id)s&client_secret=%(client_secret)s&grant_type=refresh_token&refresh_token=%(refresh_token)s"%locals() print data request = urllib2.Request( 'https://www.freesound.org/apiv2/oauth2/access_token/', data=data ) # request.add_header('Accept', 'application/json') # try: response = urllib2.urlopen(request) # except urllib2.HTTPError, exc: # if exc.code == 401: # Unauthorized # raise Unauthorized("Bad request") print(response) return json.JSONDecoder().decode(response)
def action_varexport(self, ids): V = models.Variable session = settings.Session() qry = session.query(V).filter(V.id.in_(ids)).all() session.close() var_dict = {} d = json.JSONDecoder() for var in qry: val = None try: val = d.decode(var.val) except: val = var.val var_dict[var.key] = val response = make_response(json.dumps(var_dict, sort_keys=True, indent=4)) response.headers["Content-Disposition"] = "attachment; filename=variables.json" return response
def __init__(self, **kw_args): """Compose the standard JSONDecoder with a custom object_hook. The custom object_hook will recognize a dictionary that represents a ClientData object, and decode it as a ClientData object. All other objects will get passed to the standard JSONDecoder. Args: Same arguments as JSONDecoder.__init__() with the exception that 'strict' is always set to False. If an 'object_hook' is supplied then it will be called by _object_decode() if the object is not interpreted as ClientData. """ self._other_object_hook = None kw_args_new = kw_args.copy() if 'object_hook' in kw_args: self._other_object_hook = kw_args['object_hook'] kw_args_new['object_hook'] = self._object_decode # Note: strict=False because the notes attribute might contain # line feeds. # kw_args_new['strict'] = False self._decoder = json.JSONDecoder(**kw_args_new)
def test_jsonDecoder(self): '''SockJSWireProtocolWrapper can use a json.JSONDecoder subclass for receives. ''' class SetDecoder(json.JSONDecoder): def __init__(self, *args, **kwargs): kwargs['object_hook'] = self.set_object_hook super(SetDecoder, self).__init__(*args, **kwargs) def set_object_hook(self, obj): if isinstance(obj, dict) and obj.get('!set'): return set(obj['!set']) return obj factory = P.SockJSWireProtocolWrappingFactory( self.wrappedFactory, jsonDecoder=SetDecoder) encodingProtocol = factory.buildProtocol(self.address) encodingProtocol.makeConnection(self.transport) encodingProtocol.dataReceived(b'{"!set": [1, 2, 3]}') self.assertEqual(self.receivedData, [{1, 2, 3}])
def create_cars_from_collision_json(collision_json): """ Creates all the cars stored in a json-string reporting a collision. The json must have the form {"time":<string with specified format>, "message":{"collision_code":<string>, "collision_initial_conditions":<list of cars>}}. All followers have been assigned. Time format: "%Y-%m-%d %H:%M:%S,%f". :param collision_json: <string> json with a collision report. :return: dictionary with the cars. The key value is the name of the car. """ collision_information = JSONDecoder().decode(collision_json) json_cars = collision_information["message"]["collision_initial_conditions"] cars_dict = {} for json_car in json_cars: cars_dict[json_car["car_name"]] = create_car_from_json(json_car) for json_car in json_cars: if json_car["following"] in cars_dict: car = cars_dict[json_car["car_name"]] cars_dict[json_car["following"]].add_follower(car) car.set_following(True) car.set_controller(follower_controller) return cars_dict
def create_times_from_collision_json(collision_json): """ Obtains all the times at which all the cars stored in the collision json -string entered the intersection (or where created). The json must have the form {"time":<string with specified format>, "message":{"collision_code":<string>, "collision_initial_conditions":<list of cars>}}. Also returns the time at which the collision in the jason should start. Time format: "%Y-%m-%d %H:%M:%S,%f". :param collision_json: <string >json with a collision report. :return: dictionary with the cars. The key value is the name of the car. """ collision_information = JSONDecoder().decode(collision_json) json_cars = collision_information["message"]["collision_initial_conditions"] time_format = '%Y-%m-%d %H:%M:%S,%f' collision_time = datetime.strptime(collision_information["time"], time_format) car_creation_times = [] for json_car in json_cars: car_creation_times.append((datetime.fromtimestamp(json_car["creation_time"]), json_car["car_name"])) car_creation_times.sort(key=lambda x: x[0]) start_simulation_time = collision_time for car_time in car_creation_times: if car_time[0] < start_simulation_time: start_simulation_time = car_time[0] return start_simulation_time, car_creation_times
def parse_message(self, message): tag, _, data = message.partition(" ") if tag in CLIENT: parsed = (CLIENT[tag],) elif tag in SERVER: parsed = (SERVER[tag],) else: raise ValueError("Unknown message type %s" % tag) decoder = JSONDecoder() while data: data = data.lstrip() try: decoded, end = decoder.raw_decode(data) except JSONDecodeError: break else: parsed += (decoded,) data = data[end:] return parsed
def upgrade(self, cfg): fields = [] fields_env = {} # ENV gets priority: ENV < config.json for name, data in self.base_config.items(): if name not in cfg: cfg[name] = data fields.append(name) if name in os.environ: # Use JSON decoder to get same behaviour as config file fields_env[name] = json.JSONDecoder().decode(os.environ[name]) logger.info("Using ENV setting %s=%s", name, fields_env[name]) # Only rewrite config file if new fields added if len(fields): logger.warn("Upgraded config, added %d new field(s): %r", len(fields), fields) self.save(cfg) # Update in-memory config with environment settings cfg.update(fields_env) return cfg
def __init__(self, config="grid.json"): if os.path.exists("exp/gridlogs") is False: os.makedirs("exp/gridlogs") with open(config) as f: s = f.read() conf = json.JSONDecoder().decode(s) self.filename = conf["filename"] self.gpus = conf["gpus"] self.prefix = conf["prefix"] if "prefix" in conf else "-" self.processes = [None for _ in xrange(len(self.gpus))] self.runs = self.get_runs(conf["grid"]) self.launched_runs = 0 self.finished_runs = 0 print "%d total runs on gpus %s:" % (len(self.runs), self.gpus) for i in xrange(len(self.runs)): print self.runs[i]
def find_problematic_elements(self, error, batch): try: content = json.JSONDecoder().decode(error.content) message = content['error']['message'] except KeyError: return [] pattern = (r'timeSeries\[(\d+?)\]\.metric\.labels\[\d+?\]' r' had an invalid value of "(\w+?)"') found = [] for match in re.finditer(pattern, message): ts_index = int(match.group(1)) label = match.group(2) metric = batch[ts_index]['metric'] metric_type = metric['type'] found.append((self.add_label_and_retry, label, metric_type, batch[ts_index])) return found
def parse_json(json_to_parse): """ Parse JSON string to OrderedDict and return :param json_to_parse: string representation of JSON :return: OrderedDict representation of JSON """ return json.JSONDecoder(object_pairs_hook=collections.OrderedDict).decode(json_to_parse.decode('utf-8'))
def use_startstyle(self, inlinestyletext): # type: (Optional[str]) -> None if inlinestyletext: # Transform YAML-like JSON # e.g. '{based_on_style: pep8, column_limit: 79}' # into '{"based_on_style": "pep8", "column_limit": 79}' # which can be parsed as JSON. inlinestyletext = re.sub(r'([a-zA-Z_]\w+)', r'"\1"', inlinestyletext) d = json.JSONDecoder().decode(inlinestyletext) # type: ignore self.initial_style = style_make(d)
def json_iter_parse(response_text): decoder = json.JSONDecoder(strict=False) idx = 0 while idx < len(response_text): obj, idx = decoder.raw_decode(response_text, idx) yield obj
def __init__(self, user_mode): self.user_mode = user_mode self.interface = None self.quota = None self.is_online = None self.new_api = None self.json_decoder = json.JSONDecoder() self.session = requests.Session() self.csrf_token = None self.resolver = dns.resolver.Resolver() self.resolver.nameservers = ['172.16.0.1'] self.api_host_ip = self.resolve_url(self.api_host) self.api_host_new_ip = self.resolve_url(self.api_host_new)
def get_api(self, url): # return the json version get = None get = self.get_api_raw(url) # this can fail by self-created error raising if get is not None: return json.JSONDecoder().decode( get ) else: return None # create a working directory and download the new files
def __init__(self, ctrl, vimx): import logging self.logger = logging.getLogger(__name__) self.ctrl = ctrl self.vimx = vimx self.state = OrderedDict() self.internal = {} self.json_decoder = json.JSONDecoder(object_pairs_hook=OrderedDict) self.help_flags = {"new": False, "launch_prompt": True, "session_show": True} self.bpid_map = {}
def _read_stdin(self): loads = json.JSONDecoder(parse_float=unicode, parse_int=unicode).decode while True: yield select.select([sys.stdin], [], []) line = sys.stdin.readline() if not line: break if not line.strip(): continue in_dict = loads(line) yield idiokit.send(events.Event(in_dict))
def json_decode(self, encoded_json): json_decoder = json.JSONDecoder() try: return json_decoder.decode(encoded_json) except: logging.debug("JSON error from {} container \n==\n {} \n==\n".format(self.module_cls_name, encoded_json)) return None
def __init__(self, malware, module_cls_name, task_id, cuckoo_url, all_result=False): self.json_decoder = json.JSONDecoder() self._malware = malware self._module_cls_name = module_cls_name self._task_id = task_id self._all = all_result self._ms = malware.get_module_status(self._module_cls_name) self._cuckoo_url = cuckoo_url
def __init__(self): json.JSONDecoder.__init__( self, object_hook=self.dict_to_object, )
def jsons(r, p=0): dec = json.JSONDecoder(encoding='utf-8') while True: obj,p = dec.scan_once(r.text,p) yield obj
def wiki(self, ctx, *, search_query): logger.info('!wiki ' + search_query + ' - User: ' + str(ctx.message.author)) data = self.utils.search_wiki(search_query) if len(data) == 1: result_id = 0 else: if 1 < len(data) <= 10: # Display all results then await choice i = 1 text = '' for entry in data: text += str(i) + ') **' + entry[1] + '** - *Category: ' + str(entry[2]) + '*\n' i += 1 em = discord.Embed(title='Please enter the number of the targeted wiki entry', description=text, colour=0x3D9900) await self.bot.send_message(ctx.message.channel, '', embed=em) msg = await self.bot.wait_for_message(author=ctx.message.author) if int(msg.content) >= i or int(msg.content) < 1: await self.bot.say('Invalid choice') return result_id = int(msg.content) - 1 else: await self.bot.send_message(ctx.message.channel, 'No matching entry found.') return temp = json.JSONDecoder().decode(data[result_id][3]) tempembed = discord.Embed().from_data(temp) await self.bot.send_message(ctx.message.channel, '', embed=tempembed)
def get_json(self): try: return json.loads(self.request.body.decode('utf-8')) except (json.JSONDecoder, ValueError): return {}
def json_to_prolog(jstr): return json.JSONDecoder(object_hook = _prolog_from_json).decode(jstr)
def get_api(self, url): # return the json version get = None get = self.get_api_raw(url) # this can fail by self-created error raising if get != None: return json.JSONDecoder().decode( get ) else: return None # create a working directory and download the new files
def __init__(self, *args, **kwargs): json.JSONDecoder.__init__(self, object_hook=self.object_hook, *args, **kwargs)
def __init__(self): self.buf = "" self.raw_decode = json.JSONDecoder().raw_decode
def fromjsons(values, generator=None, fillables=None, pointer_fromequal=False): if hasattr(values, "read"): def iterator(): j = json.JSONDecoder() buf = "" while True: try: obj, i = j.raw_decode(buf) except ValueError: extra = values.read(8192) if len(extra) == 0: break else: buf = buf.lstrip() + extra else: yield obj buf = buf[i:].lstrip() else: def iterator(): j = json.JSONDecoder() index = 0 while True: try: obj, i = j.raw_decode(values[index:]) except ValueError: break yield obj _, index = fromjsons._pattern.match(values, index + i).span() return fromiterdata(iterator(), generator=generator, fillables=fillables, pointer_fromequal=pointer_fromequal)
def __init__(self, date_format, *args, **kwargs): self.date_format = date_format json.JSONDecoder.__init__(self, object_hook=self.date_to_object, *args, **kwargs)
def __init__(self, base_url, key_contains=[], *args, **kwargs): self.base_url = base_url self.key_contains = key_contains json.JSONDecoder.__init__(self, object_hook=self.url_to_object, *args, **kwargs)