我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用twisted.internet.reactor()。
def message_received(self, unwrapped_message): """ message is of type UnwrappedMessage """ delay = self._sys_rand.randint(0, self.max_delay) action = start_action( action_type=u"send delayed message", delay=delay, ) with action.context(): d = deferLater(self.reactor, delay, self.protocol.packet_proxy, unwrapped_message) DeferredContext(d).addActionFinish() self._pending_sends.add(d) def _remove(res, d=d): self._pending_sends.remove(d) return res d.addBoth(_remove)
def test_cannot_listen(self): """ When the program is run with an argument and a listen address specified with a port that we can't listen on (e.g. port 1), a CannotListenError is expected to be logged and the program should stop. """ temp_dir = self.useFixture(TempDir()) yield main(reactor, raw_args=[ temp_dir.path, '--listen', ':1', # A port we can't listen on ]) # Expect a 'certs' directory to be created self.assertThat(os.path.isdir(temp_dir.join('certs')), Equals(True)) # Expect a default certificate to be created self.assertThat(os.path.isfile(temp_dir.join('default.pem')), Equals(True)) # Expect to be unable to listen flush_logged_errors(CannotListenError)
def get_events(self, callbacks): """ Attach to Marathon's event stream using Server-Sent Events (SSE). :param callbacks: A dict mapping event types to functions that handle the event data """ d = self.request('GET', path='/v2/events', unbuffered=True, headers={ 'Accept': 'text/event-stream', 'Cache-Control': 'no-store' }) def handler(event, data): callback = callbacks.get(event) # Deserialize JSON if a callback is present if callback is not None: callback(json.loads(data)) return d.addCallback( sse_content, handler, reactor=self._reactor, **self._sse_kwargs)
def makeService(self, options): """Construct a server using MLLPFactory. :rtype: :py:class:`twisted.application.internet.StreamServerEndpointService` """ from twisted.internet import reactor from txHL7.mllp import IHL7Receiver, MLLPFactory receiver_name = options['receiver'] receiver_class = reflect.namedClass(receiver_name) verifyClass(IHL7Receiver, receiver_class) factory = MLLPFactory(receiver_class()) multi_service = MultiService() for port_number in PORTS: port = "tcp:interface={0}:port={1}".format(HOST, port_number,) endpoint = endpoints.serverFromString(reactor, port) server = internet.StreamServerEndpointService(endpoint, factory) server.setName(u"mllp-{0}-{1}".format(receiver_name, port_number)) multi_service.addService(server) return multi_service
def listen(description, factory, default=None): """Listen on a port corresponding to a description @type description: C{str} @type factory: L{twisted.internet.interfaces.IProtocolFactory} @type default: C{str} or C{None} @rtype: C{twisted.internet.interfaces.IListeningPort} @return: the port corresponding to a description of a reliable virtual circuit server. See the documentation of the C{parse} function for description of the semantics of the arguments. """ from twisted.internet import reactor name, args, kw = parse(description, factory, default) return getattr(reactor, 'listen'+name)(*args, **kw)
def message_received(self, unwrapped_message): """ message is of type UnwrappedMessage """ self._batch.append(unwrapped_message) # [(destination, sphinx_packet) if len(self._batch) >= self.threshold_count: delay = self._sys_rand.randint(0, self.max_delay) action = start_action( action_type=u"send delayed message batch", delay=delay, ) with action.context(): released = self._batch self._batch = [] random.shuffle(released) d = deferLater(self.reactor, delay, self.batch_send, released) DeferredContext(d).addActionFinish() self._pending_batch_sends.add(d) def _remove(res, d=d): self._pending_batch_sends.remove(d) return res d.addBoth(_remove)
def _start_onion_service(self, factory): def progress(percent, tag, message): bar = int(percent / 10) log.debug('[%s%s] %s' % ('#' * bar, '.' * (10 - bar), message)) def setup_complete(port): port = txtorcon.IHiddenService(port) self.uri = "http://%s" % (port.getHost().onion_uri) log.info('I have set up a hidden service, advertised at: %s' % self.uri) log.info('locally listening on %s' % port.local_address.getHost()) def setup_failed(args): log.error('onion service setup FAILED: %r' % args) endpoint = endpoints.serverFromString(reactor, 'onion:80') txtorcon.IProgressProvider(endpoint).add_progress_listener(progress) d = endpoint.listen(factory) d.addCallback(setup_complete) d.addErrback(setup_failed) return d
def connectionMade(self): logger.info('[%s] Connection received from VNC client', self.id) factory = protocol.ClientFactory() factory.protocol = VNCProxyClient factory.vnc_server = self factory.deferrable = defer.Deferred() endpoint = endpoints.clientFromString(reactor, self.factory.vnc_address) def _established_callback(client): if self._broken: client.close() self.vnc_client = client self.flush() def _established_errback(reason): logger.error('[VNCProxyServer] Connection succeeded but could not establish session: %s', reason) self.close() factory.deferrable.addCallbacks(_established_callback, _established_errback) def _connect_errback(reason): logger.error('[VNCProxyServer] Connection failed: %s', reason) self.close() endpoint.connect(factory).addErrback(_connect_errback) self.send_ProtocolVersion_Handshake()
def run(self): """setup the site, start listening on port, setup the looping call to :py:meth:`~.update_active_node` every ``self.poll_interval`` seconds, and start the Twisted reactor""" # get the active node before we start anything... self.active_node_ip_port = self.get_active_node() if self.active_node_ip_port is None: logger.critical("ERROR: Could not get active vault node from " "Consul. Exiting.") raise SystemExit(3) logger.warning("Initial Vault active node: %s", self.active_node_ip_port) site = Site(VaultRedirectorSite(self)) # setup our HTTP(S) listener if self.tls_factory is not None: self.listentls(site) else: self.listentcp(site) # setup the update_active_node poll every POLL_INTERVAL seconds self.add_update_loop() logger.warning('Starting Twisted reactor (event loop)') self.run_reactor()
def se_requester(self): """ While the reactor is polling, we can't make any requests. So have the reactor itself make the request and store the result. """ logger.debug('requester called; spawning process') # since Python is single-threaded and Twisted is just event-based, # we can't do a request and run the redirector from the same script. # Best choice is to used popen to run an external script to do the # redirect. url = 'http://127.0.0.1:%d' % self.cls.bind_port path = os.path.join(os.path.dirname(__file__), 'requester.py') self.poller = subprocess.Popen( [sys.executable, path, url, '/bar/baz', '/vault-redirector-health'], stdout=subprocess.PIPE, universal_newlines=True ) # run a poller loop to check for process stop and get results self.poller_check_task = task.LoopingCall(self.check_request) self.poller_check_task.clock = self.cls.reactor self.poller_check_task.start(0.5) logger.debug('poller_check_task started')
def check_request(self): """ check if the self.poller process has finished; if so, handle results and stop the poller_check_task. If update_active has also already been called, stop the reactor. """ logger.debug('check_request called') if self.poller.poll() is None: logger.debug('poller process still running') return # stop the looping task self.poller_check_task.stop() assert self.poller.returncode == 0 out, err = self.poller.communicate() self.response = out.strip() logger.debug('check_request done; response: %s', self.response) # on python3, this will be binary if not isinstance(self.response, str): self.response = self.response.decode('utf-8') if self.update_active_called: self.stop_reactor()
def makeService(self, options): """ Called by Twisted after having parsed the command-line options. :param options: ``usage.Options`` instance :return: the server instance """ # Configuration is mandatory if options['config'] is None: print 'You need to specify a configuration file via `twistd ldap-proxy -c config.ini`.' sys.exit(1) config = load_config(options['config']) factory = ProxyServerFactory(config) endpoint_string = serverFromString(reactor, config['ldap-proxy']['endpoint']) return internet.StreamServerEndpointService(endpoint_string, factory)
def connect_service_account(self): """ Make a new connection to the LDAP backend server using the credentials of the service account :return: A Deferred that fires a `LDAPClient` instance """ client = yield connectToLDAPEndpoint(reactor, self.proxied_endpoint_string, LDAPClient) if self.use_tls: client = yield client.startTLS() try: yield client.bind(self.service_account_dn, self.service_account_password) except ldaperrors.LDAPException, e: # Call unbind() here if an exception occurs: Otherwise, Twisted will keep the file open # and slowly run out of open files. yield client.unbind() raise e defer.returnValue(client)
def test_twisted(pyi_builder): pyi_builder.test_source( """ # Twisted is an event-driven networking engine. # # The 'reactor' is object that starts the eventloop. # There are different types of platform specific reactors. # Platform specific reactor is wrapped into twisted.internet.reactor module. from twisted.internet import reactor # Applications importing module twisted.internet.reactor might fail # with error like: # # AttributeError: 'module' object has no attribute 'listenTCP' # # Ensure default reactor was loaded - it has method 'listenTCP' to start server. if not hasattr(reactor, 'listenTCP'): raise SystemExit('Twisted reactor not properly initialized.') """)
def __init__(self, domain, username, pw, server, use_ssl, policy_key=0, server_version="14.0", device_type="iPhone", device_id=None, verbose=False): self.use_ssl = use_ssl self.domain = domain self.username = username self.password = pw self.server = server self.device_id = device_id if not self.device_id: self.device_id = str(uuid.uuid4()).replace("-","")[:32] self.server_version = server_version self.device_type = device_type self.policy_key = policy_key self.folder_data = {} self.verbose = verbose self.collection_data = {} clientContext = WebClientContextFactory() self.agent = Agent(reactor, clientContext) self.operation_queue = defer.DeferredQueue() self.queue_deferred = self.operation_queue.get() self.queue_deferred.addCallback(self.queue_full) # Response processing
def test_hpe_create_volume_invalid_provisioning_option(self): name = 'test-create-volume-fake' path = b"/VolumeDriver.Create" body = {u"Name": name, u"Opts": {u"provisioning": u"fake"}} headers = Headers({b"content-type": [b"application/json"]}) body_producer = FileBodyProducer(BytesIO(dumps(body))) agent = Agent.usingEndpointFactory(reactor, HPEEndpointFactory()) d = agent.request(b'POST', b"UNIX://localhost" + path, headers, body_producer) d.addCallback(self.checkResponse, json.dumps({ u"Err": "Invalid input received: Must specify a valid " + "provisioning type ['thin', 'full', " + "'dedup'], value 'fake' is invalid."})) d.addCallback(self._remove_volume_callback, name) d.addErrback(self.cbFailed) return d
def test_hpe_create_volume_invalid_option(self): name = 'test-create-volume-fake' path = b"/VolumeDriver.Create" body = {u"Name": name, u"Opts": {u"fake": u"fake"}} headers = Headers({b"content-type": [b"application/json"]}) body_producer = FileBodyProducer(BytesIO(dumps(body))) agent = Agent.usingEndpointFactory(reactor, HPEEndpointFactory()) d = agent.request(b'POST', b"UNIX://localhost" + path, headers, body_producer) d.addCallback(self.checkResponse, json.dumps({ u"Err": "create volume failed, error is: fake is not a valid " "option. Valid options are: ['size', 'provisioning', " "'flash-cache']"})) d.addCallback(self._remove_volume_callback, name) d.addErrback(self.cbFailed) return d
def _get_volume_mount_path(self, body, name): # NOTE: body arg is the result from last deferred call. # Python complains about parameter mis-match if you don't include it # In this test, we need it to compare expected results with Path # request # Compare path returned by mount (body) with Get Path request path = b"/VolumeDriver.Path" newbody = {u"Name": name} headers = Headers({b"content-type": [b"application/json"]}) body_producer = FileBodyProducer(BytesIO(dumps(newbody))) agent = Agent.usingEndpointFactory(reactor, HPEEndpointFactory()) d = agent.request(b'POST', b"UNIX://localhost" + path, headers, body_producer) d.addCallback(self.checkResponse, body) d.addErrback(self.cbFailed) return d
def _mount_the_volume(self, body, name): # NOTE: body arg is the result from last deferred call. # Python complains about parameter mis-match if you don't include it # Mount the previously created volume path = b"/VolumeDriver.Mount" newbody = {u"Name": name} headers = Headers({b"content-type": [b"application/json"]}) body_producer = FileBodyProducer(BytesIO(dumps(newbody))) agent = Agent.usingEndpointFactory(reactor, HPEEndpointFactory()) d = agent.request(b'POST', b"UNIX://localhost" + path, headers, body_producer) d.addCallback(self.getResponse) # If we get a valid response from Path request then we assume # the mount passed. # TODO: Add additonal logic to verify the mountpath d.addCallback(self._get_volume_mount_path, name) return d
def broken_test_hpe_mount_umount_volume(self): name = 'test-mount-volume' path = b"/VolumeDriver.Create" body = {u"Name": name} # Create a volume to be mounted headers = Headers({b"content-type": [b"application/json"]}) body_producer = FileBodyProducer(BytesIO(dumps(body))) agent = Agent.usingEndpointFactory(reactor, HPEEndpointFactory()) d = agent.request(b'POST', b"UNIX://localhost" + path, headers, body_producer) d.addCallback(self.checkResponse, json.dumps({u"Err": ''})) d.addErrback(self.cbFailed) # Mount the previously created volume d.addCallback(self._mount_the_volume, name) # UMount the previously created volume d.addCallback(self._unmount_the_volume, name) # Remove the previously created volume d.addCallback(self._remove_volume_callback, name) return d
def test_hpe_get_volume(self): name = 'test-get-volume' path = b"/VolumeDriver.Create" body = {u"Name": name} # Create a volume to be mounted headers = Headers({b"content-type": [b"application/json"]}) body_producer = FileBodyProducer(BytesIO(dumps(body))) agent = Agent.usingEndpointFactory(reactor, HPEEndpointFactory()) d = agent.request(b'POST', b"UNIX://localhost" + path, headers, body_producer) d.addCallback(self.checkResponse, json.dumps({u"Err": ''})) d.addErrback(self.cbFailed) # Get the previously created volume expected = {u"Volume": {u"Status": {}, u"Mountpoint": '', u"Name": name}, u"Err": ''} d.addCallback(self._get_volume, name, expected) # Remove the previously created volume d.addCallback(self._remove_volume_callback, name) return d
def broken_test_hpe_list_volume(self): name = 'test-list-volume' path = b"/VolumeDriver.Create" body = {u"Name": name} # Create a volume to be mounted headers = Headers({b"content-type": [b"application/json"]}) body_producer = FileBodyProducer(BytesIO(dumps(body))) agent = Agent.usingEndpointFactory(reactor, HPEEndpointFactory()) d = agent.request(b'POST', b"UNIX://localhost" + path, headers, body_producer) d.addCallback(self.checkResponse, json.dumps({u"Err": ''})) d.addErrback(self.cbFailed) # List volumes expected = {u"Err": '', u"Volumes": [{u"Mountpoint": '', u"Name": name}]} d.addCallback(self._list_volumes, name, expected) # Remove the previously created volume d.addCallback(self._remove_volume_callback, name) return d
def testValidOptionsRequest(self): """ Makes sure that a "regular" OPTIONS request doesn't include the CORS specific headers in the response. """ agent = Agent(reactor) headers = Headers({'origin': ['http://localhost']}) response = yield agent.request('OPTIONS', self.uri, headers) # Check we get the correct status. self.assertEqual(http.OK, response.code) # Check we get the correct length self.assertEqual(0, response.length) # Check we get the right headers back self.assertTrue(response.headers.hasHeader('Allow')) self.assertFalse( response.headers.hasHeader('Access-Control-Allow-Origin')) self.assertFalse(response.headers.hasHeader('Access-Control-Max-Age')) self.assertFalse( response.headers.hasHeader('Access-Control-Allow-Credentials')) self.assertFalse( response.headers.hasHeader('Access-Control-Allow-Methods'))
def testViaAgent(self): """ This is a manual check of a POST to /objects which uses L{twisted.web.client.Agent} to make the request. We do not use txFluidDB because we need to check that a Location header is received and that we receive both a 'URI' and an 'id' in the JSON response payload. """ URI = self.txEndpoint.getRootURL() + defaults.httpObjectCategoryName basicAuth = 'Basic %s' % b64encode('%s:%s' % ('testuser1', 'secret')) headers = Headers({'accept': ['application/json'], 'authorization': [basicAuth]}) agent = Agent(reactor) response = yield agent.request('POST', URI, headers) self.assertEqual(http.CREATED, response.code) self.assertTrue(response.headers.hasHeader('location')) d = defer.Deferred() bodyGetter = ResponseGetter(d) response.deliverBody(bodyGetter) body = yield d responseDict = json.loads(body) self.assertIn('URI', responseDict) self.assertIn('id', responseDict)
def testQueryUnicodePath(self): """A query on a non-existent Unicode tag, should 404. Part of the point here is to make sure that no other error occurs due to passing in a Unicode tag path. """ path = u'çóñ/???' query = '%s = "hi"' % path URI = '%s/%s?query=%s' % ( self.endpoint, defaults.httpObjectCategoryName, urllib.quote(query.encode('utf-8'))) headers = Headers({'accept': ['application/json']}) agent = Agent(reactor) response = yield agent.request('GET', URI, headers) self.assertEqual(http.NOT_FOUND, response.code)
def testValidCORSRequest(self): """ Sanity check to make sure we get the valid headers back for a CORS based request. """ agent = Agent(reactor) headers = Headers() # The origin to use in the tests dummy_origin = 'http://foo.com' headers.addRawHeader('Origin', dummy_origin) response = yield agent.request('GET', self.uri, headers) # Check we get the correct status. self.assertEqual(http.OK, response.code) # Check we get the right headers back self.assertTrue( response.headers.hasHeader('Access-Control-Allow-Origin')) self.assertTrue( response.headers.hasHeader('Access-Control-Allow-Credentials')) self.assertTrue( dummy_origin in response.headers.getRawHeaders('Access-Control-Allow-Origin'))
def testVersionGets404(self): """ Version numbers used to be able to be given in API calls, but are no longer supported. """ version = 20100808 URI = '%s/%d/%s/%s' % ( self.endpoint, version, defaults.httpNamespaceCategoryName, defaults.adminUsername) headers = Headers({'accept': ['application/json']}) agent = Agent(reactor) response = yield agent.request('GET', URI, headers) self.assertEqual(http.NOT_FOUND, response.code) # TODO: Add a test for a namespace that we don't have LIST perm on. # although that might be done in permissions.py when that finally gets # added.
def with_config(loop=None): global config if loop is not None: if config.loop is not None and config.loop is not loop: raise RuntimeError( "Twisted has only a single, global reactor. You passed in " "a reactor different from the one already configured " "in txaio.config.loop" ) return _TxApi(config) # NOTE: beware that twisted.logger._logger.Logger copies itself via an # overriden __get__ method when used as recommended as a class # descriptor. So, we override __get__ to just return ``self`` which # means ``log_source`` will be wrong, but we don't document that as a # key that you can depend on anyway :/
def requestWebObject(self): parsed = urlparse.urlparse(self.uri) protocol = parsed[0] host, port = self.extractHostAndPort(parsed, protocol) rest = self.extractQuery(parsed) class_ = self.protocols[protocol] headers = self.getAllHeaders().copy() if 'host' not in headers: headers['host'] = host log.info('Performing {} request for {}'.format(self.method, self.uri)) self.content.seek(0, 0) s = self.content.read() clientFactory = class_(self.method, rest, self.clientproto, headers, s, self) self.reactor.connectTCP(host, port, clientFactory)
def clientConnectionLost(self, connector, reason): """Handle notification from the lower layers of connection loss. If we are shutting down, and twisted sends us the expected type of error, eat the error. Otherwise, log it and pass it along. Also, schedule notification of our subscribers at the next pass through the reactor. """ if self.dDown and reason.check(ConnectionDone): # We initiated the close, this is an expected close/lost log.debug('%r: Connection Closed:%r:%r', self, connector, reason) notifyReason = None # Not a failure else: log.debug('%r: clientConnectionLost:%r:%r', self, connector, reason) notifyReason = reason # Reset our proto so we don't try to send to a down connection self.proto = None # Schedule notification of subscribers self._get_clock().callLater(0, self._notify, False, notifyReason) # Call our superclass's method to handle reconnecting ReconnectingClientFactory.clientConnectionLost( self, connector, reason)
def __init__(self, host, port, state, path): """ @param reactor: An L{IReactorTCP} provider @param host: A hostname, used when connecting @type host: str @param port: The port number, used when connecting @type port: int @param path: A list of relay identities. @type path: list This endpoint will be routed through Tor over a circuit defined by path. """ self.host = host self.port = port self.path = path self.state = state self.or_endpoint = get_orport_endpoint(state)
def start_tor(config): """ Launches tor with random TCP ports chosen for SocksPort and ControlPort, and other options specified by a txtorcon.torconfig.TorConfig instance. Returns a deferred that calls back with a txtorcon.torstate.TorState instance. """ def get_random_tor_ports(): d2 = available_tcp_port(reactor) d2.addCallback(lambda port: config.__setattr__('SocksPort', port)) d2.addCallback(lambda _: available_tcp_port(reactor)) d2.addCallback(lambda port: config.__setattr__('ControlPort', port)) return d2 def launch_and_get_state(ignore): d2 = launch_tor(config, reactor, stdout=sys.stdout) d2.addCallback(lambda tpp: TorState(tpp.tor_protocol).post_bootstrap) return d2 return get_random_tor_ports().addCallback(launch_and_get_state)
def reExec(self): """ Removes pidfile, registers an exec to happen after shutdown, then stops the reactor. """ self.log.warn("SIGHUP received - restarting") try: self.log.info("Removing pidfile: {log_source.pidfilePath}") os.remove(self.pidfilePath) except OSError: pass self.reactor.addSystemEventTrigger( "after", "shutdown", os.execv, sys.executable, [sys.executable] + sys.argv ) self.reactor.stop()
def _getPort(self): from twisted.internet import reactor if self.inherit: port = InheritedSSLPort( self.args[0], self.args[1], self.args[2], reactor ) else: port = MaxAcceptSSLPort( self.args[0], self.args[1], self.args[2], self.backlog, self.interface, self.reactor ) port.startListening() self.myPort = port return port
def __init__(self, reactor, transactionFactory, useWorkerPool=True, disableWorkProcessing=False): """ Initialize a L{ControllerQueue}. @param transactionFactory: a 0- or 1-argument callable that produces an L{IAsyncTransaction} @param useWorkerPool: Whether to use a worker pool to manage load or instead take on all work ourselves (e.g. in single process mode) """ super(ControllerQueue, self).__init__() self.reactor = reactor self.transactionFactory = transactionFactory self.workerPool = WorkerConnectionPool() if useWorkerPool else None self.disableWorkProcessing = disableWorkProcessing self._lastMinPriority = WORK_PRIORITY_LOW self._timeOfLastWork = time.time() self._actualPollInterval = self.queuePollInterval self._inWorkCheck = False self._inOverdueCheck = False
def _overdueCheckLoop(self): """ While the service is running, keep checking for any overdue items. """ self._overdueCheckCall = None if not self.running: returnValue(None) try: yield self._overdueCheck() except Exception as e: log.error("_overdueCheckLoop: {exc}", exc=e) if not self.running: returnValue(None) self._overdueCheckCall = self.reactor.callLater( self.queueOverduePollInterval, self._overdueCheckLoop )
def parseStreamServer(self, reactor, description, **options): # The present endpoint plugin is intended to be used as in the # following for running a streaming protocol over WebSocket over # an underlying stream transport. # # endpoint = serverFromString(reactor, # "autobahn:tcp\:9000\:interface\=0.0.0.0:url=ws\://localhost\:9000:compress=false" # # This will result in `parseStreamServer` to be called will # # description == tcp:9000:interface=0.0.0.0 # # and # # options == {'url': 'ws://localhost:9000', 'compress': 'false'} # # Essentially, we are using the `\:` escape to coerce the endpoint descriptor # of the underlying stream transport into one (first) positional argument. # # Note that the `\:` within "url" is another form of escaping! # opts = _parseOptions(options) endpoint = serverFromString(reactor, description) return AutobahnServerEndpoint(reactor, endpoint, opts)
def start_initial(game): round_data, users_plots = get_round(game) state = 'initial' if round_data is None: game.end_time = timezone.now() game.save() game.broadcast(action='redirect', url=reverse('interactive:exit')) return else: cache.set(game.id, {'state': state, 'round_data': round_data, 'users_plots': users_plots, }) initial(game, round_data, users_plots) task.deferLater(reactor, 1, game_state_checker, game, state, round_data, users_plots).addErrback(twisted_error)
def throttled(func): """Decorator for AgentProxyMixIn.getTable to throttle requests""" def _wrapper(*args, **kwargs): self = args[0] last_request = getattr(self, '_last_request') delay = (last_request + self.throttle_delay) - time.time() setattr(self, '_last_request', time.time()) if delay > 0: _logger.debug("%sss delay due to throttling: %r", delay, self) return deferLater(reactor, delay, func, *args, **kwargs) else: return func(*args, **kwargs) return wraps(func)(_wrapper) # pylint: disable=R0903
def service(description, factory, reactor=None): """ Return the service corresponding to a description. @param description: The description of the listening port, in the syntax described by L{twisted.internet.endpoints.serverFromString}. @type description: C{str} @param factory: The protocol factory which will build protocols for connections to this service. @type factory: L{twisted.internet.interfaces.IProtocolFactory} @rtype: C{twisted.application.service.IService} @return: the service corresponding to a description of a reliable stream server. @see: L{twisted.internet.endpoints.serverFromString} """ if reactor is None: from twisted.internet import reactor svc = StreamServerEndpointService( endpoints.serverFromString(reactor, description), factory) svc._raiseSynchronously = True return svc
def listen(description, factory): """ Listen on a port corresponding to a description. @param description: The description of the connecting port, in the syntax described by L{twisted.internet.endpoints.serverFromString}. @type description: L{str} @param factory: The protocol factory which will build protocols on connection. @type factory: L{twisted.internet.interfaces.IProtocolFactory} @rtype: L{twisted.internet.interfaces.IListeningPort} @return: the port corresponding to a description of a reliable virtual circuit server. @see: L{twisted.internet.endpoints.serverFromString} """ from twisted.internet import reactor name, args, kw = endpoints._parseServer(description, factory) return getattr(reactor, 'listen' + name)(*args, **kw)
def test_storage_dir_required(self): """ When the program is run with no arguments, it should exit with code 2 because there is one required argument. """ with ExpectedException(SystemExit, MatchesStructure(code=Equals(2))): main(reactor, raw_args=[])
def test_storage_dir_provided(self): """ When the program is run with an argument, it should start up and run. The program is expected to fail because it is unable to connect to Marathon. This test takes a while because we have to let txacme go through it's initial sync (registration + issuing of 0 certificates) before things can be halted. """ temp_dir = self.useFixture(TempDir()) yield main(reactor, raw_args=[ temp_dir.path, '--acme', LETSENCRYPT_STAGING_DIRECTORY.asText(), '--marathon', 'http://localhost:28080' # An address we can't reach ]) # Expect a 'certs' directory to be created self.assertThat(os.path.isdir(temp_dir.join('certs')), Equals(True)) # Expect a default certificate to be created self.assertThat(os.path.isfile(temp_dir.join('default.pem')), Equals(True)) # Expect to be unable to connect to Marathon flush_logged_errors(ConnectionRefusedError)
def test_default_reactor(self): """ When default_reactor is passed a reactor it should return that reactor. """ clock = Clock() assert_that(default_reactor(clock), Is(clock))
def test_default_reactor_not_provided(self): """ When default_reactor is not passed a reactor, it should return the default reactor. """ assert_that(default_reactor(None), Is(reactor))
def test_default_client_not_provided(self): """ When default_agent is not passed an agent, it should return a default agent. """ assert_that(default_client(None, reactor), IsInstance(treq_HTTPClient))
def setUp(self): super(TestHTTPClientBase, self).setUp() self.requests = DeferredQueue() self.fake_server = FakeHttpServer(self.handle_request) fake_client = treq_HTTPClient(self.fake_server.get_agent()) self.client = self.get_client(fake_client) # Spin the reactor once at the end of each test to clean up any # cancelled deferreds self.addCleanup(wait0)
def default_reactor(reactor): if reactor is None: from twisted.internet import reactor return reactor
def default_client(client, reactor): """ Set up a default client if one is not provided. Set up the default ``twisted.web.client.Agent`` using the provided reactor. """ if client is None: from twisted.web.client import Agent client = treq_HTTPClient(Agent(reactor)) return client