我们从Python开源项目中,提取了以下40个代码示例,用于说明如何使用humanize.naturalsize()。
def size(): cache_dir = cfg.cache.dir source_dir = cfg.source.dir cache_size = 0 for f in cache_dir.walkfiles(): cache_size += f.size source_size = 0 for f in source_dir.walkfiles(): source_size += f.size print("{Style.BRIGHT}Cache: {Style.RESET_ALL} {}".format( humanize.naturalsize(cache_size, binary=True), Style=Style )) print("{Style.BRIGHT}Source:{Style.RESET_ALL} {}".format( humanize.naturalsize(source_size, binary=True), Style=Style ))
def _prompt_artifact_selection(self, service_name, artifact_key, deployment_repo, env, artifacts): current_image = deployment_repo['tfvars'].get(artifact_key, env) io.info('found artifacts for "%s/%s"' % (self.config.get('dockerhub')['organization'], service_name,)) table_data = [ ('id', 'tag name (* = current)', 'created at', 'size',), ] for i, artifact in enumerate(artifacts, 1): created_at = datetime.strptime(artifact['last_updated'], '%Y-%m-%dT%H:%M:%S.%fZ') created_at = pretty_print_datetime(created_at) image_size = humanize.naturalsize(artifact['full_size']) image_name = artifact['name'] if image_name in current_image: # indicate the current artifact. image_name += ' *' table_data.append((str(i), image_name, created_at, image_size,)) io.print_table(table_data, 'recent artifacts') # Handle the case where the selected artifact is the current artifact. selected_artifact = io.collect_input('select the artifact you want to use [q]:', artifacts) if selected_artifact and selected_artifact['name'] in current_image: io.err('selected artifact is already the current active artifact') return None return selected_artifact
def __init__(self, part: Dict[str, Any], core: ApartCore, main_view: 'MainView'): Gtk.Box.__init__(self) self.part = part self.core = core self.main_view = main_view self.add(key_and_val('Name', self.name())) self.add(key_and_val('Type', self.part.get('fstype', 'unknown'))) self.add(key_and_val('Label', self.part.get('label', 'none'))) self.add(key_and_val('Size', humanize.naturalsize(self.part['size'], binary=True))) self.clone_button = Gtk.Button("Clone", halign=Gtk.Align.END) self.restore_button = Gtk.Button("Restore", halign=Gtk.Align.END) if self.is_mounted(): self.clone_button.set_sensitive(False) self.clone_button.set_tooltip_text('Partition is currently mounted') self.restore_button.set_sensitive(False) self.restore_button.set_tooltip_text('Partition is currently mounted') else: self.clone_button.connect('clicked', lambda b: self.main_view.show_new_clone()) self.restore_button.connect('clicked', lambda b: self.main_view.show_new_restore()) buttons = Gtk.Box(hexpand=True, halign=Gtk.Align.END) buttons.add(self.clone_button) buttons.add(self.restore_button) self.add(buttons) main_view.connect('notify::visible-child', self.on_main_view_change)
def __init__(self, resp, description=None): """ Wrapper for Bigquery table resources, mainly for calculating/parsing job statistics into human readable formats for logging. :param resp: Dictionary representation of a table resource. :type resp: dictionary :param description: Optional string descriptor for table. """ assert isinstance(resp, dict) assert resp['kind'].split('#')[-1] == 'table' self.resp = resp if description is not None: self.description = description.strip().title() try: setattr(self, 'row_count', int(self.resp['numRows'])) except (KeyError, TypeError): pass try: setattr(self, 'size', humanize.naturalsize(int(self.resp['numBytes']))) except (KeyError, TypeError): pass
def load_resp(self, resp, is_download): """ Loads json response from API. :param resp: Response from API :type resp: dictionary :param is_download: Calculates time taken based on 'updated' field in response if upload, and based on stop time if download :type is_download: boolean """ assert isinstance(resp, dict) setattr(self, 'resp', resp) setattr(self, 'size', humanize.naturalsize(int(resp['size']))) if is_download: updated_at = datetime.now(UTC) else: updated_at = UTC.localize(datetime.strptime(resp['updated'], '%Y-%m-%dT%H:%M:%S.%fZ')) setattr(self, 'time_taken', dict(zip( ('m', 's'), divmod((updated_at - getattr(self, 'start_time')).seconds if updated_at > getattr(self, 'start_time') else 0, 60) ))) setattr(self, 'full_path', 'gs://%s/%s' % (resp['bucket'], resp['name']))
def build_objects(bucket): '''Build list of object tuples to render as table. Object structure: (name, url, size) ''' objects = [] data_dir = 'data/{}'.format(bucket) walk_dir = os.path.realpath('..') + '/' + data_dir for root, dirs, files in os.walk(walk_dir): for file_name in files: objects.append(( file_name, data_dir + '/' + file_name, naturalsize(os.path.getsize(walk_dir + '/' + file_name)) )) return objects
def _handle_http_url(self, url, headers): logging.debug("handle_http_url(url={}, headers={})".format(url, headers)) before = time.time() start_offset = self.output.tell() for piece in self._stream(url, headers): self.output.write(piece) duration = time.time() - before # On Windows we seem to get 0 values for duration. Just round up to one second. # Rates over intervals less than this are meaningless anyway. duration = max(1, duration) size = self.output.tell() - start_offset rate = (size / (2**20)) / duration logging.info("Downloaded {} chunk in {:.2f} seconds @ {:.2f} MiB/s".format( humanize.naturalsize(size, binary=True), duration, rate))
def _format_stats(stats): formatted = [] for k, v in stats.items(): if k.endswith('_size') or k.endswith('_bytes'): v = naturalsize(v, binary=True) elif k == 'cpu_used': k += '_msec' v = '{0:,}'.format(int(v)) else: v = '{0:,}'.format(int(v)) formatted.append((k, v)) return tabulate(formatted)
def download( url, local_file, file_size: int = None, hexdigest: str = None, title: str = '', ): LOGGER.info('downloading {} -> {}'.format(url, local_file)) if not title: title = f'Downloading {url.split("/")[-1]}' Progress.start( title=title, label=f'Downloading to: {local_file}', ) def _progress_hook(data): label = 'Time left: {} ({}/{})'.format( data['time'], humanize.naturalsize(data['downloaded']), humanize.naturalsize(data['total']) ) Progress.set_label(label) Progress.set_value(data['downloaded'] / data['total'] * 100) # def hook(data): # # I.progress_set_value(int(float(data['percent_complete']))) # Progress().set_value(int(float(data['percent_complete']))) time.sleep(1) dl = Downloader( url=url, filename=local_file, progress_hooks=[_progress_hook], content_length=file_size, hexdigest=hexdigest ) return dl.download()
def __init__( self, version: str, branch: str, download_url: str, remote_file_size: int, remote_file_name: str, ): self._version = version self._branch = branch self._download_url = download_url self._remote_file_size = remote_file_size self._remote_file_name = remote_file_name self._human_file_size = humanize.naturalsize(remote_file_size)
def parse_results_rarbg(response_json): global results_rarbg if error_detected_rarbg == False: for post in response_json['torrent_results']: res = {} res['name'] = post['title'] res['link'] = post['info_page'] temp_size = humanize.naturalsize(post['size'], binary=True, format='%.2f') s1 = temp_size.split('.') if(len(s1[0]) == 4): res['size'] = humanize.naturalsize(post['size'], binary=True, format='%.0f') elif(len(s1[1]) == 3): res['size'] = humanize.naturalsize(post['size'], binary=True, format='%.1f') else: res['size'] = temp_size #res['time'] = Implement later res['seeders'] = post['seeders'] res['leechers'] = post['leechers'] try: res['ratio'] = format( (float(res['seeders'])/float(res['leechers'])), '.1f' ) except ZeroDivisionError: res['ratio'] = float('inf') res['magnet'] = post['download'] results_rarbg.append(res) else: print "----------- " + colored.green('RARBG') + " -----------" print " [No results found] " return [] return results_rarbg
def get(self): session = Session() service_count = session.query(Service).count() incidents = session.query(Incident).filter() session.close() memory_stats = psutil.virtual_memory() self.render("incidents.html", version=__version__, max_memory=humanize.naturalsize(memory_stats.total), memory_used=humanize.naturalsize(memory_stats.used), service_count=service_count, cpu_current=psutil.cpu_percent(), memory_percent=(memory_stats.used / memory_stats.total) * 100, incidents=incidents)
def setProgress(self, data): """Set the progress of the bar.""" # TODO: What is the data structure in case of a patch? try: text = _( 'Downloading a new version: Total file size {}, Time remaining {}.') text = text.format(humanize.naturalsize( data['total']), data['time']) self.setLabelText(text) self.setValue(int(float(data['percent_complete']) * 10)) except Exception as e: module_logger.exception("message")
def setProgress(self, data): """Set the progress of the bar.""" # TODO: What is the data structure in case of a patch? module_logger.info("Progress {}".format(data)) try: text = _( 'Downloading required files...: Total file size {}, Time remaining {}.') text = text.format(humanize.naturalsize( data['total']), data['time']) self.setLabelText(text) self.setValue(int(float(data['percent_complete']) * 5)) except Exception as e: module_logger.exception("message")
def humanize_size_filter(dt: int, fmt=None): """??humanize??????????????""" humanize.i18n.activate('zh_CN', path='etc/humanize') return humanize.naturalsize(dt)
def get(self, request, imsi=None): """Handles GET requests.""" user_profile = UserProfile.objects.get(user=request.user) network = user_profile.network try: subscriber = Subscriber.objects.get(imsi=imsi, network=network) except Subscriber.DoesNotExist: return HttpResponseBadRequest() # Set the context with various stats. context = { 'networks': get_objects_for_user(request.user, 'view_network', klass=Network), 'currency': CURRENCIES[network.subscriber_currency], 'user_profile': user_profile, 'subscriber': subscriber, } try: context['created'] = subscriber.usageevent_set.order_by( 'date')[0].date except IndexError: context['created'] = None # Set usage info (SMS sent, total call duration, data usage). sms_kinds = ['free_sms', 'outside_sms', 'incoming_sms', 'local_sms', 'local_recv_sms', 'error_sms'] context['num_sms'] = subscriber.usageevent_set.filter( kind__in=sms_kinds).count() call_kinds = ['free_call', 'outside_call', 'incoming_call', 'local_call', 'local_recv_call', 'error_call'] calls = subscriber.usageevent_set.filter(kind__in=call_kinds) context['number_of_calls'] = len(calls) context['voice_sec'] = sum([call.voice_sec() for call in calls]) gprs_events = subscriber.usageevent_set.filter(kind='gprs') up_bytes = sum([g.uploaded_bytes for g in gprs_events]) down_bytes = sum([g.downloaded_bytes for g in gprs_events]) context['up_bytes'] = humanize.naturalsize(up_bytes) context['down_bytes'] = humanize.naturalsize(down_bytes) context['total_bytes'] = humanize.naturalsize(up_bytes + down_bytes) # Render template. template = get_template('dashboard/subscriber_detail/info.html') html = template.render(context, request) return HttpResponse(html)
def generate_gprs_events(start_timestamp, end_timestamp): """Create GPRS events from data in the GPRS DB. Records that were generated between the specified timestamp will become events. One event is created per IMSI (not one event per record). Args: start_timestamp: seconds since epoch end_timestamp: seconds since epoch """ gprs_db = gprs_database.GPRSDB() # First organize the records by IMSI. sorted_records = {} for record in gprs_db.get_records(start_timestamp, end_timestamp): if record['imsi'] not in sorted_records: sorted_records[record['imsi']] = [] sorted_records[record['imsi']].append(record) # Now analyze all records that we have for each IMSI. for imsi in sorted_records: up_bytes = sum( [r['uploaded_bytes_delta'] for r in sorted_records[imsi]]) down_bytes = sum( [r['downloaded_bytes_delta'] for r in sorted_records[imsi]]) # Do not make an event if the byte deltas are unchanged. if up_bytes == 0 and down_bytes == 0: continue # For now, GPRS is free for subscribers. cost = 0 reason = 'gprs_usage: %s uploaded, %s downloaded' % ( humanize.naturalsize(up_bytes), humanize.naturalsize(down_bytes)) timespan = int(end_timestamp - start_timestamp) events.create_gprs_event( imsi, cost, reason, up_bytes, down_bytes, timespan)
def natural_content_length(self): return humanize.naturalsize(self.content_length)
def __init__(self, final_message: Dict, progress_view: 'ProgressAndHistoryView', core: ApartCore, z_options: List[str]): FinishedJob.__init__(self, final_message, progress_view, core, icon_name='object-select-symbolic', forget_on_rerun=False, z_options=z_options) self.image_size = key_and_val('Image size', humanize.naturalsize(self.msg['image_size'], binary=True)) self.filename = key_and_val('Image file', extract_filename(self.msg['destination'])) self.source_uuid = None if self.msg.get('source_uuid'): self.source_uuid = key_and_val('Partition uuid', self.msg['source_uuid']) self.stats = Gtk.VBox() for stat in [self.filename, self.image_size, self.source_uuid, self.duration]: if stat: self.stats.add(stat) self.stats.get_style_context().add_class('finished-job-stats') self.stats.show_all() self.extra.add(self.stats) self.delete_image_btn = Gtk.Button.new_from_icon_name('user-trash-full-symbolic', Gtk.IconSize.SMALL_TOOLBAR) self.delete_image_btn.set_tooltip_text(DELETE_TIP) self.delete_image_btn.show_all() self.delete_image_btn.connect('clicked', self.delete_image) self.buttons.add(self.delete_image_btn) self.buttons.reorder_child(self.delete_image_btn, 0)
def _size(*values): """ Print summed size humanized. """ value = sum(values) return humanize.naturalsize(value, binary=True) if value else '-'
def run(self, args, config, storage, remotes): table_lines = [('<b>NAME</b>', '<b>TYPE</b>', '<b>LAST</b>', '<b>NEXT</b>', '<b>LAST SIZE</b>')] for remote in sorted(remotes.list(), key=lambda x: x.name): latest_ref = '%s/latest' % remote.name latest_backup = storage.get_backup(latest_ref) latest_date_text = '-' next_date_text = '-' size = '-' if latest_backup is None: if remote.scheduler is not None: next_date_text = '<color fg=yellow>now</color>' else: size_total = sum(latest_backup.stats.get(x, 0) for x in STATS_TOTAL) size_new = sum(latest_backup.stats.get(x, 0) for x in STATS_NEW) size = '%s (+%s)' % (humanize.naturalsize(size_total, binary=True), humanize.naturalsize(size_new, binary=True)) latest_date_text = latest_backup.start_date.humanize() if remote.scheduler is not None and remote.scheduler['enabled']: next_date = latest_backup.start_date + datetime.timedelta(seconds=remote.scheduler['interval'] * 60) if next_date > arrow.now(): next_date_text = '<color fg=green>%s</color>' % next_date.humanize() else: next_date_text = '<color fg=red>%s</color>' % next_date.humanize() table_lines.append((remote.name, remote.type, latest_date_text, next_date_text, size)) printer.table(table_lines)
def run(self, args, config, storage, remotes): count, size = gc(storage, delete=not args.dry_run) if count: printer.p('Done. Deleted {n} objects, total size: {s}', n=count, s=humanize.naturalsize(size, binary=True)) else: printer.p('Done. Nothing to delete.')
def _get_info(self): result = '' if ( self.kind != HistoryItemKind.FILE and self.kind != HistoryItemKind.IMAGE and not self.content_type ): if self.kind != HistoryItemKind.LINK: result = '%i chars, %i lines' % (len(self.raw), self.n_lines) return result if self.n_lines > 1: result += _('%s items') % self.n_lines else: try: size = os.path.getsize(self.raw.strip()) except FileNotFoundError: result += _('No such file or directory') else: result += humanize.naturalsize(size, gnu=True) if self._content_type: result += ', Type: %s' % self._content_type return result
def hr_size(self): return humanize.naturalsize(self.size)
def format_speed(value): return "%s/s" % naturalsize(value)
def _parse_job(self): try: setattr( self, 'time_taken', dict(zip( ('m', 's'), divmod( ( datetime.utcfromtimestamp(float(self.resp['statistics']['endTime']) / 1000) - datetime.utcfromtimestamp(float(self.resp['statistics']['creationTime']) / 1000) ).seconds, 60) )) ) except KeyError: pass if self.job_type == 'load': try: setattr(self, 'size', humanize.naturalsize(int(self.resp['statistics']['load']['inputFileBytes']))) except (KeyError, TypeError): pass elif self.job_type == 'query': try: setattr(self, 'size', humanize.naturalsize(int(self.resp['statistics']['query']['totalBytesProcessed']))) except (KeyError, TypeError): pass if self.job_type == 'load': try: setattr(self, 'row_count', int(self.resp['statistics'][self.job_type]['outputRows'])) except (KeyError, TypeError): pass elif self.job_type == 'query': try: setattr(self, 'row_count', int(self.resp['totalRows'])) except (KeyError, TypeError): pass
def load_resp(self, resp, is_download=False): """ Loads json response from API. :param resp: Response from API :type resp: dictionary :param is_download: Calculates time taken based on 'modifiedTime' field in response if upload, and based on stop time if download :type is_download: boolean """ assert isinstance(resp, dict) setattr(self, 'resp', resp) try: setattr(self, 'size', humanize.naturalsize(int(resp['size']))) except KeyError: pass if is_download: updated_at = datetime.now(UTC) else: updated_at = UTC.localize(datetime.strptime(resp['modifiedTime'], '%Y-%m-%dT%H:%M:%S.%fZ')) setattr(self, 'time_taken', dict(zip( ('m', 's'), divmod((updated_at - getattr(self, 'start_time')).seconds if updated_at > getattr(self, 'start_time') else 0, 60) )))
def ts_file(self, path): full_path = os.path.normpath(os.path.join(TS_DIR, './%s' % path)) inspector = Inspector.get_inspector(full_path) parent = inspector.get_file_paths()['web_dir'] metadata = inspector.get_metadata() video_streams = [] audio_streams = [] other_streams = [] if metadata not in range(1, 5) and 'streams' in metadata: for index, stream in enumerate(metadata['streams']): if 'codec_type' in stream and stream['codec_type'] == 'video': video_streams.append(index) try: if 'avg_frame_rate' in stream: stream['avg_frame_rate_norm'] = eval(stream['avg_frame_rate']) except ZeroDivisionError: pass elif 'codec_type' in stream and stream['codec_type'] == 'audio': audio_streams.append(index) else: other_streams.append(index) if 'format' in metadata and 'size' in metadata['format']: metadata['format']['size_hr'] = humanize.naturalsize(metadata['format']['size'], gnu=True) return render_template('ts_file.html', inspector=inspector, screenshot=inspector.get_screenshot(), metadata=metadata, parent=parent, job_status=JobStatus(), video_streams=video_streams, audio_streams=audio_streams, other_streams=other_streams)
def verify_size(mbtiles_file, max_size, scheme): mbtiles = MBTiles(mbtiles_file, scheme) for tile in mbtiles.tiles_by_size(max_size): print('{}/{}/{}\t{}'.format(tile.z, tile.x, tile.y, humanize.naturalsize(tile.size)))
def bytes_to_site_size(self, byte_num): humanized = humanize.naturalsize(byte_num, format='%.2f', binary=True) if 'MiB' in humanized or 'KiB' in humanized: humanized = humanize.naturalsize(byte_num, format='%d', binary=True) return humanized
def bytes_to_site_size(self, byte_num): humanized = humanize.naturalsize(byte_num, format='%.2f', binary=True) if 'KiB' in humanized: humanized = humanize.naturalsize(byte_num, format='%d', binary=True) return humanized
def ap_list(host): hosts = service.expand_host(host[0]) def _calc_load(x): return x / 65535 @coroutine def _run(): rows = [] aps = yield service.create_multiple_ap(hosts) details = yield service.ap_list(aps) header_out('name, host, #clients, loadavg, mem, uptime') for ap in details: row = [] row.append(_val(ap, 'board.hostname')) row.append(_val(ap, 'host')) row.append('%s' % (_val(ap, 'num_clients'))) row.append('%.2f / %.2f / %.2f' % ( _val(ap, 'system.load.0', _calc_load), _val(ap, 'system.load.1', _calc_load), _val(ap, 'system.load.2', _calc_load))) row.append('%s / %s' % (_val(ap, 'system.memory.free', naturalsize), _val(ap, 'system.memory.total', naturalsize))) row.append('%s' % (_val(ap, 'system.uptime', naturaltime))) rows.append(', '.join(row)) out('\n'.join(sorted(rows, cmp_host))) IOLoop.instance().run_sync(_run)
def eval_command(self, args): tw = TabWriter() if args["digests"]: tw.padding = [3, 10, 3, 8, 8] fm = self.digestsTemplate tw.writeln( "REPOSITORY\tTAG\tDIGEST\tIMAGE ID\tCREATED\tSIZE") elif args["format"] is None: tw.padding = [3, 10, 8, 8] fm = self.defaultTemplate tw.writeln( "REPOSITORY\tTAG\tIMAGE ID\tCREATED\tSIZE") else: fm = args["format"] self.settings[self.name] = "" del args["digests"] del args["format"] args["filters"] = dict(args["filters"]) if args["filters"] else None nodes = self.client.images(**args) for node in nodes: try: node["Repository"], node["Tag"] = node[ "RepoTags"][0].split(":") except TypeError: node["Repository"] = node["RepoDigests"][0].split('@', 2)[0] node["Tag"] = "<none>" node["Digest"] = node["RepoDigests"][0].split('@', 2)[1] if node[ "RepoDigests"] else '<' + str(node["RepoDigests"]) + '>' node["Id"] = node["Id"].split(":")[1][:12] node["Created"] = arrow.get(node["Created"]).humanize() node["Size"] = humanize.naturalsize(node["VirtualSize"]) tw.writeln(pystache.render(fm, node)) self.settings[self.name] = str(tw)
def download(url, filename, progress_data=None, session=None, silent=False): """ Initiate a file download and display the progress Args: url(str): Download URL filename(str): Path to save the file to progress_data(dict): Static information to display above the progress bar session(Session): An optional download session to use silent(bool): Download the file, but don't print any output Returns: """ # Set up our requests session and make sure the filepath exists session = session or Session() os.makedirs(os.path.dirname(filename), 0o755, True) # Test the connection response = session.head(url, allow_redirects=True) # type: Response response.raise_for_status() # Get some information about the file we are downloading filesize = naturalsize(response.headers.get('content-length', 0)) filetype = response.headers.get('content-type', 'Unknown') # Format the information output info_lines = [ click.style('Saving to: ', bold=True) + filename, click.style('File type: ', bold=True) + filetype, click.style('File size: ', bold=True) + filesize ] if progress_data: for key, value in progress_data.items(): info_lines.append('{key} {value}'.format(key=click.style(key + ':', bold=True), value=value)) # Print the static information now click.echo() for line in info_lines: click.echo(line) # Now let's make the real download request response = session.get(url, allow_redirects=True) # type: Response # Process the download with open(filename, 'wb') as file: length = int(response.headers.get('content-length', 0)) with click.progressbar(response.iter_content(1024), (length / 1024)) as progress: for chunk in progress: if chunk: file.write(chunk) file.flush()
def export_tar(tree, storage, output, compression=None): """ Export a tree in tar format. """ mode = 'w' if compression in ('gz', 'bz2', 'xz'): mode += ':' + compression with tarfile.open(output, mode) as tar: for fullname, item in walk_tree(storage, tree): payload = None info = tarfile.TarInfo() info.name = fullname.decode('utf-8', 'ignore') if item.type == 'blob': payload = storage.get_blob(item.ref).blob info.type = tarfile.REGTYPE info.size = item['size'] printer.verbose('Adding to {out}: <b>{fn}</b> ({size})', out=output, fn=fullname.decode('utf-8', errors='ignore'), size=humanize.naturalsize(item['size'], binary=True)) elif item.type == 'tree': info.type = tarfile.DIRTYPE printer.verbose('Adding to {out}: <b>{fn}</b> (directory)', out=output, fn=fullname.decode('utf-8', errors='ignore')) else: if item['filetype'] == 'link': info.type = tarfile.SYMTYPE info.linkname = item['link'] printer.verbose('Adding to {out}: <b>{fn}</b> (link to {link})', out=output, fn=fullname.decode('utf-8', errors='ignore'), link=item['link'].decode('utf-8', errors='replace')) elif item['filetype'] == 'fifo': info.type = tarfile.FIFOTYPE printer.verbose('Adding to {out}: <b>{fn}</b> (fifo)', out=output, fn=fullname.decode('utf-8', errors='ignore')) else: continue # Ignore unknown file types # Set optional attributes: info.mode = item.get('mode') info.uid = item.get('uid') info.gid = item.get('gid') info.mtime = item.get('mtime') # Add the item into the tar file: tar.addfile(info, payload)
def export_directory(tree, storage, output): """ Export a tree in a directory. """ os.mkdir(output) for fullname, item in walk_tree(storage, tree): outfullname = os.path.join(output.encode('utf-8'), fullname.lstrip(b'/')) if item.type == 'blob': blob = storage.get_blob(item.ref).blob with open(outfullname, 'wb') as fout: shutil.copyfileobj(blob, fout) printer.verbose('Exporting to {out}: <b>{fn}</b> ({size})', out=output, fn=fullname.decode('utf-8', errors='replace'), size=humanize.naturalsize(item['size'], binary=True)) elif item.type == 'tree': os.mkdir(outfullname) printer.verbose('Exporting to {out}: <b>{fn}</b> (directory)', out=output, fn=fullname.decode('utf-8', errors='replace')) else: if item['filetype'] == 'link': os.symlink(item['link'], outfullname) printer.verbose('Exporting to {out}: <b>{fn}</b> (link to {link})', out=output, fn=fullname.decode('utf-8', errors='replace'), link=item['link'].decode('utf-8', errors='replace')) elif item['filetype'] == 'fifo': os.mkfifo(outfullname) printer.verbose('Exporting to {out}: <b>{fn}</b> (fifo)', out=output, fn=fullname.decode('utf-8', errors='replace')) else: continue # Ignore unknown file types try: if 'mode' in item: try: os.chmod(outfullname, item['mode'], follow_symlinks=False) except SystemError: pass # Workaround follow_symlinks not implemented in Python 3.5 (bug?) if 'uid' in item or 'gid' in item: os.chown(outfullname, item.get('uid', -1), item.get('gid', -1), follow_symlinks=False) except PermissionError: printer.p('<color fg=yellow><b>Warning:</b> unable to set attributes on {fn}</color>', fn=fullname.decode('utf-8', errors='replace'))
def server_info(): """Return server statistics.""" started = time() proc = Process() process_size = proc.memory_info() stats = [] memory = virtual_memory() stats.append( 'Memory: {used} used of {total} ({percent}%)'.format( used=naturalsize(memory.used), total=naturalsize(memory.total), percent=memory.percent ) ) bt = boot_time() uptime = time() - bt stats.append( 'Server Uptime: {delta} since {booted}'.format( delta=format_timedelta(timedelta(seconds=uptime)), booted=ctime(bt) ) ) if server.server.started is not None: stats.append( 'Process Uptime: {} since {}'.format( format_timedelta(datetime.utcnow() - server.server.started), server.server.started.ctime() ) ) stats.append( 'OS Version: {} ({})'.format( platform(), architecture()[0] ) ) stats.append( '{type} Version: {version}'.format( type=python_implementation(), version=version ) ) stats.append('Number Of Threads: %d' % proc.num_threads()) stats.append('Process Memory:') stats.append('Real: %s' % naturalsize(process_size.rss)) stats.append('Virtual: %s' % naturalsize(process_size.vms)) stats.append('Percent: %.2f' % proc.memory_percent()) stats.append('Statistics generated in %.2f seconds.' % (time() - started)) return '\n'.join(stats)
def eval_command(self, args): try: stats = [] containers = args["containers"] del args["containers"] args["decode"] = True for container in containers: args["container"] = container stats.append(self.client.stats(**args)) """ for line in self.client.stats(**args): for iterElement in list(json_iterparse.json_iterparse(line)): # self.output(iterElement, args) print iterElement """ clear() put_cursor(0, 0) print pprint_things( "CONTAINER\tCPU %\tMEM USAGE / LIMIT\tMEM %\tNET I/O\tBLOCK I/O\tPIDS"), while True: y = 1 for stat in stats: put_cursor(0, y) y += 1 tmp = next(stat) tmp["Id"] = tmp["id"][:12] tmp["Cpu"] = (tmp["cpu_stats"]["cpu_usage"][ "total_usage"] / tmp["cpu_stats"]["system_cpu_usage"]) tmp["MemUsage"] = humanize.naturalsize( tmp["memory_stats"]["usage"]) tmp["Limit"] = humanize.naturalsize( tmp["memory_stats"]["limit"]) tmp["Mem"] = (tmp["memory_stats"]["usage"] / tmp["memory_stats"]["limit"]) tmp["NetInput"] = humanize.naturalsize( tmp["networks"]["eth0"]["rx_bytes"]) tmp["NetOutput"] = humanize.naturalsize( tmp["networks"]["eth0"]["tx_bytes"]) tmp["Pids"] = tmp["pids_stats"]["current"] print pprint_things(pystache.render(self.defaultTemplate, tmp)) except KeyboardInterrupt: put_cursor(0, y) colorama.deinit() raise KeyboardInterrupt put_cursor(0, y) colorama.deinit() self.settings[self.name] = "\r"