我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用json.dump()。
def error(self, message, ensure_ascii=False): """Stop analyzer with an error message. Changing ensure_ascii can be helpful when stucking with ascii <-> utf-8 issues. Additionally, the input as returned, too. Maybe helpful when dealing with errors. :param message: Error message :param ensure_ascii: Force ascii output. Default: False""" analyzerInput = self.__input if 'password' in analyzerInput.get('config', {}): analyzerInput['config']['password'] = 'REMOVED' if 'key' in analyzerInput.get('config', {}): analyzerInput['config']['key'] = 'REMOVED' if 'apikey' in analyzerInput.get('config', {}): analyzerInput['config']['apikey'] = 'REMOVED' if 'api_key' in analyzerInput.get('config', {}): analyzerInput['config']['api_key'] = 'REMOVED' json.dump({'success': False, 'input': analyzerInput, 'errorMessage': message}, self.fpoutput, ensure_ascii=ensure_ascii) # Force exit after error sys.exit(1)
def register(name): # hit api to see if name is already registered if check_name(name)['status'] == 'error': print('{} already registered.'.format(name)) else: # generate new keypair (pub, priv) = rsa.newkeys(512) if os.path.exists(KEY_LOCATION) == False: os.mkdir(KEY_LOCATION) # save to disk with open('{}/.key'.format(KEY_LOCATION), 'wb') as f: pickle.dump((pub, priv), f, pickle.HIGHEST_PROTOCOL) r = requests.post('{}/names'.format(API_LOCATION), data = {'name' : name, 'n' : pub.n, 'e' : pub.e}) if r.json()['status'] == 'success': print('Successfully registered new name: {}'.format(name)) else: print('Error registering name: {}'.format(name))
def report(self, full_report, ensure_ascii=False): """Returns a json dict via stdout. :param full_report: Analyzer results as dict. :param ensure_ascii: Force ascii output. Default: False""" summary = {} try: summary = self.summary(full_report) except: pass report = { 'success': True, 'summary': summary, 'artifacts': self.artifacts(full_report), 'full': full_report } json.dump(report, self.fpoutput, ensure_ascii=ensure_ascii)
def write(self, path=None, fileobj=None, legacy=False, skip_unknown=True): if [path, fileobj].count(None) != 1: raise ValueError('Exactly one of path and fileobj is needed') self.validate() if legacy: if self._legacy: legacy_md = self._legacy else: legacy_md = self._to_legacy() if path: legacy_md.write(path, skip_unknown=skip_unknown) else: legacy_md.write_file(fileobj, skip_unknown=skip_unknown) else: if self._legacy: d = self._from_legacy() else: d = self._data if fileobj: json.dump(d, fileobj, ensure_ascii=True, indent=2, sort_keys=True) else: with codecs.open(path, 'w', 'utf-8') as f: json.dump(d, f, ensure_ascii=True, indent=2, sort_keys=True)
def save(self, pypi_version, current_time): # Check to make sure that we own the directory if not check_path_owner(os.path.dirname(self.statefile_path)): return # Now that we've ensured the directory is owned by this user, we'll go # ahead and make sure that all our directories are created. ensure_dir(os.path.dirname(self.statefile_path)) # Attempt to write out our version check file with lockfile.LockFile(self.statefile_path): if os.path.exists(self.statefile_path): with open(self.statefile_path) as statefile: state = json.load(statefile) else: state = {} state[sys.prefix] = { "last_check": current_time.strftime(SELFCHECK_DATE_FMT), "pypi_version": pypi_version, } with open(self.statefile_path, "w") as statefile: json.dump(state, statefile, sort_keys=True, separators=(",", ":"))
def encode_and_store(batch_x, output_dir, file_name): """ Args: 1. batch_x: Batch of 32*32 images which will go inside our autoencoder. 2. output_dir: Dir path for storing all encoded features for given `batch_x`. Features will be stored in the form of JSON file. 3. file_name: File name of JSON file. """ global AUTO_ENCODER if AUTO_ENCODER is None: load_AE() norm_batch = np.zeros(batch_x.shape) for i in range(len(batch_x)): norm_batch[i] = (batch_x[i] - np.mean(batch_x[i])) / np.std(batch_x[i]) output_dict = { 'name' : file_name, 'encoded': AUTO_ENCODER.transform(norm_batch).tolist()} with open(output_dir+file_name+'.json', 'w') as f: json.dump(output_dict, f)
def write_log(target_uri, doi, pmid, found_rrids, head, body, text, h): now = datetime.now().isoformat()[0:19].replace(':','').replace('-','') frv = list(set(found_rrids.values())) if len(frv) == 1 and frv[0] == 'Already Annotated': head, body, text = None, None, None log = {'target_uri':target_uri, 'group':h.group, 'doi':doi, 'pmid':pmid, 'found_rrids':found_rrids, 'count':len(found_rrids), 'head':head, 'body':body, 'text':text, } fname = 'logs/' + 'rrid-%s.json' % now with open(fname, 'wt') as f: json.dump(log, f, sort_keys=True, indent=4)
def export(metadata, start, end, container_image_pattern): queries = [] metadata["start"] = start.isoformat() + "Z" metadata["end"] = end.isoformat() + "Z" metadata["services"] = [] ts = datetime.utcnow().strftime("%Y%m%d%H%M%S-") path = os.path.join(metadata["metrics_export"], ts + metadata["measurement_name"]) if not os.path.isdir(path): os.makedirs(path) for app in APPS: metadata["services"].append(dump_app(app, path, start, end, container_image_pattern)) with open(os.path.join(path, "metadata.json"), "w+") as f: json.dump(metadata, f, cls=Encoder, sort_keys=True, indent=4) f.flush()
def keystone_auth(auth_details): try: if auth_details['OS_AUTH_URL'].endswith('v3'): k_client = k3_client else: k_client = k2_client tenant_name = auth_details['OS_TENANT_NAME'] keystone = k_client.Client(username=auth_details['OS_USERNAME'], password=auth_details['OS_PASSWORD'], tenant_name=tenant_name, auth_url=auth_details['OS_AUTH_URL']) except Exception as e: status_err(str(e)) try: with open(TOKEN_FILE, 'w') as token_file: json.dump(keystone.auth_ref, token_file) except IOError: # if we can't write the file we go on pass return keystone.auth_ref
def crypto_spot(bot, trigger): from_cur = trigger.group(1) global last_prices from_cur = from_cur.lower() if from_cur not in main_coins: bot.say("Invalid currency!") api_result = requests.get(single_url.format(from_cur)).json() if from_cur not in last_prices: last_prices[from_cur] = 0 digits = False if from_cur.lower()=='xrp' else True diffStr = getDiffString(float(api_result["last_price"]), last_prices[from_cur], digits) last_prices[from_cur] = float(api_result["last_price"]) with open('~/.sopel/cur_py_cache', 'w') as outfile: json.dump(last_prices, outfile) bot.say("{0}: ${1:.{2}f}{3}".format(from_cur, float(api_result["last_price"]), 2 if digits else 4, diffStr))
def write_store(self,store): """ Commit store to file. """ inner_data = json.dumps(store).encode('utf-8') nonce = nacl.utils.random(nacl.secret.SecretBox.NONCE_SIZE) enc_bytes = self._box.encrypt(inner_data,nonce) enc_blob = bytes_to_hex_str(enc_bytes) outer_data = { 'hash': self._hash, 'salt': bytes_to_hex_str(self._salt), 'iterations': self._iterations, 'enc_blob': enc_blob, } with open(self._path,'w',encoding='ascii') as fw: json.dump(outer_data,fw)
def finish(self): super().finish() self.info(bold("writing templatebuiltins.js...")) xrefs = self.env.domaindata["std"]["objects"] templatebuiltins = { "ttags": [ n for ((t, n), (k, a)) in xrefs.items() if t == "templatetag" and k == "ref/templates/builtins" ], "tfilters": [ n for ((t, n), (k, a)) in xrefs.items() if t == "templatefilter" and k == "ref/templates/builtins" ], } outfilename = os.path.join(self.outdir, "templatebuiltins.js") with open(outfilename, 'w') as fp: fp.write('var django_template_builtins = ') json.dump(templatebuiltins, fp) fp.write(';\n')
def main(args, outs): genomes = cr_matrix.GeneBCMatrices.load_genomes_from_h5(args.filtered_matrices) chemistry = cr_matrix.GeneBCMatrices.load_chemistry_from_h5(args.filtered_matrices) total_cells = cr_matrix.GeneBCMatrices.count_cells_from_h5(args.filtered_matrices) summary = {'chemistry_description': chemistry, 'filtered_bcs_transcriptome_union': total_cells} with open(outs.summary, 'w') as f: json.dump(summary, f, indent=4, sort_keys=True) sample_properties = cr_webshim.get_sample_properties(args.analysis_id, args.analysis_desc, genomes, version=martian.get_pipelines_version()) sample_data_paths = cr_webshim_data.SampleDataPaths( summary_path=outs.summary, analysis_path=args.analysis, ) sample_data = cr_webshim.load_sample_data(sample_properties, sample_data_paths) cr_webshim.build_web_summary_html(outs.web_summary, sample_properties, sample_data, PIPELINE_REANALYZE)
def join(args, outs, chunk_defs, chunk_outs): matrix_attrs = cr_matrix.make_matrix_attrs_aggr(args.gem_group_index, "Unknown") cr_matrix.concatenate_h5([chunk_out.raw_matrices_h5 for chunk_out in chunk_outs], outs.raw_matrices_h5, extra_attrs=matrix_attrs) cr_matrix.concatenate_h5([chunk_out.filtered_matrices_h5 for chunk_out in chunk_outs], outs.filtered_matrices_h5, extra_attrs=matrix_attrs) cr_matrix.concatenate_mex_dirs([chunk_out.raw_matrices_mex for chunk_out in chunk_outs], outs.raw_matrices_mex) cr_matrix.concatenate_mex_dirs([chunk_out.filtered_matrices_mex for chunk_out in chunk_outs], outs.filtered_matrices_mex) merged_molecules = [chunk_out.filtered_molecules for chunk_out in chunk_outs] cr_mol_counter.MoleculeCounter.concatenate(outs.filtered_molecules, merged_molecules) barcode_summaries = [chunk_out.barcode_summary_h5 for chunk_out in chunk_outs] merge_barcode_summaries(barcode_summaries, outs.barcode_summary_h5) # merge summaries summary = merge_summaries(chunk_outs) with open(outs.summary, 'w') as f: json.dump(summary, f, indent=4, sort_keys=True)
def main(args, outs): # Write read_chunk for consumption by Rust with open("chunk_args.json", "w") as f: json.dump(args.read_chunk, f) output_path = martian.make_path("") prefix = "fastq_chunk" chunk_reads_args = ['chunk_reads', '--reads-per-fastq', str(args.reads_per_file), output_path, prefix, "--martian-args", "chunk_args.json"] print "running chunk reads: [%s]" % str(chunk_reads_args) subprocess.check_call(chunk_reads_args) with open(os.path.join(output_path, "read_chunks.json")) as f: chunk_results = json.load(f) outs.out_chunks = [] # Write out a new chunk entry for each resulting chunk for chunk in chunk_results: print args.read_chunk chunk_copy = args.read_chunk.copy() print chunk_copy chunk_copy['read_chunks'] = chunk outs.out_chunks.append(chunk_copy)
def requests_with_cache(dir): def decorator(func): def wrapper(**kwargs): cache_key = str(kwargs.get("param", "default.json")) cache_url = dir + "/" + cache_key.replace("/", "-").replace("_", "-") if os.path.isfile(cache_url): with open(cache_url, 'r') as f: print(cache_url) return json.load(f) with open(cache_url, 'w+') as f: ret = func(**kwargs) json.dump(ret, f) return ret return wrapper return decorator
def main(): if len(sys.argv) == 1: infile = sys.stdin outfile = sys.stdout elif len(sys.argv) == 2: infile = open(sys.argv[1], 'rb') outfile = sys.stdout elif len(sys.argv) == 3: infile = open(sys.argv[1], 'rb') outfile = open(sys.argv[2], 'wb') else: raise SystemExit(sys.argv[0] + " [infile [outfile]]") try: obj = json.load(infile) except ValueError, e: raise SystemExit(e) json.dump(obj, outfile, sort_keys=True, indent=4) outfile.write('\n')
def output_json(svc_msg, service_types): """ Writes output results to a file """ global DATA2 outfname = 'device-scanner.json' DATA2 = DATA2 + [{'serverid': svc_msg.getServerId(), 'devicename': svc_msg.getDeviceName(), 'devicedescription': svc_msg.getDeviceDescription(), 'hostname': svc_msg.getHostname(), 'portnumber': svc_msg.getPortNumber(), 'urlprefix': svc_msg.getUrlPrefix(), 'servicetypes': service_types, }] try: with open(outfname, 'w') as outfile: json.dump(DATA2, outfile) except Exception: print("You need to configure the webserver path" + " if you want to output json")
def main(): driver.get("https://www.dotabuff.com/heroes/winning?date=patch_7.04") rows = driver.find_elements_by_xpath("//table/tbody/tr") old_winrates = {} for row in rows: cells = row.find_elements_by_xpath("td") hero = cells[1].text winrate = float(cells[2].get_attribute("data-value")) old_winrates[hero] = winrate driver.get("https://www.dotabuff.com/heroes/winning?date=patch_7.05") rows = driver.find_elements_by_xpath("//table/tbody/tr") win_rate_diff = {} for row in rows: cells = row.find_elements_by_xpath("td") hero = cells[1].text winrate = float(cells[2].get_attribute("data-value")) win_rate_diff[hero] = winrate - old_winrates[hero] with open(os.environ.get('FDOTA') + '/fantasydota/junk/windiff_705', 'w') as f: json.dump(win_rate_diff, f)
def add_manifest(name_module=None,version_modul=None,url=None): if name_module == None : return "please insert your name_module" if version_modul == None : version = "0.1.0" if url == None: url = "" try: filename = 'manifest.json' with open(filename,'r') as data_file: data_json = json.loads(data_file.read()) os.remove(filename) new_data={'name_module':name_module,'version_module':version_modul,'url_module':url} data_json['installed_module'].append(new_data) with open(filename,'w') as data_file: json.dump(data_json, data_file) except Exception as e: return e
def del_manifest(name_module=None): try: filename = 'manifest.json' if name_module == None: return "please insert your modul name" with open(filename,'r') as data_file: data_json = json.loads(data_file.read()) number_akhir = len(data_json['installed_module']) number_awal = 0 for number_awal in xrange(number_awal,number_akhir): jumlah = (number_awal+1)-1 if name_module == data_json['installed_module'][jumlah]['name_module']: os.remove(filename) del data_json['installed_module'][jumlah] with open(filename,'w') as data_file: json.dump(data_json, data_file) else: pass return "modul has deleted" except Exception as e: pass
def create_requirement(name_module=None,version_module=None,url_endpoint=None,requirement=None,comment=None,url=None): if comment is None: comment = "my module name is {name}".format(name=name_module) if requirement is None: requirement = "openedoo_core" if name_module==None: return "please insert name module" if version_module is None: version_module = "0.1.0" if url_endpoint is None: url_endpoint = {'url_endpoint':''.format(url=name_module),'type':'function'} else: url_endpoint = {'url_endpoint':url_endpoint,'type':'end_point'} data_json = {"name":name_module, "version": version_module, "requirement":requirement, "pip_library":[], "comment":comment, "type":url_endpoint['type'], "url":url, "url_endpoint":url_endpoint['url_endpoint']} filename = 'requirement.json' with open('modules/{folder}/{filename}'.format(folder=name_module,filename=filename),'w') as data_file: json.dump(data_json, data_file) return "module has created"
def save(self): global_step = self.sess.run(tf.train.get_global_step(self.graph)) if self.config['last_checkpoint'] == global_step: if self.config['debug']: print('Model has already been saved during the current global step.') return print('Saving to %s with global_step %d.' % (self.config['results_dir'], global_step)) self.saver.save(self.sess, os.path.join(self.config['results_dir'], 'checkpoint'), global_step) self.config['last_checkpoint'] = global_step # Also save the configuration json_file = os.path.join(self.config['results_dir'], 'config.json') with open(json_file, 'w') as f: json.dump(self.config, f, cls=utilities.NumPyCompatibleJSONEncoder)
def save(self, config_file_path): """Save credentials to file. Args: config_file_path (path-like object): Path to file containing credentials. The file os opened and closed by this method. """ with open(config_file_path, 'w') as f: data = {} if self.lastfm is not None: data['lastfm'] = {'session_key': self.lastfm.session_key} if self.spotify is not None: data['spotify'] = { 'access_token': self.spotify.access_token, 'token_type': self.spotify.token_type, 'refresh_token': self.spotify.refresh_token, 'scope': self.spotify.scope } json.dump(data, f)
def add_text(self, tag, text_string, global_step=None): """Add text data to summary. Args: tag (string): Data identifier text_string (string): String to save global_step (int): Global step value to record Examples:: writer.add_text('lstm', 'This is an lstm', 0) writer.add_text('rnn', 'This is an rnn', 10) """ self.file_writer.add_summary(text(tag, text_string), global_step) if tag not in self.text_tags: self.text_tags.append(tag) extensionDIR = self.file_writer.get_logdir() + '/plugins/tensorboard_text/' if not os.path.exists(extensionDIR): os.makedirs(extensionDIR) with open(extensionDIR + 'tensors.json', 'w') as fp: json.dump(self.text_tags, fp)
def set(self, url, content): f = LockedFile(self._file, 'r+', 'r') try: f.open_and_lock() if f.is_locked(): cache = _read_or_initialize_cache(f) cache[url] = (content, _to_timestamp(datetime.datetime.now())) # Remove stale cache. for k, (_, timestamp) in list(cache.items()): if _to_timestamp(datetime.datetime.now()) >= timestamp + self._max_age: del cache[k] f.file_handle().truncate(0) f.file_handle().seek(0) json.dump(cache, f.file_handle()) else: logger.debug('Could not obtain a lock for the cache file.') except Exception as e: logger.warning(e, exc_info=True) finally: f.unlock_and_close()
def apply_settings(self, widget, data=None): # ???????? ????, ???? ?? ?????? self.window.hide() if self.white_bg_check.get_active(): self.data['properties']['white_bg'] = True else: self.data['properties']['white_bg'] = False if self.auto_levels_check.get_active(): self.data['properties']['auto_levels'] = True else: self.data['properties']['auto_levels'] = False self.data['properties']['resolution'] = int(self.resolution_cb.get_active_text()) config = open(self.path, 'wb') json.dump(self.data, config, indent=3) config.close() gtk.main_quit() # ??? ??????? ???????????? ?????????????? ?????????? ???????
def register(self, name, serializer): """Register ``serializer`` object under ``name``. Raises :class:`AttributeError` if ``serializer`` in invalid. .. note:: ``name`` will be used as the file extension of the saved files. :param name: Name to register ``serializer`` under :type name: ``unicode`` or ``str`` :param serializer: object with ``load()`` and ``dump()`` methods """ # Basic validation getattr(serializer, 'load') getattr(serializer, 'dump') self._serializers[name] = serializer
def dump(cls, obj, file_obj): """Serialize object ``obj`` to open pickle file. .. versionadded:: 1.8 :param obj: Python object to serialize :type obj: Python object :param file_obj: file handle :type file_obj: ``file`` object """ return pickle.dump(obj, file_obj, protocol=-1) # Set up default manager and register built-in serializers
def save(self): """Save settings to JSON file specified in ``self._filepath`` If you're using this class via :attr:`Workflow.settings`, which you probably are, ``self._filepath`` will be ``settings.json`` in your workflow's data directory (see :attr:`~Workflow.datadir`). """ if self._nosave: return data = {} data.update(self) # for key, value in self.items(): # data[key] = value with LockFile(self._filepath): with atomic_writer(self._filepath, 'wb') as file_obj: json.dump(data, file_obj, sort_keys=True, indent=2, encoding='utf-8') # dict methods
def sync(self): 'Write dict to disk' if self.flag == 'r': return filename = self.filename tempname = filename + '.tmp' fileobj = open(tempname, 'wb' if self.format=='pickle' else 'w') try: self.dump(fileobj) except Exception: os.remove(tempname) raise finally: fileobj.close() shutil.move(tempname, self.filename) # atomic commit if self.mode is not None: os.chmod(self.filename, self.mode)
def save_all_recommend_item(): reco = Reco(json_data, show_external_data = False) all_list = reco.get_all_list() print( json.dumps( all_list, indent = 4, sort_keys=True, ensure_ascii=False ) ) with open('testItemData.json', 'w') as outfile: json.dump(all_list, outfile, indent = 4) with open('testItemDataEncoded.json', 'w') as outfile: json.dump(all_list, outfile, indent = 4, ensure_ascii=False) # ??? ??
def twitter_add(self, ctx, handle : str): ''' Add a Twitter handle to a text channel A delay of up to 2 min. is possible due to Twitter rate limits ''' if handle in self.feeds_info["channels"].get(ctx.message.channel.id, {}).get("handles", []): await self.bot.embed_reply(":no_entry: This text channel is already following that Twitter handle") return message, embed = await self.bot.embed_reply(":hourglass: Please wait") try: await self.stream_listener.add_feed(ctx.message.channel, handle) except tweepy.error.TweepError as e: embed.description = ":no_entry: Error: {}".format(e) await self.bot.edit_message(message, embed = embed) return if ctx.message.channel.id in self.feeds_info["channels"]: self.feeds_info["channels"][ctx.message.channel.id]["handles"].append(handle) else: self.feeds_info["channels"][ctx.message.channel.id] = {"name" : ctx.message.channel.name, "handles" : [handle]} with open("data/twitter_feeds.json", 'w') as feeds_file: json.dump(self.feeds_info, feeds_file, indent = 4) embed.description = "Added the Twitter handle, [`{0}`](https://twitter.com/{0}), to this text channel".format(handle) await self.bot.edit_message(message, embed = embed)
def twitter_remove(self, ctx, handle : str): ''' Remove a Twitter handle from a text channel A delay of up to 2 min. is possible due to Twitter rate limits ''' try: self.feeds_info["channels"].get(ctx.message.channel.id, {}).get("handles", []).remove(handle) except ValueError: await self.bot.embed_reply(":no_entry: This text channel isn't following that Twitter handle") else: with open("data/twitter_feeds.json", 'w') as feeds_file: json.dump(self.feeds_info, feeds_file, indent = 4) message, embed = await self.bot.embed_reply(":hourglass: Please wait") await self.stream_listener.remove_feed(ctx.message.channel, handle) embed.description = "Removed the Twitter handle, [`{0}`](https://twitter.com/{0}), from this text channel.".format(handle) await self.bot.edit_message(message, embed = embed)
def generate_erps_dict(self): async with clients.aiohttp_session.get("http://www.umop.com/rps101/alloutcomes.htm") as resp: data = await resp.text() raw_text = BeautifulSoup(data).text raw_text = re.sub("\n+", '\n', raw_text).strip() raw_text = raw_text.lower().replace("video game", "game") raw_text = raw_text.split('\n')[:-1] objects = {} object = raw_text[0].split()[-1] object_info = {} for line in raw_text[1:]: if line[0].isdigit(): objects[object] = object_info object = line.split()[-1] object_info = {} else: object_info[line.split()[-1]] = ' '.join(line.split()[:-1]) objects[object] = object_info with open("data/erps_dict.json", 'w') as erps_file: json.dump(objects, erps_file, indent = 4)
def tag(self, ctx, tag : str = ""): '''Tags/notes that you can trigger later''' if not tag: await self.bot.embed_reply("Add a tag with `{0}tag add [tag] [content]`\nUse `{0}tag [tag]` to trigger the tag you added\n`{0}tag edit [tag] [content]` to edit it and `{0}tag delete [tag]` to delete it".format(ctx.prefix)) return if tag in self.tags_data.get(ctx.message.author.id, {}).get("tags", []): await self.bot.reply(self.tags_data[ctx.message.author.id]["tags"][tag]) elif tag in self.tags_data["global"]: await self.bot.reply(self.tags_data["global"][tag]["response"]) self.tags_data["global"][tag]["usage_counter"] += 1 with open("data/tags.json", 'w') as tags_file: json.dump(self.tags_data, tags_file, indent = 4) else: close_matches = difflib.get_close_matches(tag, list(self.tags_data.get(ctx.message.author.id, {}).get("tags", {}).keys()) + list(self.tags_data["global"].keys())) close_matches = "\nDid you mean:\n{}".format('\n'.join(close_matches)) if close_matches else "" await self.bot.embed_reply("Tag not found{}".format(close_matches))
def extract(url): global img_no try : img_no += 1 r = requests.get(url) tree = html.fromstring(r.text) div = tree.xpath('//table[@class="masterresultstable"]\ //div[@class="meshtext-wrapper-left"]') except : div=[] if div != []: div = div[0] else: return typ = div.xpath('.//strong/text()')[0] items = div.xpath('.//li/text()') img = tree.xpath('//img[@id="theImage"]/@src')[0] final_data[img_no] = {} final_data[img_no]['type'] = typ final_data[img_no]['items'] = items final_data[img_no]['img'] = domain + img try : urllib.urlretrieve(domain+img, path+str(img_no)+".png") with open('data_new.json', 'w') as f: json.dump(final_data, f) output = "Downloading Images : {}".format(img_no) sys.stdout.write("\r\x1b[K" + output) sys.stdout.flush() except :return
def save_cache(self, file): import json with open(file, 'w') as f: json.dump(self.cache, f, indent=4, sort_keys=True)
def yaml(self): """Serialize the object to yaml""" return yaml.dump(self.data)
def save(self): """Save this config to disk. If the charm is using the :mod:`Services Framework <services.base>` or :meth:'@hook <Hooks.hook>' decorator, this is called automatically at the end of successful hook execution. Otherwise, it should be called directly by user code. To disable automatic saves, set ``implicit_save=False`` on this instance. """ with open(self.path, 'w') as f: json.dump(self, f)
def _save_ready_file(self): if self._ready is None: return with open(self._ready_file, 'w') as fp: json.dump(list(self._ready), fp)
def generate(location): # cli wizard for creating a new contract from a template if directory_has_smart_contract(location): example_payload = json.load(open(glob.glob(os.path.join(location, '*.json'))[0])) print(example_payload) for k, v in example_payload.items(): value = input(k + ':') if value != '': example_payload[k] = value print(example_payload) code_path = glob.glob(os.path.join(location, '*.tsol')) tsol.compile(open(code_path[0]), example_payload) print('Code compiles with new payload.') selection = '' while True: selection = input('(G)enerate Solidity contract or (E)xport implementation:') if selection.lower() == 'g': output_name = input('Name your contract file without an extension:') code = tsol.generate_code(open(code_path[0]).read(), example_payload) open(os.path.join(location, '{}.sol'.format(output_name)), 'w').write(code) break if selection.lower() == 'e': output_name = input('Name your implementation file without an extension:') json.dump(example_payload, open(os.path.join(location, '{}.json'.format(output_name)), 'w')) break else: print('Provided directory does not contain a *.tsol and *.json or does not compile.')
def send_feedback(self): """Print stored items to console/Alfred as JSON.""" json.dump(self.obj, sys.stdout) sys.stdout.flush()