Python json 模块,load() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用json.load()

项目:lang-reps    作者:chaitanyamalaviya    | 项目源码 | 文件源码
def get_named_set(lang_codes, feature_set):
    if feature_set == 'id':
        return get_id_set(lang_codes)

    if feature_set not in FEATURE_SETS:
        print("ERROR: Invalid feature set " + feature_set, file=sys.stderr)
        sys.exit()

    filename, source, prefix = FEATURE_SETS[feature_set]
    feature_database = np.load(filename)
    lang_codes = [ get_language_code(l, feature_database) for l in lang_codes ]
    lang_indices = [ get_language_index(l, feature_database) for l in lang_codes ]
    feature_names = get_feature_names(prefix, feature_database)
    feature_indices = [ get_feature_index(f, feature_database) for f in feature_names ]
    source_index = get_source_index(source, feature_database)
    feature_values = feature_database["data"][lang_indices,:,:][:,feature_indices,:][:,:,source_index]
    feature_values = feature_values.squeeze(axis=2)
    return feature_names, feature_values
项目:flora    作者:Lamden    | 项目源码 | 文件源码
def directory_has_smart_contract(location):
    # returns bool if there is a tsol contract in said directory
    # probably makes more sense to put this inside of the tsol package
    code_path = glob.glob(os.path.join(location, '*.tsol'))
    example = glob.glob(os.path.join(location, '*.json'))

    assert len(code_path) > 0 and len(example) > 0, 'Could not find *.tsol and *.json files in provided directory.'

    # pop off the first file name and turn the code into a file object
    code = open(code_path[0])

    # turn the example into a dict
    with open(example[0]) as e:
        example = json.load(e)

    try:
        tsol.compile(code, example)
    except Exception as e:
        print(e)
        return False
    return True
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def load_previous(self, path=None):
        """Load previous copy of config from disk.

        In normal usage you don't need to call this method directly - it
        is called automatically at object initialization.

        :param path:

            File path from which to load the previous config. If `None`,
            config is loaded from the default location. If `path` is
            specified, subsequent `save()` calls will write to the same
            path.

        """
        self.path = path or self.path
        with open(self.path) as f:
            self._prev_dict = json.load(f)
        for k, v in copy.deepcopy(self._prev_dict).items():
            if k not in self:
                self[k] = v
项目:uicourses_v2    作者:sumerinlan    | 项目源码 | 文件源码
def info_to_db(db, name):
    cur = db.cursor()
    with open(name) as data_file:
        data = json.load(data_file)
        subject = data['subject']
        code = data['code']
        title = data['title']
        credit = data['credit']
        desc = data['desc']
        geneds = data['geneds']
        if geneds is None:
            geneds = ''
        else:
            geneds = ','.join(geneds) 
        url = data['url']
        cmd = 'INSERT INTO `CourseExplorer` (`Id`, `Subject`, `Code`, `Title`, `Credit`, `Description`, `GenEd`, ' \
              '`Url`) VALUES(NULL, %s, %s, %s, %s, %s, %s, %s) '
        val = (subject, code, title, credit, desc, geneds, url)
        cur.execute(cmd, val)
        # if subject == 'PHYS':
        #     print cmd % val
项目:djaio    作者:Sberned    | 项目源码 | 文件源码
def _set_asset_paths(self, app):
        """
        Read in the manifest json file which acts as a manifest for assets.
        This allows us to get the asset path as well as hashed names.

        :param app: aiohttp application
        :return: None
        """
        webpack_stats = app.settings.WEBPACK_MANIFEST_PATH

        try:
            with open(webpack_stats, 'r') as stats_json:
                stats = json.load(stats_json)
                if app.settings.WEBPACK_ASSETS_URL:
                    self.assets_url = app.settings.WEBPACK_ASSETS_URL
                else:
                    self.assets_url = stats['publicPath']

                self.assets = stats['assets']
        except IOError:
            raise RuntimeError(
                "'WEBPACK_MANIFEST_PATH' is required to be set and "
                "it must point to a valid json file.")
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def _get_external_data(url):
    result = {}
    try:
        # urlopen might fail if it runs into redirections,
        # because of Python issue #13696. Fixed in locators
        # using a custom redirect handler.
        resp = urlopen(url)
        headers = resp.info()
        ct = headers.get('Content-Type')
        if not ct.startswith('application/json'):
            logger.debug('Unexpected response for JSON request: %s', ct)
        else:
            reader = codecs.getreader('utf-8')(resp)
            #data = reader.read().decode('utf-8')
            #result = json.loads(data)
            result = json.load(reader)
    except Exception as e:
        logger.exception('Failed to get external data for %s: %s', url, e)
    return result
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def save(self, pypi_version, current_time):
        # Check to make sure that we own the directory
        if not check_path_owner(os.path.dirname(self.statefile_path)):
            return

        # Now that we've ensured the directory is owned by this user, we'll go
        # ahead and make sure that all our directories are created.
        ensure_dir(os.path.dirname(self.statefile_path))

        # Attempt to write out our version check file
        with lockfile.LockFile(self.statefile_path):
            if os.path.exists(self.statefile_path):
                with open(self.statefile_path) as statefile:
                    state = json.load(statefile)
            else:
                state = {}

            state[sys.prefix] = {
                "last_check": current_time.strftime(SELFCHECK_DATE_FMT),
                "pypi_version": pypi_version,
            }

            with open(self.statefile_path, "w") as statefile:
                json.dump(state, statefile, sort_keys=True,
                          separators=(",", ":"))
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def load(self):
        # XXX JSON is not a great database
        for path in load_config_paths('wheel'):
            conf = os.path.join(native(path), self.CONFIG_NAME)
            if os.path.exists(conf):
                with open(conf, 'r') as infile:
                    self.data = json.load(infile)
                    for x in ('signers', 'verifiers'):
                        if not x in self.data:
                            self.data[x] = []
                    if 'schema' not in self.data:
                        self.data['schema'] = self.SCHEMA
                    elif self.data['schema'] != self.SCHEMA:
                        raise ValueError(
                            "Bad wheel.json version {0}, expected {1}".format(
                                self.data['schema'], self.SCHEMA))
                break
        return self
项目:ibus-replace-with-kanji    作者:esrille    | 项目源码 | 文件源码
def __load_layout(self, config):
        var = config.get_value('engine/replace-with-kanji-python', 'layout')
        if var is None or var.get_type_string() != 's':
            path = os.path.join(os.getenv('IBUS_REPLACE_WITH_KANJI_LOCATION'), 'layouts')
            path = os.path.join(path, 'roomazi.json')
            if var:
                config.unset('engine/replace-with-kanji-python', 'layout')
        else:
            path = var.get_string()
        logger.info("layout: %s", path)
        layout = roomazi.layout     # Use 'roomazi' as default
        try:
            with open(path) as f:
                layout = json.load(f)
        except ValueError as error:
            logger.error("JSON error: %s", error)
        except OSError as error:
            logger.error("Error: %s", error)
        except:
            logger.error("Unexpected error: %s %s", sys.exc_info()[0], sys.exc_info()[1])
        self.__to_kana = self.__handle_roomazi_layout
        if 'Type' in layout:
            if layout['Type'] == 'Kana':
                self.__to_kana = self.__handle_kana_layout
        return layout
项目:nova    作者:hubblestack    | 项目源码 | 文件源码
def _get_cache(ttl, cache_path):
    '''
    If url contains valid cache, returns it, else returns empty list.
    '''
    # Check if we have a valid cached version.
    try:
        cached_time = os.path.getmtime(cache_path)
    except OSError:
        return []
    if current_time() - cached_time < ttl:
        log.debug('%s is less than ttl', cache_path)
        try:
            with open(cache_path) as json_file:
                loaded_json = json.load(json_file)
                return loaded_json
        except IOError:
            return []
        except ValueError:
            log.error('%s was not json formatted', cache_path)
            return []
    else:
        log.debug('%s was older than ttl', cache_path)
        return []
项目:django-magic-cards    作者:pbaranay    | 项目源码 | 文件源码
def test_update_text(self):
        with open(os.path.join(self.FIXTURES_DIR, 'eyes_in_the_skies.json')) as f:
            final_data = json.load(f)

        original_text = final_data['RTR']['cards'][0]['originalText']
        final_text = final_data['RTR']['cards'][0]['text']

        # Copy the data and munge it into its original state.
        original_data = copy.deepcopy(final_data)
        original_data['RTR']['cards'][0]['text'] = original_text

        # Import the original data.
        parse_data(original_data, ['RTR'])
        eyes_in_the_skies = Card.objects.first()
        self.assertEqual(eyes_in_the_skies.text, original_text)

        # Import the final, updated data.
        parse_data(final_data, ['RTR'])
        eyes_in_the_skies.refresh_from_db()
        self.assertEqual(eyes_in_the_skies.text, final_text)
项目:django-magic-cards    作者:pbaranay    | 项目源码 | 文件源码
def test_update_types(self):
        with open(os.path.join(self.FIXTURES_DIR, 'jackal_pup.json')) as f:
            final_data = json.load(f)

        # Copy the data and munge the types.
        original_data = copy.deepcopy(final_data)
        original_subtype = 'Hound'
        original_data['TMP']['cards'][0]['subtypes'] = [original_subtype]

        # Import the original data.
        parse_data(original_data, ['TMP'])
        jackal_pup = Card.objects.first()
        self.assertEqual(jackal_pup.subtypes.count(), 1)
        self.assertEqual(jackal_pup.subtypes.first().name, original_subtype)

        # Import the final, updated data.
        parse_data(final_data, ['TMP'])
        jackal_pup.refresh_from_db()
        self.assertEqual(jackal_pup.subtypes.count(), 1)
        self.assertEqual(jackal_pup.subtypes.first().name, 'Jackal')
        # The Hound subtype has been deleted.
        self.assertFalse(CardSubtype.objects.filter(name=original_subtype).exists())
项目:django-magic-cards    作者:pbaranay    | 项目源码 | 文件源码
def test_update_loyalty(self):
        """
        Simulates the upgrade process from version 0.2 to version 0.4.
        """
        with open(os.path.join(self.FIXTURES_DIR, 'vraska_the_unseen.json')) as f:
            final_data = json.load(f)

        # Copy the data and munge it to remove the loyalty.
        original_data = copy.deepcopy(final_data)
        del original_data['RTR']['cards'][0]['loyalty']

        # Import the original data.
        parse_data(original_data, ['RTR'])
        vraska = Card.objects.first()
        self.assertIsNone(vraska.loyalty)

        # Import the final, updated data.
        parse_data(final_data, ['RTR'])
        vraska.refresh_from_db()
        self.assertEqual(vraska.loyalty, 5)
项目:rca-evaluation    作者:sieve-microservices    | 项目源码 | 文件源码
def main():
    if len(sys.argv) < 2:
        sys.stderr.write("USAGE: %s measurement\n" % sys.argv[0])
        sys.exit(1)
    path = sys.argv[1]

    with open(os.path.join(path, "metadata.json")) as f:
        metadata = json.load(f)
        start = date(metadata["start"][:-1])
        end = date(metadata["start"][:-1])
        print('open measurement "%s" from "%s" to "%s"', metadata["name"], start, end)
        for service in metadata["services"]:
            print('open service "%s"' % service["name"])
            with open(os.path.join(path, service["filename"])) as csvfile:
                r = csv.DictReader(csvfile, dialect=csv.excel_tab)
                for row in r:
                    print(row["time"])
项目:consul-pg    作者:adamcstephens    | 项目源码 | 文件源码
def configure(self):
        # load config values
        try:
            with open(self.configfile) as configfile_contents:
                self.config = json.load(configfile_contents)
        except:
            self.config = {}

        try:
            self.agent_services = self.api_session.get(self.api_endpoint + '/agent/services?stale').json()
        except:
            print_exc()
            exit(135)
        self.managed_service = self.agent_services[self.service]

        if self.managed_service['Tags'] == None:
            self.managed_service['Tags'] = []

        if self.role_source == "facter":
            self.get_facter_state(self.DEFAULT_FACTERFILE)
        else:
            print("!! unsupported PG role source !!")
            exit(140)
项目:Telebackup    作者:LonamiWebs    | 项目源码 | 文件源码
def prompt_pick_backup(message):
    """Prompts the user to pick an existing database, and returns the
       selected choice database ID and its metadata"""

    # First load all the saved databases (splitting extension and path)
    saved_db = [path.splitext(path.split(f)[1])[0] for f in glob('backups/*.tlo')]

    # Then prompt the user
    print('Available backups databases:')
    for i, db_id in enumerate(saved_db):
        metadata = get_metadata(db_id)
        print('{}. {}, ID: {}'.format(i + 1,
                                      metadata.get('peer_name', '???'),
                                      db_id))

    db_id = saved_db[get_integer(message, 1, len(saved_db)) - 1]
    return db_id, get_metadata(db_id)
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def _get_external_data(url):
    result = {}
    try:
        # urlopen might fail if it runs into redirections,
        # because of Python issue #13696. Fixed in locators
        # using a custom redirect handler.
        resp = urlopen(url)
        headers = resp.info()
        ct = headers.get('Content-Type')
        if not ct.startswith('application/json'):
            logger.debug('Unexpected response for JSON request: %s', ct)
        else:
            reader = codecs.getreader('utf-8')(resp)
            #data = reader.read().decode('utf-8')
            #result = json.loads(data)
            result = json.load(reader)
    except Exception as e:
        logger.exception('Failed to get external data for %s: %s', url, e)
    return result
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def save(self, pypi_version, current_time):
        # Check to make sure that we own the directory
        if not check_path_owner(os.path.dirname(self.statefile_path)):
            return

        # Now that we've ensured the directory is owned by this user, we'll go
        # ahead and make sure that all our directories are created.
        ensure_dir(os.path.dirname(self.statefile_path))

        # Attempt to write out our version check file
        with lockfile.LockFile(self.statefile_path):
            if os.path.exists(self.statefile_path):
                with open(self.statefile_path) as statefile:
                    state = json.load(statefile)
            else:
                state = {}

            state[sys.prefix] = {
                "last_check": current_time.strftime(SELFCHECK_DATE_FMT),
                "pypi_version": pypi_version,
            }

            with open(self.statefile_path, "w") as statefile:
                json.dump(state, statefile, sort_keys=True,
                          separators=(",", ":"))
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def load(self):
        # XXX JSON is not a great database
        for path in load_config_paths('wheel'):
            conf = os.path.join(native(path), self.CONFIG_NAME)
            if os.path.exists(conf):
                with open(conf, 'r') as infile:
                    self.data = json.load(infile)
                    for x in ('signers', 'verifiers'):
                        if x not in self.data:
                            self.data[x] = []
                    if 'schema' not in self.data:
                        self.data['schema'] = self.SCHEMA
                    elif self.data['schema'] != self.SCHEMA:
                        raise ValueError(
                            "Bad wheel.json version {0}, expected {1}".format(
                                self.data['schema'], self.SCHEMA))
                break
        return self
项目:sstash    作者:realcr    | 项目源码 | 文件源码
def read_store(self):
        """
        Get the store from file.
        """
        with open(self._path,'r',encoding='ascii') as fr:
            outer_data = json.load(fr)

        try:
            validate(outer_data,OUTER_SCHEMA)
        except ValidationError:
            raise SSError("Invalid outer schema")

        enc_bytes = hex_str_to_bytes(outer_data["enc_blob"])
        try:
            inner_data = self._box.decrypt(enc_bytes)
        except nacl.exceptions.CryptoError:
            raise SSCryptoError("Wrong password")

        return json.loads(inner_data.decode('utf-8'))
项目:robik    作者:RecunchoMaker    | 项目源码 | 文件源码
def __init__(self, path='config.json'):
        CONFIG_PATH = os.path.join(sys.path[0], path)
        dict.__init__(self)
        self.path = path

        try:
            self.load()
        except:
            print "Creando fichero de configuracion"
            self['hs_threshold1'] = 100
            self['hs_threshold2'] = 200
            self['hs_apertura'] = 1
            self['hs_blur'] = 3
            self['hs_minLineLength'] = 100
            self['hs_maxLineGap'] = 10
            self['hs_kernel_dilate'] = 3
            self['hs_kernel_erode'] = 3
            self['hs_param1'] = 3
            self['hs_param2'] = 3
            self['hs_param3'] = 3
            self['hs_dilate_iteracciones'] = 1

            self.save()
项目:PowerSpikeGG    作者:PowerSpikeGG    | 项目源码 | 文件源码
def test_match_fetching(self):
        """Check if the server handle correctly a normal request."""
        # Forward sample data fetched from the Riot API directly as result
        this_dir = os.path.dirname(os.path.abspath(__file__))
        with open(os.sep.join([this_dir, "samples", self.MATCH_DATA])) as f:
            self.match_data = json.load(f)

        # Setup mocks
        self.service.cache_manager.find_match.return_value = None
        self.service.riot_api_handler.get_match.return_value = self.match_data

        response = self.stub.Match(service_pb2.MatchRequest(id=4242,
            region=constants_pb2.EUW))

        # Check the conversion of the sample is matching the response
        converter = JSONConverter(None)
        expected = converter.json_match_to_match_pb(self.match_data)
        self.assertEqual(response, expected)
        self.assertTrue(self.service.cache_manager.save_match.called)
项目:PowerSpikeGG    作者:PowerSpikeGG    | 项目源码 | 文件源码
def test_match_from_cache(self):
        """Check if the server returns a match from the cache."""
        # Forward sample data fetched from the Riot API directly as result
        this_dir = os.path.dirname(os.path.abspath(__file__))
        with open(os.sep.join([this_dir, "samples", self.MATCH_DATA])) as f:
            self.match_data = json.load(f)

        # Setup mocks
        self.service.cache_manager.find_match.return_value = self.match_data

        response = self.stub.Match(service_pb2.MatchRequest(id=4242,
            region=constants_pb2.EUW))

        # Check the conversion of the sample is matching the response
        self.assertTrue(self.service.cache_manager.find_match.called)
        self.assertFalse(self.service.riot_api_handler.get_match.called)
        self.assertFalse(self.service.cache_manager.save_match.called)
项目:PyPlanet    作者:PyPlanet    | 项目源码 | 文件源码
def load(self):
        # Prepare + load directory.
        super().load()

        # Load the files and parse JSON.
        parsed_settings = dict()

        try:
            for file_name in self.files:
                file_path = os.path.join(self.directory, file_name)
                with open(file_path, 'r') as file_handle:
                    parsed_settings.update(json.load(file_handle))
        except json.JSONDecodeError as e:
            raise ImproperlyConfigured(
                'Your settings file(s) contain invalid JSON syntax! Please fix and restart!, {}'.format(str(e))
            )

        # Loop and set in local settings (+ uppercase keys).
        for key, value in parsed_settings.items():
            self.settings[key.upper()] = value
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def load_previous(self, path=None):
        """Load previous copy of config from disk.

        In normal usage you don't need to call this method directly - it
        is called automatically at object initialization.

        :param path:

            File path from which to load the previous config. If `None`,
            config is loaded from the default location. If `path` is
            specified, subsequent `save()` calls will write to the same
            path.

        """
        self.path = path or self.path
        with open(self.path) as f:
            self._prev_dict = json.load(f)
        for k, v in copy.deepcopy(self._prev_dict).items():
            if k not in self:
                self[k] = v
项目:pbtk    作者:marin-m    | 项目源码 | 文件源码
def load_endpoints(self):
        self.choose_endpoint.endpoints.clear()

        for name in listdir(str(BASE_PATH / 'endpoints')):
            if name.endswith('.json'):
                item = QListWidgetItem(name.split('.json')[0], self.choose_endpoint.endpoints)
                item.setFlags(item.flags() & ~Qt.ItemIsEnabled)

                pb_msg_to_endpoints = defaultdict(list)
                with open(str(BASE_PATH / 'endpoints' / name)) as fd:
                    for endpoint in load(fd, object_pairs_hook=OrderedDict):
                        pb_msg_to_endpoints[endpoint['request']['proto_msg'].split('.')[-1]].append(endpoint)

                for pb_msg, endpoints in pb_msg_to_endpoints.items():
                    item = QListWidgetItem(' ' * 4 + pb_msg, self.choose_endpoint.endpoints)
                    item.setFlags(item.flags() & ~Qt.ItemIsEnabled)

                    for endpoint in endpoints:
                        path_and_qs = '/' + endpoint['request']['url'].split('/', 3).pop()
                        item = QListWidgetItem(' ' * 8 + path_and_qs, self.choose_endpoint.endpoints)
                        item.setData(Qt.UserRole, endpoint)

        self.set_view(self.choose_endpoint)
项目:cellranger    作者:10XGenomics    | 项目源码 | 文件源码
def load_barcode_dist(filename, barcode_whitelist, gem_group, proportions=True):
    """ Load barcode count distribution from a json file """
    # Input barcode whitelist must be an ordered type;
    # safeguard against it going out of sync with the distribution file
    assert barcode_whitelist is None or isinstance(barcode_whitelist, list)

    if not os.path.isfile(filename):
        return None

    with open(filename, 'r') as f:
        values = json.load(f)

    start = (gem_group-1)*len(barcode_whitelist)
    end = gem_group*len(barcode_whitelist)
    barcode_counts = {bc: value for bc, value in zip(barcode_whitelist, values[start:end])}
    if proportions:
        total_barcode_counts = sum(barcode_counts.values())
        barcode_dist = {bc: tk_stats.robust_divide(float(value), float(total_barcode_counts))
                        for bc, value in barcode_counts.iteritems()}
        return barcode_dist
    else:
        return barcode_counts
项目:cellranger    作者:10XGenomics    | 项目源码 | 文件源码
def split(args):
    assert len(args.read1s) == len(args.read2s)

    chunks = []

    # Determine the number of buckets required to achieve
    # the given chunk size.
    chunks_per_gem_group = {}
    with open(args.reads_summary) as f:
        reads_summary = json.load(f)
        for gg in args.gem_groups:
            readpairs = reads_summary['%d_total_reads_per_gem_group' % gg]
            chunks_per_gem_group[str(gg)] = max(2,
                                                int(math.ceil(float(readpairs) / \
                                                              args.readpairs_per_chunk)))

    for fastq1, fastq2 in itertools.izip(args.read1s, args.read2s):
        chunks.append({
            'read1s_chunk': fastq1,
            'read2s_chunk': fastq2,
            'chunks_per_gem_group': chunks_per_gem_group,
        })
    return {'chunks': chunks}
项目:cellranger    作者:10XGenomics    | 项目源码 | 文件源码
def main(args, outs):

    # Write read_chunk for consumption by Rust
    with open("chunk_args.json", "w") as f:
        json.dump(args.read_chunk, f)

    output_path = martian.make_path("")
    prefix = "fastq_chunk"
    chunk_reads_args = ['chunk_reads',  '--reads-per-fastq', str(args.reads_per_file), output_path, prefix, "--martian-args", "chunk_args.json"]
    print "running chunk reads: [%s]" % str(chunk_reads_args)
    subprocess.check_call(chunk_reads_args)

    with open(os.path.join(output_path, "read_chunks.json")) as f:
        chunk_results = json.load(f)

    outs.out_chunks = []

    # Write out a new chunk entry for each resulting chunk
    for chunk in chunk_results:
        print args.read_chunk
        chunk_copy = args.read_chunk.copy()
        print chunk_copy
        chunk_copy['read_chunks'] = chunk
        outs.out_chunks.append(chunk_copy)
项目:IDPanel    作者:CylanceSPEAR    | 项目源码 | 文件源码
def load_model(file_path):
        with open(file_path, "r") as f:
            decision_trees, sparse_features, total_feature_count = load(f)

        dts = {}
        for key in decision_trees.keys():
            dts[key] = []
            for dt in decision_trees[key]:
                d = DecisionTree([])
                for k in dt['model'].keys():
                    setattr(d, k, dt['model'][k])
                dt['model'] = d
                dts[key].append(dt)

        return ClassificationEngine(
            decision_trees=dts,
            sparse_features=sparse_features,
            total_feature_count=total_feature_count
        )
项目:python-station-backend    作者:itielshwartz    | 项目源码 | 文件源码
def requests_with_cache(dir):
    def decorator(func):
        def wrapper(**kwargs):
            cache_key = str(kwargs.get("param", "default.json"))
            cache_url = dir + "/" + cache_key.replace("/", "-").replace("_", "-")
            if os.path.isfile(cache_url):
                with open(cache_url, 'r') as f:
                    print(cache_url)
                    return json.load(f)
            with open(cache_url, 'w+') as f:
                ret = func(**kwargs)
                json.dump(ret, f)
                return ret

        return wrapper

    return decorator
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def main():
    if len(sys.argv) == 1:
        infile = sys.stdin
        outfile = sys.stdout
    elif len(sys.argv) == 2:
        infile = open(sys.argv[1], 'rb')
        outfile = sys.stdout
    elif len(sys.argv) == 3:
        infile = open(sys.argv[1], 'rb')
        outfile = open(sys.argv[2], 'wb')
    else:
        raise SystemExit(sys.argv[0] + " [infile [outfile]]")
    try:
        obj = json.load(infile)
    except ValueError, e:
        raise SystemExit(e)
    json.dump(obj, outfile, sort_keys=True, indent=4)
    outfile.write('\n')
项目:cbapi-python    作者:carbonblack    | 项目源码 | 文件源码
def import_policy(cb, parser, args):
    p = cb.create(Policy)

    p.policy = json.load(open(args.policyfile, "r"))
    p.description = args.description
    p.name = args.name
    p.priorityLevel = args.prioritylevel
    p.version = 2

    try:
        p.save()
    except ServerError as se:
        print("Could not add policy: {0}".format(str(se)))
    except Exception as e:
        print("Could not add policy: {0}".format(str(e)))
    else:
        print("Added policy. New policy ID is {0}".format(p.id))
项目:voxcelchain    作者:hiroaki-kaneda    | 项目源码 | 文件源码
def create_graph():
    logfile = 'result/log'
    xs = []
    ys = []
    ls = []
    f = open(logfile, 'r')
    data = json.load(f)

    print(data)

    for d in data:
        xs.append(d["iteration"])
        ys.append(d["main/accuracy"])
        ls.append(d["main/loss"])

    plt.clf()
    plt.cla()
    plt.hlines(1, 0, np.max(xs), colors='r', linestyles="dashed")  # y=-1, 1??????
    plt.title(r"loss/accuracy")
    plt.plot(xs, ys, label="accuracy")
    plt.plot(xs, ls, label="loss")
    plt.legend()
    plt.savefig("result/log.png")
项目:aws-greengrass-mini-fulfillment    作者:awslabs    | 项目源码 | 文件源码
def associate_lambda(group_config, lambda_config):
        """
        Associate the Lambda described in the `lambda_config` with the
        Greengrass Group described by the `group_config`

        :param group_config: `gg_group_setup.GroupConfigFile` to store the group
        :param lambda_config: the configuration describing the Lambda to
            associate with the Greengrass Group

        :return:
        """
        with open(lambda_config, "r") as f:
            cfg = json.load(f)

        config = GroupConfigFile(config_file=group_config)

        lambdas = config['lambda_functions']
        lambdas[cfg['func_name']] = {
            'arn': cfg['lambda_arn'],
            'arn_qualifier': cfg['lambda_alias']
        }

        config['lambda_functions'] = lambdas
项目:pygrunt    作者:elementbound    | 项目源码 | 文件源码
def load_cache(self, file):
        import json

        try:
            with open(file, 'r') as f:
                self.cache = json.load(f)
        except:
            # Fail silently
            pass
项目:ISB-CGC-pipelines    作者:isb-cgc    | 项目源码 | 文件源码
def editPipeline(args, config):
        pipelineDbUtils = PipelineDbUtils(config)

        request = json.loads(pipelineDbUtils.getJobInfo(select=["request"], where={"job_id": args.jobId})[0].request)

        _, tmp = mkstemp()
        with open(tmp, 'w') as f:
            f.write("{data}".format(data=json.dumps(request, indent=4)))

        if "EDITOR" in os.environ.keys():
            editor = os.environ["EDITOR"]
        else:
            editor = "/usr/bin/nano"

        if subprocess.call([editor, tmp]) == 0:
            with open(tmp, 'r') as f:
                request = json.load(f)

            pipelineDbUtils.updateJob(args.jobId, keyName="job_id", setValues={"request": json.dumps(request)})
        else:
            print "ERROR: there was a problem editing the request"
            exit(-1)
项目:openhealthalgorithms    作者:openhealthalgorithms    | 项目源码 | 文件源码
def load_messages():
        filename = 'guideline_content.json'
        file_path = ('%s/guideline/%s' % (
            os.path.dirname(os.path.realpath(__file__)),
            filename
        ))
        with open(file_path) as json_data:
            data = json.load(json_data)

        return data["body"]["messages"]
项目:openhealthalgorithms    作者:openhealthalgorithms    | 项目源码 | 文件源码
def load_guidelines(guideline_key):
        filename = 'guideline_%s.json' % guideline_key
        file_path = ('%s/guideline/%s' % (
            os.path.dirname(os.path.realpath(__file__)),
            filename
        ))
        with open(file_path) as json_data:
            data = json.load(json_data)

        return data
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def _load_ready_file(self):
        if self._ready is not None:
            return
        if os.path.exists(self._ready_file):
            with open(self._ready_file) as fp:
                self._ready = set(json.load(fp))
        else:
            self._ready = set()
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def get_matchmaker_map(mm_file='/etc/oslo/matchmaker_ring.json'):
    mm_map = {}
    if os.path.isfile(mm_file):
        with open(mm_file, 'r') as f:
            mm_map = json.load(f)
    return mm_map
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def _git_yaml_load(projects_yaml):
    """
    Load the specified yaml into a dictionary.
    """
    if not projects_yaml:
        return None

    return yaml.load(projects_yaml)
项目:lang-reps    作者:chaitanyamalaviya    | 项目源码 | 文件源码
def get_id_set(lang_codes):
    feature_database = np.load("family_features.npz")
    lang_codes = [ get_language_code(l, feature_database) for l in lang_codes ]
    all_languages = list(feature_database["langs"])
    feature_names = [ "ID_" + l.upper() for l in all_languages ]
    values = np.zeros((len(lang_codes), len(feature_names)))
    for i, lang_code in enumerate(lang_codes):
        feature_index = get_language_index(lang_code, feature_database)
        values[i, feature_index] = 1.0
    return feature_names, values
项目:flora    作者:Lamden    | 项目源码 | 文件源码
def generate(location):
    # cli wizard for creating a new contract from a template
    if directory_has_smart_contract(location):
        example_payload = json.load(open(glob.glob(os.path.join(location, '*.json'))[0]))
        print(example_payload)
        for k, v in example_payload.items():
            value = input(k + ':')
            if value != '':
                example_payload[k] = value
        print(example_payload)

        code_path = glob.glob(os.path.join(location, '*.tsol'))
        tsol.compile(open(code_path[0]), example_payload)
        print('Code compiles with new payload.')
        selection = ''
        while True:
            selection = input('(G)enerate Solidity contract or (E)xport implementation:')
            if selection.lower() == 'g':
                output_name = input('Name your contract file without an extension:')
                code = tsol.generate_code(open(code_path[0]).read(), example_payload)
                open(os.path.join(location, '{}.sol'.format(output_name)), 'w').write(code)
                break

            if selection.lower() == 'e':
                output_name = input('Name your implementation file without an extension:')
                json.dump(example_payload, open(os.path.join(location, '{}.json'.format(output_name)), 'w'))
                break
    else:
        print('Provided directory does not contain a *.tsol and *.json or does not compile.')
项目:monasca-transform    作者:openstack    | 项目源码 | 文件源码
def __init__(self, path=None, filename=None):
        self.kafka_offset_spec_file = os.path.join(
            (path or "/tmp/"), (filename or 'kafka_offset_specs.json'))

        self._kafka_offsets = {}
        if os.path.exists(self.kafka_offset_spec_file):
            try:
                f = open(self.kafka_offset_spec_file)
                kafka_offset_dict = json.load(f)
                for key, value in kafka_offset_dict.items():
                    log.info("Found offset %s: %s", key, value)
                    self._kafka_offsets[key] = OffsetSpec(
                        app_name=value.get('app_name'),
                        topic=value.get('topic'),
                        partition=value.get('partition'),
                        from_offset=value.get('from_offset'),
                        until_offset=value.get('until_offset'),
                        batch_time=value.get('batch_time'),
                        last_updated=value.get('last_updated'),
                        revision=value.get('revision')
                    )
            except Exception:
                log.info('Invalid or corrupts offsets file found at %s,'
                         ' starting over' % self.kafka_offset_spec_file)
        else:
            log.info('No kafka offsets found at startup')
项目:monasca-transform    作者:openstack    | 项目源码 | 文件源码
def load_offset_file_as_json(self, file_path):
        with open(file_path, 'r') as f:
            json_file = json.load(f)
        return json_file
项目:alfred-mpd    作者:deanishe    | 项目源码 | 文件源码
def register(self, name, serializer):
        """Register ``serializer`` object under ``name``.

        Raises :class:`AttributeError` if ``serializer`` in invalid.

        .. note::

            ``name`` will be used as the file extension of the saved files.

        :param name: Name to register ``serializer`` under
        :type name: ``unicode`` or ``str``
        :param serializer: object with ``load()`` and ``dump()``
            methods

        """
        # Basic validation
        getattr(serializer, 'load')
        getattr(serializer, 'dump')

        self._serializers[name] = serializer
项目:alfred-mpd    作者:deanishe    | 项目源码 | 文件源码
def load(cls, file_obj):
        """Load serialized object from open JSON file.

        .. versionadded:: 1.8

        :param file_obj: file handle
        :type file_obj: ``file`` object
        :returns: object loaded from JSON file
        :rtype: object

        """
        return json.load(file_obj)
项目:alfred-mpd    作者:deanishe    | 项目源码 | 文件源码
def load(cls, file_obj):
        """Load serialized object from open pickle file.

        .. versionadded:: 1.8

        :param file_obj: file handle
        :type file_obj: ``file`` object
        :returns: object loaded from pickle file
        :rtype: object

        """
        return cPickle.load(file_obj)
项目:alfred-mpd    作者:deanishe    | 项目源码 | 文件源码
def load(cls, file_obj):
        """Load serialized object from open pickle file.

        .. versionadded:: 1.8

        :param file_obj: file handle
        :type file_obj: ``file`` object
        :returns: object loaded from pickle file
        :rtype: object

        """
        return pickle.load(file_obj)