我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用twisted.internet.threads.deferToThread()。
def action(self, user, channel, msg): """Called when the bot sees someone do an action.""" user = user.split('!', 1)[0] self.logic.log(colorama.Fore.MAGENTA + "* " + user + colorama.Fore.WHITE + " " + msg) if channel == self.nickname: f = self.logic.check_update(self.logic.FIRST_TIME_MSG, user, "firsttime.txt") u = self.logic.check_update(self.logic.UPDATE_MSG, user, "updates.txt") if f: self.msg(user, f) if u: self.msg(user, u) d = threads.deferToThread(self.logic.sendpp, msg, user, "np") d.addCallback(self.logCommand, user) d.addErrback(log.err) else: pass
def cleanIfIdle(self, path=None): # RecordTimer calls this when preparing a recording. That is a # nice moment to clean up. It also mentions the path, so mark # it as dirty. self.markDirty(path) if not self.dirty: return if self.isCleaning: print "[Trashcan] Cleanup already running" return if (self.session is not None) and self.session.nav.getRecordings(): return self.isCleaning = True ctimeLimit = time.time() - (config.usage.movielist_trashcan_days.value * 3600 * 24) reserveBytes = 1024*1024*1024 * int(config.usage.movielist_trashcan_reserve.value) cleanset = self.dirty self.dirty = set() threads.deferToThread(purge, cleanset, ctimeLimit, reserveBytes).addCallbacks(self.cleanReady, self.cleanFail)
def list(self, raw=False, **kwargs): log.msg("LIST called") yield self.connect(self._router_url) try: output = yield self._wamp.call(u"io.timbr.kernel.list") try: output.remove(self._kernel_key) except ValueError: # kernel key doesn't exist in the list pass except ApplicationError: output = [] if raw is not True: prefix_map = yield threads.deferToThread(self._get_kernel_names, output, details=kwargs.get('details')) if prefix_map is not None: returnValue(prefix_map) else: print("Unable to access JUNO_KERNEL_URI, displaying kernel prefixes instead of kernel names") returnValue(output) else: returnValue(output) returnValue(output)
def command(self, args, callback_trigger=None): exe = (self.executable if self.executable else which('tahoe')[0]) args = [exe] + ['-d', self.nodedir] + args env = os.environ env['PYTHONUNBUFFERED'] = '1' if sys.platform == 'win32' and getattr(sys, 'frozen', False): from twisted.internet.threads import deferToThread output = yield deferToThread( self._win32_popen, args, env, callback_trigger) else: protocol = CommandProtocol(self, callback_trigger) reactor.spawnProcess(protocol, exe, args=args, env=env) output = yield protocol.done returnValue(output) #@inlineCallbacks #def start_monitor(self): # furl = os.path.join(self.nodedir, 'private', 'logport.furl') # yield self.command(['debug', 'flogtool', 'tail', furl])
def send(self, res): self.buffer.append(res) # buffer is full, write to disk if len(self.buffer) >= self.chunk_size: chunk = self.buffer[:self.chunk_size] self.buffer = self.buffer[self.chunk_size:] log_path = os.path.join(self.out_dir, "%s-scan.json" % (datetime.datetime.utcnow().isoformat())) def write(): wf = open(log_path, "w") try: json.dump(chunk, wf, sort_keys=True) finally: wf.close() r = threads.deferToThread(write).chainDeferred(self.current_task) self.current_task = None return r # buffer is not full, return deferred for current batch if not self.current_task or self.current_task.called: self.current_task = defer.Deferred() return self.current_task
def end_flush(self): """ Write buffered contents to disk. There's no need to perform this write in a seperate thread. """ def flush(): if len(self.buffer) == 0: return defer.succeed log_path = os.path.join(self.out_dir, "%s-scan.json" % (datetime.datetime.utcnow().isoformat())) wf = open(log_path, "w") try: json.dump(self.buffer, wf, sort_keys=True) finally: wf.close() return threads.deferToThread(flush)
def getSummedData(self, c, num_images = 1): ''' Get the counts with the vertical axis summed over. ''' print 'acquiring: {}'.format(self.getAcquiredData.__name__) yield self.lock.acquire() try: print 'acquired: {}'.format(self.getAcquiredData.__name__) images = yield deferToThread(self.camera.get_acquired_data, num_images) hbin, vbin, hstart, hend, vstart, vend = self.camera.get_image() x_pixels = int( (hend - hstart + 1.) / (hbin) ) y_pixels = int(vend - vstart + 1.) / (vbin) images = np.reshape(images, (num_images, y_pixels, x_pixels)) images = images.sum(axis=1) images = np.ravel(images, order='C') images = images.tolist() finally: print 'releasing: {}'.format(self.getAcquiredData.__name__) self.lock.release() returnValue(images)
def waitForKinetic(self, c, timeout = WithUnit(1,'s')): '''Waits until the given number of kinetic images are completed''' requestCalls = int(timeout['s'] / 0.050 ) #number of request calls for i in range(requestCalls): print 'acquiring: {}'.format(self.waitForKinetic.__name__) yield self.lock.acquire() try: print 'acquired : {}'.format(self.waitForKinetic.__name__) status = yield deferToThread(self.camera.get_status) #useful for debugging of how many iterations have been completed in case of missed trigger pulses a,b = yield deferToThread(self.camera.get_series_progress) print a,b print status finally: print 'releasing: {}'.format(self.waitForKinetic.__name__) self.lock.release() if status == 'DRV_IDLE': returnValue(True) yield self.wait(0.050) returnValue(False)
def mqtt_receive(self, topic=None, payload=None, **kwargs): try: # Synchronous message processing #return self.process_message(topic, payload, **kwargs) # Asynchronous message processing #deferred = threads.deferToThread(self.process_message, topic, payload, **kwargs) # Asynchronous message processing using different thread pool deferred = self.thimble.process_message(topic, payload, **kwargs) deferred.addErrback(self.mqtt_receive_error, topic, payload) return deferred except Exception: log.failure(u'Processing MQTT message failed. topic={topic}, payload={payload}', topic=topic, payload=payload)
def get_summary(measurement_id): """ Returns a deferred that will fire with the content of the summary or will errback with MeasurementInProgress if the measurement has not yet finished running. """ measurement_path = FilePath(config.measurements_directory) measurement = measurement_path.child(measurement_id) if measurement.child("measurements.njson.progress").exists(): return defer.fail(MeasurementInProgress) summary = measurement.child("summary.json") anomaly = measurement.child("anomaly") if not summary.exists(): return deferToThread( generate_summary, measurement.child("measurements.njson").path, summary.path, anomaly.path ) with summary.open("r") as f: return defer.succeed(json.load(f))
def collect(self, config): ds0 = config.datasources[0] credentials = (ds0.zWBEMUsername, ds0.zWBEMPassword) url = '{0}://{1}:{2}'.format( 'https' if ds0.zWBEMUseSSL else 'http', ds0.manageIp, ds0.zWBEMPort ) def _inner(): return WBEMConnection(url, credentials).ExecQuery( ds0.params['query_language'], ds0.params['query'], namespace=ds0.params['namespace']) return threads.deferToThread(_inner)
def isLoopbackURL(url): """Checks if the specified URL refers to a loopback address. :return: True if the URL refers to the loopback interface, otherwise False. """ if url is not None: if url.hostname is not None: is_loopback = yield deferToThread( resolves_to_loopback_address, url.hostname) else: # Empty URL == localhost. is_loopback = True else: # We need to pass is_loopback in, but it is only checked if url # is not None. None is the "I don't know and you won't ask" # state for this boolean. is_loopback = None return is_loopback
def refresh(self): """Refresh the region controller.""" # XXX ltrager 2016-05-25 - MAAS doesn't have an RPC method between # region controllers. If this method refreshes a foreign region # controller the foreign region controller will contain the running # region's hardware and networking information. if self.system_id != get_maas_id(): raise NotImplementedError( 'Can only refresh the running region controller') try: with NamedLock('refresh'): token = yield deferToDatabase(self._get_token_for_controller) yield deferToDatabase(self._signal_start_of_refresh) sys_info = yield deferToThread(get_sys_info) yield deferToDatabase(self._process_sys_info, sys_info) yield deferToThread( refresh, self.system_id, token.consumer.key, token.key, token.secret) except NamedLock.NotAvailable: # Refresh already running. pass
def loseConnection(self, reason=Failure(error.ConnectionDone())): """Request that the connection be dropped.""" if self.disconnecting is None: d = self.disconnecting = Deferred() d.addBoth(callOut, self.stopReading) d.addBoth(callOut, self.cancelHandleNotify) d.addBoth(callOut, deferToThread, self.stopConnection) d.addBoth(callOut, self.connectionLost, reason) def done(): self.disconnecting = None d.addBoth(callOut, done) if self.connecting is None: # Already/never connected: begin shutdown now. self.disconnecting.callback(None) else: # Still connecting: cancel before disconnect. self.connecting.addErrback(suppress, CancelledError) self.connecting.chainDeferred(self.disconnecting) self.connecting.cancel() return self.disconnecting
def evaluate_tag( self, system_id, tag_name, tag_definition, tag_nsmap, credentials, nodes): """evaluate_tag() Implementation of :py:class:`~provisioningserver.rpc.cluster.EvaluateTag`. """ # It's got to run in a thread because it does blocking IO. d = deferToThread( evaluate_tag, system_id, nodes, tag_name, tag_definition, # Transform tag_nsmap into a format that LXML likes. {entry["prefix"]: entry["uri"] for entry in tag_nsmap}, # Parse the credential string into a 3-tuple. convert_string_to_tuple(credentials)) return d.addCallback(lambda _: {})
def refresh(self, system_id, consumer_key, token_key, token_secret): """RefreshRackControllerInfo() Implementation of :py:class:`~provisioningserver.rpc.cluster.RefreshRackControllerInfo`. """ def _refresh(): with ClusterConfiguration.open() as config: return deferToThread( refresh, system_id, consumer_key, token_key, token_secret, config.maas_url) lock = NamedLock('refresh') try: lock.acquire() except lock.NotAvailable: # Refresh is already running, don't do anything raise exceptions.RefreshAlreadyInProgress() else: # Start gathering node results (lshw, lsblk, etc) but don't wait. maybeDeferred(_refresh).addBoth(callOut, lock.release).addErrback( log.err, 'Failed to refresh the rack controller.') return deferToThread(get_sys_info)
def callOutToThread(thing, func, *args, **kwargs): """Call out to the given `func` in another thread, but return `thing`. For example:: d = client.fetchSomethingReallyImportant() d.addCallback(callOutToThread, watchTheKettleBoil)) d.addCallback(doSomethingWithReallyImportantThing) Use this where you need a side-effect when a :py:class:`~Deferred` is fired, but you don't want to clobber the result. Note that the result being passed through is *not* passed to the function. Note also that if the call-out raises an exception, this will be propagated; nothing is done to suppress the exception or preserve the result in this case. :return: :class:`Deferred`. """ return deferToThread(func, *args, **kwargs).addCallback(lambda _: thing)
def test_probe_and_enlist(self): num_servers = 100 self.configure_vmomi_api(servers=num_servers) mock_create_node = self.patch(vmware, 'create_node') system_id = factory.make_name('system_id') mock_create_node.side_effect = asynchronous( lambda *args, **kwargs: system_id) mock_commission_node = self.patch(vmware, 'commission_node') host = factory.make_hostname() username = factory.make_username() password = factory.make_username() yield deferToThread( vmware.probe_vmware_and_enlist, factory.make_username(), host, username, password, accept_all=True) self.assertEqual(mock_create_node.call_count, num_servers) self.assertEqual(mock_commission_node.call_count, num_servers)
def test_probe_and_enlist_reconfigures_boot_order_if_create_node_ok(self): num_servers = 1 self.configure_vmomi_api(servers=num_servers) mock_create_node = self.patch(vmware, 'create_node') system_id = factory.make_name('system_id') mock_create_node.side_effect = asynchronous( lambda *args, **kwargs: system_id) mock_reconfigure_vm = self.patch(FakeVmomiVM, 'ReconfigVM_Task') # We need to not actually try to commission any nodes... self.patch(vmware, 'commission_node') host = factory.make_hostname() username = factory.make_username() password = factory.make_username() yield deferToThread( vmware.probe_vmware_and_enlist, factory.make_username(), host, username, password, accept_all=True) self.assertEqual(mock_reconfigure_vm.call_count, num_servers)
def test_probe_and_enlist_skips_pxe_config_if_create_node_failed(self): num_servers = 1 self.configure_vmomi_api(servers=num_servers) mock_create_node = self.patch(vmware, 'create_node') mock_create_node.side_effect = asynchronous( lambda *args, **kwargs: None) mock_reconfigure_vm = self.patch(FakeVmomiVM, 'ReconfigVM_Task') # We need to not actually try to commission any nodes... self.patch(vmware, 'commission_node') host = factory.make_hostname() username = factory.make_username() password = factory.make_username() yield deferToThread( vmware.probe_vmware_and_enlist, factory.make_username(), host, username, password, accept_all=True) self.assertEqual(mock_reconfigure_vm.call_count, 0)
def test_probe_and_enlist_recs_probes_and_enlists(self): user = factory.make_name('user') ip, port, username, password, node_id, context = self.make_context() domain = factory.make_name('domain') macs = [factory.make_mac_address() for _ in range(3)] mock_get_nodes = self.patch(RECSAPI, "get_nodes") mock_get_nodes.return_value = {node_id: { 'macs': macs, 'arch': 'amd64'}} self.patch(RECSAPI, "set_boot_source") mock_create_node = self.patch(recs_module, "create_node") mock_create_node.side_effect = asynchronous(lambda *args: node_id) mock_commission_node = self.patch(recs_module, "commission_node") yield deferToThread( probe_and_enlist_recs, user, ip, int(port), username, password, True, domain) self.expectThat( mock_create_node, MockCalledOnceWith( macs, 'amd64', 'recs_box', context, domain)) self.expectThat( mock_commission_node, MockCalledOnceWith(node_id, user))
def test_probe_and_enlist_recs_probes_and_enlists_no_commission(self): user = factory.make_name('user') ip, port, username, password, node_id, context = self.make_context() domain = factory.make_name('domain') macs = [factory.make_mac_address() for _ in range(3)] mock_get_nodes = self.patch(RECSAPI, "get_nodes") mock_get_nodes.return_value = {node_id: { 'macs': macs, 'arch': 'arm'}} self.patch(RECSAPI, "set_boot_source") mock_create_node = self.patch(recs_module, "create_node") mock_create_node.side_effect = asynchronous(lambda *args: node_id) mock_commission_node = self.patch(recs_module, "commission_node") yield deferToThread( probe_and_enlist_recs, user, ip, int(port), username, password, False, domain) self.expectThat( mock_create_node, MockCalledOnceWith( macs, 'armhf', 'recs_box', context, domain)) self.expectThat( mock_commission_node, MockNotCalled())
def test_probe_and_enlist_msftocs_probes_and_enlists(self): context = make_context() user = factory.make_name('user') system_id = factory.make_name('system_id') domain = factory.make_name('domain') macs = [factory.make_mac_address() for _ in range(3)] mock_get_blades = self.patch(MicrosoftOCSPowerDriver, "get_blades") mock_get_blades.return_value = {'%s' % context['blade_id']: macs} self.patch(MicrosoftOCSPowerDriver, "set_next_boot_device") mock_create_node = self.patch(msftocs_module, "create_node") mock_create_node.side_effect = asynchronous(lambda *args: system_id) mock_commission_node = self.patch(msftocs_module, "commission_node") yield deferToThread( probe_and_enlist_msftocs, user, context['power_address'], int(context['power_port']), context['power_user'], context['power_pass'], True, domain) self.expectThat( mock_create_node, MockCalledOnceWith( macs, 'amd64', 'msftocs', context, domain)) self.expectThat( mock_commission_node, MockCalledOnceWith(system_id, user))
def power_state_virsh( self, power_address, power_id, power_pass=None, **kwargs): """Return the power state for the VM using virsh.""" # Force password to None if blank, as the power control # script will send a blank password if one is not set. if power_pass == '': power_pass = None conn = VirshSSH() logged_in = yield deferToThread(conn.login, power_address, power_pass) if not logged_in: raise VirshError('Failed to login to virsh console.') state = yield deferToThread(conn.get_machine_state, power_id) if state is None: raise VirshError('Failed to get domain: %s' % power_id) try: return VM_STATE_TO_POWER_STATE[state] except KeyError: raise VirshError('Unknown state: %s' % state)
def scan_bangumi(self): """ dispatch the feed crawling job, this is a synchronized method running on individual thread :return: """ logger.info('scan bangumi %s', self.__class__.__name__) bangumi_list = yield threads.deferToThread(self.query_bangumi_list) index_list = range(len(bangumi_list)) random.shuffle(index_list) for index in index_list: bangumi = bangumi_list[index] if not self.check_bangumi_status(bangumi): episode_list = yield threads.deferToThread(self.query_episode_list, bangumi.id) # result is an array of tuple (item, eps_no) scan_result = yield threads.deferToThread(self.scan_feed, bangumi, episode_list) if scan_result is None: continue url_eps_list = [ (download_url, self.__find_episode_by_number(episode_list, eps_no), file_path, file_name) for (download_url, eps_no, file_path, file_name) in scan_result ] # this method may raise exception yield threads.deferToThread(self.download_episodes, url_eps_list, bangumi.id) yield threads.deferToThread(self.update_bangumi_status, bangumi)
def __add_download(self, video_file_list): logger.debug(video_file_list) download_url_dict = {} for video_file in video_file_list: if video_file.download_url not in download_url_dict: download_url_dict[video_file.download_url] = [] download_url_dict[video_file.download_url].append(video_file) for download_url, same_torrent_video_file_list in download_url_dict.iteritems(): first_video_file = same_torrent_video_file_list[0] bangumi_path = self.base_path + '/' + str(first_video_file.bangumi_id) try: torrent_id = yield download_manager.download(first_video_file.download_url, bangumi_path) logger.info(torrent_id) if torrent_id is None: logger.warn('episode %s already in download queue', str(first_video_file.episode_id)) else: yield threads.deferToThread(self.__update_video_file, same_torrent_video_file_list, torrent_id) except Exception as error: logger.error(error, exc_info=True) logger.error('episode %s download failed', str(first_video_file.episode_id))
def _wait_request(self, request, spider): try: driver = self.queue.get_nowait() except: driver = webdriver.PhantomJS(**self.options) driver.get(request.url) # wait until ajax completed dfd = threads.deferToThread(self._wait_and_switch, driver) dfd.addCallback(self._response, driver, spider) return dfd
def process_item(self, item, spider): return deferToThread(self._process_item, item, spider)
def initiateOp(self, sock, addr): handle = sock.fileno() if have_connectex: max_addr, family, type, protocol = self.reactor.getsockinfo(handle) self.reactor.issueConnectEx(handle, family, addr, self.ovDone, (handle, sock)) else: from twisted.internet.threads import deferToThread d = deferToThread(self.threadedThing, sock, addr) d.addCallback(self.threadedDone) d.addErrback(self.threadedErr)
def getHostByName(self, name, timeout = (1, 3, 11, 45)): if timeout: timeoutDelay = reduce(operator.add, timeout) else: timeoutDelay = 60 userDeferred = defer.Deferred() lookupDeferred = threads.deferToThread(socket.gethostbyname, name) cancelCall = self.reactor.callLater( timeoutDelay, self._cleanup, name, lookupDeferred) self._runningQueries[lookupDeferred] = (userDeferred, cancelCall) lookupDeferred.addBoth(self._checkTimeout, name, lookupDeferred) return userDeferred
def testDeferredResult(self): d = threads.deferToThread(lambda x, y=5: x + y, 3, y=4) d.addCallback(self.assertEquals, 7) return d
def testDeferredFailure(self): class NewError(Exception): pass def raiseError(): raise NewError d = threads.deferToThread(raiseError) return self.assertFailure(d, NewError)
def testDeferredFailure2(self): # set up a condition that causes cReactor to hang. These conditions # can also be set by other tests when the full test suite is run in # alphabetical order (test_flow.FlowTest.testThreaded followed by # test_internet.ReactorCoreTestCase.testStop, to be precise). By # setting them up explicitly here, we can reproduce the hang in a # single precise test case instead of depending upon side effects of # other tests. # # alas, this test appears to flunk the default reactor too d = threads.deferToThread(lambda: None) d.addCallback(lambda ign: threads.deferToThread(lambda: 1/0)) return self.assertFailure(d, ZeroDivisionError)
def pamAuthenticate(service, user, conv): return threads.deferToThread(pamAuthenticateThread, service, user, conv)
def symbolscheck(self): threads.deferToThread(self.JobTask) self.timer.startLongTimer(POLLTIME)
def _run(self): from twisted.internet import threads from enigma import eTimer self.aborted = False self.pos = 0 threads.deferToThread(self.work).addBoth(self.onComplete) self.timer = eTimer() self.timer.callback.append(self.onTimer) self.timer.start(5)
def iconcheck(self): try: threads.deferToThread(self.JobTask) except: pass self.timer.startLongTimer(30)
def main(): params, pipes, _ = command.configure() def run_shell(): shell_vars = { 'p': ParamsProxy(params), 'm': MeterProxy(pipes), } code.interact(banner=BANNER, local=shell_vars) deferred = threads.deferToThread(run_shell) deferred.addCallback(lambda result: reactor.stop()) reactor.run()
def from_thread(func, *args, **kwargs): call = lambda: deferToThread(func, *args, **kwargs) return cpu_core_semaphore.run(call)
def camcheck(self): global isBusy isBusy = True threads.deferToThread(self.JobTask) self.timer.startLongTimer(POLLTIME)
def iconcheck(self): threads.deferToThread(self.JobTask) self.timer.startLongTimer(30)
def fetch_async(*args, **kwargs): """Retrieve a URL asynchronously. @return: A C{Deferred} resulting in the URL content. """ return deferToThread(fetch, *args, **kwargs)
def call_in_thread(self, callback, errback, f, *args, **kwargs): """ Execute a callable object in a new separate thread. @param callback: A function to call in case C{f} was successful, it will be passed the return value of C{f}. @param errback: A function to call in case C{f} raised an exception, it will be pass a C{(type, value, traceback)} tuple giving information about the raised exception (see L{sys.exc_info}). @note: Both C{callback} and C{errback} will be executed in the the parent thread. """ def on_success(result): if callback: return callback(result) def on_failure(failure): exc_info = (failure.type, failure.value, failure.tb) if errback: errback(*exc_info) else: logging.error(exc_info[1], exc_info=exc_info) deferred = deferToThread(f, *args, **kwargs) deferred.addCallback(on_success) deferred.addErrback(on_failure)
def run(self): if not self._should_run(): return self._monitor.ping() deferred = threads.deferToThread(self._perform_rados_call) deferred.addCallback(self._handle_usage) return deferred
def run(self): if not self._should_run(): return self._monitor.ping() host = self._get_recon_host() deferred = threads.deferToThread(self._perform_recon_call, host) deferred.addCallback(self._handle_usage) return deferred
def request_with_payload(self, payload): resource = DataCollectingResource() port = reactor.listenTCP( 0, server.Site(resource), interface="127.0.0.1") self.ports.append(port) transport = HTTPTransport( None, "http://localhost:%d/" % (port.getHost().port,)) result = deferToThread(transport.exchange, payload, computer_id="34", exchange_token="abcd-efgh", message_api="X.Y") def got_result(ignored): try: get_header = resource.request.requestHeaders.getRawHeaders except AttributeError: # For backwards compatibility with Twisted versions # without requestHeaders def get_header(header): return [resource.request.received_headers[header]] self.assertEqual(get_header(u"x-computer-id"), ["34"]) self.assertEqual(get_header("x-exchange-token"), ["abcd-efgh"]) self.assertEqual( get_header("user-agent"), ["landscape-client/%s" % (VERSION,)]) self.assertEqual(get_header("x-message-api"), ["X.Y"]) self.assertEqual(bpickle.loads(resource.content), payload) result.addCallback(got_result) return result
def test_ssl_verification_negative(self): """ If the SSL server provides a key which is not verified by the specified public key, then the client should immediately end the connection without uploading any message data. """ self.log_helper.ignore_errors(PyCurlError) r = DataCollectingResource() context_factory = DefaultOpenSSLContextFactory( BADPRIVKEY, BADPUBKEY) port = reactor.listenSSL(0, server.Site(r), context_factory, interface="127.0.0.1") self.ports.append(port) transport = HTTPTransport(None, "https://localhost:%d/" % (port.getHost().port,), pubkey=PUBKEY) result = deferToThread(transport.exchange, "HI", computer_id="34", message_api="X.Y") def got_result(ignored): self.assertIs(r.request, None) self.assertIs(r.content, None) self.assertTrue("server certificate verification failed" in self.logfile.getvalue()) result.addErrback(got_result) return result
def onChallenge(self, challenge): logger.debug('Received CHALLENGE: %s' % challenge) # `sync_session._on_challenge` should resolve `self.on_challenge_defer` threads.deferToThread(partial(self._sync_session._on_challenge, challenge)) return self.on_challenge_defer