我们从Python开源项目中,提取了以下48个代码示例,用于说明如何使用time.strftime()。
def _run_test(self, fmt_key): conv=conversions[fmt_key] config.strConfig(logcfg+conv[0]) self.logger=logging.getLogger('') self.console=self.logger.handlers[0] self.console.stream=self.tfile pval=time.strftime(conv[1]) # self.logger.info('test1') self.tfile.seek(0) logline=self.tfile.read() logline=logline.strip() if len(conv) > 2: logline = logline.split(conv[2])[0] pval = pval.split(conv[2])[0] self.assertEquals( pval, logline)
def debug(msg , level=1 , protId=0): if not DEBUG: return if level <= DEBUG: out = '[%s] DEBUG: ' % time.strftime('%H:%M:%S') if protId: out += 'ID: %d ; ' % protId out += msg print(out, file=sys.stderr) # }}} # Response Constants {{{ # # Constants for responses back to the MTA. You should use these actions # at the end of each callback. If none of these are specified, # CONTINUE is used as the default #
def log(log_subsys, log_message, log_type='info', log_data=None): current_time = time.time() # form log entry dictionary log_entry = { 'time' : current_time, 'subsys' : log_subsys, 'type' : log_type, 'message' : log_message, } if log_data is not None: log_dict = dict(log_entry, **log_data) else: log_dict = log_entry if Logger.debug: print("LOG {:s} | {:s}".format(time.strftime("%H:%M:%S", time.localtime(current_time)), log_message)) # attempt to place in queue try: Logger.log_queue.put(log_dict) except Queue.Full as e: sys.stderr.write('Warning: log queue full, discarding message: "{:s}"\n'.format(log_message))
def construct_csr_matrix_from_data_and_nodes(f,nodes,blacklisted_nodes,remove_diag=True): print "GenomeDISCO | "+strftime("%c")+" | processing: Loading interaction data from "+f total_nodes=len(nodes.keys()) i=[] j=[] v=[] #print strftime("%c") c=0 for line in gzip.open(f): items=line.strip().split('\t') n1,n2,val=nodes[items[0]]['idx'],nodes[items[1]]['idx'],float(items[2]) i.append(n1) j.append(n2) v.append(val) c+=1 csr_m=csr_matrix( (v,(i,j)), shape=(total_nodes,total_nodes),dtype=float) if remove_diag: csr_m.setdiag(0) return filter_nodes(csr_m,blacklisted_nodes)
def random_walks_by_chunk_get_score_sparse_matrix(mym1,mym2,tmin,tmax,nonzero_total,chunksize): scores=[] n=mym1.shape[0] m1_t=mym1.transpose() m2_t=mym2.transpose() mat_names[1]='mats' for t in range(1,(tmax+1)): if t!=1: compute_current_matrices(t,mat_names) if t>=tmin: pass #scores.append(1.0*abs_diff_by_chunk_sparse_matrix(t)/nonzero_total) print 'done '+str(t)+' '+strftime("%c") return scores
def random_walks_by_chunk_get_score(mym1,mym2,tmin,tmax,nonzero_total,chunksize): scores=[] hdf5_names={} n=mym1.shape[0] m1_t=mym1.transpose() m2_t=mym2.transpose() #write the ms into hdf5s #todo: make name more specific #print 'filling hdf5 '+strftime("%c") hdf5_names[1]='hdf5s' fill_hdf5_with_sparse_by_chunk(mym1,mym2,hdf5_names[1],chunksize) for t in range(1,(tmax+1)): if t!=1: hdf5_names[t]='hdf5s_'+str(t) #t=1, t=(t-1) and the new t=t that we want to compute multiply_by_chunk(hdf5_names[1],hdf5_names[t-1],hdf5_names[t],chunksize) if t>=tmin: scores.append(1.0*abs_diff_by_chunk(hdf5_names[t],'m1','m2',chunksize)/nonzero_total) print 'done '+str(t)+' '+strftime("%c") return scores
def disconnect(self): flag = self.get_conn() if len(flag) == 1: handle = flag[0][0] dialname = str(flag[0][1]) try: win32ras.HangUp(handle) self.saveData(False, time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())) logger.info("??" + dialname + "????") return True except Exception as e: logger.info(dialname + "???????" + str(e.message)) # disconnect() else: logger.info("?????????????") # ????????
def __init__(self, configuration): self.client_queue = multiprocessing.Queue(0) self.apply_patch() self.logger = self.init_logger() if ["debug", "html", "content_type", "notify", "ports"] not in configuration: raise PJFMissingArgument() if configuration.debug: print("[\033[92mINFO\033[0m] Starting HTTP ({0}) and HTTPS ({1}) built-in server...".format( configuration.ports["servers"]["HTTP_PORT"], configuration.ports["servers"]["HTTPS_PORT"] )) if not configuration.content_type: configuration.content_type = False if not configuration.content_type: configuration.content_type = "application/json" self.config = configuration self.json = PJFFactory(configuration) self.https = SSLWSGIRefServer(host="0.0.0.0", port=self.config.ports["servers"]["HTTPS_PORT"]) self.http = WSGIRefServer(host="0.0.0.0", port=self.config.ports["servers"]["HTTP_PORT"]) self.httpsd = multiprocessing.Process(target=run, kwargs={"server": self.https, "quiet": True}) self.httpd = multiprocessing.Process(target=run, kwargs={"server": self.http, "quiet": True}) if self.config.fuzz_web: self.request_checker = Thread(target=self.request_pool, args=()) self.logger.debug("[{0}] - PJFServer successfully initialized".format(time.strftime("%H:%M:%S")))
def __init__(self, configuration): """ Init the ProcessMonitor server """ self.logger = self.init_logger() if ["debug", "ports", "process_to_monitor"] not in configuration: raise PJFMissingArgument() self.config = configuration self.process = None self.finished = False self.testcase_count = 0 if self.config.debug: print("[\033[92mINFO\033[0m] Starting process monitoring...") print("[\033[92mINFO\033[0m] Starting Testcase Server ({0})...".format( self.config.ports["servers"]["TCASE_PORT"] )) super(PJFProcessMonitor, self).__init__(configuration) self.logger.debug("[{0}] - PJFProcessMonitor successfully completed".format(time.strftime("%H:%M:%S")))
def __init__(self, configuration): """ Class that represent a JSON object """ self.logger = self.init_logger() if ["json", "json_file", "strong_fuzz", "parameters", "exclude_parameters", "url_encode", "indent", "utf8"] not in configuration: raise PJFMissingArgument("Some arguments are missing from PJFFactory object") self.config = configuration self.mutator = PJFMutation(self.config) other = self.config.json if not self.config.strong_fuzz: if type(other) == dict: self.json = other elif type(other) == list: self.json = {"array": other} else: raise PJFInvalidType(other, dict) else: if self.config.json_file: self.json = other else: self.json = json.dumps(other) self.logger.debug("[{0}] - PJFFactory successfully initialized".format(time.strftime("%H:%M:%S")))
def mkdir_file(self): """ :return:????????? """ ini = U.ConfigIni() result_file = str(ini.get_ini('test_case', 'log_file')) result_file_every = result_file + '/' + \ time.strftime("%Y-%m-%d_%H_%M_%S{}".format(random.randint(10, 99)), time.localtime(time.time())) file_list = [ result_file, result_file_every, result_file_every + '/log', result_file_every + '/per', result_file_every + '/img', result_file_every + '/status'] if not os.path.exists(result_file): os.mkdir(result_file) for file_path in file_list: if not os.path.exists(file_path): os.mkdir(file_path) return result_file_every
def imshow_cv(label, im, block=False, text=None, wait=2): vis = im.copy() print_status(vis, text=text) window_manager.imshow(label, vis) ch = cv2.waitKey(0 if block else wait) & 0xFF if ch == ord(' '): cv2.waitKey(0) if ch == ord('v'): print('Entering debug mode, image callbacks active') while True: ch = cv2.waitKey(10) & 0xFF if ch == ord('q'): print('Exiting debug mode!') break if ch == ord('s'): fn = 'img-%s.png' % time.strftime("%Y-%m-%d-%H-%M-%S") print 'Saving %s' % fn cv2.imwrite(fn, vis) elif ch == 27 or ch == ord('q'): sys.exit(1)
def main(_): config = flags.FLAGS.__flags.copy() config.update(json.loads(config['config'])) del config['config'] if config['results_dir'] == '': del config['results_dir'] if config['task'] == 'search': # Hyperparameter search cannot be continued, so a new results dir is created. config['results_dir'] = os.path.join(results_dir, 'hs', config['model_name'] \ + time.strftime('_%Y-%m-%d_%H-%M-%S', time.gmtime())) hb = Hyperband(config) results = hb.run() else: model = make_model(config) if config['task'] == 'train': model.train() elif config['task'] == 'test': model.test() else: print('Invalid argument: --task=%s. ' \ + 'It should be either of {train, test, search}.' % config['task'])
def log(scan_type,host,port,info=''): mutex.acquire() time_str = time.strftime('%X', time.localtime( time.time())) if scan_type == 'portscan': print "[%s] %s:%d open"%(time_str,host,int(port)) elif scan_type == 'discern': print "[%s] %s:%d is %s"%(time_str,host,int(port),info) elif scan_type == 'active': print "[%s] %s active" % (time_str, host) elif info: log = "[*%s] %s:%d %s %s"%(time_str,host,int(port),scan_type,info) print log log_file = open('result.log','a') log_file.write(log+"\r\n") log_file.close() mutex.release()
def asctime(t=None): """ Convert a tuple or struct_time representing a time as returned by gmtime() or localtime() to a 24-character string of the following form: >>> asctime(time.gmtime(0)) 'Thu Jan 1 00:00:00 1970' If t is not provided, the current time as returned by localtime() is used. Locale information is not used by asctime(). This is meant to normalise the output of the built-in time.asctime() across different platforms and Python versions. In Python 3.x, the day of the month is right-justified, whereas on Windows Python 2.7 it is padded with zeros. See https://github.com/behdad/fonttools/issues/455 """ if t is None: t = time.localtime() s = "%s %s %2s %s" % ( DAYNAMES[t.tm_wday], MONTHNAMES[t.tm_mon], t.tm_mday, time.strftime("%H:%M:%S %Y", t)) return s
def write_parameter_log(options, args, output_dir): """ Write paramter values to a log file, named by current time. """ with open(output_dir+'/CLAM_Aligner.Log.'+ strftime("%Y%m%d_%H%M") + '.txt', 'w') as log: log.write('CLAM Re-aligner ' + __version__ + '\n') log.write('Args:\n' + '\n'.join(args) + '\n') log.write('resume: ' + str(options.resume) + '\n') log.write('verbose: ' + str(options.verbose) + '\n') log.write('output_dir: ' + str(options.output_dir) + '\n') log.write('tmp_dir: ' + str(options.tmp_dir) + '\n') log.write('window_size: ' + str(options.window_size) + '\n') log.write('max_multihits: ' + str(options.max_multihits) + '\n') log.write('is_stranded: ' + str(options.is_stranded) + '\n') log.write('max-gap: ' + str(options.max_gaps) + '\n') #log.write('gtf: ' + str(options.gtf) + '\n') #if len(args)>1: # log.write('cov_site_min: ' + str(options.cov_site_min) + '\n') # log.write('cov_gene_min: ' + str(options.cov_gene_min) + '\n') return
def write_parameter_log(options, output_dir): """ Write paramter values to a log file, named by current time. """ merge_method_dict={1:'narrowPeak', 2:'broadPeak'} correction_method_dict={1:'Bonferroni', 2:'BH_FDR'} with open(output_dir+'/CLAM_Peaker.Parameters.'+ strftime("%Y%m%d_%H%M") + '.txt', 'w') as log: log.write('CLAM Peaker ' + __version__ + '\n') log.write('resume: ' + str(options.resume) + '\n') log.write('verbose: ' + str(options.verbose) + '\n') log.write('output_dir:' + str(options.output_dir) + '\n') log.write('tmp_dir: ' + str(options.tmp_dir) + '\n') log.write('peak_file: ' + str(options.peak_file) + '\n') log.write('is_stranded: ' + str(options.is_stranded) + '\n') log.write('extend: ' + str(options.extend) + '\n') log.write('pval_cutoff: ' + str(options.pval_cutoff) + '\n') log.write('merge_size: ' + str(options.merge_size) + '\n') log.write('max_iter: ' + str(options.max_iter) + '\n') log.write('gtf: ' + str(options.gtf) + '\n') log.write('seed: ' + str(options.seed) + '\n') log.write('merge_method: ' + merge_method_dict[options.merge_method] + '\n') log.write('correction_method: ' + correction_method_dict[options.correction_method] + '\n') log.write('thread: ' + str(options.nb_proc) + '\n')
def request(self, method, request_uri, headers, content): """Modify the request headers""" keys = _get_end2end_headers(headers) keylist = "".join(["%s " % k for k in keys]) headers_val = "".join([headers[k] for k in keys]) created = time.strftime('%Y-%m-%dT%H:%M:%SZ',time.gmtime()) cnonce = _cnonce() request_digest = "%s:%s:%s:%s:%s" % (method, request_uri, cnonce, self.challenge['snonce'], headers_val) request_digest = hmac.new(self.key, request_digest, self.hashmod).hexdigest().lower() headers['authorization'] = 'HMACDigest username="%s", realm="%s", snonce="%s", cnonce="%s", uri="%s", created="%s", response="%s", headers="%s"' % ( self.credentials[0], self.challenge['realm'], self.challenge['snonce'], cnonce, request_uri, created, request_digest, keylist)
def gethtml(zurl,str_fname): mobileEmulation = {'deviceName': 'Apple iPhone 6'} options = webdriver.ChromeOptions() options.add_experimental_option('mobileEmulation', mobileEmulation) driver = webdriver.Chrome(executable_path='chromedriver.exe', chrome_options=options) driver.get(zurl) time.sleep(5) result = [] # for i in range(0,300): #???0?20?????i? for i in range(0, 1): # ???0?3?????i? print('????' + str(i)) myscroll(driver) time.sleep(2) st=time.strftime("%Y%m%d",time.localtime()) # print(driver.page_source, file=open('itg201703.html', 'w', encoding='utf-8')) print(driver.page_source, file=open(str_fname+"-"+st+".html", 'w', encoding='utf-8')) print("?????????") print(driver.title) driver.quit()
def strftime(config, context, arg, now=time.gmtime()): """ strftime returns the current time (in UTC) converted to the format specified by the first argument. The format is specified using Python's time.strftime format ( https://docs.python.org/2/library/time.html#time.strftime). Example: {"CFPP::Strftime": "%Y%m%d_%H%M%S"} ==> 20060102_220405 Note: use special care when using this function with CloudFormation's "update" functionality. The output of this function will change each time cfpp is run. """ _raise_unless_string(context, arg) return time.strftime(arg, now)
def make_layout(self,frame,label,labelloc,labelwidth): """ Generate chip with dimensions xdim,ydim """ box=cad.shapes.Box((-self.xdim/2, -self.ydim/2), (self.xdim/2, self.ydim/2), width=self.boxwidth, layer =self.layer_box) date = time.strftime("%d/%m/%Y") # The label is added 100 um on top of the main cell label_grid_chip = cad.shapes.LineLabel( self.name + " " +\ date,self.boxwidth, position=labelloc, line_width=labelwidth, layer=self.layer_label) if frame==True: self.add(box) if label==True: self.add(label_grid_chip)
def make_wafer(self,wafer_r,frame,label,labelloc,labelwidth): """ Generate wafer with primary flat on the left. From https://coresix.com/products/wafers/ I estimated that the angle defining the wafer flat to arctan(flat/2 / radius) """ angled = 18 angle = angled*np.pi/180 circ = cad.shapes.Circle((0,0), wafer_r, width=self.boxwidth, initial_angle=180+angled, final_angle=360+180-angled, layer=self.layer_box) flat = cad.core.Path([(-wafer_r*np.cos(angle),wafer_r*np.sin(angle)),(-wafer_r*np.cos(angle),-wafer_r*np.sin(angle))], width=self.boxwidth, layer=self.layer_box) date = time.strftime("%d/%m/%Y") if labelloc==(0,0): labelloc=(-2e3,wafer_r-1e3) # The label is added 100 um on top of the main cell label_grid_chip = cad.shapes.LineLabel( self.name + " " +\ date,500,position=labelloc, line_width=labelwidth, layer=self.layer_label) if frame==True: self.add(circ) self.add(flat) if label==True: self.add(label_grid_chip)
def timeheader(timestamp=time.gmtime()): """Timestamp header string timestamp - timestamp return - timetamp string for the file header """ assert isinstance(timestamp, time.struct_time), 'Unexpected type of timestamp' # ATTENTION: MPE pool timestamp [prefix] intentionally differs a bit from the # benchmark timestamp to easily find/filter each of them return time.strftime('# ----- %Y-%m-%d %H:%M:%S ' + '-'*30, timestamp) # Limit the amount of memory consumption by worker processes. # NOTE: # - requires import of psutils # - automatically reduced to the RAM size if the specidied limit is larger
def outputjson(): site_info = site_get() tempdict = {} tempjson = "[" info_list = Article.query.filter_by().all() for item in info_list: tempdict = item.__dict__ del tempdict["_sa_instance_state"] value = json.dumps(tempdict,cls=CJsonEncoder) tempjson += value + ",\n" tempjson = tempjson[:-2] + "]" filename = 'page_list_'+str(time.strftime("%Y%m%d"))+'.txt' output = open(filename,'w') output.write(tempjson) output.close() flash(u'?????????????') return render_template('admin/output.html', **locals())
def get_merged_nodes(): merged = {} mergeddmp = open(args.infile_mergeddmp_path,'r') for curr_line in mergeddmp: curr_line_old_taxid = curr_line.split('|')[0].strip() curr_line_new_taxid = curr_line.split('|')[1].strip() merged[curr_line_old_taxid] = curr_line_new_taxid mergeddmp.close() log_file = open(args.logfile_path, 'a') log_file.write('get_merged_nodes() finished ' + strftime("%H:%M:%S on %d-%m-%Y",localtime()) + '\n') log_file.close() return(merged) #################################################
def get_deleted_nodes(): deleted = {} delnodesdmp = open(args.infile_delnodesdmp_path,'r') for curr_line in delnodesdmp: curr_line_old_taxid = curr_line.split('|')[0].strip() deleted[curr_line_old_taxid] = True delnodesdmp.close() log_file = open(args.logfile_path, 'a') log_file.write('get_deleted_nodes() finished ' + strftime("%H:%M:%S on %d-%m-%Y",localtime()) + '\n') log_file.close() return(deleted) #################################################
def save_liquids(self): start_time = time.time() for liquid_obj in self.bl_scene_objects.liquids: print("\nSaving liquid: <<{}>>".format(liquid_obj.name)) if not liquid_obj.WowLiquid.WMOGroup: print("WARNING: Failed saving liquid: <<{}>>".format(liquid_obj.name)) continue group_obj = bpy.context.scene.objects[liquid_obj.WowLiquid.WMOGroup] group_index = group_obj.WowWMOGroup.GroupID group = self.groups[group_index] group.save_liquid(liquid_obj) print("Done saving liquid: <<{}>>".format(liquid_obj.name)) print("\nDone saving liquids. " "\nTotal saving time: ", time.strftime("%M minutes %S seconds", time.gmtime(time.time() - start_time)))
def open_game_resources(wow_path): """Open game resources and store links to them in memory""" print("\nProcessing available game resources of client: " + wow_path) start_time = time.time() if WoWFileData.is_wow_path_valid(wow_path): data_packages = WoWFileData.list_game_data_paths(os.path.join(wow_path, "Data\\")) resource_map = [] for package in data_packages: if os.path.isfile(package): resource_map.append((mpyq.MPQArchive(package, listfile=False), True)) print("\nLoaded MPQ: " + os.path.basename(package)) else: resource_map.append((package, False)) print("\nLoaded folder patch: " + os.path.basename(package)) print("\nDone initializing data packages.") print("Total loading time: ", time.strftime("%M minutes %S seconds", time.gmtime(time.time() - start_time))) return resource_map else: print("\nPath to World of Warcraft is empty or invalid. Failed to load game data.") return None
def log(self, message, *args): now = time.time() if args: message = message % args if self.log_ms: self.stream.write('%s.%03d %s - %s\n' % (time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(now)), 1000 * (now - int(now)), self.name, message)) else: self.stream.write('%s %s - %s\n' % (time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(now)), self.name, message))
def get_requirements(): return ''' import os import re import sys import math import time import socket import base64 import shutil import ctypes import socket import struct import zipfile import datetime import requests import StringIO import platform import threading import subprocess from Crypto import Random from Crypto.Cipher import AES from mss import ScreenshotError from time import strftime, sleep from contextlib import contextmanager from base64 import b64decode as INFO from zlib import decompress as SEC from st_utils import * from st_protocol import * from st_encryption import * '''
def current_time(): return strftime(Res.time_format, gmtime())
def save_rdd_contents(rdd): file_name = "".join(( "/vagrant_home/uniq_metrics", '-', time.strftime("%Y-%m-%d-%H-%M-%S"), '-', str(rdd.id), '.log')) rdd.saveAsTextFile(file_name)
def saveLights(self): # We'll now save the lights down to a JSON file that can be shared as a preset # The properties dictionary will hold all the light properties to save down properties = {} # First lets get all the light widgets that exist in our manager for lightWidget in self.findChildren(LightWidget): # For each widget we can get its' light object light = lightWidget.light # Then we need to get its transform node transform = light.getTransform() # Finally we add it to the dictionary. # The key will be the name of the transform which we get by converting the node to a string # Then we simply query the attributes of the light that we want to save down properties[str(transform)] = { 'translate': list(transform.translate.get()), 'rotation': list(transform.rotate.get()), 'lightType': pm.objectType(light), 'intensity': light.intensity.get(), 'color': light.color.get() } # We fetch the light manager directory to save in directory = self.getDirectory() # We then construct the name of the lightFile to save # We'll be using time.strftime to construct a name using the current time # %m%d will give 0701 for July 1st (month and day) # so we'd end up with a name like lightFile_0701.json stored in our directory lightFile = os.path.join(directory, 'lightFile_%s.json' % time.strftime('%m%d')) # Next we open the file to write with open(lightFile, 'w') as f: # Then we use json to write out our file to this location json.dump(properties, f, indent=4) # A helpful logger call tells us where the file was saved to. logger.info('Saving file to %s' % lightFile)
def sec_to_text(ts): return time.strftime('%Y-%m-%d %H:%M:%S -0000', time.gmtime(ts))
def showUser(user, fullInfo): def line(key, value): if value: printLine("%s : %s" % (key.ljust(16, " "), value)) separator("#", "USER INFO") line('Username', user.username) line('Name', user.name) line('Email', user.email) if fullInfo: limit = (int(user.accounting.uploadLimit) / 1024 / 1024) endlimit = time.gmtime(user.accounting.uploadLimitEnd / 1000) line('Upload limit', "%.2f" % limit) line('Upload limit end', time.strftime("%d.%m.%Y", endlimit))
def printDate(timestamp): # Author @ash-2000 https://github.com/ash-2000 # Check for crashing when timestamp is 13 digits on python2.7 # pull request #260 if len(str(timestamp)) == 13: timestamp = int(str(timestamp)[0:-3]) # --- return datetime.date.strftime(datetime.date.fromtimestamp(timestamp / 1000), "%d.%m.%Y")
def print_and_log(text, error=False): print(text) if error: logging.error(time.strftime(LOG_TIME_FORMAT) + text) else: logging.info(time.strftime(LOG_TIME_FORMAT) + text)
def proccessComments(): for comment in reddit.redditor(str(reddit.user.me())).comments.new(limit=None): #if comment score is below the threshold, delete it if comment.score < deleteThreshold: comment.delete() permalink = "http://www.reddit.com" + \ comment.permalink() + "/" print("Deleting comment: " + permalink) logging.info(time.strftime("%Y/%m/%d %H:%M:%S ") + "Deleting comment: " + permalink)
def datetime(self, asstruct=False): if not asstruct: return time.strftime('%Y-%m-%d %X %Z') else: d = time.localtime() return { 'year': d.tm_year, 'mon': d.tm_mon, 'mday': d.tm_mday, 'hour': d.tm_hour, 'min': d.tm_min, 'sec': d.tm_sec, 'tz': time.strftime('%Z', d), 'str': time.strftime('%Y-%m-%d %X', d), }
def cpustat(self, fullstat=False): cpustat = {} # REF: http://www.kernel.org/doc/Documentation/filesystems/proc.txt fname = ('used', 'idle') full_fname = ('user', 'nice', 'system', 'idle', 'iowait', 'irq', 'softirq', 'steal', 'guest', 'guest_nice') cpustat['cpus'] = [] with open('/proc/stat', 'r') as f: for line in f: if line.startswith('cpu'): fields = line.strip().split() name = fields[0] if not fullstat and name != 'cpu': continue; stat = fields[1:] stat = [int(i) for i in stat] statall = sum(stat) if fullstat: while len(stat) < 10: stat.append(0) stat = dict(zip(full_fname, stat)) else: stat = [statall-stat[3], stat[3]] stat = dict(zip(fname, stat)) stat['all'] = statall if name == 'cpu': cpustat['total'] = stat else: cpustat['cpus'].append(stat) elif line.startswith('btime'): btime = int(line.strip().split()[1]) cpustat['btime'] = time.strftime('%Y-%m-%d %X %Z', time.localtime(btime)) return cpustat
def format_duration(self, duration): if (duration <= 0) and self.max is None or self.cur == self.min: result = '??:??:??' #elif duration < 1: # result = '--:--:--' else: result = time.strftime('%H:%M:%S', time.gmtime(duration)) return result
def update_headers(self, resp): headers = resp.headers if 'expires' in headers: return {} if 'cache-control' in headers and headers['cache-control'] != 'public': return {} if resp.status not in self.cacheable_by_default_statuses: return {} if 'date' not in headers or 'last-modified' not in headers: return {} date = calendar.timegm(parsedate_tz(headers['date'])) last_modified = parsedate(headers['last-modified']) if date is None or last_modified is None: return {} now = time.time() current_age = max(0, now - date) delta = date - calendar.timegm(last_modified) freshness_lifetime = max(0, min(delta / 10, 24 * 3600)) if freshness_lifetime <= current_age: return {} expires = date + freshness_lifetime return {'expires': time.strftime(TIME_FMT, time.gmtime(expires))}
def tags(self): version = '' if self.tag_build: version += self.tag_build if self.tag_date: version += time.strftime("-%Y%m%d") return version