我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用time.asctime()。
def flush_queue(self): # check number of messages in queue num_log_entries = self.log_queue.qsize() if num_log_entries > 0: # open the log file with open(self.log_full_filename, 'ab') as log_file: for i in range(num_log_entries): log_entry = self.log_queue.get() # append extra log information current_time = log_entry['time'] current_time_str = time.asctime(time.localtime(current_time)) log_entry['localtime'] = current_time_str # log the message as a JSON string if isPython3: log_file.write(bytes(json.dumps(log_entry) + "\n", 'UTF-8')) else: log_file.write(json.dumps(log_entry) + "\n")
def maker(name): my_level = getattr(logging, name.upper()) if name != 'exception' else logging.ERROR altname = (name if name != 'exception' else 'error').upper() def _log(self, msg, *args, **kwargs): exc = kwargs.pop('exc_info', None) or name == 'exception' tb = ('\n' + traceback.format_exc().strip()) if exc else '' if args: try: msg = msg % args except: self.exception( "Exception raised while formatting message:\n%s\n%r", msg, args) msg += tb # todo: check level before printing if self.level <= my_level: print("%s %s %s"%(time.asctime(), altname, msg)) _log.__name__ = name return _log
def _install_message(self, message): """Format a message and blindly write to self._file.""" from_line = None if isinstance(message, str) and message.startswith('From '): newline = message.find('\n') if newline != -1: from_line = message[:newline] message = message[newline + 1:] else: from_line = message message = '' elif isinstance(message, _mboxMMDFMessage): from_line = 'From ' + message.get_from() elif isinstance(message, email.message.Message): from_line = message.get_unixfrom() # May be None. if from_line is None: from_line = 'From MAILER-DAEMON %s' % time.asctime(time.gmtime()) start = self._file.tell() self._file.write(from_line + os.linesep) self._dump_message(message, self._file, self._mangle_from_) stop = self._file.tell() return (start, stop)
def asctime(t=None): """ Convert a tuple or struct_time representing a time as returned by gmtime() or localtime() to a 24-character string of the following form: >>> asctime(time.gmtime(0)) 'Thu Jan 1 00:00:00 1970' If t is not provided, the current time as returned by localtime() is used. Locale information is not used by asctime(). This is meant to normalise the output of the built-in time.asctime() across different platforms and Python versions. In Python 3.x, the day of the month is right-justified, whereas on Windows Python 2.7 it is padded with zeros. See https://github.com/behdad/fonttools/issues/455 """ if t is None: t = time.localtime() s = "%s %s %2s %s" % ( DAYNAMES[t.tm_wday], MONTHNAMES[t.tm_mon], t.tm_mday, time.strftime("%H:%M:%S %Y", t)) return s
def calc_output_tuple(self): utc_secs = self.__utc_secs utc_tuple = secs_to_tuple(utc_secs) year = utc_tuple[0] self.is_dst = False timezone = self.__output_timezone output_secs = utc_secs - TIME_ZONE_OFFSETS_TO_UTC[timezone] if timezone != UNIVERSAL: dst_start = calc_DST_start_utc_secs(year, timezone) dst_end = calc_DST_end_utc_secs(year, timezone) if (utc_secs >= dst_start and utc_secs <= dst_end): output_secs += HOUR_SECONDS self.is_dst = True self.__output_tuple = secs_to_tuple(output_secs) self.db_output = time.asctime(self.__output_tuple) return self
def main(): return ''' <style> h1 { font-size:150px; color: #ada; font-family: cursive; text-align: center; margin: 70px 30px; font-weight: normal; } p { color: #9a9; font-size:20px; text-align: right; } </style> <h1>Paper<br>Melody</h1> <p>%s</p> ''' % time.asctime()
def __init__(self): super(BanyanPub, self).__init__(process_name='Banyan publisher') print('Publishing 100000 messages.') time.sleep(.3) for x in range(0, 100000): payload = {'msg': x} self.publish_payload(payload, 'test') localtime = time.asctime(time.localtime(time.time())) print('Task completed at: ', localtime) super(BanyanPub, self).clean_up() sys.exit(0) # instantiate this class
def download_files(msg): user_remark = itchat.search_friends(userName=msg['FromUserName'])['RemarkName'] or itchat.search_friends( userName=msg['FromUserName'])['NickName'] if msg['FromUserName'] == itchat.originInstance.storageClass.userName: user_remark = '?' msg_time = t.asctime(t.localtime(t.time())) if msg['ToUserName'] == 'filehelper': print('\033[33m{time} \033[34m{user_from}\033[0m ????? \033[34m{user_to}\033[0m: <{file_name}>'.format( time=msg_time, user_from=user_remark, user_to='?', file_name=msg['FileName'])) # ???? else: user_to = itchat.search_friends(userName=msg['ToUserName'])['RemarkName'] or itchat.search_friends( userName=msg['ToUserName'])['NickName'] print('\033[33m{time} \033[34m{user_from}\033[0m ????? \033[35m{user_to}\033[0m: <{file_name}>'.format( time=msg_time, user_from=user_remark, user_to=user_to, file_name=msg['FileName'])) # ???? else: msg_time = t.asctime(t.localtime(t.time())) print('\033[33m{time} \033[35m{user_from}\033[0m ????? \033[34m?\033[0m: <{file_name}>'.format( time=msg_time, user_from=user_remark, file_name=msg['FileName'])) # ???? try: msg['Text']('./wechat_files/file/{file_name}'.format(file_name=msg['FileName'])) except (TimeoutError, ConnectionError, ConnectionAbortedError): print('\033[31m???????\033[0m') else: print("Unexpected error:", sys.exc_info()[0]) pass
def prog(self):#Show progress nb_batches_total=(self.params['nb_epoch'] if not kv-1 else self.params['epochs'])*self.params['nb_sample']/self.params['batch_size'] nb_batches_epoch=self.params['nb_sample']/self.params['batch_size'] prog_total=(self.t_batches/nb_batches_total if nb_batches_total else 0)+0.01 prog_epoch=(self.c_batches/nb_batches_epoch if nb_batches_epoch else 0)+0.01 if self.t_epochs: now=time.time() t_mean=float(sum(self.t_epochs)) / len(self.t_epochs) eta_t=(now-self.train_start)*((1/prog_total)-1) eta_e=t_mean*(1-prog_epoch) t_end=time.asctime(time.localtime(now+eta_t)) e_end=time.asctime(time.localtime(now+eta_e)) m='\nTotal:\nProg:'+str(prog_total*100.)[:5]+'%\nEpoch:'+str(self.epoch[-1])+'/'+str(self.stopped_epoch)+'\nETA:'+str(eta_t)[:8]+'sec\nTrain will be finished at '+t_end+'\nCurrent epoch:\nPROG:'+str(prog_epoch*100.)[:5]+'%\nETA:'+str(eta_e)[:8]+'sec\nCurrent epoch will be finished at '+e_end self.t_send(m) print(m) else: now=time.time() eta_t=(now-self.train_start)*((1/prog_total)-1) eta_e=(now-self.train_start)*((1/prog_epoch)-1) t_end=time.asctime(time.localtime(now+eta_t)) e_end=time.asctime(time.localtime(now+eta_e)) m='\nTotal:\nProg:'+str(prog_total*100.)[:5]+'%\nEpoch:'+str(len(self.epoch))+'/'+str(self.stopped_epoch)+'\nETA:'+str(eta_t)[:8]+'sec\nTrain will be finished at '+t_end+'\nCurrent epoch:\nPROG:'+str(prog_epoch*100.)[:5]+'%\nETA:'+str(eta_e)[:8]+'sec\nCurrent epoch will be finished at '+e_end self.t_send(m) print(m)
def on_train_begin(self, logs={}): self.epoch=[] self.t_epochs=[] self.t_batches=0 self.logs_batches={} self.logs_epochs={} self.train_start=time.time() self.localtime = time.asctime( time.localtime(self.train_start) ) self.mesg = 'Train started at: '+self.localtime self.t_send(self.mesg) self.stopped_epoch = (self.params['epochs'] if kv-1 else self.params['nb_epoch']) #============================================================================== #============================================================================== #============================================================================== # #==============================================================================
def start_webserver(self): def webthread_start(): print time.asctime(), "Server Starts - %s:%s" % (self.server, self.port) self.web_server = CodemapHTTPServer( (self.server, self.port), CodemapHTTPRequestHandler) self.web_server.set_codemap(self) self.seq_dict.update({self.uid: []}) try: self.web_server.serve_forever() except KeyboardInterrupt: pass self.web_server.server_close() print time.asctime(), "Server Stops - %s:%s" % (self.server, self.port) if self.thread_http is not None: pass self.thread_http = threading.Thread(target=webthread_start) self.thread_http.daemon = True self.thread_http.start()
def Log(text=None): """Make an entry in the default log file.""" now = str(time.asctime(time.localtime(time.time()))) if not text: text = "-" entry = "%s: %s\r"%(now, text) path = os.path.join(os.getcwd(), "Logs") new = 0 if not os.path.exists(path): os.makedirs(path) new = 1 log = os.path.join(path, "log.txt") f = open(log, 'a') if new: f.write("# log file for FL\r") f.write(entry) f.close()
def initaliseLogger(): global log log = logging.getLogger("pySimReader") #~ log.setLevel(logging.INFO) log.setLevel(logging.WARNING) # Define the handler and formmatter myLogHandler = logging.FileHandler("pySimReader.log", "w") # Attach the formatter to the handler and the handler to the log myLogHandler.setFormatter(logging.Formatter("%(asctime)s - %(levelname)-5s : %(message)s", "%H:%M:%S")) log.addHandler(myLogHandler) log.info("Started pySimReader: %s" % (time.asctime())) ##################################################################################### # This class will be the main windows for the pySIM application # #####################################################################################
def convertTimestamp(self, ts): # 2050107034146B self.timetuple = [0,0,0,0,0,0,0,0,0] self.timetuple[0] = int(ts[0]) + int(ts[1]) * 10 if self.timetuple[0] >= 80: # Convert to previous century, hopefully no one uses this after 2079 ;) self.timetuple[0] += 1900 else: # Convert to current century self.timetuple[0] += 2000 #~ print ts self.timetuple[1] = int(ts[2]) + int(ts[3]) * 10 self.timetuple[2] = int(ts[4]) + int(ts[5]) * 10 self.timetuple[3] = int(ts[6]) + int(ts[7]) * 10 self.timetuple[4] = int(ts[8]) + int(ts[9]) * 10 self.timetuple[5] = int(ts[10]) + int(ts[11]) * 10 self.timetuple[6] = calendar.weekday(self.timetuple[0], self.timetuple[1], self.timetuple[2]) return time.asctime(self.timetuple)
def run(args): print time.asctime(time.localtime(time.time())) if args.inferrer == 'emb': inferrer = CsEmbeddingInferrer(args.vocabfile, args.ignoretarget, args.contextmath, args.embeddingpath, args.embeddingpathc, args.testfileconll, args.bow_size, 10) print "Using CsEmbeddingInferrer" elif args.inferrer == 'lstm': inferrer = Context2vecInferrer(args.lstm_config, args.ignoretarget, args.contextmath, 10) print "Using Context2vecInferrer" else: raise Exception("Unknown inferrer type: " + args.inferrer) print time.asctime(time.localtime(time.time())) run_test(inferrer) print "Finished" print time.asctime(time.localtime(time.time()))
def wait_for_new_inst(module, connection, group_name, wait_timeout, desired_size, prop): # make sure we have the latest stats after that last loop. as_group = connection.get_all_groups(names=[group_name])[0] props = get_properties(as_group) log.debug("Waiting for {0} = {1}, currently {2}".format(prop, desired_size, props[prop])) # now we make sure that we have enough instances in a viable state wait_timeout = time.time() + wait_timeout while wait_timeout > time.time() and desired_size > props[prop]: log.debug("Waiting for {0} = {1}, currently {2}".format(prop, desired_size, props[prop])) time.sleep(10) as_group = connection.get_all_groups(names=[group_name])[0] props = get_properties(as_group) if wait_timeout <= time.time(): # waiting took too long module.fail_json(msg = "Waited too long for new instances to become viable. %s" % time.asctime()) log.debug("Reached {0}: {1}".format(prop, desired_size)) return props
def cli_list(ctx): """List payment channels and their information.""" urls = ctx.obj['client'].list() if ctx.obj['json']: print(json.dumps({'result': urls})) elif len(urls) == 0: print("No payment channels exist.") else: print() for url in urls[::-1]: print(COLORS['blue'] + url + COLORS['reset']) # Get channel status status = ctx.obj['client'].status(url) print(" {:<16}{}".format("Status", format_state((str(status.state))))) print(" {:<16}{}".format("Balance", status.balance)) print(" {:<16}{}".format("Deposit", status.deposit)) print(" {:<16}{}".format("Created", time.asctime(time.localtime(status.creation_time)))) print(" {:<16}{}".format("Expires", format_expiration_time(status.expiration_time))) print(" {:<16}{}".format("Deposit txid", status.deposit_txid)) print(" {:<16}{}".format("Spend txid", status.spend_txid)) print()
def main(): import time ticks=time.time() print ' ???????',ticks print '---------' localtime=time.localtime(time.time()) print ' ??????',localtime print '---------' localtime=time.asctime(time.localtime(time.time())) print '??????',localtime print '---------' print time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) print time.strftime("%a %b %d %H:%M:%S %Y", time.localtime()) a=time.strftime("%a %b %d %H:%M:%S %Y", time.localtime()) print time.mktime(time.strptime(a,"%a %b %d %H:%M:%S %Y")) print '---------' import calendar cal=calendar.month(2017,7) print cal
def train(self, n_iterations): print("Training network...") # Recover losses from previous run if (self.option == 'continue'): with open("{0}_{1}_loss.json".format(self.root, self.depth), 'r') as f: losses = json.load(f) else: losses = [] self.checkpointer = ModelCheckpoint(filepath="{0}_{1}_weights.hdf5".format(self.root, self.depth), verbose=1, save_best_only=True) self.history = LossHistory(self.root, self.depth, losses, {'name': '{0}_{1}'.format(self.root, self.depth), 'init_t': time.asctime()}) self.reduce_lr = LearningRateScheduler(self.learning_rate) self.metrics = self.model.fit_generator(self.training_generator(), self.batchs_per_epoch_training, epochs=n_iterations, callbacks=[self.checkpointer, self.history, self.reduce_lr], validation_data=self.validation_generator(), validation_steps=self.batchs_per_epoch_validation) self.history.finalize()
def get_response(url,cookies='',headers='',params='', stream=False, mtt=20, wt=2, st=0.25): '???????????' max_try_times = mtt # ?????? wait_time = wt # ???????? sleep_time = st # ?????? #print('[%s][INFO] Start trying to connect ...' % time.asctime()[11:19]) for times in range(1,max_try_times+1): # print('[%s][INFO] The %s time try begin ...' % (time.asctime()[11:19], times)) try: response = requests.get(url, timeout=wait_time, cookies=cookies, headers=headers, params=params, stream=stream) # print('[%s][INFO] The %s time try success!' % (time.asctime()[11:19], times)) break except: #traceback.print_exc() if times < max_try_times: # print('[%s][WARN] The %s time try failed!' % (time.asctime()[11:19], times)) time.sleep(sleep_time) continue else: print('[%s][ERROR] The last try failed at last , exit pro ...' % time.asctime()[11:19]) traceback.print_exc() exit() # print('[%s][INFO] Successfully get the response!' % time.asctime()[11:19]) response.encoding = 'u8' return response
def retrieve(imgurl,imgpath): '??????????????????????' max_try_times = 5 # ?????? wait_time = 15 # ???????? sleep_time = 3 # ?????? import socket socket.setdefaulttimeout(wait_time) #print('[%s][INFO] Start trying to connect ...' % time.asctime()[11:19]) for times in range(1,max_try_times+1): # print('[%s][INFO] The %s time try begin ...' % (time.asctime()[11:19], times)) try: urllib.urlretrieve(imgurl,imgpath) # print('[%s][INFO] The %s time try success!' % (time.asctime()[11:19], times)) return True except: if times < max_try_times: # print('[%s][WARN] The %s time try failed!' % (time.asctime()[11:19], times)) time.sleep(sleep_time) continue else: print('[%s][ERROR] The last try failed at last , pass ...' % time.asctime()[11:19]) break return False # print('[%s][INFO] Successfully get the response!' % time.asctime()[11:19])
def save_img2(imgurl, imgpath, headers='', params=''): '??????????????????????' max_try_times = 10 # ?????? sleep_time = 0 # ?????? print('[%s][INFO] Start trying to download ...' % time.asctime()[11:19]) for times in range(1,max_try_times+1): # print('[%s][INFO] The %s time try begin ...' % (time.asctime()[11:19], times)) try: # __save_img2(imgurl, dirpath, imgname, headers, params) response = get_response(imgurl, headers=headers,params=params, stream=False, mtt=10, wt=15, st=2) img = Image.open(StringIO(response.content)) img.save(imgpath) img.close() # print('[%s][INFO] The %s time try success!' % (time.asctime()[11:19], times)) return True except: traceback.print_exc() if times < max_try_times: print('[%s][WARN][IMG] The %s time try failed!' % (time.asctime()[11:19], times)) time.sleep(sleep_time) continue else: print('[%s][ERROR] The last try failed at last , pass ...' % time.asctime()[11:19]) break return False
def lastupdate(): ''' Return the date of the last rpm package update/installation. CLI Example: .. code-block:: bash salt '*' rpmpck.lastupdate ''' installtime = lambda rpm_date: time.strptime(rpm_date, "%c") cmd = ['rpm', '-qa', '--queryformat', r'%{INSTALLTID:date}\n'] out = __salt__['cmd.run'](cmd, output_loglevel='trace', python_shell=False).splitlines() last = max(installtime(rpm_date) for rpm_date in out) return time.asctime(last)
def buildtime(): ''' Return the build date and time. CLI Example: .. code-block:: bash salt '*' rpmpck.buildtime ''' installtime = lambda rpm_date: time.strptime(rpm_date, "%c") cmd = ['rpm', '-qa', '--queryformat', r'%{INSTALLTID:date}\n'] out = __salt__['cmd.run'](cmd, output_loglevel='trace', python_shell=False).splitlines() first = min(installtime(rpm_date) for rpm_date in out) return time.asctime(first)
def lastupdate(): ''' Return the date of the last rpm package update/installation. CLI Example: .. code-block:: bash salt '*' rpmlibpkg.lastupdate ''' installdate = lambda h: h.sprintf("%{INSTALLTID:date}") installptime = lambda h: time.strptime(installdate(h), "%c") ts = rpm.TransactionSet() mi = ts.dbMatch() last = max(installptime(h) for h in mi) return time.asctime(last)
def buildtime(): ''' Return the build date and time. CLI Example: .. code-block:: bash salt '*' rpmlibpkg.buildtime ''' installdate = lambda h: h.sprintf("%{INSTALLTID:date}") installptime = lambda h: time.strptime(installdate(h), "%c") ts = rpm.TransactionSet() mi = ts.dbMatch() first = min(installptime(h) for h in mi) return time.asctime(first)
def error_log(self, error, station_name=None): if os.path.isfile('error_log.txt'): file = open('error_log.txt', 'a') else: file = open('error_log.txt', 'w') if not station_name: station_name = 'NO-STATION-NAME' # Create time stamp time_stamp = time.asctime(time.localtime(time.time())) # Form error message 'Station == Time == Error' error_msg = str(station_name) + '\t' + time_stamp + '\t' + str(error) + '\n' file.write(error_msg) file.close()
def add_acl(ip): ''' add an entry to the ACL. look at success or not of the commands :param ip: :return: ''' UPDATE_ACL_COMMANDS = """ ip access-list extended %s no deny ip any any remark %s permit ip any host %s deny ip any any """ localtime = time.asctime(time.localtime(time.time())) remark = "Added %s @%s" % (ip, localtime) responses = configure(UPDATE_ACL_COMMANDS % (ACLNAME, remark, ip)) success = reduce(lambda x, y: x and y, [r.success for r in responses]) status = "Success" if success else "Fail" log("adding IP: %s to ACL: status: %s" % (ip, status), 5)
def process(re_table, apply_change): exec_commands = [] re_table.Reset() output = cli("show int | inc Last in|line pro") #print (output) localtime = time.asctime(time.localtime(time.time())) description = "description PCIShutdown: %s" % localtime fsm_results = re_table.ParseText(output) for interface in fsm_results: # skip non Ethernet if is_idle(interface[2], interface[3]) and ("Ethernet" in interface[0]): if interface[1] != "administratively": if apply_change: exec_commands.extend(['interface ' + interface[0], description, 'shutdown']) else: print("(testmode) would have SHUT %s (%s,%s)" % (interface[0], interface[2], interface[3])) print('Commands to run:') print(exec_commands) apply_commands(exec_commands)
def compute(i, n, task=None): import time yield task.sleep(n) raise StopIteration((i, task.location, time.asctime())) # result of 'compute' is current time # client (local) task submits computations
def compute(n, task=None): import time yield task.sleep(n) raise StopIteration(time.asctime()) # result of 'compute' is current time
def _worker_thread_loop(self): while self.go: try: try: (x, regmod_path) = self.queue.get(timeout=0.5) except Queue.Empty: continue # TODO -- could comment this out if you want CSV data to feed into something print "--> Attempting for %s" % regmod_path # Go Grab it if we think we have something! sensor_id = x.env.endpoint.SensorId hostname = x.env.endpoint.SensorHostName # TODO -- this could use some concurrency and work queues because we could wait a while for # each of these to get established and retrieve the value # Establish our CBLR session if necessary! lrh = self._create_lr_session_if_necessary(sensor_id) data = lrh.get_registry_value(regmod_path) print "%s,%s,%d,%s,%s,%s" % ( time.asctime(), hostname, sensor_id, x.header.process_path, regmod_path, data.get('value_data', "") if data else "<UNKNOWN>") # TODO -- could *do something* here, like if it is for autoruns keys then go check the signature status # of the binary at the path pointed to, and see who wrote it out, etc except: traceback.print_exc()
def generate_cookie(name, securekey): #print (">> generate cookie for %s" % name) content = { 'name':name, 'login-time': time.asctime() } text = json.dumps(content) part1 = base64.b64encode(text.encode('ascii')) part2 = hashlib.md5( (text+securekey).encode('ascii') ).hexdigest() # part1 is binary(ascii) and part2 is str(utf-8) cookie = str(part1, encoding='utf-8') +"."+ part2 #print ("cookie : %s" % cookie) return cookie
def print_io(inputs, outputs): if inputs: print(time.asctime(), "Inputs:", inputs) if outputs: print(time.asctime(), "Outputs:", outputs) if not inputs and not outputs: print(time.asctime(), "No inputs/outputs?") #--------------------------- graph traversal stuff ---------------------------
def set_from(self, from_, time_=None): """Set "From " line, formatting and appending time_ if specified.""" if time_ is not None: if time_ is True: time_ = time.gmtime() from_ += ' ' + time.asctime(time_) self._from = from_
def timestampToString(value): return asctime(time.gmtime(max(0, value + epoch_diff)))
def add(self, eventclass="note", message="Your message here"): "Compose a log entry from the elements passed to add() and append it to the list of events." import time event = self.datetime() + ' ' + eventclass + ' ' + message if ( self.debug == "on" ): print 'DEBUG: Adding', event, 'to log', self.name self.modified = time.asctime(time.localtime(time.time())) self.events.append(event) return
def datetime(self): "Generate the date/time stamp used in our log entries" import time datestamp = time.asctime(time.localtime(time.time())) return datestamp