我们从Python开源项目中,提取了以下33个代码示例,用于说明如何使用ujson.dump()。
def _update_json_file(data_dir, data, model_str): if len(model_str) > 100: model_str = model_str[:100] model_dir = os.path.abspath(os.path.join(data_dir, data['model'])) _ensure_dirs(model_dir) record_path = os.path.abspath(os.path.join( model_dir, '%s_%s.json' % (data['id'], model_str))) updated_paths = [record_path] # Name changed if not os.path.exists(record_path): for root, dirs, files in os.walk(model_dir): for file in files: if file.startswith(data['id']): print(file) old = os.path.abspath(os.path.join(root, file)) os.rename(old, record_path) updated_paths.append(old) with open(record_path, 'w') as f: json.dump(data, f, indent=4, sort_keys=True) return updated_paths
def wait_for_processing(self, task): """Wait for any background processing threads to finish""" if self.video_processing is not None: logging.debug('Waiting for video processing to finish') self.video_processing.communicate() self.video_processing = None if not self.job['keepvideo']: try: os.remove(task['video_file']) except Exception: pass opt = None if self.optimization is not None: opt = self.optimization.join() if self.wpt_result is not None: self.process_optimization_results(self.wpt_result['pageData'], self.wpt_result['requests'], opt) if self.path_base is not None: devtools_file = self.path_base + '_devtools_requests.json.gz' with gzip.open(devtools_file, 'wb', 7) as f_out: json.dump(self.wpt_result, f_out)
def convert_image(options): print 'loading cache' options['cache'] = json.load(open('color_cache.json')) print 'beginning conversion' im = Image.open( options['filename'] ) o = convert_frame(im, options) o += ANSI_RESET + '\n\n' # Save to ANSI file and add SAUCE record if options['output_file'] is not sys.stdout: save_frame(o,options) add_sauce(o,options) # Output to console (unicode) else: print_frame(o,options) # json.dump(options['cache'], open('color_cache.json','w')) # MAIN FUNCTION
def refresh(self): self.status_file = self.path / "status.json" self.status = ( {} if not self.status_file.exists() else json.loads(self.status_file.open().read()) ) for files in self.iter_file_lists(): for file_obj in files: self.archive.downloader.add_file(file_obj) output_dir = self.path / ts2ymd(file_obj["created"]) if not output_dir.exists(): output_dir.mkdir() output_file = output_dir / (file_obj["id"] + ".json") with open_atomic(str(output_file)) as f: json.dump(file_obj, f)
def query(self, *args, onlyCached=False, **kwargs): queryString, hashString = self._queryString(*args, **kwargs) filename = self.__cacheDir + '/' + self._prefix + '-' + self.__hash(hashString) if not os.path.exists(self.__cacheDir): os.makedirs(self.__cacheDir) if os.path.exists(filename): with open(filename, 'r') as file: data = ujson.load(file) elif onlyCached: print('[' + self._prefix + '] data not cached: ' + queryString) return None else: print('[' + self._prefix + '] downloading data: ' + queryString) if self._waitForReady() == None: if self.__lastQuery and self.__waitBetweenQueries and time.time() - self.__lastQuery < self.__waitBetweenQueries: time.sleep(self.__waitBetweenQueries - time.time() + self.__lastQuery) self.__lastQuery = time.time() data = self.__query(queryString) with open(filename, 'w') as file: ujson.dump(data, file) result = self._rawToResult(data, queryString) if not self._isValid(result): raise(Exception('[' + self._prefix + '] error in result (' + filename + '): ' + queryString)) return result
def write(self): """Write out the resulting json data""" if self.out_file is not None and len(self.result['pageData']) and \ len(self.result['requests']): try: _, ext = os.path.splitext(self.out_file) if ext.lower() == '.gz': with gzip.open(self.out_file, 'wb') as f_out: json.dump(self.result, f_out) else: with open(self.out_file, 'w') as f_out: json.dump(self.result, f_out) except Exception: logging.critical("Error writing to " + self.out_file)
def save(self): """Save a snapshot of the dataset file in JSON-Line format.""" with storage.open(self.path, 'w', self._compression_level) as fp: for dp in self._datapoints_query: json.dump((dp.pid, dp.json), fp) fp.write('\n')
def write_json_file(filename, path, result): """Writes the result to json with the given filename. Args: filename (str): Filename to write to. path (str): Directory path to use. """ with open(path + filename + ".json", "w+") as json_file: ujson.dump(result, json_file) json_file.close()
def write_json_rows(rows, docfilepath, encode_html=True): """ Note that we are appending""" with open(docfilepath, 'a') as fp: for row in rows: ujson.dump(row, fp, encode_html_chars=encode_html) fp.write('\n') #----------------------------------------------------------- #-----------------------------------------------------------
def export_data(data, output_file, format='json', compress=False): """Writes and optionally compresses Mixpanel data to disk in json or csv format :param data: A list of Mixpanel events or People profiles, if format='json', arbitrary json can be exported :param output_file: Name of file to write to :param format: Output format can be 'json' or 'csv' (Default value = 'json') :param compress: Option to gzip output (Default value = False) :type data: list :type output_file: str :type format: str :type compress: bool """ with open(output_file, 'w+') as output: if format == 'json': json.dump(data, output) elif format == 'csv': Mixpanel._write_items_to_csv(data, output_file) else: msg = "Invalid format - must be 'json' or 'csv': format = " + str(format) + '\n' \ + "Dumping json to " + output_file Mixpanel.LOGGER.warning(msg) json.dump(data, output) if compress: Mixpanel._gzip_file(output_file) os.remove(output_file)
def _prep_event_for_import(event, token, timezone_offset): """Takes an event dict and modifies it to meet the Mixpanel /import HTTP spec or dumps it to disk if it is invalid :param event: A Mixpanel event dict :param token: A Mixpanel project token :param timezone_offset: UTC offset (number of hours) of the timezone setting for the project that exported the data. Needed to convert the timestamp back to UTC prior to import. :type event: dict :type token: str :type timezone_offset: int | float :return: Mixpanel event dict with token added and timestamp adjusted to UTC :rtype: dict """ # The /import API requires a 'time' and 'distinct_id' property, if either of those are missing we dump that # event to a log of invalid events and return if ('time' not in event['properties']) or ('distinct_id' not in event['properties']): Mixpanel.LOGGER.warning('Event missing time or distinct_id property, dumping to invalid_events.txt') with open('invalid_events.txt', 'a') as invalid: json.dump(event, invalid) invalid.write('\n') return event_copy = deepcopy(event) # transforms timestamp to UTC event_copy['properties']['time'] = int(int(event['properties']['time']) - (timezone_offset * 3600)) event_copy['properties']['token'] = token return event_copy
def _create_json_file(data_dir, data, model_str): if len(model_str) > 100: model_str = model_str[:100] model_dir = os.path.abspath(os.path.join(data_dir, data['model'])) _ensure_dirs(model_dir) record_path = os.path.abspath(os.path.join( model_dir, '%s_%s.json' % (data['id'], model_str))) with open(record_path, 'w') as f: json.dump(data, f, indent=4, sort_keys=True) return [record_path]
def _update_fs(items, data_dir, create=False, update=False, delete=False): """ CREATE OPERATION HANDLING IS CONCERNED ONLY ABOUT ROOT models NON ROOT MODEL ITEMS ARE IGNORED We don't dump non-root models, since they belong to one root model any way and there data will be saved as part of a root model. example, if user created a task (non root model) and assigned it to contact cached data will look like {'changes': {'updated': IdentitySet([<crm.contact.models.Contact object at 0x7f5748a1f5c0>]), 'deleted': IdentitySet([]), 'created': IdentitySet([<crm.task.models.Task object at 0x7f574929dba8>])}} so Contact object has been updated any way and we'll handle that update add the task in contact data in File system. :param items: list of model dictionaries that was created :param data_dir: where to save files :return: newly created file paths :rtype: list """ all_paths = [] for item in items: data = item['data'] if not _is_root_model(data['model']): continue if create: paths = _create_json_file(data_dir, data, item['obj_as_str']) elif update: paths = _update_json_file(data_dir, data, item['obj_as_str']) elif delete: paths = _delete_json_file(data_dir, data, item['obj_as_str']) all_paths.extend(paths) return all_paths
def dumpdata(): """ Dump data table models into filesystem. Only Root models are dumped 'Company', 'Contact', 'Deal', 'Sprint', 'Project', 'Organization','User' """ data_dir = app.config["DATA_DIR"] if not os.path.exists(data_dir): os.makedirs(data_dir) for model in RootModel.__subclasses__(): model_dir = os.path.abspath(os.path.join(data_dir, model.__name__)) if not os.path.exists(model_dir): os.mkdir(model_dir) for obj in model.query.all(): obj_as_str = str(obj).replace('/', '_') if len(obj_as_str) > 100: obj_as_str = obj_as_str[:100] record_path = os.path.abspath(os.path.join( model_dir, '%s_%s.json' % (obj.id, obj_as_str))) data = obj.as_dict() with open(record_path, 'w') as f: json.dump(data, f, indent=4, sort_keys=True)
def write(obj, fn): with open(fn, "w") as f: json.dump(obj, f, indent=4)
def process_requests(self, request_timings, task): """Convert all of the request and page events into the format needed for WPT""" result = {} result['requests'] = self.merge_requests(request_timings) result['pageData'] = self.calculate_page_stats(result['requests']) devtools_file = os.path.join(task['dir'], task['prefix'] + '_devtools_requests.json.gz') with gzip.open(devtools_file, 'wb', 7) as f_out: json.dump(result, f_out)
def main(): """ Main entry-point when running on the command-line""" import argparse parser = argparse.ArgumentParser(description='Chrome trace parser.', prog='trace-parser') parser.add_argument('-v', '--verbose', action='count', help="Increase verbosity (specify multiple times for more)" \ ". -vvvv for full debug output.") parser.add_argument('-l', '--logfile', help="File name for the mozilla log.") parser.add_argument('-s', '--start', help="Start Time in UTC with microseconds YYYY-MM-DD HH:MM:SS.xxxxxx.") parser.add_argument('-o', '--out', help="Output requests json file.") options, _ = parser.parse_known_args() # Set up logging log_level = logging.CRITICAL if options.verbose == 1: log_level = logging.ERROR elif options.verbose == 2: log_level = logging.WARNING elif options.verbose == 3: log_level = logging.INFO elif options.verbose >= 4: log_level = logging.DEBUG logging.basicConfig( level=log_level, format="%(asctime)s.%(msecs)03d - %(message)s", datefmt="%H:%M:%S") if not options.logfile or not options.start: parser.error("Input devtools file or start time is not specified.") parser = FirefoxLogParser() requests = parser.process_logs(options.logfile, options.start) if options.out: with open(options.out, 'w') as f_out: json.dump(requests, f_out, indent=4)
def write_json(self, out_file, json_data): """Write out one of the internal structures as a json blob""" try: _, ext = os.path.splitext(out_file) if ext.lower() == '.gz': with gzip.open(out_file, 'wb') as f: json.dump(json_data, f) else: with open(out_file, 'w') as f: json.dump(json_data, f) except BaseException: logging.critical("Error writing to " + out_file)
def update_browser_viewport(self, task): """Update the browser border size based on the measured viewport""" if 'actual_viewport' in task and 'width' in task and 'height' in task and \ self.job is not None and 'browser' in self.job: browser = self.job['browser'] width = max(task['width'] - task['actual_viewport']['width'], 0) height = max(task['height'] - task['actual_viewport']['height'], 0) if browser not in self.margins or self.margins[browser]['width'] != width or \ self.margins[browser]['height'] != height: self.margins[browser] = {"width": width, "height": height} if not os.path.isdir(self.persistent_dir): os.makedirs(self.persistent_dir) margins_file = os.path.join(self.persistent_dir, 'margins.json') with open(margins_file, 'wb') as f_out: json.dump(self.margins, f_out)
def process_message(self, msg): """Process a message from the browser https://trac.webkit.org/browser/webkit/trunk/Source/JavaScriptCore/inspector/protocol""" try: if 'method' in msg and self.recording: parts = msg['method'].split('.') if len(parts) >= 2: category = parts[0] event = parts[1] if category == 'Page': self.process_page_event(event, msg) elif category == 'Network': self.process_network_event(event, msg) elif category == 'Inspector': self.process_inspector_event(event) elif category == 'Timeline': self.process_timeline_event(event, msg) elif category == 'Console': self.process_console_event(event, msg) except Exception: pass if self.timeline and 'method' in msg and self.recording: json.dump(msg, self.timeline) self.timeline.write(",\n") if 'id' in msg: response_id = int(re.search(r'\d+', str(msg['id'])).group()) if response_id in self.pending_commands: self.pending_commands.remove(response_id) self.command_responses[response_id] = msg
def cache_all_colors(): options = {} options['cache'] = json.load(open('color_cache.json')) for r in range(255): for g in range(255): for b in range(255): print 'r: ' + str(r) + ' | g: ' + str(g) + ' | b: ' + str(b) desired_color = { 'r': r, 'g': g, 'b': b, } color_id = str(r).zfill(3) + str(g).zfill(3) + str(b).zfill(3) closest_dist = INFINITY closest_color_index = 0 for i, block_char in enumerate(ANSI_SHADED_BLOCKS_TO_RGB): block_char_color = { 'r': int( block_char['r'] ), 'g': int( block_char['g'] ), 'b': int( block_char['b'] ), } d = color_distance(block_char_color, desired_color) if d < closest_dist: closest_dist = d closest_color_index = i # Add this index to our color cache so we don't have to look it up again options['cache'][color_id] = closest_color_index json.dump(options['cache'], open('color_cache.json','w')) #@timing
def set_user_value(self, username, key, value): data = {key: value} try: self[username].update(data) except KeyError: self[username] = data with open(self.file, 'w') as fd: json.dump(self, fd)
def read_instance_types(): redirects = read_redirects() subject_object = defaultdict(list) read_types(redirects, subject_object, 'instance_types_en.ttl') #read_types(redirects, subject_object, 'instance_types_transitive_en.ttl') #read_types(redirects, subject_object, 'instance_types_lhd_dbo_en.ttl') read_types(redirects, subject_object, 'instance_types_sdtyped_dbo_en.ttl') #read_types(redirects, subject_object, 'instance_types_dbtax_dbo_en.ttl') print('write json') with open('instance_types.json', 'w') as outfile: ujson.dump(subject_object, outfile) print('finish')
def _write_pending(self): try: self.pool.join() finally: to_write = list(self.pool.iter_incomplete()) if not to_write: try: self.pending_file.unlink() except OSError: pass return with open_atomic(str(self.pending_file)) as f: json.dump(to_write, f)
def _refresh_messages(self): latest_archive = next(self.iter_archives(reverse=True), None) latest_ts = 0 if latest_archive: msgs = self.load_messages(latest_archive) latest_ts = msgs[-1]["ts"] if msgs else 0 slack = getattr(self.slack, self.attr) while True: resp = self._get_list(slack, latest_ts) assert_successful(resp) msgs = resp.body["messages"] msgs.sort(key=lambda m: m["ts"]) if msgs and not self.path.exists(): self.path.mkdir() for day, day_msgs in groupby(msgs, key=lambda m: ts2ymd(m["ts"])): day_msgs = list(day_msgs) day_archive = self.path / (day + ".json") cur = ( self.load_messages(day_archive) if day_archive.exists() else [] ) cur.extend(day_msgs) print "%s: %s new messages in %s (saving to %s)" %( self.pretty_name, len(day_msgs), self.pretty_name, day_archive, ) for msg in day_msgs: if "file" in msg or "attachments" in msg: self.downloader.add_message(msg) with open_atomic(str(day_archive)) as f: json.dump(cur, f) if float(day_msgs[-1]["ts"]) > float(latest_ts): latest_ts = day_msgs[-1]["ts"] if not resp.body["has_more"]: break
def update_status(self, x): self.status.update(x) with open_atomic(str(self.status_file)) as f: json.dump(self.status, f)
def save_name(self, user_data): data = { "fbid": user_data.data['fbprofile']['id'], "showid": user_data.meta['showid'], "showdate": user_data.meta['showdate'], } os.makedirs("./data/" + self.name, exist_ok=True) filename = "./data/{}/{}.json".format(self.name, user_data.userid) with open(filename, "w+") as fd: json.dump(data, fd)
def json_export(self, sourcenode, fname, **kargs): """ Exports current data for later import. The exported data is a snapshot of current state. Args: fname: File name for the exported data. sourcenode: Base of sub-tree for export. None for complete JSON document. **kargs: ffs. Returns: When successful returns 'True', else returns either 'False', or raises an exception. Raises: JSONDataTargetFile: """ if not sourcenode: sourcenode = self.data try: with open(fname, 'w') as fp: #ret = myjson.dump(sourcenode, fp) except Exception as e: raise JSONDataTargetFile("open-"+str(e),"data.dump",str(fname)) return True
def __repr__(self): """Dump data. """ # io = StringIO() # myjson.dump(self.data, io) # return io.getvalue() return repr(self.data)
def _send_batch(self, base_url, endpoint, batch, dataset_id=None, dataset_version=None, retries=0): """POST a single batch of data to a Mixpanel API and return the response :param base_url: The base API url :param endpoint: Can be 'import', 'engage', 'import-events' or 'import-people' :param batch: List of Mixpanel event data or People updates to import. :param dataset_id: Dataset name to import into, required if dataset_version is specified, otherwise optional :param dataset_version: Dataset version to import into, required if dataset_id is specified, otherwise optional :param retries: Max number of times to retry if we get a HTTP 503 response (Default value = 0) :type base_url: str :type endpoint: str :type batch: list :type dataset_id: str :type dataset_version: str :type retries: int :raise: Raises for any HTTP error other than 503 :return: HTTP response from Mixpanel API :rtype: str """ try: params = {'data': base64.b64encode(json.dumps(batch))} if dataset_id: params['dataset_id'] = dataset_id params['token'] = self.token if dataset_version: params['dataset_version'] = dataset_version response = self.request(base_url, [endpoint], params, 'POST') msg = "Sent " + str(len(batch)) + " items on " + time.strftime("%Y-%m-%d %H:%M:%S") + "!" Mixpanel.LOGGER.debug(msg) return response except urllib2.HTTPError as err: # In the event of a 503 we will try to send again if err.code == 503: if retries < self.max_retries: Mixpanel.LOGGER.warning("HTTP Error 503: Retry #" + str(retries + 1)) self._send_batch(base_url, endpoint, batch, dataset_id=dataset_id, dataset_version=dataset_version, retries=retries + 1) else: Mixpanel.LOGGER.warning("Failed to import batch, dumping to file import_backup.txt") with open('import_backup.txt', 'a') as backup: json.dump(batch, backup) backup.write('\n') else: raise
def delete_old_files(self, date, confirm): date_str = date.strftime("%Y-%m-%d") dry_run = ( "" if confirm else " (PREVIEW ONLY; use '--confirm-delete' to actaully delete these files)" ) print "Deleting files created before %s... %s" %(date_str, dry_run) def delete_file(x): file_file, file_obj = x try: res = self.slack.files.delete(file_obj["id"]) assert_successful(res) except Error as e: print "Error deleting file %r: %s" %(file_obj["id"], e.message) self._error_count += 1 return self._deleted_count += 1 file_obj["_wayslack_deleted"] = True with open_atomic(str(file_file)) as f: json.dump(file_obj, f) pool = Threadpool(delete_file, queue_size=1, thread_count=10) self._deleted_count = 0 self._skipped_count = 0 self._error_count = 0 for dir in self.path.iterdir(): if dir.name >= date_str: continue for file_file, file_obj in self._iter_files_in_dir(dir): if file_obj.get("_wayslack_deleted"): continue err, file_path = self.archive.downloader.is_file_missing(file_obj) if err: self._skipped_count += 1 if VERBOSE: print "WARNING: %s: %s" %( str(file_file), err, ) print " File:", file_path print " URL:", file_obj["url_private"] continue self._deleted_count += 1 if confirm: if (self._deleted_count + self._error_count + self._skipped_count) % 10 == 0: print self._deleted_msg() pool.put((file_file, file_obj)) pool.join() print "Deleted files: %s%s" %(self._deleted_count, dry_run) if self._skipped_count and self._deleted_count: print "Skipped files: %s (this is 'normal'. See: https://stackoverflow.com/q/44742164/71522; use --verbose for more info)" %(self._skipped_count, ) if self._error_count: print "Errors: %s" %(self._error_count, )