Python _collections 模块,deque() 实例源码

我们从Python开源项目中,提取了以下12个代码示例,用于说明如何使用_collections.deque()

项目:Fanicontrol    作者:tsonntig    | 项目源码 | 文件源码
def __init__(self, name, sensors_list, pwm_path, minPWM, startPWM,
                 lock, pwm_enable, algo):
        self.sensors_list = sensors_list
        # this is outside limits so we force the fan to write a new value
        self.lastvalue = -100
        self.pwm_enable = pwm_enable
        self.pwm_path = pwm_path
        self.name = name
        self.minPWM = int(minPWM)
        self.startPWM = int(startPWM)
        self.sleep = False
        self.lock = int(lock)
        self.alock = 0
        self.pwm_history = deque('9876543210', 10)
        self.logger = logging.getLogger("fanicontrol")
        self.algo = algo
项目:crawl_wechat    作者:songluyi    | 项目源码 | 文件源码
def set_config(self):
        data=deque(open('New_Session.txt','r'),18)
        header={}
        final_request={}
        form_request=[]
        for i in data:
            print(i)
            if ':' in str(i):
                special_data=str(i).split(':')[1].replace('\n','')
                header[str(i).split(':')[0]]=special_data
            else:
                form_request.append(i)
        if 'CONNECT mp.weixin.qq.com' in header and 'Request header' in header and 'Request body' in header :
            list(map(header.pop,['CONNECT mp.weixin.qq.com','Request header','Request body','Request url','Proxy-Connection']))
            #??????????list map??????????
        split_request=form_request[0].replace('GET ','').split('&')
        for j in split_request[1:]:
            final_request[str(j).split('=')[0]]=str(j).split('=')[1]
        final_request[split_request[0].split('?')[1].split('=')[0]]=split_request[0].split('?')[1].split('=')[1]
        return [header,final_request]
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def __init__(self, lock=None):
        if lock is None:
            lock = RLock()
        self._lock = lock
        # Export the lock's acquire() and release() methods
        self.acquire = lock.acquire
        self.release = lock.release
        # If the lock defines _release_save() and/or _acquire_restore(),
        # these override the default implementations (which just call
        # release() and acquire() on the lock).  Ditto for _is_owned().
        try:
            self._release_save = lock._release_save
        except AttributeError:
            pass
        try:
            self._acquire_restore = lock._acquire_restore
        except AttributeError:
            pass
        try:
            self._is_owned = lock._is_owned
        except AttributeError:
            pass
        self._waiters = _deque()
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def notify(self, n=1):
        """Wake up one or more threads waiting on this condition, if any.

        If the calling thread has not acquired the lock when this method is
        called, a RuntimeError is raised.

        This method wakes up at most n of the threads waiting for the condition
        variable; it is a no-op if no threads are waiting.

        """
        if not self._is_owned():
            raise RuntimeError("cannot notify on un-acquired lock")
        all_waiters = self._waiters
        waiters_to_notify = _deque(_islice(all_waiters, n))
        if not waiters_to_notify:
            return
        for waiter in waiters_to_notify:
            waiter.release()
            try:
                all_waiters.remove(waiter)
            except ValueError:
                pass
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def __init__(self, lock=None):
        if lock is None:
            lock = RLock()
        self._lock = lock
        # Export the lock's acquire() and release() methods
        self.acquire = lock.acquire
        self.release = lock.release
        # If the lock defines _release_save() and/or _acquire_restore(),
        # these override the default implementations (which just call
        # release() and acquire() on the lock).  Ditto for _is_owned().
        try:
            self._release_save = lock._release_save
        except AttributeError:
            pass
        try:
            self._acquire_restore = lock._acquire_restore
        except AttributeError:
            pass
        try:
            self._is_owned = lock._is_owned
        except AttributeError:
            pass
        self._waiters = _deque()
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def notify(self, n=1):
        """Wake up one or more threads waiting on this condition, if any.

        If the calling thread has not acquired the lock when this method is
        called, a RuntimeError is raised.

        This method wakes up at most n of the threads waiting for the condition
        variable; it is a no-op if no threads are waiting.

        """
        if not self._is_owned():
            raise RuntimeError("cannot notify on un-acquired lock")
        all_waiters = self._waiters
        waiters_to_notify = _deque(_islice(all_waiters, n))
        if not waiters_to_notify:
            return
        for waiter in waiters_to_notify:
            waiter.release()
            try:
                all_waiters.remove(waiter)
            except ValueError:
                pass
项目:VMAttack    作者:anatolikalysch    | 项目源码 | 文件源码
def __init__(self, trace, title='Optimizations', **kwargs):
        # context should be a dictionary containing the backward traced result of each relevant register
        super(OptimizationViewer, self).__init__(title)
        self.orig_trace = trace
        self.trace = deepcopy(trace)
        self.undo_stack = deque([deepcopy(trace), deepcopy(trace), deepcopy(trace)], maxlen=3)
        self.opti_map = dict(zip(optimization_names, optimizations))
        self.order = []
        self.foldable_regs = []
        self.save = kwargs.get('save', None)
项目:VMAttack    作者:anatolikalysch    | 项目源码 | 文件源码
def __init__(self, trace, title='Optimizations (legacy)', **kwargs):
        # context should be a dictionary containing the backward traced result of each relevant register
        super(OptimizationViewer, self).__init__(title)
        self.orig_trace = trace
        self.trace = deepcopy(trace)
        self.undo_stack = deque([deepcopy(trace), deepcopy(trace), deepcopy(trace)], maxlen=3)
        self.opti_map = dict(zip(optimization_names, optimizations))
        self.order = []
        self.foldable_regs = []
        self.save = kwargs.get('save', None)
项目:VMAttack    作者:anatolikalysch    | 项目源码 | 文件源码
def __init__(self, clustered_trace, bb_func, ctx_reg_size, title='Clustering Analysis Result (legacy)', save_func=None):
        # context should be a dictionary containing the backward traced result of each relevant register
        super(ClusterViewer, self).__init__(title)
        self.orig_trace = clustered_trace
        self.trace = deepcopy(self.orig_trace)
        self.bb_func = bb_func
        self.ctx_reg_size = ctx_reg_size
        self.save = save_func
        self.undo_stack = deque([deepcopy(self.trace)], maxlen=3)
项目:VMAttack    作者:anatolikalysch    | 项目源码 | 文件源码
def Restore(self):
        self.undo_stack = deque([deepcopy(self.trace)], maxlen=3)
        self.trace = deepcopy(self.orig_trace)
        self.PopulateModel()
项目:VMAttack    作者:anatolikalysch    | 项目源码 | 文件源码
def __init__(self, clustered_trace, bb_func, ctx_reg_size, title='Clustering Analysis Result', save_func=None):
        # context should be a dictionary containing the backward traced result of each relevant register
        super(ClusterViewer, self).__init__(title)
        self.orig_trace = clustered_trace
        self.trace = deepcopy(self.orig_trace)
        self.bb_func = bb_func
        self.ctx_reg_size = ctx_reg_size
        self.save = save_func
        self.undo_stack = deque([deepcopy(self.trace)], maxlen=3)
项目:hwtLib    作者:Nic30    | 项目源码 | 文件源码
def test_startbit(self):
        u = self.u
        u.cntrl._ag.data.extend([(START, 0), (NOP, 0)])
        u.clk_cnt_initVal._ag.data.append(4)
        self.doSim(600 * Time.ns)

        self.assertEqual(u.i2c._ag.bits, deque([I2cAgent.START]))