我们从Python开源项目中,提取了以下12个代码示例,用于说明如何使用_collections.deque()。
def __init__(self, name, sensors_list, pwm_path, minPWM, startPWM, lock, pwm_enable, algo): self.sensors_list = sensors_list # this is outside limits so we force the fan to write a new value self.lastvalue = -100 self.pwm_enable = pwm_enable self.pwm_path = pwm_path self.name = name self.minPWM = int(minPWM) self.startPWM = int(startPWM) self.sleep = False self.lock = int(lock) self.alock = 0 self.pwm_history = deque('9876543210', 10) self.logger = logging.getLogger("fanicontrol") self.algo = algo
def set_config(self): data=deque(open('New_Session.txt','r'),18) header={} final_request={} form_request=[] for i in data: print(i) if ':' in str(i): special_data=str(i).split(':')[1].replace('\n','') header[str(i).split(':')[0]]=special_data else: form_request.append(i) if 'CONNECT mp.weixin.qq.com' in header and 'Request header' in header and 'Request body' in header : list(map(header.pop,['CONNECT mp.weixin.qq.com','Request header','Request body','Request url','Proxy-Connection'])) #??????????list map?????????? split_request=form_request[0].replace('GET ','').split('&') for j in split_request[1:]: final_request[str(j).split('=')[0]]=str(j).split('=')[1] final_request[split_request[0].split('?')[1].split('=')[0]]=split_request[0].split('?')[1].split('=')[1] return [header,final_request]
def __init__(self, lock=None): if lock is None: lock = RLock() self._lock = lock # Export the lock's acquire() and release() methods self.acquire = lock.acquire self.release = lock.release # If the lock defines _release_save() and/or _acquire_restore(), # these override the default implementations (which just call # release() and acquire() on the lock). Ditto for _is_owned(). try: self._release_save = lock._release_save except AttributeError: pass try: self._acquire_restore = lock._acquire_restore except AttributeError: pass try: self._is_owned = lock._is_owned except AttributeError: pass self._waiters = _deque()
def notify(self, n=1): """Wake up one or more threads waiting on this condition, if any. If the calling thread has not acquired the lock when this method is called, a RuntimeError is raised. This method wakes up at most n of the threads waiting for the condition variable; it is a no-op if no threads are waiting. """ if not self._is_owned(): raise RuntimeError("cannot notify on un-acquired lock") all_waiters = self._waiters waiters_to_notify = _deque(_islice(all_waiters, n)) if not waiters_to_notify: return for waiter in waiters_to_notify: waiter.release() try: all_waiters.remove(waiter) except ValueError: pass
def __init__(self, trace, title='Optimizations', **kwargs): # context should be a dictionary containing the backward traced result of each relevant register super(OptimizationViewer, self).__init__(title) self.orig_trace = trace self.trace = deepcopy(trace) self.undo_stack = deque([deepcopy(trace), deepcopy(trace), deepcopy(trace)], maxlen=3) self.opti_map = dict(zip(optimization_names, optimizations)) self.order = [] self.foldable_regs = [] self.save = kwargs.get('save', None)
def __init__(self, trace, title='Optimizations (legacy)', **kwargs): # context should be a dictionary containing the backward traced result of each relevant register super(OptimizationViewer, self).__init__(title) self.orig_trace = trace self.trace = deepcopy(trace) self.undo_stack = deque([deepcopy(trace), deepcopy(trace), deepcopy(trace)], maxlen=3) self.opti_map = dict(zip(optimization_names, optimizations)) self.order = [] self.foldable_regs = [] self.save = kwargs.get('save', None)
def __init__(self, clustered_trace, bb_func, ctx_reg_size, title='Clustering Analysis Result (legacy)', save_func=None): # context should be a dictionary containing the backward traced result of each relevant register super(ClusterViewer, self).__init__(title) self.orig_trace = clustered_trace self.trace = deepcopy(self.orig_trace) self.bb_func = bb_func self.ctx_reg_size = ctx_reg_size self.save = save_func self.undo_stack = deque([deepcopy(self.trace)], maxlen=3)
def Restore(self): self.undo_stack = deque([deepcopy(self.trace)], maxlen=3) self.trace = deepcopy(self.orig_trace) self.PopulateModel()
def __init__(self, clustered_trace, bb_func, ctx_reg_size, title='Clustering Analysis Result', save_func=None): # context should be a dictionary containing the backward traced result of each relevant register super(ClusterViewer, self).__init__(title) self.orig_trace = clustered_trace self.trace = deepcopy(self.orig_trace) self.bb_func = bb_func self.ctx_reg_size = ctx_reg_size self.save = save_func self.undo_stack = deque([deepcopy(self.trace)], maxlen=3)
def test_startbit(self): u = self.u u.cntrl._ag.data.extend([(START, 0), (NOP, 0)]) u.clk_cnt_initVal._ag.data.append(4) self.doSim(600 * Time.ns) self.assertEqual(u.i2c._ag.bits, deque([I2cAgent.START]))