我们从Python开源项目中,提取了以下22个代码示例,用于说明如何使用time.time.sleep()。
def recvall(the_socket, timeout=5): the_socket.setblocking(0) total_data = [] data = "" begin = time() while True: sleep(0.05) if total_data and time()-begin > timeout: break elif time()-begin > timeout*2: break try: data = the_socket.recv(1024) if data: total_data.append(data) begin = time() except Exception: pass return "".join(total_data)
def displayResult(self): fig = plt.figure(101) plt.subplot(221) plt.imshow(np.abs(self.reconstruction),origin='lower') plt.draw() plt.title('Reconstruction Magnitude') plt.subplot(222) plt.imshow(np.angle(self.reconstruction),origin='lower') plt.draw() plt.title('Reconstruction Phase') plt.subplot(223) plt.imshow(np.abs(self.aperture),origin='lower') plt.title("Aperture Magnitude") plt.draw() plt.subplot(224) plt.imshow(np.angle(self.aperture),origin='lower') plt.title("Aperture Phase") plt.draw() fig.canvas.draw() #fig.tight_layout() # display.display(fig) # display.clear_output(wait=True) # time.sleep(.00001)
def do_real_import(self, vsfile, filepath,mdXML,import_tags): """ Make the import call to vidispine, and wait for self._importer_timeout seconds for the job to complete. Raises a VSException representing the job error if the import job fails, or ImportStalled if the timeout occurs :param vsfile: VSFile object to import :param filepath: filepath of the VSFile :param mdXML: compiled metadata XML to import alongside the media :param import_tags: shape tags describing required transcodes :return: None """ import_job = vsfile.importToItem(mdXML, tags=import_tags, priority="LOW", jobMetadata={"gnm_app": "vsingester"}) job_start_time = time.time() close_sent = False while import_job.finished() is False: self.logger.info("\tJob status is %s" % import_job.status()) if time.time() - job_start_time > self._importer_timeout: self.logger.error("\tJob has taken more than {0} seconds to complete, concluding that it must be stalled.".format(self._importer_timeout)) import_job.abort() self.logger.error("\tSent abort signal to job") raise ImportStalled(filepath) if time.time() - job_start_time > self._close_file_timeout and not close_sent: vsfile.setState("CLOSED") sleep(5) import_job.update(noraise=False)
def read_temp(): global temp_f lines = read_temp_raw() while lines[0].strip()[-3:] != 'YES': #time.sleep(0.2) lines = read_temp_raw() equals_pos = lines[1].find('t=') if equals_pos != -1: temp_string = lines[1][equals_pos+2:] temp_c_raw = float(temp_string) / 1000.0 temp_f_raw = temp_c_raw * 9.0 / 5.0 + 32.0 temp_f = "{0:.0f}".format(temp_f_raw) #only whole numbers #_________________________________________________________________ #VARIABLES
def wait(i=10): """ Wraps 'time.sleep()' with logger output """ logger.debug('Waiting %d sec... (%.2fmin)', i, i / 60.0) sleep(i)
def auto_send_find_node(self): wait = 1.0 / self.max_node_qsize while True: try: node = self.nodes.popleft() self.send_find_node((node.ip, node.port), node.nid) except IndexError: pass try: sleep(wait) except KeyboardInterrupt: os._exit(0)
def fetch_torrent(session, ih, timeout): name = ih.upper() url = 'magnet:?xt=urn:btih:%s' % (name,) data = '' params = { 'save_path': '/tmp/downloads/', 'storage_mode': lt.storage_mode_t(2), 'paused': False, 'auto_managed': False, 'duplicate_is_error': True} try: handle = lt.add_magnet_uri(session, url, params) except: return None status = session.status() #print 'downloading metadata:', url handle.set_sequential_download(1) meta = None down_time = time.time() down_path = None for i in xrange(0, timeout): if handle.has_metadata(): info = handle.get_torrent_info() down_path = '/tmp/downloads/%s' % info.name() #print 'status', 'p', status.num_peers, 'g', status.dht_global_nodes, 'ts', status.dht_torrents, 'u', status.total_upload, 'd', status.total_download meta = info.metadata() break time.sleep(1) if down_path and os.path.exists(down_path): os.system('rm -rf "%s"' % down_path) session.remove_torrent(handle) return meta
def _setToSlaveThread(logger,settings, cache, master,url, queue, hostState): if(not master): return socket = zmq.Context.instance().socket(zmq.PUSH) import time while hostState["current"] is None: logger.debug("cannot send to slave, net info: "+ str(hostState["current"])) time.sleep(1) slaveAddress = "tcp://"+hostState["current"].slave.ip + ":"+ str(settings.getSlaveSetPort()) socket.connect(slaveAddress) oldAddress = "tcp://"+hostState["current"].slave.ip + ":"+ str(settings.getSlaveSetPort()) logger.debug("Finally I'm configured") while True: objToSend = queue.get() if(slaveAddress != None): sended = False while( not sended): try: slaveAddress = "tcp://"+hostState["current"].slave.ip + ":"+ str(settings.getSlaveSetPort()) if(slaveAddress != oldAddress): oldAddress = slaveAddress socket = zmq.Context.instance().socket(zmq.PUSH) socket.connect(slaveAddress) logger.debug("Change of slave:" + slaveAddress) socket.send(dumps(Command(SETCOMMAND, objToSend.key, objToSend.value))) if(settings.isVerbose()): logger.debug("sended current key to slave: "+str(objToSend.key) +" to " + str(slaveAddress)) sended = True except Exception as e: logger.warning("error in slave: " + str(e))
def send_post_report(self): """Send post report to a given URL.""" data_post = self.create_post_report() count = 1 time_retry = 60 message_config_command = self.__json_dict['GENERAL']['MESSAGE_CONFIG_COMMAND'] message_config_method = self.__json_dict['GENERAL']['MESSAGE_CONFIG_METHOD'] while count <= 5: request_to_brt = Communications.send_message( Communications(), data_post, message_config_command, message_config_method) self.__logger.info( 'Report sent status: ' + str(request_to_brt.status_code) + ' <===> ' + request_to_brt.reason) print 'Response from server:' attempt_notification = 'Attempt: ' + str(count) print attempt_notification self.__logger.info(attempt_notification) print (request_to_brt.status_code, request_to_brt.reason) self.__logger.info('Server response: ' + str(request_to_brt.status_code) + ' ' + str(request_to_brt.reason)) # this should make the script wait for 60s (1min), 120s (2min), 360s (6min), 1440s (24min), 7200s (2h) time_retry = time_retry * count count = count + 1 if request_to_brt.status_code == 200: self.__logger.info('Sent') break elif request_to_brt.status_code != 200 and count is not 5: attempt_failed_notification = 'The attempt to send report failed. Attempt number ' + \ str(count) + ' will be in: ' + str(time_retry/60) + ' minutes.' print attempt_failed_notification self.__logger.warning(attempt_failed_notification) time.sleep(time_retry) elif count == 5 and request_to_brt.status_code != 200: attempt_failed_notification = 'Last attempt to send report FAILED, please check connectivity to BRT' print attempt_failed_notification self.__logger.critical(attempt_failed_notification) exit(1)
def patch_time(): """Replace :func:`time.sleep` with :func:`gevent.sleep`.""" from gevent.hub import sleep import time patch_item(time, 'sleep', sleep)
def polling_status(self): self.status_obj = UpdateStatus(self.hyp_id, self.hostname, port=self.port, user=self.user) while self.stop is not True: self.status_obj.update_status_hyps_rethink() interval = 0.0 while interval < self.polling_interval: sleep(0.1) interval = interval + 0.1 if self.stop is True: break
def test_clone_of_event( self ): """ checks that all classes with references to Event implement a clone method with ``self`` and ``event`` as only parameters. """ pass # FIXME # TODO check also a clone of another clone # TODO {{{2 #This test can't work with sqlite, because sqlite not support multiusers. #It is recomended to use this in future # def test_visibility_in_thread(self): # "testing visibility public and private events in thread" # class TestThread(threading.Thread): # "thread with random delay" # def __init__(self, test, user_nr): # self.user_nr = user_nr # self.test = test # threading.Thread.__init__(self) # def run(self): # time.sleep(random.randint(0, 100)/100.0) # self.test.user_test_visibility(self.user_nr) # for user_nr in range(USERS_COUNT): # thread = TestThread(self, user_nr) # thread.start() # for second in range(20, 0, -1): # print "wait %d seconds \r" % second, # time.sleep(1) # TODO: test that a notification email is sent to all members of a group # when a new event is added to the group. See class Membership in # events/models.py
def save_table(self, code, date): TR_REQ_TIME_INTERVAL = 4 time.sleep(TR_REQ_TIME_INTERVAL) data_81 = self.wrapper.get_data_opt10081(code, date) time.sleep(TR_REQ_TIME_INTERVAL) data_86 = self.wrapper.get_data_opt10086(code, date) col_86 = ['???', '???', '??(??)', '???', '??', '??', '????', '???', '????', '???', '????', '????', '????', '?????', '?????', '?????', '?????'] data = pd.concat([data_81, data_86.loc[:, col_86]], axis=1) #con = sqlite3.connect("../data/stock.db") try: data = data.loc[data.index > int(self.kiwoom.start_date.strftime("%Y%m%d"))] #orig_data = pd.read_sql("SELECT * FROM '%s'" % code, con, index_col='??').sort_index() orig_data = pd.read_hdf("../data/hdf/%s.hdf" % code, 'day').sort_index() end_date = orig_data.index[-1] orig_data = orig_data.loc[orig_data.index < end_date] data = data.loc[data.index >= end_date] data = pd.concat([orig_data, data], axis=0) except (FileNotFoundError, IndexError) as e: print(e) pass finally: data.index.name = '??' if len(data) != 0: #data.to_sql(code, con, if_exists='replace') data.to_hdf('../data/hdf/%s.hdf'%code, 'day', mode='w')
def balance(self): counter = 0 while (counter < self.__retry_time) and (not self._get_balance()): sleep(self.__retry_interval) counter += 1 return self.__balance # @property
def stock_position(self): counter = 0 while (counter < self.__retry_time) and (not self._get_position()): sleep(self.__retry_interval) counter += 1 return self.__stock_position # @property
def cancel_list(self): counter = 0 while (counter < self.__retry_time) and (not self._get_cancel_list()): sleep(self.__retry_interval) counter += 1 return self.__cancel_list # @property
def entrust_list(self): counter = 0 while (counter < self.__retry_time) and (not self._get_today_entrust()): sleep(self.__retry_interval) counter += 1 return self.__entrust_list # @property
def trade_list(self): counter = 0 while (counter < self.__retry_time) and (not self._get_today_trade()): sleep(self.__retry_interval) counter += 1 return self.__trade_list # ?????
def _memoryMetricatorThread(logger, cache, settings, master, timing): if master: period = settings.getScalePeriod() setScaleDownLevel = settings.getSetScaleDownLevel() if settings.getSetScaleDownLevel() >0 else -float("inf") setScaleUpLevel = settings.getSetScaleUpLevel() if settings.getSetScaleUpLevel() >0 else float("inf") getScaleDownLevel = settings.getGetScaleDownLevel() if settings.getGetScaleDownLevel() >0 else -float("inf") getScaleUpLevel = settings.getGetScaleUpLevel() if settings.getGetScaleUpLevel() >0 else float("inf") logger.debug("Metricator alive, period: "+ str(period) +"s, getThrLevel: [" +str(getScaleDownLevel) +"," + str(getScaleUpLevel)+ "], setThrLevel: [" + str(setScaleDownLevel) + "," + str(setScaleUpLevel) + "]" ) # this channel is necessary to send scale up/down requests internal_channel = InternalChannel(addr='127.0.0.1', port=settings.getIntPort(), logger=logger) internal_channel.generate_internal_channel_client_side() from random import gauss sleep(60) while True: sleep(abs(gauss(period, period/10))) locked = timing["setters"][0].isTransferring() setMean = 1.0 - timing["setters"][0].calcMean() getMean = 0.0 for metr in timing["getters"]: getMean += 1.0 - metr.calcMean() getMean = getMean / settings.getGetterThreadNumber() logger.debug("Working time for setters: " + str(setMean) + ", getters (mean): " + str(getMean) ) # scale up needed if getMean >= getScaleUpLevel or setMean >= setScaleUpLevel and not locked: logger.debug("Requests for scale Up!") # call scale up service ListThread.notify_scale_up(internal_channel) # self.list_communication_thread.notify_scale_up() # scale down needed elif getMean <= getScaleDownLevel and setMean <= setScaleDownLevel and not locked: logger.debug("Requests for scale Down!") # call scale down service ListThread.notify_scale_down(internal_channel) # self.list_communication_thread.notify_scale_down()
def getsearchresult_url(ip, url): # ?????????????? try: html = gethtml(url) if not html: print '??????url:%s' % url return 'error' if html == 'error': # ?????? return 'error' root = etree.HTML(html) lines = root.xpath('//*[@id="b_results"]') # ???? if not lines: return 'error' else: lines = lines[0] urlandtitle = [] for l in lines: url = l.xpath('.//h2/a/@href') title = l.xpath('.//h2/a/text()') if url and title: url = url[0] parser = urlparse(url) netloc = parser.netloc if netloc == 'ip.chinaz.com': pass else: title = title[0] urlandtitle.append({'url': url, 'title': title}) else: pass urls_temp = {} urls_temp = urlandtitle for u in urls_temp: title = u['title'] parser = urlparse(u['url']) url = parser.scheme + '://' + parser.netloc + '/' if url not in check: end.append({'url': url, 'title': title}) check.append(url) next_page = lines.xpath('.//*[@class="sb_pagN"]/@href') # time.sleep(1) if len(next_page) > 0: url = 'https://www.bing.com'+next_page[0] return 0,url,end else: return 1,None,end except Exception, e: print e return 'error'