Python json 模块,dump() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用json.dump()

项目:Cortex-Analyzers    作者:CERT-BDF    | 项目源码 | 文件源码
def error(self, message, ensure_ascii=False):
        """Stop analyzer with an error message. Changing ensure_ascii can be helpful when stucking
        with ascii <-> utf-8 issues. Additionally, the input as returned, too. Maybe helpful when dealing with errors.
        :param message: Error message
        :param ensure_ascii: Force ascii output. Default: False"""

        analyzerInput = self.__input
        if 'password' in analyzerInput.get('config', {}):
            analyzerInput['config']['password'] = 'REMOVED'
        if 'key' in analyzerInput.get('config', {}):
            analyzerInput['config']['key'] = 'REMOVED'
        if 'apikey' in analyzerInput.get('config', {}):
            analyzerInput['config']['apikey'] = 'REMOVED'
        if 'api_key' in analyzerInput.get('config', {}):
            analyzerInput['config']['api_key'] = 'REMOVED'

        json.dump({'success': False,
                   'input': analyzerInput,
                   'errorMessage': message},
                  self.fpoutput,
                  ensure_ascii=ensure_ascii)

        # Force exit after error
        sys.exit(1)
项目:flora    作者:Lamden    | 项目源码 | 文件源码
def register(name):
    # hit api to see if name is already registered
    if check_name(name)['status'] == 'error':
        print('{} already registered.'.format(name))
    else:
        # generate new keypair
        (pub, priv) = rsa.newkeys(512)

        if os.path.exists(KEY_LOCATION) == False:
            os.mkdir(KEY_LOCATION)

        # save to disk
        with open('{}/.key'.format(KEY_LOCATION), 'wb') as f:
            pickle.dump((pub, priv), f, pickle.HIGHEST_PROTOCOL)

        r = requests.post('{}/names'.format(API_LOCATION), data = {'name' : name, 'n' : pub.n, 'e' : pub.e})
        if r.json()['status'] == 'success':
            print('Successfully registered new name: {}'.format(name))
        else:
            print('Error registering name: {}'.format(name))
项目:Cortex-Analyzers    作者:CERT-BDF    | 项目源码 | 文件源码
def report(self, full_report, ensure_ascii=False):
        """Returns a json dict via stdout.

        :param full_report: Analyzer results as dict.
        :param ensure_ascii: Force ascii output. Default: False"""

        summary = {}
        try:
            summary = self.summary(full_report)
        except:
            pass

        report = {
            'success': True,
            'summary': summary,
            'artifacts': self.artifacts(full_report),
            'full': full_report
        }
        json.dump(report, self.fpoutput, ensure_ascii=ensure_ascii)
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def write(self, path=None, fileobj=None, legacy=False, skip_unknown=True):
        if [path, fileobj].count(None) != 1:
            raise ValueError('Exactly one of path and fileobj is needed')
        self.validate()
        if legacy:
            if self._legacy:
                legacy_md = self._legacy
            else:
                legacy_md = self._to_legacy()
            if path:
                legacy_md.write(path, skip_unknown=skip_unknown)
            else:
                legacy_md.write_file(fileobj, skip_unknown=skip_unknown)
        else:
            if self._legacy:
                d = self._from_legacy()
            else:
                d = self._data
            if fileobj:
                json.dump(d, fileobj, ensure_ascii=True, indent=2,
                          sort_keys=True)
            else:
                with codecs.open(path, 'w', 'utf-8') as f:
                    json.dump(d, f, ensure_ascii=True, indent=2,
                              sort_keys=True)
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def save(self, pypi_version, current_time):
        # Check to make sure that we own the directory
        if not check_path_owner(os.path.dirname(self.statefile_path)):
            return

        # Now that we've ensured the directory is owned by this user, we'll go
        # ahead and make sure that all our directories are created.
        ensure_dir(os.path.dirname(self.statefile_path))

        # Attempt to write out our version check file
        with lockfile.LockFile(self.statefile_path):
            if os.path.exists(self.statefile_path):
                with open(self.statefile_path) as statefile:
                    state = json.load(statefile)
            else:
                state = {}

            state[sys.prefix] = {
                "last_check": current_time.strftime(SELFCHECK_DATE_FMT),
                "pypi_version": pypi_version,
            }

            with open(self.statefile_path, "w") as statefile:
                json.dump(state, statefile, sort_keys=True,
                          separators=(",", ":"))
项目:AVSR-Deep-Speech    作者:pandeydivesh15    | 项目源码 | 文件源码
def encode_and_store(batch_x, output_dir, file_name):
    """
    Args:
        1. batch_x:         Batch of 32*32 images which will go inside our autoencoder.
        2. output_dir:      Dir path for storing all encoded features for given `batch_x`.
                            Features will be stored in the form of JSON file.
        3. file_name:       File name of JSON file.
    """
    global AUTO_ENCODER
    if AUTO_ENCODER is None:
        load_AE()

    norm_batch = np.zeros(batch_x.shape)
    for i in range(len(batch_x)):
        norm_batch[i] = (batch_x[i] - np.mean(batch_x[i])) / np.std(batch_x[i])

    output_dict = {
        'name' : file_name,
        'encoded': AUTO_ENCODER.transform(norm_batch).tolist()}

    with open(output_dir+file_name+'.json', 'w') as f:
        json.dump(output_dict, f)
项目:scibot    作者:SciCrunch    | 项目源码 | 文件源码
def write_log(target_uri, doi, pmid, found_rrids, head, body, text, h):
    now = datetime.now().isoformat()[0:19].replace(':','').replace('-','')
    frv = list(set(found_rrids.values()))
    if len(frv) == 1 and frv[0] == 'Already Annotated':
        head, body, text = None, None, None
    log = {'target_uri':target_uri,
           'group':h.group,
           'doi':doi,
           'pmid':pmid,
           'found_rrids':found_rrids,
           'count':len(found_rrids),
           'head':head,
           'body':body,
           'text':text,
          }
    fname = 'logs/' + 'rrid-%s.json' % now
    with open(fname, 'wt') as f:
        json.dump(log, f, sort_keys=True, indent=4)
项目:rca-evaluation    作者:sieve-microservices    | 项目源码 | 文件源码
def export(metadata, start, end, container_image_pattern):

    queries = []

    metadata["start"] = start.isoformat() + "Z"
    metadata["end"] = end.isoformat() + "Z"
    metadata["services"] = []

    ts = datetime.utcnow().strftime("%Y%m%d%H%M%S-")
    path = os.path.join(metadata["metrics_export"], ts + metadata["measurement_name"])
    if not os.path.isdir(path):
        os.makedirs(path)

    for app in APPS:
        metadata["services"].append(dump_app(app, path, start, end, container_image_pattern))

    with open(os.path.join(path, "metadata.json"), "w+") as f:
        json.dump(metadata, f, cls=Encoder, sort_keys=True, indent=4)
        f.flush()
项目:rca-evaluation    作者:sieve-microservices    | 项目源码 | 文件源码
def keystone_auth(auth_details):
        try:
            if auth_details['OS_AUTH_URL'].endswith('v3'):
                k_client = k3_client
            else:
                k_client = k2_client
            tenant_name = auth_details['OS_TENANT_NAME']
            keystone = k_client.Client(username=auth_details['OS_USERNAME'],
                                       password=auth_details['OS_PASSWORD'],
                                       tenant_name=tenant_name,
                                       auth_url=auth_details['OS_AUTH_URL'])
        except Exception as e:
            status_err(str(e))

        try:
            with open(TOKEN_FILE, 'w') as token_file:
                json.dump(keystone.auth_ref, token_file)
        except IOError:
            # if we can't write the file we go on
            pass

        return keystone.auth_ref
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def write(self, path=None, fileobj=None, legacy=False, skip_unknown=True):
        if [path, fileobj].count(None) != 1:
            raise ValueError('Exactly one of path and fileobj is needed')
        self.validate()
        if legacy:
            if self._legacy:
                legacy_md = self._legacy
            else:
                legacy_md = self._to_legacy()
            if path:
                legacy_md.write(path, skip_unknown=skip_unknown)
            else:
                legacy_md.write_file(fileobj, skip_unknown=skip_unknown)
        else:
            if self._legacy:
                d = self._from_legacy()
            else:
                d = self._data
            if fileobj:
                json.dump(d, fileobj, ensure_ascii=True, indent=2,
                          sort_keys=True)
            else:
                with codecs.open(path, 'w', 'utf-8') as f:
                    json.dump(d, f, ensure_ascii=True, indent=2,
                              sort_keys=True)
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def save(self, pypi_version, current_time):
        # Check to make sure that we own the directory
        if not check_path_owner(os.path.dirname(self.statefile_path)):
            return

        # Now that we've ensured the directory is owned by this user, we'll go
        # ahead and make sure that all our directories are created.
        ensure_dir(os.path.dirname(self.statefile_path))

        # Attempt to write out our version check file
        with lockfile.LockFile(self.statefile_path):
            if os.path.exists(self.statefile_path):
                with open(self.statefile_path) as statefile:
                    state = json.load(statefile)
            else:
                state = {}

            state[sys.prefix] = {
                "last_check": current_time.strftime(SELFCHECK_DATE_FMT),
                "pypi_version": pypi_version,
            }

            with open(self.statefile_path, "w") as statefile:
                json.dump(state, statefile, sort_keys=True,
                          separators=(",", ":"))
项目:sopel-modules    作者:phixion    | 项目源码 | 文件源码
def crypto_spot(bot, trigger):
  from_cur = trigger.group(1)
  global last_prices
  from_cur = from_cur.lower()
  if from_cur not in main_coins:
    bot.say("Invalid currency!")

  api_result = requests.get(single_url.format(from_cur)).json()

  if from_cur not in last_prices:
    last_prices[from_cur] = 0
  digits = False if from_cur.lower()=='xrp' else True
  diffStr = getDiffString(float(api_result["last_price"]), last_prices[from_cur], digits)
  last_prices[from_cur] = float(api_result["last_price"])
  with open('~/.sopel/cur_py_cache', 'w') as outfile:
    json.dump(last_prices, outfile)
  bot.say("{0}: ${1:.{2}f}{3}".format(from_cur, float(api_result["last_price"]), 2 if digits else 4, diffStr))
项目:sstash    作者:realcr    | 项目源码 | 文件源码
def write_store(self,store):
        """
        Commit store to file.
        """
        inner_data = json.dumps(store).encode('utf-8')
        nonce = nacl.utils.random(nacl.secret.SecretBox.NONCE_SIZE)
        enc_bytes = self._box.encrypt(inner_data,nonce)
        enc_blob = bytes_to_hex_str(enc_bytes)

        outer_data = {
            'hash': self._hash,
            'salt': bytes_to_hex_str(self._salt),
            'iterations': self._iterations,
            'enc_blob': enc_blob,
        }

        with open(self._path,'w',encoding='ascii') as fw:
            json.dump(outer_data,fw)
项目:docs    作者:hasura    | 项目源码 | 文件源码
def finish(self):
        super().finish()
        self.info(bold("writing templatebuiltins.js..."))
        xrefs = self.env.domaindata["std"]["objects"]
        templatebuiltins = {
            "ttags": [
                n for ((t, n), (k, a)) in xrefs.items()
                if t == "templatetag" and k == "ref/templates/builtins"
            ],
            "tfilters": [
                n for ((t, n), (k, a)) in xrefs.items()
                if t == "templatefilter" and k == "ref/templates/builtins"
            ],
        }
        outfilename = os.path.join(self.outdir, "templatebuiltins.js")
        with open(outfilename, 'w') as fp:
            fp.write('var django_template_builtins = ')
            json.dump(templatebuiltins, fp)
            fp.write(';\n')
项目:cellranger    作者:10XGenomics    | 项目源码 | 文件源码
def main(args, outs):
    genomes = cr_matrix.GeneBCMatrices.load_genomes_from_h5(args.filtered_matrices)
    chemistry = cr_matrix.GeneBCMatrices.load_chemistry_from_h5(args.filtered_matrices)
    total_cells = cr_matrix.GeneBCMatrices.count_cells_from_h5(args.filtered_matrices)
    summary = {'chemistry_description': chemistry, 'filtered_bcs_transcriptome_union': total_cells}
    with open(outs.summary, 'w') as f:
        json.dump(summary, f, indent=4, sort_keys=True)

    sample_properties = cr_webshim.get_sample_properties(args.analysis_id, args.analysis_desc, genomes, version=martian.get_pipelines_version())

    sample_data_paths = cr_webshim_data.SampleDataPaths(
        summary_path=outs.summary,
        analysis_path=args.analysis,
    )

    sample_data = cr_webshim.load_sample_data(sample_properties, sample_data_paths)
    cr_webshim.build_web_summary_html(outs.web_summary, sample_properties, sample_data, PIPELINE_REANALYZE)
项目:cellranger    作者:10XGenomics    | 项目源码 | 文件源码
def join(args, outs, chunk_defs, chunk_outs):
    matrix_attrs = cr_matrix.make_matrix_attrs_aggr(args.gem_group_index, "Unknown")
    cr_matrix.concatenate_h5([chunk_out.raw_matrices_h5 for chunk_out in chunk_outs], outs.raw_matrices_h5, extra_attrs=matrix_attrs)
    cr_matrix.concatenate_h5([chunk_out.filtered_matrices_h5 for chunk_out in chunk_outs], outs.filtered_matrices_h5, extra_attrs=matrix_attrs)

    cr_matrix.concatenate_mex_dirs([chunk_out.raw_matrices_mex for chunk_out in chunk_outs], outs.raw_matrices_mex)
    cr_matrix.concatenate_mex_dirs([chunk_out.filtered_matrices_mex for chunk_out in chunk_outs], outs.filtered_matrices_mex)

    merged_molecules = [chunk_out.filtered_molecules for chunk_out in chunk_outs]
    cr_mol_counter.MoleculeCounter.concatenate(outs.filtered_molecules, merged_molecules)

    barcode_summaries = [chunk_out.barcode_summary_h5 for chunk_out in chunk_outs]
    merge_barcode_summaries(barcode_summaries, outs.barcode_summary_h5)

    # merge summaries
    summary = merge_summaries(chunk_outs)
    with open(outs.summary, 'w') as f:
        json.dump(summary, f, indent=4, sort_keys=True)
项目:cellranger    作者:10XGenomics    | 项目源码 | 文件源码
def main(args, outs):

    # Write read_chunk for consumption by Rust
    with open("chunk_args.json", "w") as f:
        json.dump(args.read_chunk, f)

    output_path = martian.make_path("")
    prefix = "fastq_chunk"
    chunk_reads_args = ['chunk_reads',  '--reads-per-fastq', str(args.reads_per_file), output_path, prefix, "--martian-args", "chunk_args.json"]
    print "running chunk reads: [%s]" % str(chunk_reads_args)
    subprocess.check_call(chunk_reads_args)

    with open(os.path.join(output_path, "read_chunks.json")) as f:
        chunk_results = json.load(f)

    outs.out_chunks = []

    # Write out a new chunk entry for each resulting chunk
    for chunk in chunk_results:
        print args.read_chunk
        chunk_copy = args.read_chunk.copy()
        print chunk_copy
        chunk_copy['read_chunks'] = chunk
        outs.out_chunks.append(chunk_copy)
项目:python-station-backend    作者:itielshwartz    | 项目源码 | 文件源码
def requests_with_cache(dir):
    def decorator(func):
        def wrapper(**kwargs):
            cache_key = str(kwargs.get("param", "default.json"))
            cache_url = dir + "/" + cache_key.replace("/", "-").replace("_", "-")
            if os.path.isfile(cache_url):
                with open(cache_url, 'r') as f:
                    print(cache_url)
                    return json.load(f)
            with open(cache_url, 'w+') as f:
                ret = func(**kwargs)
                json.dump(ret, f)
                return ret

        return wrapper

    return decorator
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def main():
    if len(sys.argv) == 1:
        infile = sys.stdin
        outfile = sys.stdout
    elif len(sys.argv) == 2:
        infile = open(sys.argv[1], 'rb')
        outfile = sys.stdout
    elif len(sys.argv) == 3:
        infile = open(sys.argv[1], 'rb')
        outfile = open(sys.argv[2], 'wb')
    else:
        raise SystemExit(sys.argv[0] + " [infile [outfile]]")
    try:
        obj = json.load(infile)
    except ValueError, e:
        raise SystemExit(e)
    json.dump(obj, outfile, sort_keys=True, indent=4)
    outfile.write('\n')
项目:wpw-sdk-python    作者:WPTechInnovation    | 项目源码 | 文件源码
def output_json(svc_msg, service_types):
    """
    Writes output results to a file
    """
    global DATA2
    outfname = 'device-scanner.json'
    DATA2 = DATA2 + [{'serverid': svc_msg.getServerId(),
                      'devicename': svc_msg.getDeviceName(),
                      'devicedescription': svc_msg.getDeviceDescription(),
                      'hostname': svc_msg.getHostname(),
                      'portnumber': svc_msg.getPortNumber(),
                      'urlprefix': svc_msg.getUrlPrefix(),
                      'servicetypes': service_types,
                      }]
    try:
        with open(outfname, 'w') as outfile:
            json.dump(DATA2, outfile)
    except Exception:
        print("You need to configure the webserver path" +
              " if you want to output json")
项目:fantasy-dota-heroes    作者:ThePianoDentist    | 项目源码 | 文件源码
def main():
    driver.get("https://www.dotabuff.com/heroes/winning?date=patch_7.04")
    rows = driver.find_elements_by_xpath("//table/tbody/tr")
    old_winrates = {}
    for row in rows:
        cells = row.find_elements_by_xpath("td")
        hero = cells[1].text
        winrate = float(cells[2].get_attribute("data-value"))
        old_winrates[hero] = winrate

    driver.get("https://www.dotabuff.com/heroes/winning?date=patch_7.05")
    rows = driver.find_elements_by_xpath("//table/tbody/tr")
    win_rate_diff = {}
    for row in rows:
        cells = row.find_elements_by_xpath("td")
        hero = cells[1].text
        winrate = float(cells[2].get_attribute("data-value"))
        win_rate_diff[hero] = winrate - old_winrates[hero]
    with open(os.environ.get('FDOTA') + '/fantasydota/junk/windiff_705', 'w') as f:
        json.dump(win_rate_diff, f)
项目:openedoo    作者:openedoo    | 项目源码 | 文件源码
def add_manifest(name_module=None,version_modul=None,url=None):
    if name_module == None :
        return "please insert your name_module"
    if version_modul == None :
        version = "0.1.0"
    if url == None:
        url = ""
    try:
        filename = 'manifest.json'
        with open(filename,'r') as data_file:
            data_json = json.loads(data_file.read())
        os.remove(filename)
        new_data={'name_module':name_module,'version_module':version_modul,'url_module':url}
        data_json['installed_module'].append(new_data)
        with open(filename,'w') as data_file:
            json.dump(data_json, data_file)
    except Exception as e:
        return e
项目:openedoo    作者:openedoo    | 项目源码 | 文件源码
def del_manifest(name_module=None):
    try:
        filename = 'manifest.json'
        if name_module == None:
            return "please insert your modul name"
        with open(filename,'r') as data_file:
            data_json = json.loads(data_file.read())
        number_akhir = len(data_json['installed_module'])
        number_awal = 0
        for number_awal in xrange(number_awal,number_akhir):
            jumlah = (number_awal+1)-1
            if name_module == data_json['installed_module'][jumlah]['name_module']:
                os.remove(filename)
                del data_json['installed_module'][jumlah]
                with open(filename,'w') as data_file:
                    json.dump(data_json, data_file)
            else:
                pass
        return "modul has deleted"
    except Exception as e:
        pass
项目:openedoo    作者:openedoo    | 项目源码 | 文件源码
def create_requirement(name_module=None,version_module=None,url_endpoint=None,requirement=None,comment=None,url=None):
    if comment is None:
        comment = "my module name is {name}".format(name=name_module)
    if requirement is None:
        requirement = "openedoo_core"
    if name_module==None:
        return "please insert name module"
    if version_module is None:
        version_module = "0.1.0"
    if url_endpoint is None:
        url_endpoint = {'url_endpoint':''.format(url=name_module),'type':'function'}
    else:
        url_endpoint = {'url_endpoint':url_endpoint,'type':'end_point'}
    data_json = {"name":name_module,
    "version": version_module,
    "requirement":requirement,
    "pip_library":[],
    "comment":comment,
    "type":url_endpoint['type'],
    "url":url,
    "url_endpoint":url_endpoint['url_endpoint']}
    filename = 'requirement.json'
    with open('modules/{folder}/{filename}'.format(folder=name_module,filename=filename),'w') as data_file:
        json.dump(data_json, data_file)
    return "module has created"
项目:sea-lion-counter    作者:rdinse    | 项目源码 | 文件源码
def save(self):
    global_step = self.sess.run(tf.train.get_global_step(self.graph))

    if self.config['last_checkpoint'] == global_step:
      if self.config['debug']:
        print('Model has already been saved during the current global step.')
        return

    print('Saving to %s with global_step %d.' % (self.config['results_dir'], global_step))

    self.saver.save(self.sess, os.path.join(self.config['results_dir'], 'checkpoint'), global_step)
    self.config['last_checkpoint'] = global_step

    # Also save the configuration
    json_file = os.path.join(self.config['results_dir'], 'config.json')
    with open(json_file, 'w') as f:
      json.dump(self.config, f, cls=utilities.NumPyCompatibleJSONEncoder)
项目:spotify-connect-scrobbler    作者:jeschkies    | 项目源码 | 文件源码
def save(self, config_file_path):
        """Save credentials to file.

        Args:
            config_file_path (path-like object): Path to file containing
            credentials. The file os opened and closed by this method.
        """
        with open(config_file_path, 'w') as f:
            data = {}
            if self.lastfm is not None:
                data['lastfm'] = {'session_key': self.lastfm.session_key}

            if self.spotify is not None:
                data['spotify'] = {
                    'access_token': self.spotify.access_token,
                    'token_type': self.spotify.token_type,
                    'refresh_token': self.spotify.refresh_token,
                    'scope': self.spotify.scope
                }

            json.dump(data, f)
项目:tensorboard    作者:dmlc    | 项目源码 | 文件源码
def add_text(self, tag, text_string, global_step=None):
        """Add text data to summary.

        Args:
            tag (string): Data identifier
            text_string (string): String to save
            global_step (int): Global step value to record

        Examples::

            writer.add_text('lstm', 'This is an lstm', 0)
            writer.add_text('rnn', 'This is an rnn', 10)

        """
        self.file_writer.add_summary(text(tag, text_string), global_step)
        if tag not in self.text_tags:
            self.text_tags.append(tag)
            extensionDIR = self.file_writer.get_logdir() + '/plugins/tensorboard_text/'
            if not os.path.exists(extensionDIR):
                os.makedirs(extensionDIR)
            with open(extensionDIR + 'tensors.json', 'w') as fp:
                json.dump(self.text_tags, fp)
项目:oscars2016    作者:0x0ece    | 项目源码 | 文件源码
def set(self, url, content):
    f = LockedFile(self._file, 'r+', 'r')
    try:
      f.open_and_lock()
      if f.is_locked():
        cache = _read_or_initialize_cache(f)
        cache[url] = (content, _to_timestamp(datetime.datetime.now()))
        # Remove stale cache.
        for k, (_, timestamp) in list(cache.items()):
          if _to_timestamp(datetime.datetime.now()) >= timestamp + self._max_age:
            del cache[k]
        f.file_handle().truncate(0)
        f.file_handle().seek(0)
        json.dump(cache, f.file_handle())
      else:
        logger.debug('Could not obtain a lock for the cache file.')
    except Exception as e:
      logger.warning(e, exc_info=True)
    finally:
      f.unlock_and_close()
项目:id_photo    作者:aeifn    | 项目源码 | 文件源码
def apply_settings(self, widget, data=None):
    # ???????? ????, ???? ?? ??????
    self.window.hide()
    if self.white_bg_check.get_active():
      self.data['properties']['white_bg'] = True
    else:
      self.data['properties']['white_bg'] = False
    if self.auto_levels_check.get_active():
      self.data['properties']['auto_levels'] = True
    else:
      self.data['properties']['auto_levels'] = False
    self.data['properties']['resolution'] = int(self.resolution_cb.get_active_text())
    config = open(self.path, 'wb')
    json.dump(self.data, config, indent=3)
    config.close()
    gtk.main_quit()

  # ??? ??????? ???????????? ?????????????? ?????????? ???????
项目:Gank-Alfred-Workflow    作者:hujiaweibujidao    | 项目源码 | 文件源码
def register(self, name, serializer):
        """Register ``serializer`` object under ``name``.

        Raises :class:`AttributeError` if ``serializer`` in invalid.

        .. note::

            ``name`` will be used as the file extension of the saved files.

        :param name: Name to register ``serializer`` under
        :type name: ``unicode`` or ``str``
        :param serializer: object with ``load()`` and ``dump()``
            methods

        """

        # Basic validation
        getattr(serializer, 'load')
        getattr(serializer, 'dump')

        self._serializers[name] = serializer
项目:Gank-Alfred-Workflow    作者:hujiaweibujidao    | 项目源码 | 文件源码
def dump(cls, obj, file_obj):
        """Serialize object ``obj`` to open pickle file.

        .. versionadded:: 1.8

        :param obj: Python object to serialize
        :type obj: Python object
        :param file_obj: file handle
        :type file_obj: ``file`` object

        """

        return pickle.dump(obj, file_obj, protocol=-1)


# Set up default manager and register built-in serializers
项目:Gank-Alfred-Workflow    作者:hujiaweibujidao    | 项目源码 | 文件源码
def save(self):
        """Save settings to JSON file specified in ``self._filepath``

        If you're using this class via :attr:`Workflow.settings`, which
        you probably are, ``self._filepath`` will be ``settings.json``
        in your workflow's data directory (see :attr:`~Workflow.datadir`).
        """
        if self._nosave:
            return
        data = {}
        data.update(self)
        # for key, value in self.items():
        #     data[key] = value
        with LockFile(self._filepath):
            with atomic_writer(self._filepath, 'wb') as file_obj:
                json.dump(data, file_obj, sort_keys=True, indent=2,
                          encoding='utf-8')

    # dict methods
项目:Gank-Alfred-Workflow    作者:hujiaweibujidao    | 项目源码 | 文件源码
def register(self, name, serializer):
        """Register ``serializer`` object under ``name``.

        Raises :class:`AttributeError` if ``serializer`` in invalid.

        .. note::

            ``name`` will be used as the file extension of the saved files.

        :param name: Name to register ``serializer`` under
        :type name: ``unicode`` or ``str``
        :param serializer: object with ``load()`` and ``dump()``
            methods

        """

        # Basic validation
        getattr(serializer, 'load')
        getattr(serializer, 'dump')

        self._serializers[name] = serializer
项目:Gank-Alfred-Workflow    作者:hujiaweibujidao    | 项目源码 | 文件源码
def dump(cls, obj, file_obj):
        """Serialize object ``obj`` to open pickle file.

        .. versionadded:: 1.8

        :param obj: Python object to serialize
        :type obj: Python object
        :param file_obj: file handle
        :type file_obj: ``file`` object

        """

        return pickle.dump(obj, file_obj, protocol=-1)


# Set up default manager and register built-in serializers
项目:pip-update-requirements    作者:alanhamlett    | 项目源码 | 文件源码
def write(self, path=None, fileobj=None, legacy=False, skip_unknown=True):
        if [path, fileobj].count(None) != 1:
            raise ValueError('Exactly one of path and fileobj is needed')
        self.validate()
        if legacy:
            if self._legacy:
                legacy_md = self._legacy
            else:
                legacy_md = self._to_legacy()
            if path:
                legacy_md.write(path, skip_unknown=skip_unknown)
            else:
                legacy_md.write_file(fileobj, skip_unknown=skip_unknown)
        else:
            if self._legacy:
                d = self._from_legacy()
            else:
                d = self._data
            if fileobj:
                json.dump(d, fileobj, ensure_ascii=True, indent=2,
                          sort_keys=True)
            else:
                with codecs.open(path, 'w', 'utf-8') as f:
                    json.dump(d, f, ensure_ascii=True, indent=2,
                              sort_keys=True)
项目:pip-update-requirements    作者:alanhamlett    | 项目源码 | 文件源码
def save(self, pypi_version, current_time):
        # Check to make sure that we own the directory
        if not check_path_owner(os.path.dirname(self.statefile_path)):
            return

        # Now that we've ensured the directory is owned by this user, we'll go
        # ahead and make sure that all our directories are created.
        ensure_dir(os.path.dirname(self.statefile_path))

        # Attempt to write out our version check file
        with lockfile.LockFile(self.statefile_path):
            if os.path.exists(self.statefile_path):
                with open(self.statefile_path) as statefile:
                    state = json.load(statefile)
            else:
                state = {}

            state[sys.prefix] = {
                "last_check": current_time.strftime(SELFCHECK_DATE_FMT),
                "pypi_version": pypi_version,
            }

            with open(self.statefile_path, "w") as statefile:
                json.dump(state, statefile, sort_keys=True,
                          separators=(",", ":"))
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def sync(self):
        'Write dict to disk'
        if self.flag == 'r':
            return
        filename = self.filename
        tempname = filename + '.tmp'
        fileobj = open(tempname, 'wb' if self.format=='pickle' else 'w')
        try:
            self.dump(fileobj)
        except Exception:
            os.remove(tempname)
            raise
        finally:
            fileobj.close()
        shutil.move(tempname, self.filename)    # atomic commit
        if self.mode is not None:
            os.chmod(self.filename, self.mode)
项目:caly-recommend-system    作者:CalyFactory    | 项目源码 | 文件源码
def save_all_recommend_item():
    reco = Reco(json_data, show_external_data = False)
    all_list = reco.get_all_list()

    print(
        json.dumps(
            all_list,
            indent = 4,
            sort_keys=True,
            ensure_ascii=False
        )
    )

    with open('testItemData.json', 'w') as outfile:
        json.dump(all_list, outfile, indent = 4)

    with open('testItemDataEncoded.json', 'w') as outfile:
        json.dump(all_list, outfile, indent = 4, ensure_ascii=False)

# ??? ??
项目:caly-recommend-system    作者:CalyFactory    | 项目源码 | 文件源码
def save_all_recommend_item():
    reco = Reco(json_data, show_external_data = False)
    all_list = reco.get_all_list()

    print(
        json.dumps(
            all_list,
            indent = 4,
            sort_keys=True,
            ensure_ascii=False
        )
    )

    with open('testItemData.json', 'w') as outfile:
        json.dump(all_list, outfile, indent = 4)

    with open('testItemDataEncoded.json', 'w') as outfile:
        json.dump(all_list, outfile, indent = 4, ensure_ascii=False)

# ??? ??
项目:Harmonbot    作者:Harmon758    | 项目源码 | 文件源码
def twitter_add(self, ctx, handle : str):
        '''
        Add a Twitter handle to a text channel
        A delay of up to 2 min. is possible due to Twitter rate limits
        '''
        if handle in self.feeds_info["channels"].get(ctx.message.channel.id, {}).get("handles", []):
            await self.bot.embed_reply(":no_entry: This text channel is already following that Twitter handle")
            return
        message, embed = await self.bot.embed_reply(":hourglass: Please wait")
        try:
            await self.stream_listener.add_feed(ctx.message.channel, handle)
        except tweepy.error.TweepError as e:
            embed.description = ":no_entry: Error: {}".format(e)
            await self.bot.edit_message(message, embed = embed)
            return
        if ctx.message.channel.id in self.feeds_info["channels"]:
            self.feeds_info["channels"][ctx.message.channel.id]["handles"].append(handle)
        else:
            self.feeds_info["channels"][ctx.message.channel.id] = {"name" : ctx.message.channel.name, "handles" : [handle]}
        with open("data/twitter_feeds.json", 'w') as feeds_file:
            json.dump(self.feeds_info, feeds_file, indent = 4)
        embed.description = "Added the Twitter handle, [`{0}`](https://twitter.com/{0}), to this text channel".format(handle)
        await self.bot.edit_message(message, embed = embed)
项目:Harmonbot    作者:Harmon758    | 项目源码 | 文件源码
def twitter_remove(self, ctx, handle : str):
        '''
        Remove a Twitter handle from a text channel
        A delay of up to 2 min. is possible due to Twitter rate limits
        '''
        try:
            self.feeds_info["channels"].get(ctx.message.channel.id, {}).get("handles", []).remove(handle)
        except ValueError:
            await self.bot.embed_reply(":no_entry: This text channel isn't following that Twitter handle")
        else:
            with open("data/twitter_feeds.json", 'w') as feeds_file:
                json.dump(self.feeds_info, feeds_file, indent = 4)
            message, embed = await self.bot.embed_reply(":hourglass: Please wait")
            await self.stream_listener.remove_feed(ctx.message.channel, handle)
            embed.description = "Removed the Twitter handle, [`{0}`](https://twitter.com/{0}), from this text channel.".format(handle)
            await self.bot.edit_message(message, embed = embed)
项目:Harmonbot    作者:Harmon758    | 项目源码 | 文件源码
def generate_erps_dict(self):
        async with clients.aiohttp_session.get("http://www.umop.com/rps101/alloutcomes.htm") as resp:
            data = await resp.text()
        raw_text = BeautifulSoup(data).text
        raw_text = re.sub("\n+", '\n', raw_text).strip()
        raw_text = raw_text.lower().replace("video game", "game")
        raw_text = raw_text.split('\n')[:-1]
        objects = {}
        object = raw_text[0].split()[-1]
        object_info = {}
        for line in raw_text[1:]:
            if line[0].isdigit():
                objects[object] = object_info
                object = line.split()[-1]
                object_info = {}
            else:
                object_info[line.split()[-1]] = ' '.join(line.split()[:-1])
        objects[object] = object_info
        with open("data/erps_dict.json", 'w') as erps_file:
            json.dump(objects, erps_file, indent = 4)
项目:Harmonbot    作者:Harmon758    | 项目源码 | 文件源码
def tag(self, ctx, tag : str = ""):
        '''Tags/notes that you can trigger later'''
        if not tag:
            await self.bot.embed_reply("Add a tag with `{0}tag add [tag] [content]`\nUse `{0}tag [tag]` to trigger the tag you added\n`{0}tag edit [tag] [content]` to edit it and `{0}tag delete [tag]` to delete it".format(ctx.prefix))
            return
        if tag in self.tags_data.get(ctx.message.author.id, {}).get("tags", []):
            await self.bot.reply(self.tags_data[ctx.message.author.id]["tags"][tag])
        elif tag in self.tags_data["global"]:
            await self.bot.reply(self.tags_data["global"][tag]["response"])
            self.tags_data["global"][tag]["usage_counter"] += 1
            with open("data/tags.json", 'w') as tags_file:
                json.dump(self.tags_data, tags_file, indent = 4)
        else:
            close_matches = difflib.get_close_matches(tag, list(self.tags_data.get(ctx.message.author.id, {}).get("tags", {}).keys()) + list(self.tags_data["global"].keys()))
            close_matches = "\nDid you mean:\n{}".format('\n'.join(close_matches)) if close_matches else ""
            await self.bot.embed_reply("Tag not found{}".format(close_matches))
项目:X-ray-classification    作者:bendidi    | 项目源码 | 文件源码
def extract(url):
    global img_no

    try :
        img_no += 1
        r = requests.get(url)
        tree = html.fromstring(r.text)

        div = tree.xpath('//table[@class="masterresultstable"]\
            //div[@class="meshtext-wrapper-left"]')
    except : div=[]

    if div != []:
        div = div[0]
    else:
        return

    typ = div.xpath('.//strong/text()')[0]
    items = div.xpath('.//li/text()')
    img = tree.xpath('//img[@id="theImage"]/@src')[0]


    final_data[img_no] = {}
    final_data[img_no]['type'] = typ
    final_data[img_no]['items'] = items
    final_data[img_no]['img'] = domain + img
    try :
        urllib.urlretrieve(domain+img, path+str(img_no)+".png")
        with open('data_new.json', 'w') as f:
            json.dump(final_data, f)

        output = "Downloading Images : {}".format(img_no)
        sys.stdout.write("\r\x1b[K" + output)
        sys.stdout.flush()
    except :return
项目:pygrunt    作者:elementbound    | 项目源码 | 文件源码
def save_cache(self, file):
        import json

        with open(file, 'w') as f:
            json.dump(self.cache, f, indent=4, sort_keys=True)
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def yaml(self):
        """Serialize the object to yaml"""
        return yaml.dump(self.data)
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def save(self):
        """Save this config to disk.

        If the charm is using the :mod:`Services Framework <services.base>`
        or :meth:'@hook <Hooks.hook>' decorator, this
        is called automatically at the end of successful hook execution.
        Otherwise, it should be called directly by user code.

        To disable automatic saves, set ``implicit_save=False`` on this
        instance.

        """
        with open(self.path, 'w') as f:
            json.dump(self, f)
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def _save_ready_file(self):
        if self._ready is None:
            return
        with open(self._ready_file, 'w') as fp:
            json.dump(list(self._ready), fp)
项目:flora    作者:Lamden    | 项目源码 | 文件源码
def generate(location):
    # cli wizard for creating a new contract from a template
    if directory_has_smart_contract(location):
        example_payload = json.load(open(glob.glob(os.path.join(location, '*.json'))[0]))
        print(example_payload)
        for k, v in example_payload.items():
            value = input(k + ':')
            if value != '':
                example_payload[k] = value
        print(example_payload)

        code_path = glob.glob(os.path.join(location, '*.tsol'))
        tsol.compile(open(code_path[0]), example_payload)
        print('Code compiles with new payload.')
        selection = ''
        while True:
            selection = input('(G)enerate Solidity contract or (E)xport implementation:')
            if selection.lower() == 'g':
                output_name = input('Name your contract file without an extension:')
                code = tsol.generate_code(open(code_path[0]).read(), example_payload)
                open(os.path.join(location, '{}.sol'.format(output_name)), 'w').write(code)
                break

            if selection.lower() == 'e':
                output_name = input('Name your implementation file without an extension:')
                json.dump(example_payload, open(os.path.join(location, '{}.json'.format(output_name)), 'w'))
                break
    else:
        print('Provided directory does not contain a *.tsol and *.json or does not compile.')
项目:alfred-mpd    作者:deanishe    | 项目源码 | 文件源码
def send_feedback(self):
        """Print stored items to console/Alfred as JSON."""
        json.dump(self.obj, sys.stdout)
        sys.stdout.flush()