我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用threading.Condition()。
def __init__(self, porttype): """ Instantiates a new object responsible for writing data from the port into an array. It is important to notice that the porttype is a BULKIO__POA type and not a BULKIO type. The reason is because it is used to generate a Port class that will be returned when the getPort() is invoked. The returned class is the one acting as a server and therefore must be a Portable Object Adapter rather and a simple BULKIO object. Inputs: <porttype> The BULKIO__POA data type """ StreamMgr.__init__(self) self.port_type = porttype self.sri=bulkio_helpers.defaultSRI self.data = [] self.timestamps = [] self.gotEOS = False self.breakBlock = False self.port_lock = threading.Lock() self.port_cond = threading.Condition(self.port_lock)
def __init__(self, thread_sleep=0.1, parent=None, storeMessages = False): self.consumer_lock = threading.Lock() threading.Thread.__init__(self) self._terminateMe=False self._pauseMe=True self.state = threading.Condition() self.setDaemon(True) self.actionQueue = Queue.Queue() self.thread_sleep = thread_sleep self._messages = {} self._allMsg = [] self._connections = {} self.consumers = {} self.supplier_admin = self.SupplierAdmin_i(self) self._parent_comp = parent self._storeMessages = storeMessages self._storedMessages = [] self.startPort()
def __init__(self, maxsize=0): self.maxsize = maxsize self._init(maxsize) # mutex must be held whenever the queue is mutating. All methods # that acquire mutex must release it before returning. mutex # is shared between the three conditions, so acquiring and # releasing the conditions also acquires and releases mutex. self.mutex = _threading.Lock() # Notify not_empty whenever an item is added to the queue; a # thread waiting to get is notified then. self.not_empty = _threading.Condition(self.mutex) # Notify not_full whenever an item is removed from the queue; # a thread waiting to put is notified then. self.not_full = _threading.Condition(self.mutex) # Notify all_tasks_done whenever the number of unfinished tasks # drops to zero; thread waiting to join() is notified to resume self.all_tasks_done = _threading.Condition(self.mutex) self.unfinished_tasks = 0
def __init__(self, seeds=None, replica_set_name=None, pool_class=None, pool_options=None, monitor_class=None, condition_class=None, local_threshold_ms=LOCAL_THRESHOLD_MS, server_selection_timeout=SERVER_SELECTION_TIMEOUT): """Represent MongoClient's configuration. Take a list of (host, port) pairs and optional replica set name. """ self._seeds = seeds or [('localhost', 27017)] self._replica_set_name = replica_set_name self._pool_class = pool_class or pool.Pool self._pool_options = pool_options or PoolOptions() self._monitor_class = monitor_class or monitor.Monitor self._condition_class = condition_class or threading.Condition self._local_threshold_ms = local_threshold_ms self._server_selection_timeout = server_selection_timeout self._direct = (len(self._seeds) == 1 and not replica_set_name)
def __init__(self, host, host_distance, session): self.host = host self.host_distance = host_distance self._session = weakref.proxy(session) self._lock = Lock() # this is used in conjunction with the connection streams. Not using the connection lock because the connection can be replaced in the lifetime of the pool. self._stream_available_condition = Condition(self._lock) self._is_replacing = False if host_distance == HostDistance.IGNORED: log.debug("Not opening connection to ignored host %s", self.host) return elif host_distance == HostDistance.REMOTE and not session.cluster.connect_to_remote_hosts: log.debug("Not opening connection to remote host %s", self.host) return log.debug("Initializing connection for host %s", self.host) self._connection = session.cluster.connection_factory(host.address) self._keyspace = session.keyspace if self._keyspace: self._connection.set_keyspace_blocking(self._keyspace) log.debug("Finished initializing connection for host %s", self.host)
def __init__(self, host, host_distance, session): self.host = host self.host_distance = host_distance self._session = weakref.proxy(session) self._lock = RLock() self._conn_available_condition = Condition() log.debug("Initializing new connection pool for host %s", self.host) core_conns = session.cluster.get_core_connections_per_host(host_distance) self._connections = [session.cluster.connection_factory(host.address) for i in range(core_conns)] self._keyspace = session.keyspace if self._keyspace: for conn in self._connections: conn.set_keyspace_blocking(self._keyspace) self._trash = set() self._next_trash_allowed_at = time.time() self.open_count = core_conns log.debug("Finished initializing new connection pool for host %s", self.host)
def _wait_for_conn(self, timeout): start = time.time() remaining = timeout while remaining > 0: # wait on our condition for the possibility that a connection # is useable self._await_available_conn(remaining) # self.shutdown() may trigger the above Condition if self.is_shutdown: raise ConnectionException("Pool is shutdown") conns = self._connections if conns: least_busy = min(conns, key=lambda c: c.in_flight) with least_busy.lock: if least_busy.in_flight < least_busy.max_request_id: least_busy.in_flight += 1 return least_busy, least_busy.get_request_id() remaining = timeout - (time.time() - start) raise NoConnectionsAvailable()
def __init__(self, maxsize=0): try: import threading except ImportError: import dummy_threading as threading self._init(maxsize) # mutex must be held whenever the queue is mutating. All methods # that acquire mutex must release it before returning. mutex # is shared between the three conditions, so acquiring and # releasing the conditions also acquires and releases mutex. self.mutex = threading.Lock() # Notify not_empty whenever an item is added to the queue; a # thread waiting to get is notified then. self.not_empty = threading.Condition(self.mutex) # Notify not_full whenever an item is removed from the queue; # a thread waiting to put is notified then. self.not_full = threading.Condition(self.mutex) # Notify all_tasks_done whenever the number of unfinished tasks # drops to zero; thread waiting to join() is notified to resume self.all_tasks_done = threading.Condition(self.mutex) self.unfinished_tasks = 0
def __init__(self, ufc, node, iomap, cmd_pend_size = 2, timeout = 1): self.ports = { 'cmd_async': {'dir': 'in', 'type': 'topic', 'callback': self.cmd_async_cb}, 'cmd_sync': {'dir': 'in', 'type': 'service', 'callback': self.cmd_sync_cb}, 'report': {'dir': 'out', 'type': 'topic'}, 'status': {'dir': 'out', 'type': 'topic'}, # report lost, etc... 'service': {'dir': 'in', 'type': 'service', 'callback': self.service_cb}, 'packet_in': {'dir': 'in', 'type': 'topic', 'callback': self.packet_in_cb}, 'packet_out': {'dir': 'out', 'type': 'topic'}, } self.node = node self.logger = logging.getLogger('uf.' + node.replace('/', '.')) self.cmd_pend = {} self.cmd_pend_size = cmd_pend_size self.cmd_pend_c = threading.Condition() self.timeout = timeout self.cnt_lock = _thread.allocate_lock() self.cnt = 1 # no reply if cnt == 0, FIXME ufc.node_init(node, self.ports, iomap)
def __init__(self, port='/dev/ttyAMA0', baudrate=115200, verbose=True, connected=None): self.verbose = verbose self.version = None self.connectedCV = threading.Condition() self.responseQueue = queue.Queue() self.port = serial.Serial(port, baudrate=baudrate) self._transactionLock = threading.Lock() self.tx = self.transmitThreadClass(self.port, verbose=self.verbose) self.rx = self.receiverThreadClass(self.port, callback=self._receive, verbose=self.verbose) self.rx.start() self.tx.start() if connected is None: self.connected = True self.connected = self._testForExistingConnection() else: self.connected = connected if self.verbose: if self.connected: print("Already connected to gimbal, version %s" % self.version) else: print("Waiting for gimbal to power on")
def __init__(self, opt, mturk_agent_ids, is_test=False): """Create an MTurkManager using the given setup opts and a list of agent_ids that will participate in each conversation """ self.opt = opt self.server_url = None self.topic_arn = None self.port = 443 self.task_group_id = None self.run_id = None self.mturk_agent_ids = mturk_agent_ids self.task_files_to_copy = None self.is_sandbox = opt['is_sandbox'] self.worker_pool_change_condition = threading.Condition() self.onboard_function = None self.num_conversations = opt['num_conversations'] self.required_hits = math.ceil( self.num_conversations * len(self.mturk_agent_ids) * HIT_MULT ) self.socket_manager = None self.is_test = is_test self._init_logs() # Helpers and internal manager methods #
def main(): integers = [] condition = threading.Condition() # Our Publisher pub1 = Publisher(integers, condition) pub1.start() # Our Subscribers sub1 = Subscriber(integers, condition) sub2 = Subscriber(integers, condition) sub1.start() sub2.start() ## Joining our Threads pub1.join() consumer1.join() consumer2.join()
def __init__(self, host: str = 'localhost', port: int = 8125, prefix: str = '', maxudpsize: Any = _sentinel) -> None: ''' Create a new client. :param host: Host of the statsd server. :param port: Port of the statsd server, 8125 by default. :param prefix: String that will be prefixed to any stat description. :param maxudpsize: Ignored in this implementation. ''' self._prefix = prefix + '.' if prefix else '' self._server_addr = (socket.gethostbyname(host), port) self._queue = cystatsd.MetricCollector() self._queue_cv = threading.Condition() if maxudpsize is not self._sentinel: warnings.warn('Fastatsd client doesn\'t support maxudpsize') self._start_sender_thread()
def __init__ (self, address, lock = None, logger = None): self.address = address self.lock = lock self.logger = logger self._cv = threading.Condition () self.__sendlock = None self.__no_more_request = False self.set_event_time () self.handler = None self.auth = None self.proxy = False self.initialize_connection () self._closed = False self.backend = False self.ac_in_buffer = b'' self.incoming = [] self.producer_fifo = self.fifo_class () asyncore.dispatcher.__init__(self)
def __init__(self, outport, sequence, start_time=time.time(), allow_updates=False, channel=0, offset=0.0): self._outport = outport self._channel = channel self._offset = offset # Set of notes (pitches) that are currently on. self._open_notes = set() # Lock for serialization. self._lock = threading.RLock() # A control variable to signal when the sequence has been updated. self._update_cv = threading.Condition(self._lock) # The queue of mido.Message objects to send, sorted by ascending time. self._message_queue = deque() # An event that is set when `stop` has been called. self._stop_signal = threading.Event() # Initialize message queue. # We first have to allow "updates" to set the initial sequence. self._allow_updates = True self.update_sequence(sequence, start_time=start_time) # We now make whether we allow updates dependent on the argument. self._allow_updates = allow_updates super(MidiPlayer, self).__init__()
def __init__(self, handover_dict, handover_cond): """ Parameters: handover_dict (dict): Dictionary for handing over the notification header and message from this listener thread to the receiver thread. Must initially be an empty dictionary. handover_cond (threading.Condition): Condition object for handing over the notification from this listener thread to the receiver thread. Must initially be a new threading.Condition object. """ # Sync variables for thread-safe handover between listener thread and # receiver thread: self._handover_dict = handover_dict # keys: headers, message self._handover_cond = handover_cond # Wait timeout to honor keyboard interrupts after this time: self._wait_timeout = 10.0 # seconds
def __init__(self, method, url, headers, body, response, wfile): self.method = method self.wfile = wfile self.url = url self.headers = headers self.body = body self.response = response self.keep_running = True self.blocked = False self.lock = threading.Lock() self.waiter = threading.Condition(self.lock) self.data_list = {} # begin => payload self.data_size = 0 self.req_begin = 0 self.req_end = 0 self.wait_begin = 0
def __init__(self, queue_capacity, requeue_capacity=None, queue_factory=Queue.Queue, get_time=time.time): """Initialize a ReQueue instance. Args: queue_capacity: The number of items that can be put in the ReQueue. requeue_capacity: The numer of items that can be reput in the ReQueue. queue_factory: Used for dependency injection. get_time: Used for dependency injection. """ if requeue_capacity is None: requeue_capacity = queue_capacity self.get_time = get_time self.queue = queue_factory(queue_capacity) self.requeue = queue_factory(requeue_capacity) self.lock = threading.Lock() self.put_cond = threading.Condition(self.lock) self.get_cond = threading.Condition(self.lock)
def sync_fetch(self, task): '''Synchronization fetch, usually used in xmlrpc thread''' if not self._running: return self.ioloop.run_sync(functools.partial(self.async_fetch, task, lambda t, _, r: True)) wait_result = threading.Condition() _result = {} def callback(type, task, result): wait_result.acquire() _result['type'] = type _result['task'] = task _result['result'] = result wait_result.notify() wait_result.release() wait_result.acquire() self.ioloop.add_callback(self.fetch, task, callback) while 'result' not in _result: wait_result.wait() wait_result.release() return _result['result']
def setUp(self): super(KazooElectionTests, self).setUp() self.path = "/" + uuid.uuid4().hex self.condition = threading.Condition() # election contenders set these when elected. The exit event is set by # the test to make the leader exit. self.leader_id = None self.exit_event = None # tests set this before the event to make the leader raise an error self.raise_exception = False # set by a worker thread when an unexpected error is hit. # better way to do this? self.thread_exc_info = None
def __init__(self, sr=22050, backend='sounddevice'): """ :param int sr: samplerate used - all sounds added to the sampler will automatically be resampled if needed (- his can be a CPU consumming task, try to use sound with all identical sampling rate if possible. :param str backend: backend used for playing sound. Can be either 'sounddevice' or 'dummy'. """ self.sr = sr self.sounds = [] self.chunks = Queue(1) self.chunk_available = Condition() if backend == 'dummy': from .dummy_stream import DummyStream self.BackendStream = DummyStream elif backend == 'sounddevice': from sounddevice import OutputStream self.BackendStream = OutputStream else: raise ValueError("Backend can either be 'sounddevice' or 'dummy'") # TODO: use a process instead? self.play_thread = Thread(target=self.run) self.play_thread.daemon = True self.play_thread.start()
def __init__(self, *, log_level='ERROR'): self.log_level = log_level self.udp_sock = None self._search_lock = threading.RLock() self.search_results = {} # map name to (time, address) self.unanswered_searches = {} # map search id (cid) to name self.listeners = weakref.WeakSet() self.broadcaster = ca.Broadcaster(our_role=ca.CLIENT) self.broadcaster.log.setLevel(self.log_level) self.command_bundle_queue = queue.Queue() self.command_cond = threading.Condition() self.selector = SelectorThread() self.command_thread = threading.Thread(target=self.command_loop, daemon=True) self.command_thread.start()
def __init__(self, marathon, verify_interval, cccls): """Class init. Starts a thread that waits for Marathon events, then configures BIG-IP based on the Marathon state """ self.__marathon = marathon # appId -> MarathonApp self.__apps = dict() self.__cccls = cccls self.__verify_interval = verify_interval self.__condition = threading.Condition() self.__thread = threading.Thread(target=self.do_reset) self.__pending_reset = False self.__thread.daemon = True self.__thread.start() self.__timer = None self._backoff_timer = 1 self._max_backoff_time = 128 # Fetch the base data self.reset_from_tasks()
def __init__(self): self.help_msg = \ u"H: ????\n" \ u"L: ???????\n" \ u"M: ????\n" \ u"N: ???\n"\ u"U: ????\n"\ u"R: ????\n"\ u"S: ????\n"\ u"T: ????\n"\ u"G: ????\n"\ u"E: ??\n" self.con = threading.Condition() self.myNetease = MyNetease() self.playlist = self.myNetease.get_top_songlist() #??????? self.mp3 = None t = threading.Thread(target=self.play) t.start()
def test_thread_safe(self): thread_count = 4 repeats = 1000 latch = [threading.Condition(), thread_count] def thread_main(latch): for _ in range(repeats): self.assertEqual("False", self.conn.test_service.bool_to_string(False)) self.assertEqual(12345, self.conn.test_service.string_to_int32("12345")) with latch[0]: latch[1] -= 1 if latch[1] <= 0: latch[0].notifyAll() for i in range(thread_count): t = threading.Thread(target=thread_main, args=(latch,)) t.daemon = True t.start() with latch[0]: while latch[1] > 0: latch[0].wait(10) self.assertEqual(0, latch[1]);
def __init__(self): self.message = None self.__event = threading.Event() self.__cond = threading.Condition() self.__mail_queue = Queue(100)
def __init__(self, queue, counter): self.queue = queue self.counter = counter self.checkers = {} self.queues = {} self.events = {} self.line = None self.__event = threading.Event() self.__cond = threading.Condition()
def __init__(self, input_stream=None, porttype=None, rem_file=False): """ Instantiates a new object responsible for writing data from the port into a file. The file name is given by the input_stream variable. It is important to notice that the porttype is a BULKIO__POA type and not a BULKIO type. The reason is because it is used to generate a Port class that will be returned when the getPort() is invoked. The returned class is the one acting as a server and therefore must be a Portable Object Adapter rather and a simple BULKIO object. Inputs: <input_stream> The X-Midas file to generate <porttype> The BULKIO__POA data type <rem_file> Removes the input_stream if present """ if input_stream != None and os.path.isfile(input_stream): os.remove(input_stream) self.port_type = porttype self.outFile = input_stream self.port_lock = threading.Lock() self.eos_cond = threading.Condition(self.port_lock) self.gotEOS = False self.header = None self.done = False self._firstPacket = True
def waitAllPacketsSent(self, timeout=None): """ Wait until all of the packets queued on this source have been pushed to all connected ports. If timeout is given, it should be the maximum number of seconds to wait before giving up. """ self._packetsSentCond.acquire() try: # Assume no spurious signals will occur, so we can defer to the # timeout handling of Python's Condition object. if self._packetsPending > 0: self._packetsSentCond.wait(timeout) finally: self._packetsSentCond.release()
def __init__(self): self._recv_disconnect = True self.logger = logging.getLogger("ossie.events.Subscriber.Receiver") self._lock = threading.Lock() self._cond = threading.Condition(self._lock)
def __init__(self, name, logger=None, sriCompare=sri.compare, newSriCallback=None, sriChangeCallback=None, maxsize=100, PortTransferType=_TYPE_ ): self.name = name self.logger = logger self.queue = collections.deque() self._maxSize = maxsize self.port_lock = threading.Lock() self._not_full = threading.Condition(self.port_lock) self._not_empty = threading.Condition(self.port_lock) self._breakBlock = False self.stats = InStats(name, PortTransferType) self.blocking = False self.sri_cmp = sriCompare self.newSriCallback = newSriCallback self.sriChangeCallback = sriChangeCallback self.sriDict = {} # key=streamID, value=StreamSRI if logger==None: self.logger = logging.getLogger("redhawk.bulkio.input."+name) _cmpMsg = "DEFAULT" _newSriMsg = "EMPTY" _sriChangeMsg = "EMPTY" if sriCompare != sri.compare: _cmpMsg = "USER_DEFINED" if newSriCallback: _newSriMsg = "USER_DEFINED" if sriChangeCallback: _sriChangeMsg = "USER_DEFINED" if self.logger: self.logger.debug( "bulkio::InPort CTOR port:" + str(name) + " Blocking/MaxInputQueueSize " + str(self.blocking) + "/" + str(maxsize) + " SriCompare/NewSriCallback/SriChangeCallback " + _cmpMsg + "/" + _newSriMsg + "/" + _sriChangeMsg );
def __init__(self, proxy=None): self.connected = False self._proxy = proxy self._recreate_socket() # Support for multi-threading advantages and safety self.cancelled = Event() # Has the read operation been cancelled? self.delay = 0.1 # Read delay when there was no data available self._lock = Lock() self._buffer = [] self._read_thread = Thread(target=self._reading_thread, daemon=True) self._cv = Condition() # Condition Variable
def _after_fork(self): debug('Queue._after_fork()') self._notempty = threading.Condition(threading.Lock()) self._buffer = collections.deque() self._thread = None self._jointhread = None self._joincancelled = False self._closed = False self._close = None self._send = self._writer.send self._recv = self._reader.recv self._poll = self._reader.poll
def __init__(self, maxsize=0): Queue.__init__(self, maxsize) self._unfinished_tasks = Semaphore(0) self._cond = Condition()
def __init__(self, cache, callback): self._cond = threading.Condition(threading.Lock()) self._job = job_counter.next() self._cache = cache self._ready = False self._callback = callback cache[self._job] = self
def __init__(self, cache): self._cond = threading.Condition(threading.Lock()) self._job = job_counter.next() self._cache = cache self._items = collections.deque() self._index = 0 self._length = None self._unsorted = {} cache[self._job] = self
def __init__(self, parties): """Create a barrier, initialised to 'parties' threads.""" self.cond = threading.Condition(threading.Lock()) self.parties = parties # Indicates the number of waiting parties. self.waiting = 0 # generation is needed to deal with spurious wakeups. If self.cond.wait() # wakes up for other reasons, generation will force it go back to wait(). self.generation = 0 self.broken = False
def __init__(self, value=1): if value < 0: raise ValueError("semaphore initial value must be >= 0") self._cond = threading.Condition(threading.Lock()) self._value = value
def __init__(self, show_state_steps, show_dynamic_steps): super(Monitor, self).__init__() self._processor = SaltEventProcessor() self._processor.add_listener(Monitor.DeepSeaEventListener(self)) self._show_state_steps = show_state_steps self._show_dynamic_steps = show_dynamic_steps self._running_stage = None self._monitor_listeners = [] self._event_lock = threading.Lock() self._event_cond = threading.Condition(self._event_lock) self._event_buffer = [] self._running = False self._stage_steps = {}
def __init__(self, session, statements_and_params): self.session = session self._enum_statements = enumerate(iter(statements_and_params)) self._condition = Condition() self._fail_fast = False self._results_queue = [] self._current = 0 self._exec_count = 0 self._exec_depth = 0
def __init__(self, conn): super(MultiplexedInputStream, self).__init__(conn) # Arbitrates access to this InputStream (it's used simultaneously # by a Request and its owning Connection object). lock = threading.RLock() # Notifies Request thread that there is new data available. self._lock = threading.Condition(lock)