我们从Python开源项目中,提取了以下16个代码示例,用于说明如何使用twisted.internet.threads.blockingCallFromThread()。
def push_to_member(self, member: Member, ignore_for_statistics=False) -> None: """Push to the specified member.""" bptc.logger.debug('Push to {}... ({}, {})'.format(member.verify_key[:6], member.address.host, member.address.port)) with self.hashgraph.lock: data_string = self.generate_data_string(self.hashgraph.me, self.hashgraph.get_unknown_events_of(member), filter_members_with_address(self.hashgraph.known_members.values())) if not ignore_for_statistics: factory = PushClientFactory(data_string, network=self, receiver=member) else: factory = PushClientFactory(data_string, network=None, receiver=member) def push(): if member.address is not None: reactor.connectTCP(member.address.host, member.address.port, factory) threads.blockingCallFromThread(reactor, push)
def api_method(request, method): """ Utility function to create a synchronous wrapper for the JSON-RPC API. """ def method_fn(*args, **kwargs): fn = getattr(request.sdata.api, method) res = threads.blockingCallFromThread(reactor, fn, *args, **kwargs) try: ret = res['result'] ret = m2mstr_object_hook(ret) except KeyError: ret = res['error'] return ret return method_fn
def push_to(self, ip, port) -> None: """Push to the specified network address.""" with self.hashgraph.lock: data_string = self.generate_data_string(self.hashgraph.me, self.hashgraph.lookup_table, filter_members_with_address(self.hashgraph.known_members.values())) factory = PushClientFactory(data_string, network=self) def push(): reactor.connectTCP(ip, port, factory) threads.blockingCallFromThread(reactor, push)
def single_pull(self, ip_text_input, port_text_input): """Trigger the reactor to pull from the specified client.""" ip = ip_text_input.value port = int(port_text_input.value) factory = PullClientFactory(self, doc, ready_event) threads.blockingCallFromThread(reactor, partial(reactor.connectTCP, ip, port, factory))
def run(self): while not self.stopped(): ready_event.wait() ready_event.clear() print('Try to connect...') threads.blockingCallFromThread(reactor, partial(reactor.connectTCP, self.ip, self.port, self.factory)) sleep(2.0)
def maybeblockingCallFromThread(self, callable, *args, **kwargs): """ Call callable from the reactor thread. If we are in the reactor thread, then call it and return a Deferred. If we are *not* in the reactor thread, then block on that deferred instal of returning it """ # if we're in the reactor thread then just call it if self.in_reactor_thread(): return callable(*args, **kwargs) else: return twisted_threads.blockingCallFromThread(self._reactor, lambda: callable(*args, **kwargs))
def _resolveQuery(self, session, objects, query): """Resolve a L{Query}. @param session: The L{FluidinfoSession} for the request. @param objects: The L{SecureObjectAPI} to use to fetch object IDs. @param query: The L{Query} to resolve. @return: A C{list} of object ID C{str}s that match the query. """ try: result = objects.search([query]) except UnknownPathError as error: session.log.exception(error) unknownPath = error.paths[0] raise TNonexistentTag(unknownPath.encode('utf-8')) except PermissionDeniedError as error: session.log.exception(error) deniedPath, operation = error.pathsAndOperations[0] raise TNonexistentTag(deniedPath) try: with session.timer.track('index-search'): result = blockingCallFromThread(reactor, result.get) except SearchError as error: session.log.exception(error) raise TParseError(query, error.message) return result[query]
def _execute(self): """ Callback fired when the associated event is set. Run the C{action} callback on the wrapped descriptor in the main reactor thread and raise or return whatever it raises or returns to cause this event handler to be removed from C{self._reactor} if appropriate. """ return blockingCallFromThread( self._reactor, lambda: getattr(self._fd, self._action)())
def _testBlockingCallFromThread(self, reactorFunc): """ Utility method to test L{threads.blockingCallFromThread}. """ waiter = threading.Event() results = [] errors = [] def cb1(ign): def threadedFunc(): try: r = threads.blockingCallFromThread(reactor, reactorFunc) except Exception as e: errors.append(e) else: results.append(r) waiter.set() reactor.callInThread(threadedFunc) return threads.deferToThread(waiter.wait, self.getTimeout()) def cb2(ign): if not waiter.isSet(): self.fail("Timed out waiting for event") return results, errors return self._waitForThread().addCallback(cb1).addBoth(cb2)
def test_blockingCallFromThread(self): """ Test blockingCallFromThread facility: create a thread, call a function in the reactor using L{threads.blockingCallFromThread}, and verify the result returned. """ def reactorFunc(): return defer.succeed("foo") def cb(res): self.assertEqual(res[0][0], "foo") return self._testBlockingCallFromThread(reactorFunc).addCallback(cb)
def test_asyncBlockingCallFromThread(self): """ Test blockingCallFromThread as above, but be sure the resulting Deferred is not already fired. """ def reactorFunc(): d = defer.Deferred() reactor.callLater(0.1, d.callback, "egg") return d def cb(res): self.assertEqual(res[0][0], "egg") return self._testBlockingCallFromThread(reactorFunc).addCallback(cb)
def test_asyncErrorBlockingCallFromThread(self): """ Test error report for blockingCallFromThread as above, but be sure the resulting Deferred is not already fired. """ def reactorFunc(): d = defer.Deferred() reactor.callLater(0.1, d.errback, RuntimeError("spam")) return d def cb(res): self.assertIsInstance(res[1][0], RuntimeError) self.assertEqual(res[1][0].args[0], "spam") return self._testBlockingCallFromThread(reactorFunc).addCallback(cb)
def default(self, line): response = threads.blockingCallFromThread( reactor, self._run_command, line) print '\n'.join(response.stdout) print >>sys.stderr, '\n'.join(response.stderr)
def captureException(self, **extra): kwargs = self.ravenCaptureArguments(**extra) exc_info = sys.exc_info() return blockingCallFromThread( self.reactor, self.client.captureException, exc_info, **kwargs)
def captureMessage(self, message, **extra): kwargs = self.ravenCaptureArguments(**extra) return blockingCallFromThread( self.reactor, self.client.captureMessage, message, **kwargs)
def write(self, data): """ The WSGI I{write} callable returned by the I{start_response} callable. The given bytes will be written to the response body, possibly flushing the status and headers first. This will be called in a non-I/O thread. """ # PEP-3333 states: # # The server or gateway must transmit the yielded bytestrings to the # client in an unbuffered fashion, completing the transmission of # each bytestring before requesting another one. # # This write() method is used for the imperative and (indirectly) for # the more familiar iterable-of-bytestrings WSGI mechanism. It uses # C{blockingCallFromThread} to schedule writes. This allows exceptions # to propagate up from the underlying HTTP implementation. However, # that underlying implementation does not, as yet, provide any way to # know if the written data has been transmitted, so this method # violates the above part of PEP-3333. # # PEP-3333 also says that a server may: # # Use a different thread to ensure that the block continues to be # transmitted while the application produces the next block. # # Which suggests that this is actually compliant with PEP-3333, # because writes are done in the reactor thread. # # However, providing some back-pressure may nevertheless be a Good # Thing at some point in the future. def wsgiWrite(started): if not started: self._sendResponseHeaders() self.request.write(data) try: return blockingCallFromThread( self.reactor, wsgiWrite, self.started) finally: self.started = True