我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用json.load()。
def get_named_set(lang_codes, feature_set): if feature_set == 'id': return get_id_set(lang_codes) if feature_set not in FEATURE_SETS: print("ERROR: Invalid feature set " + feature_set, file=sys.stderr) sys.exit() filename, source, prefix = FEATURE_SETS[feature_set] feature_database = np.load(filename) lang_codes = [ get_language_code(l, feature_database) for l in lang_codes ] lang_indices = [ get_language_index(l, feature_database) for l in lang_codes ] feature_names = get_feature_names(prefix, feature_database) feature_indices = [ get_feature_index(f, feature_database) for f in feature_names ] source_index = get_source_index(source, feature_database) feature_values = feature_database["data"][lang_indices,:,:][:,feature_indices,:][:,:,source_index] feature_values = feature_values.squeeze(axis=2) return feature_names, feature_values
def directory_has_smart_contract(location): # returns bool if there is a tsol contract in said directory # probably makes more sense to put this inside of the tsol package code_path = glob.glob(os.path.join(location, '*.tsol')) example = glob.glob(os.path.join(location, '*.json')) assert len(code_path) > 0 and len(example) > 0, 'Could not find *.tsol and *.json files in provided directory.' # pop off the first file name and turn the code into a file object code = open(code_path[0]) # turn the example into a dict with open(example[0]) as e: example = json.load(e) try: tsol.compile(code, example) except Exception as e: print(e) return False return True
def load_previous(self, path=None): """Load previous copy of config from disk. In normal usage you don't need to call this method directly - it is called automatically at object initialization. :param path: File path from which to load the previous config. If `None`, config is loaded from the default location. If `path` is specified, subsequent `save()` calls will write to the same path. """ self.path = path or self.path with open(self.path) as f: self._prev_dict = json.load(f) for k, v in copy.deepcopy(self._prev_dict).items(): if k not in self: self[k] = v
def info_to_db(db, name): cur = db.cursor() with open(name) as data_file: data = json.load(data_file) subject = data['subject'] code = data['code'] title = data['title'] credit = data['credit'] desc = data['desc'] geneds = data['geneds'] if geneds is None: geneds = '' else: geneds = ','.join(geneds) url = data['url'] cmd = 'INSERT INTO `CourseExplorer` (`Id`, `Subject`, `Code`, `Title`, `Credit`, `Description`, `GenEd`, ' \ '`Url`) VALUES(NULL, %s, %s, %s, %s, %s, %s, %s) ' val = (subject, code, title, credit, desc, geneds, url) cur.execute(cmd, val) # if subject == 'PHYS': # print cmd % val
def _set_asset_paths(self, app): """ Read in the manifest json file which acts as a manifest for assets. This allows us to get the asset path as well as hashed names. :param app: aiohttp application :return: None """ webpack_stats = app.settings.WEBPACK_MANIFEST_PATH try: with open(webpack_stats, 'r') as stats_json: stats = json.load(stats_json) if app.settings.WEBPACK_ASSETS_URL: self.assets_url = app.settings.WEBPACK_ASSETS_URL else: self.assets_url = stats['publicPath'] self.assets = stats['assets'] except IOError: raise RuntimeError( "'WEBPACK_MANIFEST_PATH' is required to be set and " "it must point to a valid json file.")
def _get_external_data(url): result = {} try: # urlopen might fail if it runs into redirections, # because of Python issue #13696. Fixed in locators # using a custom redirect handler. resp = urlopen(url) headers = resp.info() ct = headers.get('Content-Type') if not ct.startswith('application/json'): logger.debug('Unexpected response for JSON request: %s', ct) else: reader = codecs.getreader('utf-8')(resp) #data = reader.read().decode('utf-8') #result = json.loads(data) result = json.load(reader) except Exception as e: logger.exception('Failed to get external data for %s: %s', url, e) return result
def save(self, pypi_version, current_time): # Check to make sure that we own the directory if not check_path_owner(os.path.dirname(self.statefile_path)): return # Now that we've ensured the directory is owned by this user, we'll go # ahead and make sure that all our directories are created. ensure_dir(os.path.dirname(self.statefile_path)) # Attempt to write out our version check file with lockfile.LockFile(self.statefile_path): if os.path.exists(self.statefile_path): with open(self.statefile_path) as statefile: state = json.load(statefile) else: state = {} state[sys.prefix] = { "last_check": current_time.strftime(SELFCHECK_DATE_FMT), "pypi_version": pypi_version, } with open(self.statefile_path, "w") as statefile: json.dump(state, statefile, sort_keys=True, separators=(",", ":"))
def load(self): # XXX JSON is not a great database for path in load_config_paths('wheel'): conf = os.path.join(native(path), self.CONFIG_NAME) if os.path.exists(conf): with open(conf, 'r') as infile: self.data = json.load(infile) for x in ('signers', 'verifiers'): if not x in self.data: self.data[x] = [] if 'schema' not in self.data: self.data['schema'] = self.SCHEMA elif self.data['schema'] != self.SCHEMA: raise ValueError( "Bad wheel.json version {0}, expected {1}".format( self.data['schema'], self.SCHEMA)) break return self
def __load_layout(self, config): var = config.get_value('engine/replace-with-kanji-python', 'layout') if var is None or var.get_type_string() != 's': path = os.path.join(os.getenv('IBUS_REPLACE_WITH_KANJI_LOCATION'), 'layouts') path = os.path.join(path, 'roomazi.json') if var: config.unset('engine/replace-with-kanji-python', 'layout') else: path = var.get_string() logger.info("layout: %s", path) layout = roomazi.layout # Use 'roomazi' as default try: with open(path) as f: layout = json.load(f) except ValueError as error: logger.error("JSON error: %s", error) except OSError as error: logger.error("Error: %s", error) except: logger.error("Unexpected error: %s %s", sys.exc_info()[0], sys.exc_info()[1]) self.__to_kana = self.__handle_roomazi_layout if 'Type' in layout: if layout['Type'] == 'Kana': self.__to_kana = self.__handle_kana_layout return layout
def _get_cache(ttl, cache_path): ''' If url contains valid cache, returns it, else returns empty list. ''' # Check if we have a valid cached version. try: cached_time = os.path.getmtime(cache_path) except OSError: return [] if current_time() - cached_time < ttl: log.debug('%s is less than ttl', cache_path) try: with open(cache_path) as json_file: loaded_json = json.load(json_file) return loaded_json except IOError: return [] except ValueError: log.error('%s was not json formatted', cache_path) return [] else: log.debug('%s was older than ttl', cache_path) return []
def test_update_text(self): with open(os.path.join(self.FIXTURES_DIR, 'eyes_in_the_skies.json')) as f: final_data = json.load(f) original_text = final_data['RTR']['cards'][0]['originalText'] final_text = final_data['RTR']['cards'][0]['text'] # Copy the data and munge it into its original state. original_data = copy.deepcopy(final_data) original_data['RTR']['cards'][0]['text'] = original_text # Import the original data. parse_data(original_data, ['RTR']) eyes_in_the_skies = Card.objects.first() self.assertEqual(eyes_in_the_skies.text, original_text) # Import the final, updated data. parse_data(final_data, ['RTR']) eyes_in_the_skies.refresh_from_db() self.assertEqual(eyes_in_the_skies.text, final_text)
def test_update_types(self): with open(os.path.join(self.FIXTURES_DIR, 'jackal_pup.json')) as f: final_data = json.load(f) # Copy the data and munge the types. original_data = copy.deepcopy(final_data) original_subtype = 'Hound' original_data['TMP']['cards'][0]['subtypes'] = [original_subtype] # Import the original data. parse_data(original_data, ['TMP']) jackal_pup = Card.objects.first() self.assertEqual(jackal_pup.subtypes.count(), 1) self.assertEqual(jackal_pup.subtypes.first().name, original_subtype) # Import the final, updated data. parse_data(final_data, ['TMP']) jackal_pup.refresh_from_db() self.assertEqual(jackal_pup.subtypes.count(), 1) self.assertEqual(jackal_pup.subtypes.first().name, 'Jackal') # The Hound subtype has been deleted. self.assertFalse(CardSubtype.objects.filter(name=original_subtype).exists())
def test_update_loyalty(self): """ Simulates the upgrade process from version 0.2 to version 0.4. """ with open(os.path.join(self.FIXTURES_DIR, 'vraska_the_unseen.json')) as f: final_data = json.load(f) # Copy the data and munge it to remove the loyalty. original_data = copy.deepcopy(final_data) del original_data['RTR']['cards'][0]['loyalty'] # Import the original data. parse_data(original_data, ['RTR']) vraska = Card.objects.first() self.assertIsNone(vraska.loyalty) # Import the final, updated data. parse_data(final_data, ['RTR']) vraska.refresh_from_db() self.assertEqual(vraska.loyalty, 5)
def main(): if len(sys.argv) < 2: sys.stderr.write("USAGE: %s measurement\n" % sys.argv[0]) sys.exit(1) path = sys.argv[1] with open(os.path.join(path, "metadata.json")) as f: metadata = json.load(f) start = date(metadata["start"][:-1]) end = date(metadata["start"][:-1]) print('open measurement "%s" from "%s" to "%s"', metadata["name"], start, end) for service in metadata["services"]: print('open service "%s"' % service["name"]) with open(os.path.join(path, service["filename"])) as csvfile: r = csv.DictReader(csvfile, dialect=csv.excel_tab) for row in r: print(row["time"])
def configure(self): # load config values try: with open(self.configfile) as configfile_contents: self.config = json.load(configfile_contents) except: self.config = {} try: self.agent_services = self.api_session.get(self.api_endpoint + '/agent/services?stale').json() except: print_exc() exit(135) self.managed_service = self.agent_services[self.service] if self.managed_service['Tags'] == None: self.managed_service['Tags'] = [] if self.role_source == "facter": self.get_facter_state(self.DEFAULT_FACTERFILE) else: print("!! unsupported PG role source !!") exit(140)
def prompt_pick_backup(message): """Prompts the user to pick an existing database, and returns the selected choice database ID and its metadata""" # First load all the saved databases (splitting extension and path) saved_db = [path.splitext(path.split(f)[1])[0] for f in glob('backups/*.tlo')] # Then prompt the user print('Available backups databases:') for i, db_id in enumerate(saved_db): metadata = get_metadata(db_id) print('{}. {}, ID: {}'.format(i + 1, metadata.get('peer_name', '???'), db_id)) db_id = saved_db[get_integer(message, 1, len(saved_db)) - 1] return db_id, get_metadata(db_id)
def load(self): # XXX JSON is not a great database for path in load_config_paths('wheel'): conf = os.path.join(native(path), self.CONFIG_NAME) if os.path.exists(conf): with open(conf, 'r') as infile: self.data = json.load(infile) for x in ('signers', 'verifiers'): if x not in self.data: self.data[x] = [] if 'schema' not in self.data: self.data['schema'] = self.SCHEMA elif self.data['schema'] != self.SCHEMA: raise ValueError( "Bad wheel.json version {0}, expected {1}".format( self.data['schema'], self.SCHEMA)) break return self
def read_store(self): """ Get the store from file. """ with open(self._path,'r',encoding='ascii') as fr: outer_data = json.load(fr) try: validate(outer_data,OUTER_SCHEMA) except ValidationError: raise SSError("Invalid outer schema") enc_bytes = hex_str_to_bytes(outer_data["enc_blob"]) try: inner_data = self._box.decrypt(enc_bytes) except nacl.exceptions.CryptoError: raise SSCryptoError("Wrong password") return json.loads(inner_data.decode('utf-8'))
def __init__(self, path='config.json'): CONFIG_PATH = os.path.join(sys.path[0], path) dict.__init__(self) self.path = path try: self.load() except: print "Creando fichero de configuracion" self['hs_threshold1'] = 100 self['hs_threshold2'] = 200 self['hs_apertura'] = 1 self['hs_blur'] = 3 self['hs_minLineLength'] = 100 self['hs_maxLineGap'] = 10 self['hs_kernel_dilate'] = 3 self['hs_kernel_erode'] = 3 self['hs_param1'] = 3 self['hs_param2'] = 3 self['hs_param3'] = 3 self['hs_dilate_iteracciones'] = 1 self.save()
def test_match_fetching(self): """Check if the server handle correctly a normal request.""" # Forward sample data fetched from the Riot API directly as result this_dir = os.path.dirname(os.path.abspath(__file__)) with open(os.sep.join([this_dir, "samples", self.MATCH_DATA])) as f: self.match_data = json.load(f) # Setup mocks self.service.cache_manager.find_match.return_value = None self.service.riot_api_handler.get_match.return_value = self.match_data response = self.stub.Match(service_pb2.MatchRequest(id=4242, region=constants_pb2.EUW)) # Check the conversion of the sample is matching the response converter = JSONConverter(None) expected = converter.json_match_to_match_pb(self.match_data) self.assertEqual(response, expected) self.assertTrue(self.service.cache_manager.save_match.called)
def test_match_from_cache(self): """Check if the server returns a match from the cache.""" # Forward sample data fetched from the Riot API directly as result this_dir = os.path.dirname(os.path.abspath(__file__)) with open(os.sep.join([this_dir, "samples", self.MATCH_DATA])) as f: self.match_data = json.load(f) # Setup mocks self.service.cache_manager.find_match.return_value = self.match_data response = self.stub.Match(service_pb2.MatchRequest(id=4242, region=constants_pb2.EUW)) # Check the conversion of the sample is matching the response self.assertTrue(self.service.cache_manager.find_match.called) self.assertFalse(self.service.riot_api_handler.get_match.called) self.assertFalse(self.service.cache_manager.save_match.called)
def load(self): # Prepare + load directory. super().load() # Load the files and parse JSON. parsed_settings = dict() try: for file_name in self.files: file_path = os.path.join(self.directory, file_name) with open(file_path, 'r') as file_handle: parsed_settings.update(json.load(file_handle)) except json.JSONDecodeError as e: raise ImproperlyConfigured( 'Your settings file(s) contain invalid JSON syntax! Please fix and restart!, {}'.format(str(e)) ) # Loop and set in local settings (+ uppercase keys). for key, value in parsed_settings.items(): self.settings[key.upper()] = value
def load_endpoints(self): self.choose_endpoint.endpoints.clear() for name in listdir(str(BASE_PATH / 'endpoints')): if name.endswith('.json'): item = QListWidgetItem(name.split('.json')[0], self.choose_endpoint.endpoints) item.setFlags(item.flags() & ~Qt.ItemIsEnabled) pb_msg_to_endpoints = defaultdict(list) with open(str(BASE_PATH / 'endpoints' / name)) as fd: for endpoint in load(fd, object_pairs_hook=OrderedDict): pb_msg_to_endpoints[endpoint['request']['proto_msg'].split('.')[-1]].append(endpoint) for pb_msg, endpoints in pb_msg_to_endpoints.items(): item = QListWidgetItem(' ' * 4 + pb_msg, self.choose_endpoint.endpoints) item.setFlags(item.flags() & ~Qt.ItemIsEnabled) for endpoint in endpoints: path_and_qs = '/' + endpoint['request']['url'].split('/', 3).pop() item = QListWidgetItem(' ' * 8 + path_and_qs, self.choose_endpoint.endpoints) item.setData(Qt.UserRole, endpoint) self.set_view(self.choose_endpoint)
def load_barcode_dist(filename, barcode_whitelist, gem_group, proportions=True): """ Load barcode count distribution from a json file """ # Input barcode whitelist must be an ordered type; # safeguard against it going out of sync with the distribution file assert barcode_whitelist is None or isinstance(barcode_whitelist, list) if not os.path.isfile(filename): return None with open(filename, 'r') as f: values = json.load(f) start = (gem_group-1)*len(barcode_whitelist) end = gem_group*len(barcode_whitelist) barcode_counts = {bc: value for bc, value in zip(barcode_whitelist, values[start:end])} if proportions: total_barcode_counts = sum(barcode_counts.values()) barcode_dist = {bc: tk_stats.robust_divide(float(value), float(total_barcode_counts)) for bc, value in barcode_counts.iteritems()} return barcode_dist else: return barcode_counts
def split(args): assert len(args.read1s) == len(args.read2s) chunks = [] # Determine the number of buckets required to achieve # the given chunk size. chunks_per_gem_group = {} with open(args.reads_summary) as f: reads_summary = json.load(f) for gg in args.gem_groups: readpairs = reads_summary['%d_total_reads_per_gem_group' % gg] chunks_per_gem_group[str(gg)] = max(2, int(math.ceil(float(readpairs) / \ args.readpairs_per_chunk))) for fastq1, fastq2 in itertools.izip(args.read1s, args.read2s): chunks.append({ 'read1s_chunk': fastq1, 'read2s_chunk': fastq2, 'chunks_per_gem_group': chunks_per_gem_group, }) return {'chunks': chunks}
def main(args, outs): # Write read_chunk for consumption by Rust with open("chunk_args.json", "w") as f: json.dump(args.read_chunk, f) output_path = martian.make_path("") prefix = "fastq_chunk" chunk_reads_args = ['chunk_reads', '--reads-per-fastq', str(args.reads_per_file), output_path, prefix, "--martian-args", "chunk_args.json"] print "running chunk reads: [%s]" % str(chunk_reads_args) subprocess.check_call(chunk_reads_args) with open(os.path.join(output_path, "read_chunks.json")) as f: chunk_results = json.load(f) outs.out_chunks = [] # Write out a new chunk entry for each resulting chunk for chunk in chunk_results: print args.read_chunk chunk_copy = args.read_chunk.copy() print chunk_copy chunk_copy['read_chunks'] = chunk outs.out_chunks.append(chunk_copy)
def load_model(file_path): with open(file_path, "r") as f: decision_trees, sparse_features, total_feature_count = load(f) dts = {} for key in decision_trees.keys(): dts[key] = [] for dt in decision_trees[key]: d = DecisionTree([]) for k in dt['model'].keys(): setattr(d, k, dt['model'][k]) dt['model'] = d dts[key].append(dt) return ClassificationEngine( decision_trees=dts, sparse_features=sparse_features, total_feature_count=total_feature_count )
def requests_with_cache(dir): def decorator(func): def wrapper(**kwargs): cache_key = str(kwargs.get("param", "default.json")) cache_url = dir + "/" + cache_key.replace("/", "-").replace("_", "-") if os.path.isfile(cache_url): with open(cache_url, 'r') as f: print(cache_url) return json.load(f) with open(cache_url, 'w+') as f: ret = func(**kwargs) json.dump(ret, f) return ret return wrapper return decorator
def main(): if len(sys.argv) == 1: infile = sys.stdin outfile = sys.stdout elif len(sys.argv) == 2: infile = open(sys.argv[1], 'rb') outfile = sys.stdout elif len(sys.argv) == 3: infile = open(sys.argv[1], 'rb') outfile = open(sys.argv[2], 'wb') else: raise SystemExit(sys.argv[0] + " [infile [outfile]]") try: obj = json.load(infile) except ValueError, e: raise SystemExit(e) json.dump(obj, outfile, sort_keys=True, indent=4) outfile.write('\n')
def import_policy(cb, parser, args): p = cb.create(Policy) p.policy = json.load(open(args.policyfile, "r")) p.description = args.description p.name = args.name p.priorityLevel = args.prioritylevel p.version = 2 try: p.save() except ServerError as se: print("Could not add policy: {0}".format(str(se))) except Exception as e: print("Could not add policy: {0}".format(str(e))) else: print("Added policy. New policy ID is {0}".format(p.id))
def create_graph(): logfile = 'result/log' xs = [] ys = [] ls = [] f = open(logfile, 'r') data = json.load(f) print(data) for d in data: xs.append(d["iteration"]) ys.append(d["main/accuracy"]) ls.append(d["main/loss"]) plt.clf() plt.cla() plt.hlines(1, 0, np.max(xs), colors='r', linestyles="dashed") # y=-1, 1?????? plt.title(r"loss/accuracy") plt.plot(xs, ys, label="accuracy") plt.plot(xs, ls, label="loss") plt.legend() plt.savefig("result/log.png")
def associate_lambda(group_config, lambda_config): """ Associate the Lambda described in the `lambda_config` with the Greengrass Group described by the `group_config` :param group_config: `gg_group_setup.GroupConfigFile` to store the group :param lambda_config: the configuration describing the Lambda to associate with the Greengrass Group :return: """ with open(lambda_config, "r") as f: cfg = json.load(f) config = GroupConfigFile(config_file=group_config) lambdas = config['lambda_functions'] lambdas[cfg['func_name']] = { 'arn': cfg['lambda_arn'], 'arn_qualifier': cfg['lambda_alias'] } config['lambda_functions'] = lambdas
def load_cache(self, file): import json try: with open(file, 'r') as f: self.cache = json.load(f) except: # Fail silently pass
def editPipeline(args, config): pipelineDbUtils = PipelineDbUtils(config) request = json.loads(pipelineDbUtils.getJobInfo(select=["request"], where={"job_id": args.jobId})[0].request) _, tmp = mkstemp() with open(tmp, 'w') as f: f.write("{data}".format(data=json.dumps(request, indent=4))) if "EDITOR" in os.environ.keys(): editor = os.environ["EDITOR"] else: editor = "/usr/bin/nano" if subprocess.call([editor, tmp]) == 0: with open(tmp, 'r') as f: request = json.load(f) pipelineDbUtils.updateJob(args.jobId, keyName="job_id", setValues={"request": json.dumps(request)}) else: print "ERROR: there was a problem editing the request" exit(-1)
def load_messages(): filename = 'guideline_content.json' file_path = ('%s/guideline/%s' % ( os.path.dirname(os.path.realpath(__file__)), filename )) with open(file_path) as json_data: data = json.load(json_data) return data["body"]["messages"]
def load_guidelines(guideline_key): filename = 'guideline_%s.json' % guideline_key file_path = ('%s/guideline/%s' % ( os.path.dirname(os.path.realpath(__file__)), filename )) with open(file_path) as json_data: data = json.load(json_data) return data
def _load_ready_file(self): if self._ready is not None: return if os.path.exists(self._ready_file): with open(self._ready_file) as fp: self._ready = set(json.load(fp)) else: self._ready = set()
def get_matchmaker_map(mm_file='/etc/oslo/matchmaker_ring.json'): mm_map = {} if os.path.isfile(mm_file): with open(mm_file, 'r') as f: mm_map = json.load(f) return mm_map
def _git_yaml_load(projects_yaml): """ Load the specified yaml into a dictionary. """ if not projects_yaml: return None return yaml.load(projects_yaml)
def get_id_set(lang_codes): feature_database = np.load("family_features.npz") lang_codes = [ get_language_code(l, feature_database) for l in lang_codes ] all_languages = list(feature_database["langs"]) feature_names = [ "ID_" + l.upper() for l in all_languages ] values = np.zeros((len(lang_codes), len(feature_names))) for i, lang_code in enumerate(lang_codes): feature_index = get_language_index(lang_code, feature_database) values[i, feature_index] = 1.0 return feature_names, values
def generate(location): # cli wizard for creating a new contract from a template if directory_has_smart_contract(location): example_payload = json.load(open(glob.glob(os.path.join(location, '*.json'))[0])) print(example_payload) for k, v in example_payload.items(): value = input(k + ':') if value != '': example_payload[k] = value print(example_payload) code_path = glob.glob(os.path.join(location, '*.tsol')) tsol.compile(open(code_path[0]), example_payload) print('Code compiles with new payload.') selection = '' while True: selection = input('(G)enerate Solidity contract or (E)xport implementation:') if selection.lower() == 'g': output_name = input('Name your contract file without an extension:') code = tsol.generate_code(open(code_path[0]).read(), example_payload) open(os.path.join(location, '{}.sol'.format(output_name)), 'w').write(code) break if selection.lower() == 'e': output_name = input('Name your implementation file without an extension:') json.dump(example_payload, open(os.path.join(location, '{}.json'.format(output_name)), 'w')) break else: print('Provided directory does not contain a *.tsol and *.json or does not compile.')
def __init__(self, path=None, filename=None): self.kafka_offset_spec_file = os.path.join( (path or "/tmp/"), (filename or 'kafka_offset_specs.json')) self._kafka_offsets = {} if os.path.exists(self.kafka_offset_spec_file): try: f = open(self.kafka_offset_spec_file) kafka_offset_dict = json.load(f) for key, value in kafka_offset_dict.items(): log.info("Found offset %s: %s", key, value) self._kafka_offsets[key] = OffsetSpec( app_name=value.get('app_name'), topic=value.get('topic'), partition=value.get('partition'), from_offset=value.get('from_offset'), until_offset=value.get('until_offset'), batch_time=value.get('batch_time'), last_updated=value.get('last_updated'), revision=value.get('revision') ) except Exception: log.info('Invalid or corrupts offsets file found at %s,' ' starting over' % self.kafka_offset_spec_file) else: log.info('No kafka offsets found at startup')
def load_offset_file_as_json(self, file_path): with open(file_path, 'r') as f: json_file = json.load(f) return json_file
def register(self, name, serializer): """Register ``serializer`` object under ``name``. Raises :class:`AttributeError` if ``serializer`` in invalid. .. note:: ``name`` will be used as the file extension of the saved files. :param name: Name to register ``serializer`` under :type name: ``unicode`` or ``str`` :param serializer: object with ``load()`` and ``dump()`` methods """ # Basic validation getattr(serializer, 'load') getattr(serializer, 'dump') self._serializers[name] = serializer
def load(cls, file_obj): """Load serialized object from open JSON file. .. versionadded:: 1.8 :param file_obj: file handle :type file_obj: ``file`` object :returns: object loaded from JSON file :rtype: object """ return json.load(file_obj)
def load(cls, file_obj): """Load serialized object from open pickle file. .. versionadded:: 1.8 :param file_obj: file handle :type file_obj: ``file`` object :returns: object loaded from pickle file :rtype: object """ return cPickle.load(file_obj)
def load(cls, file_obj): """Load serialized object from open pickle file. .. versionadded:: 1.8 :param file_obj: file handle :type file_obj: ``file`` object :returns: object loaded from pickle file :rtype: object """ return pickle.load(file_obj)