我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用simplejson.dump()。
def convert_sql_to_avsc(self, avsc_file_path, sql_file_path): with open(sql_file_path) as sql_file: sql_content = sql_file.read() converter = RedshiftSQLToAVSCConverter( sql_content=sql_content, base_namespace=self.options.base_namespace, default_schema=self.options.default_schema ) avro = converter.avro_record with open(avsc_file_path, 'w') as avsc_file: self.log.info('Writing "{0}"'.format(avsc_file_path)) json.dump( obj=avro, fp=avsc_file, indent=' ', sort_keys=True )
def test_tuple_array_dump(self): t = (1, 2, 3) expect = json.dumps(list(t)) # Default is True sio = StringIO() json.dump(t, sio) self.assertEqual(expect, sio.getvalue()) sio = StringIO() json.dump(t, sio, tuple_as_array=True) self.assertEqual(expect, sio.getvalue()) self.assertRaises(TypeError, json.dump, t, StringIO(), tuple_as_array=False) # Ensure that the "default" does not get called sio = StringIO() json.dump(t, sio, default=repr) self.assertEqual(expect, sio.getvalue()) sio = StringIO() json.dump(t, sio, tuple_as_array=True, default=repr) self.assertEqual(expect, sio.getvalue()) # Ensure that the "default" gets called sio = StringIO() json.dump(t, sio, tuple_as_array=False, default=repr) self.assertEqual( json.dumps(repr(t)), sio.getvalue())
def test_namedtuple_dump(self): for v in [Value(1), Point(1, 2), DuckValue(1), DuckPoint(1, 2)]: d = v._asdict() sio = StringIO() json.dump(v, sio) self.assertEqual(d, json.loads(sio.getvalue())) sio = StringIO() json.dump(v, sio, namedtuple_as_object=True) self.assertEqual( d, json.loads(sio.getvalue())) sio = StringIO() json.dump(v, sio, tuple_as_array=False) self.assertEqual(d, json.loads(sio.getvalue())) sio = StringIO() json.dump(v, sio, namedtuple_as_object=True, tuple_as_array=False) self.assertEqual( d, json.loads(sio.getvalue()))
def main(): if len(sys.argv) == 1: infile = sys.stdin outfile = sys.stdout elif len(sys.argv) == 2: infile = open(sys.argv[1], 'r') outfile = sys.stdout elif len(sys.argv) == 3: infile = open(sys.argv[1], 'r') outfile = open(sys.argv[2], 'w') else: raise SystemExit(sys.argv[0] + " [infile [outfile]]") with infile: try: obj = json.load(infile, object_pairs_hook=json.OrderedDict, use_decimal=True) except ValueError: raise SystemExit(sys.exc_info()[1]) with outfile: json.dump(obj, outfile, sort_keys=True, indent=' ', use_decimal=True) outfile.write('\n')
def textanalyze(self,index_name, analyzer, text): # Create JSON string for request body reqobject={} reqobject['text'] = text reqobject['analyzer'] = analyzer io=StringIO() json.dump(reqobject, io) req_body = io.getvalue() # HTTP request to Azure search REST API conn = httplib.HTTPSConnection(self.__api_url) conn.request("POST", u"/indexes/{0}/analyze?api-version={1}".format(index_name, _AZURE_SEARCH_API_VERSION), req_body, self.headers) response = conn.getresponse() #print "status:", response.status, response.reason data = (response.read()).decode('utf-8') #print("data:{}".format(data)) conn.close() return data
def main(): import sys if len(sys.argv) == 1: infile = sys.stdin outfile = sys.stdout elif len(sys.argv) == 2: infile = open(sys.argv[1], 'rb') outfile = sys.stdout elif len(sys.argv) == 3: infile = open(sys.argv[1], 'rb') outfile = open(sys.argv[2], 'wb') else: raise SystemExit("%s [infile [outfile]]" % (sys.argv[0],)) try: obj = simplejson.load(infile) except ValueError, e: raise SystemExit(e) simplejson.dump(obj, outfile, sort_keys=True, indent=4) outfile.write('\n')
def main(): if len(sys.argv) == 1: infile = sys.stdin outfile = sys.stdout elif len(sys.argv) == 2: infile = open(sys.argv[1], 'rb') outfile = sys.stdout elif len(sys.argv) == 3: infile = open(sys.argv[1], 'rb') outfile = open(sys.argv[2], 'wb') else: raise SystemExit(sys.argv[0] + " [infile [outfile]]") try: obj = json.load(infile, object_pairs_hook=json.OrderedDict, use_decimal=True) except ValueError, e: raise SystemExit(e) json.dump(obj, outfile, sort_keys=True, indent=' ', use_decimal=True) outfile.write('\n')
def main(): if len(sys.argv) == 1: infile = sys.stdin outfile = sys.stdout elif len(sys.argv) == 2: infile = open(sys.argv[1], 'rb') outfile = sys.stdout elif len(sys.argv) == 3: infile = open(sys.argv[1], 'rb') outfile = open(sys.argv[2], 'wb') else: raise SystemExit(sys.argv[0] + " [infile [outfile]]") try: try: obj = json.load(infile, object_pairs_hook=json.OrderedDict, use_decimal=True) except ValueError, e: raise SystemExit(e) json.dump(obj, outfile, sort_keys=True, indent=' ', use_decimal=True) outfile.write('\n') finally: infile.close() outfile.close()
def main(): if len(sys.argv) == 1: infile = sys.stdin outfile = sys.stdout elif len(sys.argv) == 2: infile = open(sys.argv[1], 'rb') outfile = sys.stdout elif len(sys.argv) == 3: infile = open(sys.argv[1], 'rb') outfile = open(sys.argv[2], 'wb') else: raise SystemExit(sys.argv[0] + " [infile [outfile]]") try: obj = simplejson.load(infile) except ValueError, e: raise SystemExit(e) simplejson.dump(obj, outfile, sort_keys=True, indent=4) outfile.write('\n')
def dump_batch(name, batch, batch_number): ''' Dumps data batch into json file Parameters ------------ * name (string): filename prefix [<name>-<batch_number>] (may include path) * batch (QuestionKlerk[]): array of questions (batch) * batch_number (int): batch number Returns ------------ (boolean): True if dumped, False otherwise ''' batch_dict = [q.to_dict() for q in batch] if len(batch_dict) == 0: return False with open(name + '-' + str(batch_number), 'w', encoding='utf-8') as fp_out: json.dump(batch_dict, fp_out) return True
def make_request(): startepoch = '1391806500' URL = "https://poloniex.com/public?command=returnChartData¤cyPair=BTC_DASH&start=" + startepoch + "&end=9999999999&period=300" USERAGET = 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_1) AppleWebKit/602.2.14 (KHTML, like Gecko) Version/10.0.1 Safari/602.2.14' headers = {'user-agent': USERAGET} try: response = requests.get(URL, headers=headers) if response.status_code == requests.codes.ok: with open(filetow, 'w') as fp: json.dump(response.json(), fp) except requests.exceptions.RequestException: print(e.args[0]) sys.exit(1) except Exception as e: print(e.args[0]) sys.exit(1)
def make_request(): URL = "https://api.coinmarketcap.com/v1/datapoints/bitcoin/" USERAGET = 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_1) AppleWebKit/602.2.14 (KHTML, like Gecko) Version/10.0.1 Safari/602.2.14' headers = {'user-agent': USERAGET} try: response = requests.get(URL, headers=headers) if response.status_code == requests.codes.ok: with open(filetow, 'w') as fp: json.dump(response.json(), fp) except requests.exceptions.RequestException: print(e.args[0]) sys.exit(1) except Exception as e: print(e.args[0]) sys.exit(1)
def solve_with_pool(): """ ????? """ manager = multiprocessing.Manager() pool = multiprocessing.Pool(40) check_codes = manager.dict() # ??? 1 ? 1000 ?????????? pool.map(partial(get_verify_code, check_codes=check_codes), [i for i in range(1, 1000 + 1)]) # ??????? print(check_codes) check_codes = dict(check_codes) with open("result_check_code.txt", "w") as f: json.dump(check_codes, f) # ???? vote(check_codes)
def run(file_name, demo=True): """ ?? xml2json ??? :param file_name: str(), xml ????? :param demo: boolean(), True or False, ????? demo ???????, ????????? :return: None, ?????? json ???? """ dir_path = "xml2json_result" if not demo else "demo_xml2json_result" print("[*] ???? {} ?? xml2json ??, ????? {}".format(file_name, dir_path)) os.makedirs(dir_path, exist_ok=True) # ? doc ???? for i, each_doc in enumerate(get_docs_from_file(file_name)): doc_dict = parse_doc_to_dict(each_doc) with codecs.open(os.path.join(dir_path, "news_data{}.json".format(i)), "w") as f: json.dump(doc_dict, f) print("[*] xml2json ??")
def count_domain(file_name, demo=True): """ ???????? :param file_name: str(), xml ????? :param demo: boolean(), True or False, ????? demo ???????, ????????? :return: None, ???????????? """ result_dict = dict() result_file_name = "{}domain_parse_result.json".format("demo_" if demo else "") print("[*] ???? {} ????????, ?????? {}".format(file_name, result_file_name)) # ? doc ???? for i, each_doc in enumerate(get_docs_from_file(file_name)): print("[*] ????? {} ?, ?? {} ?".format(i + 1, 1294234)) # ?? url, ???? doc_dict = parse_doc_to_dict(each_doc) url = doc_dict["url"] domain = extract_domain_from_url(url) # ?????? result_dict[domain] = result_dict.get(domain, 0) + 1 # ???????? json ?? with open(result_file_name, "w") as f: json.dump(result_dict, f)
def scrap(self): print 'Starting Scrapping' with open(self.basePath+'data.txt','r') as f: jsonData=simplejson.load(f) for i in range(1,self.depth+1): try: url=self.baseUrl+'/user/ettv/uploads' r=requests.get(url,params={'page':i},verify = certifi.where()) except requests.exceptions.Timeout: print 'Timeout has occured' except requests.exceptions.RequestException as e: print e soup=BeautifulSoup(r.text,'html.parser') for x in soup.findAll('a',{'class':'cellMainLink'}): jsonData[x.string]=x['href'] print 'End of Page',i with open(self.basePath + 'data.txt', 'w') as outfile: simplejson.dump(jsonData, outfile, sort_keys = True, indent = 4, ensure_ascii=False) sleep(5) print 'End Scrapping'
def save_json(self, filename, data): """Save container json to a file in the image TAG directory that has been previously selected via cd_imagerepo() or if the file starts with "/" to that specific file. """ if filename.startswith("/"): out_filename = filename else: if not (self.cur_repodir and self.cur_tagdir): return False if not os.path.exists(self.cur_repodir): return False if not os.path.exists(self.cur_tagdir): return False out_filename = self.cur_tagdir + "/" + filename outfile = None try: outfile = open(out_filename, 'w') json.dump(data, outfile) except (IOError, OSError, AttributeError, ValueError, TypeError): if outfile: outfile.close() return False outfile.close() return True
def get_park_effects(team): ''' Get park adjustments for a team's park :param str team: team to get park effect data for :return tuple[str]: batter effect, pitcher effect ''' global cached_effects if team in team_disambiguations: team = team_disambiguations[team] if not cached_effects: if os.path.exists(cache_file): cached_effects = json.load(open(cache_file, 'r')) else: cached_effects = {} if team in cached_effects: return cached_effects[team] else: effects = _load_parkeffect_from_url(team) cached_effects[team] = effects with open(cache_file, 'w') as outf: json.dump(cached_effects, outf) return effects