我们从Python开源项目中,提取了以下12个代码示例,用于说明如何使用datetime.time.sleep()。
def restart(bot, update): if not from_admin(update): return import os import time import sys update.message.reply_text('Restarting {}…'.format(bot.first_name.title())) time.sleep(0.2) os.execl(sys.executable, sys.executable, *sys.argv)
def test_today(self): import time # We claim that today() is like fromtimestamp(time.time()), so # prove it. for dummy in range(3): today = self.theclass.today() ts = time.time() todayagain = self.theclass.fromtimestamp(ts) if today == todayagain: break # There are several legit reasons that could fail: # 1. It recently became midnight, between the today() and the # time() calls. # 2. The platform time() has such fine resolution that we'll # never get the same value twice. # 3. The platform time() has poor resolution, and we just # happened to call today() right before a resolution quantum # boundary. # 4. The system clock got fiddled between calls. # In any case, wait a little while and try again. time.sleep(0.1) # It worked or it didn't. If it didn't, assume it's reason #2, and # let the test pass if they're within half a second of each other. self.assertTrue(today == todayagain or abs(todayagain - today) < timedelta(seconds=0.5))
def test_today(self): import time # We claim that today() is like fromtimestamp(time.time()), so # prove it. for dummy in range(3): today = self.theclass.today() ts = time.time() todayagain = self.theclass.fromtimestamp(ts) if today == todayagain: break # There are several legit reasons that could fail: # 1. It recently became midnight, between the today() and the # time() calls. # 2. The platform time() has such fine resolution that we'll # never get the same value twice. # 3. The platform time() has poor resolution, and we just # happened to call today() right before a resolution quantum # boundary. # 4. The system clock got fiddled between calls. # In any case, wait a little while and try again. time.sleep(0.1) # It worked or it didn't. If it didn't, assume it's reason #2, and # let the test pass if they're within half a second of each other. if today != todayagain: self.assertAlmostEqual(todayagain, today, delta=timedelta(seconds=0.5))
def __lsdir(self,path):# if DEBUG: print "ls dir" ss = path.split('/')[1] subdir = '/'+'/'.join(path.split('/')[2:]) dirents = ['.', '..'] #if path == "/" and (self.root.strip('/') == self.BDIRS['loc'].strip('/') or self.root == self.BDIRS['ext'].strip('/')): if path == "/" and (self.root in self.BDIRS.values()): self.root = self.BDIRS['loc'] dirents.extend(self.snapshots.keys()) elif ss in self.snapshots or path == "./": dirents.extend(os.listdir(self.__realpath(path))) else: self.root = self.BDIRS['loc'] dirents.extend(os.listdir(path)) return dirents # def mythread(self): # # """ # The beauty of the FUSE python implementation is that with the python interp # running in foreground, you can have threads # """ # print "mythread: started" # while 1: # time.sleep(120) # print "mythread: ticking"
def main(): results = [] log_file = 'database.log' # with open(log_file) as f: # #Spawn pool of workers to execute http queries # pool = Pool() # results = pool.map_async(run_log, f,1) # pool.close() # while not results.ready(): # remaining = results._number_left # print "Waiting for", remaining, "tasks to complete..." # sys.stdout.flush() # time.sleep(10) with open(log_file) as in_, tqdm(total=40000) as pbar: count = 0. last_timestamp = datetime.utcnow() for l_ in in_: count += 1 res, last_timestamp = run_log(l_, last_timestamp) if len(results) > 40000: break if count == 19: count = 0 pbar.update(19) sys.stdout.flush() if res != -1.: results.append(res) with open(log_file + '-test2', 'a') as out: for entry in results: # for entry in results.get(): if entry is not None: out.write(str(entry))
def run_log(query_line, last_timestamp): # open queries and regex for links url_ = re.findall('"GET (.*?) HTTP', query_line) last_timestamp_new = datetime.utcnow() if len(url_) == 1: request_url = url_[0] query_times = [] resp = '' result_size = 0 try: utcnow = datetime.utcnow() midnight_utc = datetime.combine(utcnow.date(), time(0)) delta_last_query = (datetime.utcnow() - last_timestamp).total_seconds() for _ in range(11): response, exec_time = run_http_request(request_url) # if exec_time == -1.: # break query_times.append(exec_time) # timesleep.sleep(random.random()*0.1) last_timestamp_new = datetime.utcnow() timestamp_query = ((last_timestamp_new - midnight_utc).total_seconds()) respJson = response.json() result_size = get_result_size(respJson) except: exec_time = -1 if exec_time != -1 and len(query_times) == 11: #and result_size > 0: cold_exec_time = query_times[0] warm_times = query_times[1:] warm_mean = np.mean(warm_times, dtype=np.float64) time_vec = [timestamp_query, delta_last_query] query_clean = cleanup_query(request_url) res = str(query_clean + '\t'+ str(time_vec) + '\t' + str(warm_mean) + '\t' + str(cold_exec_time) + '\t' + str(result_size) + '\n') return (res, last_timestamp_new) else: return (-1., last_timestamp_new) else: return (-1., last_timestamp_new)