我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用simplejson.load()。
def __init__(self, path='config.json'): CONFIG_PATH = os.path.join(sys.path[0], path) dict.__init__(self) self.path = path try: self.load() except: print "Creando fichero de configuracion" self['hs_threshold1'] = 100 self['hs_threshold2'] = 200 self['hs_apertura'] = 1 self['hs_blur'] = 3 self['hs_minLineLength'] = 100 self['hs_maxLineGap'] = 10 self['hs_kernel_dilate'] = 3 self['hs_kernel_erode'] = 3 self['hs_param1'] = 3 self['hs_param2'] = 3 self['hs_param3'] = 3 self['hs_dilate_iteracciones'] = 1 self.save()
def test_object_pairs_hook(self): s = '{"xkd":1, "kcw":2, "art":3, "hxm":4, "qrt":5, "pad":6, "hoy":7}' p = [("xkd", 1), ("kcw", 2), ("art", 3), ("hxm", 4), ("qrt", 5), ("pad", 6), ("hoy", 7)] self.assertEqual(json.loads(s), eval(s)) self.assertEqual(json.loads(s, object_pairs_hook=lambda x: x), p) self.assertEqual(json.load(StringIO(s), object_pairs_hook=lambda x: x), p) od = json.loads(s, object_pairs_hook=OrderedDict) self.assertEqual(od, OrderedDict(p)) self.assertEqual(type(od), OrderedDict) # the object_pairs_hook takes priority over the object_hook self.assertEqual(json.loads(s, object_pairs_hook=OrderedDict, object_hook=lambda x: None), OrderedDict(p))
def main(): if len(sys.argv) == 1: infile = sys.stdin outfile = sys.stdout elif len(sys.argv) == 2: infile = open(sys.argv[1], 'r') outfile = sys.stdout elif len(sys.argv) == 3: infile = open(sys.argv[1], 'r') outfile = open(sys.argv[2], 'w') else: raise SystemExit(sys.argv[0] + " [infile [outfile]]") with infile: try: obj = json.load(infile, object_pairs_hook=json.OrderedDict, use_decimal=True) except ValueError: raise SystemExit(sys.exc_info()[1]) with outfile: json.dump(obj, outfile, sort_keys=True, indent=' ', use_decimal=True) outfile.write('\n')
def _get_config_dict(): """Returns a dict containing the key/values in the config file. If the file doesn't exist, it is created, and an empty dict is returned. """ try: conf_file = file(config_file_path) config_dct = json.load(conf_file) conf_file.close() except IOError: # File doesn't exist config_dct = {} # Create the file _write_config_dict(config_dct) return config_dct
def get_host_info(self): ''' Get variables about a specific host ''' if len(self.index) == 0: # Need to load index from cache self.load_index_from_cache() if not self.args.host in self.index: # try updating the cache self.do_api_calls_update_cache() if not self.args.host in self.index: # host might not exist anymore return self.json_format_dict({}, True) (region, instance_id) = self.index[self.args.host] instance = self.get_instance(region, instance_id) return self.json_format_dict(self.get_host_info_dict_from_instance(instance), True)
def main(): import sys if len(sys.argv) == 1: infile = sys.stdin outfile = sys.stdout elif len(sys.argv) == 2: infile = open(sys.argv[1], 'rb') outfile = sys.stdout elif len(sys.argv) == 3: infile = open(sys.argv[1], 'rb') outfile = open(sys.argv[2], 'wb') else: raise SystemExit("%s [infile [outfile]]" % (sys.argv[0],)) try: obj = simplejson.load(infile) except ValueError, e: raise SystemExit(e) simplejson.dump(obj, outfile, sort_keys=True, indent=4) outfile.write('\n')
def main(): """Main script function""" # Create simulation object, and start streaming SPEAD heaps sender = PulsarSender() # Parse command line arguments args = parse_command_line() # Initialise logging. _log = _init_log(level=logging.DEBUG if args.verbose else logging.INFO) # Load configuration. _log.info('Loading config: {}'.format(args.config_file.name)) _config = json.load(args.config_file) if args.print_settings: _log.debug('Settings:\n {}'.format(json.dumps(_config, indent=4, sort_keys=True))) sender.send(_config, _log, 1, 1)
def main(): if len(sys.argv) == 1: infile = sys.stdin outfile = sys.stdout elif len(sys.argv) == 2: infile = open(sys.argv[1], 'rb') outfile = sys.stdout elif len(sys.argv) == 3: infile = open(sys.argv[1], 'rb') outfile = open(sys.argv[2], 'wb') else: raise SystemExit(sys.argv[0] + " [infile [outfile]]") try: obj = json.load(infile, object_pairs_hook=json.OrderedDict, use_decimal=True) except ValueError, e: raise SystemExit(e) json.dump(obj, outfile, sort_keys=True, indent=' ', use_decimal=True) outfile.write('\n')
def parse_readme_json(self, path, **parser_kwargs): """Parse preprocessed README.json file. :param path: path to README.json file :type path: str :param parser_kwargs: additional arguments for markup parser :return: parsed raw/plain content """ with open(path, 'r') as f: file_content = json.load(f) if not file_content: raise ValueError("No content in '%s'" % path) if 'content' not in file_content.keys(): raise ValueError("No content in '%s', bogus README.json format?" % path) if 'type' not in file_content.keys(): raise ValueError("No content type in '%s', bogus README.json format?" % path) return self.parse(file_content['content'], file_content['type'], **parser_kwargs)
def main(): if len(sys.argv) == 1: infile = sys.stdin outfile = sys.stdout elif len(sys.argv) == 2: infile = open(sys.argv[1], 'rb') outfile = sys.stdout elif len(sys.argv) == 3: infile = open(sys.argv[1], 'rb') outfile = open(sys.argv[2], 'wb') else: raise SystemExit(sys.argv[0] + " [infile [outfile]]") try: try: obj = json.load(infile, object_pairs_hook=json.OrderedDict, use_decimal=True) except ValueError, e: raise SystemExit(e) json.dump(obj, outfile, sort_keys=True, indent=' ', use_decimal=True) outfile.write('\n') finally: infile.close() outfile.close()
def get_stats(soup): """ Get Question stats :param soup: :return: """ question_title = (soup.find_all("a", class_="question-hyperlink")[0].get_text()) question_stats = (soup.find_all("span", class_="vote-count-post")[0].get_text()) try: question_stats = "Votes " + question_stats + " | " + (((soup.find_all("div", class_="module question-stats")[0] .get_text()).replace("\n", " ")).replace(" "," | ")) except IndexError as e: question_stats = "Could not load statistics." question_desc = (soup.find_all("div", class_="post-text")[0]) add_urls(question_desc) question_desc = question_desc.get_text() question_stats = ' '.join(question_stats.split()) return question_title, question_desc, question_stats
def main(): if len(sys.argv) == 1: infile = sys.stdin outfile = sys.stdout elif len(sys.argv) == 2: infile = open(sys.argv[1], 'rb') outfile = sys.stdout elif len(sys.argv) == 3: infile = open(sys.argv[1], 'rb') outfile = open(sys.argv[2], 'wb') else: raise SystemExit(sys.argv[0] + " [infile [outfile]]") try: obj = simplejson.load(infile) except ValueError, e: raise SystemExit(e) simplejson.dump(obj, outfile, sort_keys=True, indent=4) outfile.write('\n')
def api_query(self, resource="/domains", method="GET", data=None): url = EXO_DNS_BASEURL + resource if data: data = json.dumps(data) response, info = fetch_url( module = self.module, url = url, data = data, method = method, headers = self.headers, timeout = self.module.params.get('api_timeout'), ) if info['status'] not in (200, 201, 204): self.module.fail_json(msg="%s returned %s, with body: %s" % (url, info['status'], info['msg'])) try: return json.load(response) except Exception: return {}
def __init__(self, path, json_kw=None, mode=0600, object_hook=None): """Create a JSONStore object backed by the file at `path`. If a dict is passed in as `json_kw`, it will be used as keyword arguments to the json module. """ self.path = path self.json_kw = json_kw or {} self.mode = mode self.object_hook = object_hook self._data = {} self._synced_json_kw = None self._needs_sync = False if not os.path.exists(path): self.sync(force=True) # write empty dict to disk return # load the whole store with __builtin__.open(path, 'r') as fp: self.update(json.load(fp, object_hook=self.object_hook))
def create_cluster(name, datacenter): test_cluster = load_cluster(name) if test_cluster is not None: print('Cluster %s already exists. Use a different name or load this one instead of creating.' % (name)) return None cluster = collections.OrderedDict() cluster['name'] = name cluster['dc'] = datacenter cluster['servers'] = [] cluster['clients'] = [] save_cluster(cluster) return cluster
def load_from_json(self, filepath): try: with open(filepath, 'r') as f: # object_pairs_hook and object_hook ensure that data is retained # in the same order as the file. Keeps it readable when written back. self.plan = json.load(f, object_pairs_hook = collections.OrderedDict, object_hook = collections.OrderedDict) # TODO If version is not current schema version, the file # should be upgraded validated = self.validate() if not validated: logger.error_msg("%s is not a valid JSON cluster plan file" % (filepath)) return False return True except JSONDecodeError as e: logger.error_msg(str(e)) return False
def evaluate_apk(permissions, perm_file, model_file): fd = open(perm_file,'r') perm_list = simplejson.load(fd) fd.close() # permissions = get_permissions(filename) bitmap = perm_bitmap(perm_list, permissions)+[True] temp=tempfile.mkstemp(suffix='.arff') arff.dump(temp[1],[bitmap], names=perm_list+['Class']) output = subprocess.check_output(['java','weka.classifiers.bayes.NaiveBayesUpdateable','-p','0','-T',temp[1],'-l',model_file]) #os.remove(temp[1]) virus = output.split()[13]=='1:True' assurance = output.split()[14] if assurance == '+': assurance = output.split()[15] return (virus, str(assurance))
def load(self, path): with open(path, 'r') as fp: d = simplejson.load(fp) self.X = d['contact_length'] / 2. self.Y = d['contact_width'] / 2. self.com_height = d['com_height'] self.dict_repr = d self.friction = d['friction'] self.path = path if 'zmp_height' in d: self.zmp_height = d['zmp_height'] self.contacts = {} for (name, c) in d['contacts'].iteritems(): self.contacts[name] = self.create_contact( pos=c['pos'], rpy=c['rpy'], name=name, visible=True) self.stances = [] for (i, stance) in enumerate(d['stances']): self.stances.append(Stance(self, i, stance))
def main(sys_argv=sys.argv): template, context = parse_args(sys_argv, USAGE) if template.endswith('.mustache'): template = template[:-9] renderer = Renderer() try: template = renderer.load_template(template) except TemplateNotFoundError: pass try: context = json.load(open(context)) except IOError: context = json.loads(context) rendered = renderer.render(template, context) print rendered
def changeWwwSetting(settingName, value): wwwSettingsFileName = util.addSlash(config['wwwPath']) + 'userSettings.json' if os.path.exists(wwwSettingsFileName): wwwSettingsFile = open(wwwSettingsFileName, 'r+b') try: wwwSettings = json.load(wwwSettingsFile) # read existing settings except json.JSONDecodeError: logMessage("Error in decoding userSettings.json, creating new empty json file") wwwSettings = {} # start with a fresh file when the json is corrupt. else: wwwSettingsFile = open(wwwSettingsFileName, 'w+b') # create new file wwwSettings = {} wwwSettings[settingName] = str(value) wwwSettingsFile.seek(0) wwwSettingsFile.write(json.dumps(wwwSettings)) wwwSettingsFile.truncate() wwwSettingsFile.close()
def load(filepath=None, data=None, **kwargs): """ Loads a geojson file or dictionary, validates it, and returns a GeojsonFile instance. In order for a geojson dict to be considered a file, it cannot just be single geometries, so this class always saves them as the toplevel FeatureCollection type, and requires the files it loads to be the same. To load with a different text encoding use the 'encoding' argument. Parameters: - **filepath** (optional): The path of a geojson file to load. - **data** (optional): A complete geojson dictionary to load. Returns: - A GeojsonFile instance. """ return GeojsonFile(filepath, data, **kwargs)
def getload(): with open(filetow) as data_file: data = json.load(data_file) for x in data['price_usd']: date = int(x[0] / 1000) Avg = x[1] # redis try: pipe = r.pipeline() pipe.zadd(r_SS_BTC_USD_24H_HISTORY, date, str(date) + ':' + str(Avg)) response = pipe.execute() except Exception as e: print(e.args[0]) sys.exit() #-----------
def generate_storage_data(project_name): """ Create orchs from template for OPC storage """ data_type = "storage" data_file = 'orch_templates/storage.template' with open(data_file) as data_file: data = json.load(data_file) sections = get_section_data(project_name, data_type) for section in sections: data['description'] = section + " Commerce Storage" data['name'] = compute_name + "/" + section data['oplans'][0]['objects'][0]['name'] = compute_name + "/" + section data['oplans'][0]['objects'][0]['description'] = get_config_item(project_name, section, 'description', data_type) data['oplans'][0]['objects'][0]['size'] = get_config_item(project_name, section, 'size', data_type) data['oplans'][0]['objects'][0]['properties'][0] = get_config_item(project_name, section, 'properties', data_type) write_orch_data(data, section, data_type, project_name)
def get_host_info(self): ''' Get variables about a specific host ''' if len(self.index) == 0: # Need to load index from cache self.load_index_from_cache() if self.args.host not in self.index: # try updating the cache self.do_api_calls_update_cache() if self.args.host not in self.index: # host might not exist anymore return self.json_format_dict({}, True) (region, instance_id) = self.index[self.args.host] instance = self.get_instance(region, instance_id) return self.json_format_dict(self.get_host_info_dict_from_instance(instance), True)
def load(cls, config_file): """ Load experiment parameters from a JSON configuration file Parameters ---------- config_file: string path to a JSON configuration file Returns ------- dictionary (or list of dictionaries) of parameters to pass to a behavior """ try: import simplejson as json except ImportError: import json with open(config_file, 'rb') as config: parameters = json.load(config) return parameters
def load_schema_file(schema_file, language_code=None): """ Load a schema, optionally for a specified language. :param schema_file: The name of the schema e.g. census_household.json :param language_code: ISO 2-character code for language e.g. 'en', 'cy' """ language_code = language_code or DEFAULT_LANGUAGE_CODE schema_path = get_schema_file_path(schema_file, language_code) if language_code != DEFAULT_LANGUAGE_CODE and not os.path.exists(schema_path): logger.info("couldn't find requested language schema, falling back to 'en'", schema_file=schema_file, language_code=language_code, schema_path=schema_path) schema_path = get_schema_file_path(schema_file, DEFAULT_LANGUAGE_CODE) logger.info('loading schema', schema_file=schema_file, language_code=language_code, schema_path=schema_path) try: with open(schema_path, encoding='utf8') as json_data: return json.load(json_data, use_decimal=True) except FileNotFoundError as e: logger.error('no schema file exists', filename=schema_path) raise e
def changeWwwSetting(settingName, value): # We only update changeWwwSetting if we're using a configFile based installation (That is - brewpi-www) if configFile is not None: wwwSettingsFileName = util.addSlash(config['wwwPath']) + 'userSettings.json' if os.path.exists(wwwSettingsFileName): wwwSettingsFile = open(wwwSettingsFileName, 'r+b') try: wwwSettings = json.load(wwwSettingsFile) # read existing settings except json.JSONDecodeError: logMessage("Error in decoding userSettings.json, creating new empty json file") wwwSettings = {} # start with a fresh file when the json is corrupt. else: wwwSettingsFile = open(wwwSettingsFileName, 'w+b') # create new file wwwSettings = {} wwwSettings[settingName] = str(value) wwwSettingsFile.seek(0) wwwSettingsFile.write(json.dumps(wwwSettings)) wwwSettingsFile.truncate() wwwSettingsFile.close()
def load_files(avg_file, std_file): # load files with open(avg_file) as f: avg = simplejson.load(f) with open(std_file) as f: std = simplejson.load(f) std = np.array(std) print std std = np.true_divide(std, 2.) print std avg = np.array(avg) avg_upper = avg + std avg_lower = avg - std return avg, avg_upper, avg_lower
def read_from_file(path="", raw=False): if path == "": return False if not xbmcvfs.exists(path): return False try: with open(path) as f: logger.info("opened textfile %s." % (path)) if not raw: result = json.load(f) else: result = f.read() return result except: logger.info("failed to load textfile: " + path) return False
def loadFromDatabase(self): self.initializeHigherEntities() entities_to_load = int(db.getConfig()["entities_to_load"]) log("Entities to load: " + str(entities_to_load)) # Read all entities from database and populate internal data structure with them. self.loadEntities(entities_to_load) # sort entities by lat, lng so can easily find segment in a box self.entities.sort(key=lambda x: (x.lat, x.lng)) self.eid_to_index = {} index = 0 for entity in self.entities: self.eid_to_index[entity.eid] = index index += 1 self.loadEntitieProperties() self.populateHigherEntities() self.generateSubcities() self.flattenHigherEntities() self.createJSONs() # serialize the state needed for serving into directory.