我们从Python开源项目中,提取了以下18个代码示例,用于说明如何使用threading.thread()。
def __start_monitoring(self, stdout, stderr=None): """Start monitoring threads. **used internally**""" program = self.program name = "thread-{:x}".format(program.pid) # create monitoring threads + coroutines if stderr: res = process.monitorPipe(self.taskQueue, (stdout,program.stdout),(stderr,program.stderr), name=name) else: res = process.monitorPipe(self.taskQueue, (stdout,program.stdout), name=name) res = map(None, res) # attach a method for injecting data into a monitor for t,q in res: t.send = q.send threads,senders = zip(*res) # update threads for destruction later self.__threads.update(threads) # set things off for t in threads: t.start()
def monitor(send, pipe, blocksize=1, daemon=True, name=None): """Spawn a thread that reads `blocksize` bytes from `pipe` and dispatches it to `send` For every single byte, `send` is called. The thread is named according to the `name` parameter. Returns the monitoring threading.thread instance """ def shuffle(send, pipe): while not pipe.closed: data = pipe.read(blocksize) if len(data) == 0: # pipe.read syscall was interrupted. so since we can't really # determine why (cause...y'know..python), stop dancing so # the parent will actually be able to terminate us break map(send,data) return if name: monitorThread = threading.Thread(target=shuffle, name=name, args=(send,pipe)) else: monitorThread = threading.Thread(target=shuffle, args=(send,pipe)) monitorThread.daemon = daemon return monitorThread
def validate(self, proxy_scanner, expected_num=20, queue_timeout=3, val_timeout=5): """Target function of validation threads Args: proxy_scanner: A ProxyScanner object. expected_num: Max number of valid proxies to be scanned. queue_timeout: Timeout for getting a proxy from the queue. val_timeout: An integer passed to `is_valid` as argument `timeout`. """ while self.proxy_num() < expected_num: try: candidate_proxy = proxy_scanner.proxy_queue.get( timeout=queue_timeout) except queue.Empty: if proxy_scanner.is_scanning(): continue else: break addr = candidate_proxy['addr'] protocol = candidate_proxy['protocol'] ret = self.is_valid(addr, protocol, val_timeout) if self.proxy_num() >= expected_num: self.logger.info('Enough valid proxies, thread {} exit.' .format(threading.current_thread().name)) break if ret['valid']: self.add_proxy(Proxy(addr, protocol)) self.logger.info('{} ok, {:.2f}s'.format(addr, ret[ 'response_time'])) else: self.logger.info('{} invalid, {}'.format(addr, ret['msg']))
def is_scanning(self): """Return whether at least one scanning thread is alive""" for t in self.scan_threads: if t.is_alive(): return True return False
def scan(self): """Start a thread for each registered scan function to scan proxy lists""" self.logger.info('{0} registered scan functions, starting {0} threads ' 'to scan candidate proxy lists...' .format(len(self.scan_funcs))) for i in range(len(self.scan_funcs)): t = threading.Thread( name=self.scan_funcs[i].__name__, target=self.scan_funcs[i], kwargs=self.scan_kwargs[i]) t.daemon = True self.scan_threads.append(t) t.start()
def onkeypress(self,evt): if evt.key==' ': # spacebar pressed if self.text_h.get_gid()=='notrec': # start recording data... # change colour of dot self.scat.set_facecolors('red') plt.draw() # set gid to recording to flag recording state self.text_h.set_gid('rec') if self.acq_info['acq_time'] != 'inf': # timed acquisition - start timer self.acq_timer.start() self.text_h.set_text('Timed acquisition') else: # manual acq # change instructions self.text_h.set_text(self.text_stop) # set thread to store data in queue lock.acquire() thd.storeflag = True lock.release() elif self.text_h.get_gid()=='rec': # stop recording if self.acq_info['acq_time'] != 'inf': # timed acq - do nothing pass else: # recording data, manual acq lock.acquire() thd.storeflag = False thd.runflag = False lock.release() plt.close() else: print('error in onkeypress - unrecognised text_h gid') # callback function for timer
def t_event(self): # stop thread queuing data and stop it running lock.acquire() thd.storeflag = False thd.runflag = False lock.release() self.acq_timer.remove_callback(self.t_event) plt.close() # ~~~~~~~~~~~~~~~ # MAIN ROUTINE # ~~~~~~~~~~~~~~~ # to suppress the annoying warning
def write(self, data): """Write `data` directly to program's stdin""" if self.running and not self.program.stdin.closed: if self.updater and self.updater.is_alive(): return self.program.stdin.write(data) raise IOError("Unable to write to stdin for process {:d}. Updater thread has prematurely terminated.".format(self.id)) raise IOError("Unable to write to stdin for process. {:s}.".format(self.__format_process_state()))
def __stop_monitoring(self): """Cleanup monitoring threads""" P = self.program if P.poll() is None: raise RuntimeError("Unable to stop monitoring while process {!r} is still running.".format(P)) # stop the update thread self.eventWorking.clear() # forcefully close pipes that still open, this should terminate the monitor threads # also, this fixes a resource leak since python doesn't do this on subprocess death for p in (P.stdin,P.stdout,P.stderr): while p and not p.closed: try: p.close() except: pass continue # join all monitoring threads map(operator.methodcaller('join'), self.threads) # now spin until none of them are alive while len(self.threads) > 0: for th in self.threads[:]: if not th.is_alive(): self.__threads.discard(th) del(th) continue # join the updater thread, and then remove it self.taskQueue.put(None) self.updater.join() assert not self.updater.is_alive() self.__updater = None return
def __repr__(self): cls = self.__class__ state = 'paused' if self.ev_unpaused.is_set(): state = 'running' if self.ev_terminating.is_set(): state = 'terminated' if not self.thread.is_alive(): state = 'dead' res = tuple(self.state) return "<class '{:s}'> {:s} Queue:{:d} Results:{:d}".format('.'.join(('internal',__name__,cls.__name__)), state, len(res), self.result.unfinished_tasks)
def __start(self): cls = self.__class__ logging.debug("{:s}.start : Starting execution queue thread. : {!r}".format('.'.join(('internal',__name__,cls.__name__)), self.thread)) self.ev_terminating.clear(), self.ev_unpaused.clear() self.thread.daemon = True return self.thread.start()
def __stop(self): cls = self.__class__ logging.debug("{:s}.stop : Terminating execution queue thread. : {!r}".format('.'.join(('internal',__name__,cls.__name__)), self.thread)) if not self.thread.is_alive(): cls = self.__class__ logging.warn("{:s}.stop : Execution queue has already been terminated. : {!r}".format('.'.join(('internal',__name__,cls.__name__)), self)) return self.ev_unpaused.set(), self.ev_terminating.set() self.queue.acquire() self.queue.notify_all() self.queue.release() return self.thread.join()
def start(self): '''Start to dispatch callables in the execution queue.''' cls = self.__class__ if not self.thread.is_alive(): logging.fatal("{:s}.start : Unable to resume an already terminated execution queue. : {!r}".format('.'.join(('internal',__name__,cls.__name__)), self)) return False logging.info("{:s}.start : Resuming execution queue. :{!r}".format('.'.join(('internal',__name__,cls.__name__)), self.thread)) res, _ = self.ev_unpaused.is_set(), self.ev_unpaused.set() self.queue.acquire() self.queue.notify_all() self.queue.release() return not res
def stop(self): '''Pause the execution queue.''' cls = self.__class__ if not self.thread.is_alive(): logging.fatal("{:s}.stop : Unable to pause an already terminated execution queue. : {!r}".format('.'.join(('internal',__name__,cls.__name__)), self)) return False logging.info("{:s}.stop : Pausing execution queue. : {!r}".format('.'.join(('internal',__name__,cls.__name__)), self.thread)) res, _ = self.ev_unpaused.is_set(), self.ev_unpaused.clear() self.queue.acquire() self.queue.notify_all() self.queue.release() return res
def __run__(self): cls = self.__class__ consumer = self.__consume(self.ev_terminating, self.queue, self.state) executor = self.__dispatch(self.lock); next(executor) logging.debug("{:s}.running : Execution queue is now running. : {!r}".format('.'.join(('internal',__name__,cls.__name__)), self.thread)) while not self.ev_terminating.is_set(): # check if we're allowed to execute if not self.ev_unpaused.is_set(): self.ev_unpaused.wait() # pull a callable out of the queue logging.debug("{:s}.running : Waiting for an item.. : {!r}".format('.'.join(('internal',__name__,cls.__name__)), self.thread)) self.queue.acquire() item = next(consumer) self.queue.release() if not self.ev_unpaused.is_set(): self.ev_unpaused.wait() # check if we're terminating if self.ev_terminating.is_set(): break # now we can execute it logging.debug("{:s}.running : Executing {!r} asynchronously. : {!r}".format('.'.join(('internal',__name__,cls.__name__)), item, self.thread)) res, err = executor.send(item) # and stash our result logging.debug("{:s}.running : Received result {!r} from {!r}. : {!r}".format('.'.join(('internal',__name__,cls.__name__)), (res,err), item, self.thread)) self.result.put((item,res,err)) return # FIXME: figure out how to match against a bounds
def stop(self): """ Stop a looping thread """ self._stop.set()
def kill(self): """Kill the main association thread loop.""" self._kill = True self.is_established = False while not self.dul.stop_dul(): time.sleep(0.001) self.ae.cleanup_associations()
def __start_updater(self, daemon=True, timeout=0): """Start the updater thread. **used internally**""" import Queue def task_exec(emit, data): if hasattr(emit,'send'): res = emit.send(data) res and P.write(res) else: emit(data) def task_get_timeout(P, timeout): try: emit,data = P.taskQueue.get(block=True, timeout=timeout) except Queue.Empty: _,_,tb = sys.exc_info() P.exceptionQueue.put(StopIteration,StopIteration(),tb) return () return emit,data def task_get_notimeout(P, timeout): return P.taskQueue.get(block=True) task_get = task_get_timeout if timeout > 0 else task_get_notimeout def update(P, timeout): P.eventWorking.wait() while P.eventWorking.is_set(): res = task_get(P, timeout) if not res: continue emit,data = res try: task_exec(emit,data) except StopIteration: P.eventWorking.clear() except: P.exceptionQueue.put(sys.exc_info()) finally: P.taskQueue.task_done() continue return self.__updater = updater = threading.Thread(target=update, name="thread-%x.update"% self.id, args=(self,timeout)) updater.daemon = daemon updater.start() return updater