我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用json.JSONDecodeError()。
def tba_get(self, path): """Base method for querying the TBA API. Returns the response JSON as a python dict. :param path: (str) Request path, without the API address prefix (https://www.thebluealliance.com/api/v2/) :return: A dict parsed from the response from the API. """ if self.app_id['X-TBA-App-Id'] == "": raise Exception('An API key is required for TBA. Please use set_api_key() to set one.') url_str = 'https://www.thebluealliance.com/api/v2/' + path r = self.session.get(url_str, headers=self.app_id) # print(r.url) tba_txt = r.text try: return json.loads(tba_txt) except json.JSONDecodeError: print(url_str) print(tba_txt)
def donations_helper(): print("donations_helper") FecApiObj = FECAPI(FEC_APIKEY) committees = FecApiObj.get_committees() PPCampFinObj = CampaignFinanceAPI(ProPublica_APIKEY) PPCongressApi = CongressAPI(ProPublica_APIKEY) legislator_index = dict() legislators = PPCongressApi.list_members('house')["results"][0]["members"] for legislator in legislators: name = str(legislator['first_name']) + " " + str(legislator['last_name']) legislator_index[name] = legislator legislators = PPCongressApi.list_members('senate')["results"][0]["members"] for legislator in legislators: name = str(legislator['first_name']) + " " + str(legislator['last_name']) legislator_index[name] = legislator print("starting to iterate through superpacs") donations = [] count = 0 for committee in committees: if(2016 in committee['cycles']): try: indepExpend = PPCampFinObj.get_indep_expends(str(committee['committee_id'])) for expend in indepExpend["results"]: try: #expend fo a particular expenditure expend['committee_id'] = str(committee['committee_id']) expend['propublica_candidate_id'] = str(legislator_index[expend['candidate_name']]['id']) donations.append(expend) except KeyError: pass except JSONDecodeError: pass count += 1 return donations
def load(self): # Prepare + load directory. super().load() # Load the files and parse JSON. parsed_settings = dict() try: for file_name in self.files: file_path = os.path.join(self.directory, file_name) with open(file_path, 'r') as file_handle: parsed_settings.update(json.load(file_handle)) except json.JSONDecodeError as e: raise ImproperlyConfigured( 'Your settings file(s) contain invalid JSON syntax! Please fix and restart!, {}'.format(str(e)) ) # Loop and set in local settings (+ uppercase keys). for key, value in parsed_settings.items(): self.settings[key.upper()] = value
def from_json(cls, data): if isinstance(data, cls): return data if isinstance(data, str): try: data = json.loads(data) except json.JSONDecodeError: raise d = {} for k, v in (data or {}).items(): d[cls._toPy.get(k, k)] = v try: return cls(**d) except TypeError: raise
def execute(self): try: self.system.run() except (ReadTimeout, ConnectionError, JSONDecodeError): pass except exceptions.TradingSystemException as e: curr = datetime.now() print('{time} - {text}'.format(time=curr.strftime('%Y-%m-%d %H:%M:%S'), text=str(e))) except Exception as e: curr = datetime.now() print('{time} - {text} - {args}'.format(time=curr.strftime('%Y-%m-%d %H:%M:%S'), text=str(e), args=e.args)) traceback.print_exc() if self.interval: threading.Timer(self.interval, self.execute).start()
def assertJSON(self, response): if response.text: try: data = json.loads(response.text) except json.JSONDecodeError: self.fail("Response data is not JSON.") else: reference = "{formatted_data}\n".format( formatted_data=json.dumps( data, indent=2, sort_keys=True, separators=(',', ': ') ) ) self.assertEqual( reference, response.text )
def get_phenolist(): # TODO: should this be memoized? from .file_utils import common_filepaths filepath = common_filepaths['phenolist'] try: with open(os.path.join(filepath)) as f: phenolist = json.load(f) except (FileNotFoundError, PermissionError): raise PheWebError( "You need a file to define your phenotypes at '{}'.\n".format(filepath) + "For more information on how to make one, see <https://github.com/statgen/pheweb#3-make-a-list-of-your-phenotypes>") except json.JSONDecodeError: print("Your file at '{}' contains invalid json.\n".format(filepath) + "The error it produced was:") raise for pheno in phenolist: pheno['phenocode'] = urllib.parse.quote_plus(pheno['phenocode']) return phenolist
def with_json(self, json_dict: dict) -> "Statistic": body = json.dumps(json_dict, sort_keys=True) actual_body = self.get_current_request().body.decode("utf-8", errors="skip") try: actual_json_dict = json.loads(actual_body) except json.JSONDecodeError: requested_time = self._current_request_index + 1 self._error_messages.append(f"\nFor the {requested_time} time: with json {body}.\n" f"But for the {requested_time} time: json was corrupted " f"{actual_body.__repr__()}.") return self actual_body = json.dumps(actual_json_dict, sort_keys=True) if body != actual_body: requested_time = self._current_request_index + 1 self._error_messages.append(f"\nFor the {requested_time} time: with json {body}.\n" f"But for the {requested_time} time: json was {actual_body}.") return self
def _load_stored_result_from_file(self, default=None): # First, let's attempt to load the existing results json file try: with open(self._results_file_path) as fp: file_data = fp.read() except IOError: return default # Now attempt to deserialize the file... if that fails, save a backup of the file try: return json.loads(file_data) except json.JSONDecodeError as e: backup_file_name = "corrupted.{}.{}".format( str(uuid.uuid4())[:8], self._results_file_name, ) logger.error( "Unable to parse file {}: {}, renaming to {} and creating new file".format( self._results_file_path, e, backup_file_name, ) ) dst = "{}/{}".format(self._results_directory, backup_file_name) os.rename(self._results_file_path, dst) return default
def setup_logging(config_file_path, log_level=logging.INFO): """ Logging configuration helper. :param config_file_path: file path to logging configuration file. https://docs.python.org/3/library/logging.config.html#object-connections :param log_level: defaults to logging.INFO :return: None - access the logger by name as described in the config--or the "root" logger as a backup. """ try: with open(config_file_path, 'rt') as f: config = json.load(f) logging.config.dictConfig(config) except (ValueError, IOError, OSError): # json.JSONDecodeError is throwable in Python3.5+ -- subclass of ValueError logging.basicConfig(log_level=log_level) logging.root.exception( "Could not load specified logging configuration '{}'. Verify the filepath exists and is compliant with: " "[https://docs.python.org/3/library/logging.config.html#object-connections]".format(config_file_path))
def __init__(self, callResult): self.success = False try: # decode from json if possible result = callResult.json() # if there is a status then the response came from the API server if result['status']: # so store a copy of the json self.response = result # copy the json result code code = result['status']['code'] # assemble a description of the result self.text = str(code) + ': ' + result['status']['info'] # if we got a 200 OK then the the json object has the answer data if code == 200: self.success = True else: # otherwise, assemble a description from the HTTP results self.text = 'Error ' + str(callResult.status_code) + ': ' + callResult.reason except JSONDecodeError: self.text = 'Error ' + str(callResult.status_code) + ': ' + callResult.reason # chat API call
def get_manifest(self, archive): try: with ZipFile(archive.temporary_file_path()) as plugin: print(plugin.namelist()) prefix = self.get_prefix(plugin) prefix = prefix + '/' if len(prefix) else '' with plugin.open('{}manifest.json'.format(prefix)) as myfile: manifest = json.loads(myfile.read()) validate_manifest(manifest) return manifest except BadZipFile: raise ValidationError('Bad .zip format') except FileNotFoundError: raise ValidationError('Error with upload, please try again') except KeyError: raise ValidationError('No manifest.json found in archive') except json.JSONDecodeError: raise ValidationError('Error with manifest.json, bad Json Format') except avasdk.exceptions.ValidationError as e: raise ValidationError('Error in manifest.json ({})'.format(e))
def req_json(self, req): if req.content_length is None or req.content_length == 0: return None if req.content_type is not None and req.content_type.lower( ) == 'application/json': raw_body = req.stream.read(req.content_length or 0) if raw_body is None: return None try: json_body = json.loads(raw_body.decode('utf-8')) return json_body except json.JSONDecodeError as jex: print( "Invalid JSON in request: \n%s" % raw_body.decode('utf-8')) self.error( req.context, "Invalid JSON in request: \n%s" % raw_body.decode('utf-8')) raise errors.InvalidFormat("%s: Invalid JSON in body: %s" % (req.path, jex)) else: raise errors.InvalidFormat("Requires application/json payload")
def _parse_and_update_body(self, handler_def): """Parses the request body to JSON.""" if self.request.body: try: json_body = json.loads(self.request.body.decode('utf-8')) except json.JSONDecodeError: raise BadRequestError( "Malformed request body. JSON is expected." ) new_body = json_body if handler_def.consumes: try: new_body = handler_def.consumes.from_json(json_body) except ValidationError: # TODO: log warning or error raise BadRequestError("Bad data structure.") self.request.body = new_body
def parseMessage(self, msg): print(msg) try: decoded = json.loads(msg) except json.JSONDecodeError: return type = decoded["msgtype"] if type == "control": self.handleControl(decoded) elif type == "sheetdelta": self.passSheetDelta(decoded) elif type == "request": self.handleRequest(decoded) elif type == "nodedata": self.passNodedata(decoded)
def discoverWorkers(self): """Discover new workers via udp broadcasts""" rlist, wlist, elist = select([self.discoverysocket], [], [], 0) if rlist: received = self.discoverysocket.recvfrom(4096)[0] discoverydata = {} try: discoverydata = json.loads(bytes.decode(received)) except json.JSONDecodeError: pass if "ip" in discoverydata and "port" in discoverydata: if "host" in discoverydata: name = discoverydata["host"] else: name = discoverydata["ip"] + ":" + str(discoverydata["port"]) if name not in self.workers: treeItem = QTreeWidgetItem(1001) # Type 1000 for Worker Item treeItem.setText(0, name) self.treeWidget.addTopLevelItem(treeItem) self.grabPeriodicInfos() # Grab monitor data self.workers[name] = Worker(discoverydata, treeItem, nodeDataJar=self.nodeDataJar) self.workers[name].tick(self.sheetDeltaMemory) self.workers[name].synchronize()
def _ws_recv_handler(self): # Given command responses and notifications are all send through the # same websocket, separate them here, passing command response thanks # to a Queue. while True: raw = await self._websocket.recv() try: if isinstance(raw, bytes): raw = raw.decode() recv = ejson_loads(raw) if 'status' in recv: # Message response self._resp_queue.put_nowait(recv) else: # Event self._signal_ns.signal(recv['event']).send(recv['sender']) except (KeyError, TypeError, json.JSONDecodeError): # Dummy ??? logger.warning('Backend server sent invalid message: %s' % raw)
def handshake(self): if self.id: raise HandshakeError('Handshake already done.') challenge = _generate_challenge() query = {'handshake': 'challenge', 'challenge': challenge} yield Effect(EHandshakeSend(ejson_dumps(query))) raw_resp = yield Effect(EHandshakeRecv()) try: resp = ejson_loads(raw_resp) except (TypeError, json.JSONDecodeError): error = HandshakeError('Invalid challenge response format') yield Effect(EHandshakeSend(error.to_raw())) raise error resp = HandshakeAnswerSchema().load(resp) claimed_identity = resp['identity'] try: pubkey = yield Effect(EPubKeyGet(claimed_identity)) pubkey.verify(resp['answer'], challenge.encode()) yield Effect(EHandshakeSend('{"status": "ok", "handshake": "done"}')) self.id = claimed_identity except (TypeError, PubKeyNotFound, InvalidSignature): error = HandshakeError('Invalid signature, challenge or identity') yield Effect(EHandshakeSend(error.to_raw())) raise error
def load_string(json_string): """Deserialize `json_string` to Python Object. If `json_string` is not a valid json document, just return `json_string`. It is used in the context of activity results: floto handles str and JSON serialized results. Parameters ---------- json_string : any Returns ------- obj """ try: j = json.loads(json_string) except (TypeError, json.JSONDecodeError): j = json_string return j
def _json_get(inp): """ Get a Python object (list or dict) regardless of whether data is passed as a JSON string, a file path, or is already a python object. Returns the parsed data, as well as "native" if the data was already a Python object, "str" if the data was passed as a JSON string, or the path if the data passed was a file path """ if not (isinstance(inp, dict) or isinstance(inp, list)): # Python object try: # JSON string data = json.loads(inp) dataformat = "str" except json.JSONDecodeError: # JSON filepath # Store the filename in the dataformat variable if dataformat is a # file, because it's just one fewer variable to keep track of dataformat = inp with open(inp, encoding="utf-8") as f: data = json.load(f) else: dataformat = "native" return data, dataformat
def post(self): request_body = json.dumps(request.get_json()) channel = get_channel_name_from_json(request.get_json()) query_data = json.loads('{}') try: # TODO Asnycronous call? ??? ???. response = ServerComponents().query(request_body, channel) logging.debug(f"query result : {response}") query_data['response_code'] = str(response.response_code) try: query_data['response'] = json.loads(response.response) except json.JSONDecodeError as e: logging.warning("your response is not json, your response(" + str(response.response) + ")") query_data['response'] = response.response except _Rendezvous as e: logging.error(f'Execute Query Error : {e}') if e.code() == grpc.StatusCode.DEADLINE_EXCEEDED: # TODO REST ?? ??? ???(?? ??? Response code ??) Extract Method ?? ?? ??? ?? ??? ?? logging.debug("gRPC timeout !!!") query_data['response_code'] = str(message_code.Response.timeout_exceed) return query_data
def get(self): args = ServerComponents().parser.parse_args() response = ServerComponents().get_transaction(args['hash'], get_channel_name_from_args(args)) tx_data = json.loads('{}') tx_data['response_code'] = str(response.response_code) tx_data['data'] = "" if len(response.data) is not 0: try: tx_data['data'] = json.loads(response.data) except json.JSONDecodeError as e: logging.warning("your data is not json, your data(" + str(response.data) + ")") tx_data['data'] = response.data tx_data['meta'] = "" if len(response.meta) is not 0: tx_data['meta'] = json.loads(response.meta) tx_data['more_info'] = response.more_info b64_sign = base64.b64encode(response.signature) tx_data['signature'] = b64_sign.decode() b64_public_key = base64.b64encode(response.public_key) tx_data['public_key'] = b64_public_key.decode() return tx_data
def get(self): logging.debug('transaction result') args = ServerComponents().parser.parse_args() logging.debug('tx_hash : ' + args['hash']) channel_name = get_channel_name_from_args(args) response = ServerComponents().get_invoke_result(args['hash'], channel_name) verify_result = dict() verify_result['response_code'] = str(response.response_code) if len(response.result) is not 0: try: result = json.loads(response.result) result['jsonrpc'] = '2.0' verify_result['response'] = result except json.JSONDecodeError as e: logging.warning("your data is not json, your data(" + str(response.data) + ")") verify_result['response_code'] = message_code.Response.fail else : verify_result['response_code'] = str(message_code.Response.fail) return verify_result
def jwks_to_keyjar(jwks, iss=''): """ Convert a JWKS to a KeyJar instance. :param jwks: String representation of a JWKS :return: A :py:class:`oic.utils.keyio.KeyJar` instance """ if not isinstance(jwks, dict): try: jwks = json.loads(jwks) except json.JSONDecodeError: raise ValueError('No proper JSON') kj = KeyJar() kj.import_jwks(jwks, issuer=iss) return kj
def get_key_from_headers(self, request, key_names, key_in_body=False): if key_in_body: try: body = json.loads(request.body.decode('utf-8')) for k in key_names: if k in body: return body[k] return None except json.JSONDecodeError: return None for n in key_names: name = n.upper().replace('-', '_') key_name = 'HTTP_{0}'.format(name) if key_name in request.META: return request.META[key_name] return None
def check_files(): default = {"VOLUME": 50, "MAX_LENGTH": 3700, "VOTE_ENABLED": True, "MAX_CACHE": 0, "SOUNDCLOUD_CLIENT_ID": None, "TITLE_STATUS": True, "AVCONV": False, "VOTE_THRESHOLD": 50, "SERVERS": {}} settings_path = "data/audio/settings.json" if not os.path.isfile(settings_path): print("Creating default audio settings.json...") dataIO.save_json(settings_path, default) else: # consistency check try: current = dataIO.load_json(settings_path) except JSONDecodeError: # settings.json keeps getting corrupted for unknown reasons. Let's # try to keep it from making the cog load fail. dataIO.save_json(settings_path, default) current = dataIO.load_json(settings_path) if current.keys() != default.keys(): for key in default.keys(): if key not in current.keys(): current[key] = default[key] print( "Adding " + str(key) + " field to audio settings.json") dataIO.save_json(settings_path, current)
def listen(self, ws, environ): self._add(ws) while not ws.closed: try: message = ws.receive() except WebSocketError: break if message is not None: try: message = json.loads(message) except json.JSONDecodeError: break # Odoo heavily relies on httprequests, for each message # a new httprequest will be created. This request will be # based on the original environ from the socket initialization # request. httprequest = werkzeug.wrappers.Request(environ.copy()) odoo.http.root.setup_session(httprequest) odoo.http.root.setup_db(httprequest) odoo.http.root.setup_lang(httprequest) gevent.spawn(self.respond, ws, httprequest, message) self._remove(ws)
def __get_conf_from_file(self, file_name): try: json_file = open(file_name, 'r') setting_dict = json.load(json_file) self.TRUST_P = setting_dict.get('trust_p', 0.45) self.COUNTRY = setting_dict.get('country', 'JP') self.COVER_MIN_SIZE = setting_dict.get('cover_min_size', 1000) self.SAVE_COVER_TYPE = setting_dict.get('save_cover_type', 1) self.DOWNLOAD_DIR = setting_dict.get('download_dir', './cover') self.SEARCH_POSTFIX_NAME = setting_dict.get('search_postfix_name', 'cue,m4a,flac') self.FILE_NAME_FORMAT = setting_dict.get('file_name_format', '') self.REPLACE_COVER = setting_dict.get('replace_cover', 1) self.SEARCH_THREAD_NUM = setting_dict.get('search_thread_num', 6) self.DOWNLOAD_THREAD_NUM = setting_dict.get('download_thread_num', 5) except FileNotFoundError: print("Conf file 'setting.json' not found") exit(1) except json.JSONDecodeError: print("Conf file err") exit(1) except Exception: print("Some err occur") exit(1)
def load_from_persistent_storage(self): # precedence: files > (modules, scripts) # Note that `files` will only be changed manually # It is not updated by changing `modules` or `scripts`. # However, `files` will affect `modules` and `scripts`. decoded = {} if self.project_directory and self.project_directory.exists(): for key, project_file in self.project_files.items(): try: decoded[key] = serialize.decode(category_type[key], json.loads(project_file.read_text())) except json.JSONDecodeError as err: logger.error(err) except FileNotFoundError as err: #logger.error(err) pass return decoded
def fetch_valid_json(cmd): """Assert stdout contains valid JSON :param cmd: program and arguments :type cmd: [str] :returns: parsed JSON AST """ returncode, stdout, stderr = exec_command(cmd) assert returncode == 0 assert stderr == b'' try: return json.loads(stdout.decode('utf-8')) except json.JSONDecodeError: error_text = 'Command "{}" returned invalid JSON'.format(' '.join(cmd)) raise Exception(error_text)
def do_request(ns): method = METHODS[ns.command] destinations = dictify(ns.destinations) if hasattr(ns, 'destinations') else None json = jsonify(ns, destinations=destinations if destinations else None) validate(ns.api_url, throw=True) response = requests.request(method, ns.api_url / 'autocert', json=json) status = response.status_code try: json = response.json() output(json) except JSONDecodeError as jde: print('status =', status) print('JSONDecodeError =', jde) print('text =', response.text) return -1 return status
def read(filepath): path = None try: if os.path.isfile(os.path.expanduser(filepath)): path = os.path.expanduser(filepath) elif os.path.isfile('./.keystorerc'): path = './.keystorerc' else: raise OSError('''The config file .keystorerc is not found in home or local working directory. Please refer to https://pypi.python.org/pypi/keystore for setting up a .keystorerc file.''') with open(path) as f: conf = json.loads(f.read()) return conf except OSError as err: print('Unable to open config: {}'.format(err), file=sys.stderr) return None except json.JSONDecodeError as err: print('Unable to parse config: {}'.format(err), file=sys.stderr) return None
def show_response(self, response): """Print an HTTP Response.""" if response.headers.get('Content-Type', None) == 'application/json': try: body = json.dumps(response.json(), indent=2) except json.JSONDecodeError: body = response.text else: body = response.text http_txt = self.HTTP_TPL.substitute( http_version=str(float(response.raw.version) / 10), status_code=response.status_code, reason=response.reason, headers=self.key_value_pairs(response.headers), body=body, ) return highlight(http_txt, self.http_lexer, self.formatter)
def populate_user_blacklist(self): for user in self.user_blacklist: user_id_url = self.url_user_detail % (user) info = self.s.get(user_id_url) # prevent error if 'Account of user was deleted or link is invalid from json import JSONDecodeError try: all_data = json.loads(info.text) except JSONDecodeError as e: self.write_log('Account of user %s was deleted or link is ' 'invalid' % (user)) else: # prevent exception if user have no media id_user = all_data['user']['id'] # Update the user_name with the user_id self.user_blacklist[user] = id_user log_string = "Blacklisted user %s added with ID: %s" % (user, id_user) self.write_log(log_string) time.sleep(5 * random.random())
def load_json(ctx, filename): if filename is None: if sys.stdin.isatty(): click.echo(ctx.get_usage()) click.echo("Try `jsoncut --help' for more information.") sys.exit(0) else: filename = '-' try: with click.open_file(filename) as file_: return json.load(file_) except EnvironmentError as e: if not sys.stdin.isatty(): sys.stdin.read() click.echo(exc.default_error_mesg_fmt(e), err=True) sys.exit(1) except json.JSONDecodeError as e: click.echo(exc.default_error_mesg_fmt(e), err=True) sys.exit(1)
def get_latest_log_entry_for(name,**kwargs): db = connect_db() c = db.cursor() refine_search = '' if 'successfully_uploaded' == True: refine_search = 'AND uploaded = 1 ' c.execute('SELECT info_json, uploadable, uploaded FROM log WHERE name=? '+refine_search+'ORDER BY time DESC LIMIT 1',(name,)) try: entry = c.fetchone() result = json.loads(entry[0]) uploadable = entry[1] uploaded = entry[2] except (json.JSONDecodeError,TypeError): result = {} uploadable = None uploaded = None return result, uploadable, uploaded
def get_candles_df(self, currency_pair, epoch_start, epoch_end, period=False): """ Returns candlestick chart data in pandas dataframe """ try: data = self.get_candles(currency_pair, epoch_start, epoch_end, period) df = pd.DataFrame(data) df = df.tail(1) df['close'] = df['close'].astype(float) df['volume'] = df['volume'].astype(float) df['pair'] = currency_pair return df except (PoloniexError, JSONDecodeError) as e: print() print(colored('!!! Got exception while retrieving polo data:' + str(e) + ', pair: ' + currency_pair, 'red')) return pd.DataFrame()
def recv(self): n = 0 self._soc.settimeout(0.1) try: got = self._soc.recv() while 1: try: val = json.loads(got) self._handle_event(val['method'], val['params']) n += 1 break except json.JSONDecodeError as e: self._handle_event(got[:e.pos]) n += 1 got = got[e.pos:] except websocket.WebSocketTimeoutException: pass self._soc.settimeout(None) return n
def read_json(conn, length): """ Reads json request of the given length from the client socket. Specified length must be available in the socket; otherwise function will raise BlockingIOError. :param conn: active client socket to read data from :param length: length of the json content :return: dictionary corresponding to received json :rtype: dict :raise json.JSONDecodeError: :raise BlockingIOError: """ buffer = io.BytesIO() while length > 0: chunk = conn.recv(min(2048, length)) length -= buffer.write(chunk) content = buffer.getvalue().decode('utf-8') if not content: return None else: return json.loads(content)
def parse_sofa_message(message): match = SOFA_REGEX.match(message) if not match: raise SyntaxError("Invalid SOFA message") body = match.group('json') try: body = json.loads(body) except json.JSONDecodeError: raise SyntaxError("Invalid SOFA message: body is not valid json") type = match.group('type').lower() if type not in VALID_SOFA_TYPES: raise SyntaxError("Invalid SOFA type") if type not in IMPLEMENTED_SOFA_TYPES: raise NotImplementedError("SOFA type '{}' has not been implemented yet".format(match.group('type'))) try: return IMPLEMENTED_SOFA_TYPES[type](**body) except TypeError: raise SyntaxError("Invalid SOFA message: body contains unexpected fields")
def __call__(self, request): if isinstance(request, (bytes, str)): try: request = json_decode(request) except json.JSONDecodeError: return _parse_error(request) # check batch request if isinstance(request, list): resp = [] for r in request: result = await self._handle_single_request(r) if result: resp.append(result) # if all were notifications if not resp: return None return resp # standard single request return await self._handle_single_request(request)
def __init__(self, *args, **kwargs): super_self = super() super_self.__init__() super_self.__setattr__('__values__', {}) if len(args) > 0: json_obj = args[0] else: json_obj = {} if isinstance(json_obj, str): try: json_obj = json.loads(json_obj) except json.JSONDecodeError: raise TypeError('json object is not valid (dict-like or json string) for conversion to model: {!r}' .format(json_obj)) self.from_dict(json_obj) self.from_dict(kwargs)
def json(self, branch='master', filename=''): """Retrieve _filename_ from GitLab. Args: branch (str): Git Branch to find file. filename (str): Name of file to retrieve. Returns: dict: Decoded JSON. Raises: SystemExit: Invalid JSON provided. """ file_contents = self.get(branch=branch, filename=filename) try: json_dict = json.loads(file_contents) # TODO: Use json.JSONDecodeError when Python 3.4 has been deprecated except ValueError as error: msg = ('"{filename}" appears to be invalid json. ' 'Please validate it with http://jsonlint.com. ' 'JSON decoder error:\n' '{error}').format( filename=filename, error=error) raise SystemExit(msg) LOG.debug('JSON object:\n%s', json_dict) return json_dict
def process_log(file_path): """ Expects header as first line in log file. Header begins with comment character '#'. The line is a json string dump of a dictionary that contains the following keys: name: str, Task name maintainer: str, Name of person tags: list of tags associated with the task. can be empty properties: list of properties associated with the task. can be empty run_id: str, a run ID for the task run timestamp: str, timestamp for the task run :param file_path: :return: """ # read header if isinstance(file_path, str): with open(file_path) as f: line = f.readline() else: line = file_path.readline() if not line.startswith("#"): raise ValueError("Expecting header in log file") try: metadata = json.loads(line[1:]) if 'timestamp' in metadata: metadata['timestamp'] = dateutil_parse(metadata['timestamp']) except JSONDecodeError as e: metadata = {"name": "", "timestamp": "", "run_id": ""} df = parse_log(file_path) return df, metadata
def validate_json(value): try: json.loads(value) except json.JSONDecodeError: raise ValidationError("Not valid json")
def submit(request): """Endpoint for submitting new crawls. Expects POST['data'] to be populated with data for a single document group.""" # TODO authentication? Secret keys? # TODO stop processing the documents when submitted; use processing queues input_raw = request.POST.get('data') if not input_raw: return HttpResponseBadRequest('POST["data"] is not set!') try: input = json.loads(input_raw) except json.JSONDecodeError: return HttpResponseBadRequest('POST["data"] is not valid json') input_hash = hash.dict_sha1(input) submitted, created = models.SubmittedData.objects.update_or_create( sha1=input_hash, data=input ) try: doc, new = process(submitted) index_data(doc) except ProcessingInputError as e: print(e) return HttpResponseServerError('error: ' + str(e)) response = { 'status': 'ok', 'new': new, } return JsonResponse(response)
def method(self, key, **data): """ Return a result of executing vk's method `method` Function for special cases only! This method doesn't process nor errors nor captcha. """ url = f"https://api.vk.com/method/{key}?access_token={self.token}&v={VERSION}" if data is None: data = {} if data.get("_replace_nl", True): for k, v in data.items(): data[k] = v.replace("\n", "<br>") if "_replace_nl" in data: del data["_replace_nl"] async with self.session.post(url, data=data, **self.req_kwargs) as resp: try: results = json_iter_parse(await resp.text()) for data in results: if 'response' in data: return data['response'] except json.JSONDecodeError: self.logger.error("Error while executing vk method: vk's response is wrong!") return False return False
def _get_page(self, page_number): data = { 'mid': str(self._mid), 'page': str(page_number), '_': '1496132105785' } pages = 0 fansnumber = 0 fans_ids = "" try: url = "http://space.bilibili.com/ajax/friend/GetFansList?" + urlencode(data) response = requests.get(url) if response.status_code != 200: return None html_cont = response.text try: data = json.loads(html_cont) if data and (data.get('status') is True): if data and 'data' in data.keys(): if(page_number == 1): pages = data.get('data').get('pages') fansnumber = data.get('data').get('results') for fans in data.get('data').get('list'): fans_ids = str(fans.get('fid')) + ',' + fans_ids elif (data.get('data') == "????????"): pages = 0 fansnumber = 0 except JSONDecodeError: pass self._fans_ids = fans_ids + self._fans_ids return pages, fansnumber except RequestException: return self._get_page(page_number)
def _getinfo(self, cont): aids = "" tag_ids = "" tag_names = "" tag_counts = "" try: data = json.loads(cont) if data and 'data' in data.keys(): for video in data.get('data').get('vlist'): aids = str(video.get('aid')) + ',' + aids return aids except JSONDecodeError: pass