我们从Python开源项目中,提取了以下34个代码示例,用于说明如何使用twisted.internet.reactor.listenUNIX()。
def listen_unix(factory, config): ''' Internal implementation that opens unix socket listeners for factory based on config. See listen() for details. ''' # upgrade config to a list if it has been passed as a single item config = _listify(config) # now process the list listeners = [] for item in config: if not validate_connection_config(item): # warn but skip invalid configs continue if 'unix' in item: path = item['unix'] listeners.append(reactor.listenUNIX(path, factory)) return _unlistify(listeners)
def setService(self, service): log.msg('setting client server to %s' % service) transport.SSHClientTransport.setService(self, service) if service.name != 'ssh-userauth' and self.factory.d: d = self.factory.d self.factory.d = None d.callback(None) if service.name == 'ssh-connection': # listen for UNIX if not self.factory.options['nocache']: user = self.factory.userAuthObject.user peer = self.transport.getPeer() filename = os.path.expanduser("~/.conch-%s-%s-%i" % (user, peer.host, peer.port)) try: u = unix.SSHUnixServerFactory(service) try: os.unlink(filename) except OSError: pass self.unixServer = reactor.listenUNIX(filename, u, mode=0600, wantPID=1) except Exception, e: log.msg('error trying to listen on %s' % filename) log.err(e)
def testPeerBind(self): """assert the remote endpoint (getPeer) on the receiving end matches the local endpoint (bind) on the connecting end, for unix sockets""" filename = self.mktemp() peername = self.mktemp() f = Factory(self, filename, peername=peername) l = reactor.listenUNIX(filename, f) self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) self._sock.bind(peername) self._sock.connect(filename) d = f.deferred def done(x): self._addPorts(l) self._sock.close() del self._sock return x d.addBoth(done) return d
def testPIDFile(self): filename = self.mktemp() f = Factory(self, filename) l = reactor.listenUNIX(filename, f, mode = 0600, wantPID=1) self.failUnless(lockfile.isLocked(filename + ".lock")) tcf = TestClientFactory(self, filename) c = reactor.connectUNIX(filename, tcf, checkPID=1) d = defer.gatherResults([f.deferred, tcf.deferred]) def _portStuff(ignored): self._addPorts(l, c.transport, tcf.protocol.transport, f.protocol.transport) return self.cleanPorts(*self.ports) def _check(ignored): self.failIf(lockfile.isLocked(filename + ".lock"), 'locked') d.addCallback(_portStuff) d.addCallback(_check) return d
def loopbackUNIX(server, client, noisy=True): """Run session between server and client protocol instances over UNIX socket.""" path = tempfile.mktemp() from twisted.internet import reactor f = policies.WrappingFactory(protocol.Factory()) serverWrapper = _FireOnClose(f, server) f.noisy = noisy f.buildProtocol = lambda addr: serverWrapper serverPort = reactor.listenUNIX(path, f) clientF = LoopbackClientFactory(client) clientF.noisy = noisy reactor.connectUNIX(path, clientF) d = clientF.deferred d.addCallback(lambda x: serverWrapper.deferred) d.addCallback(lambda x: serverPort.stopListening()) return d
def test_peerBind(self): """ The address passed to the server factory's C{buildProtocol} method and the address returned by the connected protocol's transport's C{getPeer} method match the address the client socket is bound to. """ filename = self.mktemp() peername = self.mktemp() serverFactory = MyServerFactory() connMade = serverFactory.protocolConnectionMade = defer.Deferred() unixPort = reactor.listenUNIX(filename, serverFactory) self.addCleanup(unixPort.stopListening) unixSocket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) self.addCleanup(unixSocket.close) unixSocket.bind(peername) unixSocket.connect(filename) def cbConnMade(proto): expected = address.UNIXAddress(peername) self.assertEqual(serverFactory.peerAddresses, [expected]) self.assertEqual(proto.transport.getPeer(), expected) connMade.addCallback(cbConnMade) return connMade
def test_socketLocking(self): """ L{IReactorUNIX.listenUNIX} raises L{error.CannotListenError} if passed the name of a file on which a server is already listening. """ filename = self.mktemp() serverFactory = MyServerFactory() unixPort = reactor.listenUNIX(filename, serverFactory, wantPID=True) self.assertRaises( error.CannotListenError, reactor.listenUNIX, filename, serverFactory, wantPID=True) def stoppedListening(ign): unixPort = reactor.listenUNIX(filename, serverFactory, wantPID=True) return unixPort.stopListening() return unixPort.stopListening().addCallback(stoppedListening)
def _reprTest(self, serverFactory, factoryName): """ Test the C{__str__} and C{__repr__} implementations of a UNIX port when used with the given factory. """ filename = self.mktemp() unixPort = reactor.listenUNIX(filename, serverFactory) connectedString = "<%s on %r>" % (factoryName, filename) self.assertEqual(repr(unixPort), connectedString) self.assertEqual(str(unixPort), connectedString) d = defer.maybeDeferred(unixPort.stopListening) def stoppedListening(ign): unconnectedString = "<%s (not listening)>" % (factoryName,) self.assertEqual(repr(unixPort), unconnectedString) self.assertEqual(str(unixPort), unconnectedString) d.addCallback(stoppedListening) return d
def test_reprWithClassicFactory(self): """ The two string representations of the L{IListeningPort} returned by L{IReactorUNIX.listenUNIX} contains the name of the classic factory class being used and the filename on which the port is listening or indicates that the port is not listening. """ class ClassicFactory: def doStart(self): pass def doStop(self): pass # Sanity check self.assertIsInstance(ClassicFactory, types.ClassType) return self._reprTest( ClassicFactory(), "twisted.test.test_unix.ClassicFactory")
def test_reprWithNewStyleFactory(self): """ The two string representations of the L{IListeningPort} returned by L{IReactorUNIX.listenUNIX} contains the name of the new-style factory class being used and the filename on which the port is listening or indicates that the port is not listening. """ class NewStyleFactory(object): def doStart(self): pass def doStop(self): pass # Sanity check self.assertIsInstance(NewStyleFactory, type) return self._reprTest( NewStyleFactory(), "twisted.test.test_unix.NewStyleFactory")
def testDumber(self): filename = self.mktemp() f = Factory(self, filename) l = reactor.listenUNIX(filename, f) tcf = TestClientFactory(self, filename) c = reactor.connectUNIX(filename, tcf) d = defer.gatherResults([f.deferred, tcf.deferred]) d.addCallback(lambda x : self._addPorts(l, c.transport, tcf.protocol.transport, f.protocol.transport)) return d
def testMode(self): filename = self.mktemp() f = Factory(self, filename) l = reactor.listenUNIX(filename, f, mode = 0600) self.assertEquals(stat.S_IMODE(os.stat(filename)[0]), 0600) tcf = TestClientFactory(self, filename) c = reactor.connectUNIX(filename, tcf) self._addPorts(l, c.transport)
def _uncleanSocketTest(self, callback): self.filename = self.mktemp() source = ("from twisted.internet import protocol, reactor\n" "reactor.listenUNIX(%r, protocol.ServerFactory(), wantPID=True)\n") % (self.filename,) env = {'PYTHONPATH': os.pathsep.join(sys.path)} d = utils.getProcessOutput(sys.executable, ("-u", "-c", source), env=env) d.addCallback(callback) return d
def testUncleanServerSocketLocking(self): def ranStupidChild(ign): # If this next call succeeds, our lock handling is correct. p = reactor.listenUNIX(self.filename, Factory(self, self.filename), wantPID=True) return p.stopListening() return self._uncleanSocketTest(ranStupidChild)
def testRepr(self): filename = self.mktemp() f = Factory(self, filename) p = reactor.listenUNIX(filename, f) self.failIf(str(p).find(filename) == -1) def stoppedListening(ign): self.failIf(str(p).find(filename) != -1) return defer.maybeDeferred(p.stopListening).addCallback(stoppedListening)
def autostart(reason, **kwargs): if reason == 0: from twisted.internet import reactor try: os.remove("/tmp/hotplug.socket") except OSError: pass factory = Factory() factory.protocol = Hotplug reactor.listenUNIX("/tmp/hotplug.socket", factory)
def setUp(self): super(MethodCallFunctionalTest, self).setUp() self.methods = ["method"] self.object = DummyObject() self.object.method = lambda word: word.capitalize() self.socket = self.mktemp() self.server = MethodCallServerFactory(self.object, self.methods) self.client = MethodCallClientFactory(reactor) self.port = reactor.listenUNIX(self.socket, self.server)
def autostart(reason, **kwargs): if reason == 0: print "[Hotplug] starting hotplug handler" from twisted.internet import reactor import os try: os.remove("/tmp/hotplug.socket") except OSError: pass factory = Factory() factory.protocol = Hotplug reactor.listenUNIX("/tmp/hotplug.socket", factory)
def test_dumber(self): """ L{IReactorUNIX.connectUNIX} can be used to connect a client to a server started with L{IReactorUNIX.listenUNIX}. """ filename = self.mktemp() serverFactory = MyServerFactory() serverConnMade = defer.Deferred() serverFactory.protocolConnectionMade = serverConnMade unixPort = reactor.listenUNIX(filename, serverFactory) self.addCleanup(unixPort.stopListening) clientFactory = MyClientFactory() clientConnMade = defer.Deferred() clientFactory.protocolConnectionMade = clientConnMade reactor.connectUNIX(filename, clientFactory) d = defer.gatherResults([serverConnMade, clientConnMade]) def allConnected(args): serverProtocol, clientProtocol = args # Incidental assertion which may or may not be redundant with some # other test. This probably deserves its own test method. self.assertEqual(clientFactory.peerAddresses, [address.UNIXAddress(filename)]) clientProtocol.transport.loseConnection() serverProtocol.transport.loseConnection() d.addCallback(allConnected) return d
def test_pidFile(self): """ A lockfile is created and locked when L{IReactorUNIX.listenUNIX} is called and released when the Deferred returned by the L{IListeningPort} provider's C{stopListening} method is called back. """ filename = self.mktemp() serverFactory = MyServerFactory() serverConnMade = defer.Deferred() serverFactory.protocolConnectionMade = serverConnMade unixPort = reactor.listenUNIX(filename, serverFactory, wantPID=True) self.assertTrue(lockfile.isLocked(filename + ".lock")) # XXX This part would test something about the checkPID parameter, but # it doesn't actually. It should be rewritten to test the several # different possible behaviors. -exarkun clientFactory = MyClientFactory() clientConnMade = defer.Deferred() clientFactory.protocolConnectionMade = clientConnMade reactor.connectUNIX(filename, clientFactory, checkPID=1) d = defer.gatherResults([serverConnMade, clientConnMade]) def _portStuff(args): serverProtocol, clientProto = args # Incidental assertion which may or may not be redundant with some # other test. This probably deserves its own test method. self.assertEqual(clientFactory.peerAddresses, [address.UNIXAddress(filename)]) clientProto.transport.loseConnection() serverProtocol.transport.loseConnection() return unixPort.stopListening() d.addCallback(_portStuff) def _check(ignored): self.assertFalse(lockfile.isLocked(filename + ".lock"), 'locked') d.addCallback(_check) return d
def _uncleanSocketTest(self, callback): self.filename = self.mktemp() source = networkString(( "from twisted.internet import protocol, reactor\n" "reactor.listenUNIX(%r, protocol.ServerFactory()," "wantPID=True)\n") % (self.filename,)) env = {b'PYTHONPATH': FilePath( os.pathsep.join(sys.path)).asBytesMode().path} pyExe = FilePath(sys.executable).asBytesMode().path d = utils.getProcessValue(pyExe, (b"-u", b"-c", source), env=env) d.addCallback(callback) return d
def run(self): """ Start the server and listen on host:port """ f = None unix_prefix = 'unix://' if self.http_enabled: rpc = JsonRpcHttpResource() rpc.rpcprocessor = self.rpcprocessor rpc.tls_client_auth_enabled = self.tls_client_auth_enabled if self.http_basic_auth_enabled: checker = PasswordChecker(self.passwdCheckFunction) realm = HttpPasswordRealm(rpc) p = portal.Portal(realm, [checker]) realm_name = 'Reflect RPC' if sys.version_info.major == 2: realm_name = realm_name.encode('utf-8') credentialFactory = BasicCredentialFactory(realm_name) rpc = HTTPAuthSessionWrapper(p, [credentialFactory]) root = RootResource(rpc) f = server.Site(root) else: f = JsonRpcProtocolFactory(self.rpcprocessor, self.tls_client_auth_enabled) if self.tls_enabled: if not self.tls_client_auth_enabled: reactor.listenSSL(self.port, f, self.cert.options(), interface=self.host) else: reactor.listenSSL(self.port, f, self.cert.options(self.client_auth_ca), interface=self.host) else: if self.host.startswith(unix_prefix): path = self.host[len(unix_prefix):] reactor.listenUNIX(path, f, backlog=self.unix_socket_backlog, mode=self.unix_socket_mode, wantPID=self.unix_socket_want_pid) else: reactor.listenTCP(self.port, f, interface=self.host) if self.host.startswith(unix_prefix): print("Listening on %s" % (self.host)) else: print("Listening on %s:%d" % (self.host, self.port)) reactor.run()