我们从Python开源项目中,提取了以下28个代码示例,用于说明如何使用twisted.internet.protocol.ProcessProtocol()。
def test_requestExec(self): """ When a client requests a command, the SSHSession object should get the command by getting an ISession adapter for the avatar, then calling execCommand with a ProcessProtocol to attach and the command line. """ ret = self.session.requestReceived(b'exec', common.NS(b'failure')) self.assertFalse(ret) self.assertRequestRaisedRuntimeError() self.assertIsNone(self.session.client) self.assertTrue(self.session.requestReceived(b'exec', common.NS(b'success'))) self.assertSessionIsStubSession() self.assertIsInstance(self.session.client, session.SSHSessionProcessProtocol) self.assertIs(self.session.session.execProtocol, self.session.client) self.assertEqual(self.session.session.execCommandLine, b'success')
def test_stderr(self): """ Bytes written to stderr by the spawned process are passed to the C{errReceived} callback on the C{ProcessProtocol} passed to C{spawnProcess}. """ value = "42" p = Accumulator() d = p.endedDeferred = defer.Deferred() reactor.spawnProcess(p, pyExe, [pyExe, b"-c", networkString("import sys; sys.stderr.write" "('{0}')".format(value))], env=None, path="/tmp", usePTY=self.usePTY) def processEnded(ign): self.assertEqual(b"42", p.errF.getvalue()) return d.addCallback(processEnded)
def test_stdout(self): """ ProcessProtocol.transport.closeStdout actually closes the pipe. """ d = self.doit(1) def _check(errput): if _PY3: if runtime.platform.isWindows(): self.assertIn(b"OSError", errput) self.assertIn(b"22", errput) else: self.assertIn(b'BrokenPipeError', errput) else: self.assertIn(b'OSError', errput) if runtime.platform.getType() != 'win32': self.assertIn(b'Broken pipe', errput) d.addCallback(_check) return d
def test_launchWorkerProcesses(self): """ Given a C{spawnProcess} function, C{launchWorkerProcess} launches a python process with an existing path as its argument. """ protocols = [ProcessProtocol() for i in range(4)] arguments = [] environment = {} def fakeSpawnProcess(processProtocol, executable, args=(), env={}, path=None, uid=None, gid=None, usePTY=0, childFDs=None): arguments.append(executable) arguments.extend(args) environment.update(env) self.runner.launchWorkerProcesses( fakeSpawnProcess, protocols, ["foo"]) self.assertEqual(arguments[0], arguments[1]) self.assertTrue(os.path.exists(arguments[2])) self.assertEqual("foo", arguments[3]) self.assertEqual(os.pathsep.join(sys.path), environment["TRIAL_PYTHONPATH"])
def fork(executable, args=(), env={}, path=None, timeout=3600): """fork Provides a deferred wrapper function with a timeout function :param executable: Executable :type executable: str. :param args: Tupple of arguments :type args: tupple. :param env: Environment dictionary :type env: dict. :param timeout: Kill the child process if timeout is exceeded :type timeout: int. """ de = defer.Deferred() proc = ProcessProtocol(de, timeout) reactor.spawnProcess(proc, executable, (executable,)+tuple(args), env, path) return de
def test_stdout(self): """ProcessProtocol.transport.closeStdout actually closes the pipe.""" d = self.doit(1) def _check(errput): self.failIfEqual(errput.find('OSError'), -1) if runtime.platform.getType() != 'win32': self.failIfEqual(errput.find('Broken pipe'), -1) d.addCallback(_check) return d
def test_stderr(self): """ProcessProtocol.transport.closeStderr actually closes the pipe.""" d = self.doit(2) def _check(errput): # there should be no stderr open, so nothing for it to # write the error to. self.failUnlessEqual(errput, '') d.addCallback(_check) return d
def childDataReceived(self, childFD, data): #self.sptl.monitor try: for pid, sid in self.sptl.monitor_ps.iteritems(): print pid, sid if pid == self.pid: self.sptl.publish("psmonitor", data, eligible=[sid]) except Exception, e: print e protocol.ProcessProtocol.childDataReceived(self, childFD, data)
def connectionMade(self): """ProcessProtocol override""" if not self.launched.called: self.launched.callback(self)
def outReceived(self, data): """ProcessProtocol override""" self._out += data.decode('utf8') while '\n' in self._out: idx = self._out.find('\n') line = self._out[:idx] self._out = self._out[idx + 1:] sys.stdout.write(self.prefix + self.color + line + Fore.RESET + '\n')
def errReceived(self, data): """ProcessProtocol override""" self._err += data.decode('utf8') while '\n' in self._err: idx = self._err.find('\n') line = self._err[:idx] self._err = self._err[idx + 1:] sys.stderr.write(self.prefix + self.color + line + Fore.RESET + '\n')
def __init__(self, protocol, data): """ @type protocol: L{ConchTestForwardingProcess} @param protocol: The L{ProcessProtocol} which made this connection. @type data: str @param data: The data to be sent to the third-party server. """ self.protocol = protocol self.data = data
def __init__(self, processProtocol): """ Initialize our instance variables. @param processProtocol: a C{ProcessProtocol} to connect to ourself. """ self.proto = processProtocol self.closed = False self.data = b'' processProtocol.makeConnection(self)
def write(self, data): """ We got some data. Give it back to our C{ProcessProtocol} with a newline attached. Disconnect if there's a null byte. """ self.data += data self.proto.outReceived(data) self.proto.outReceived(b'\r\n') if b'\x00' in data: # mimic 'exit' for the shell test self.loseConnection()
def loseConnection(self): """ If we're asked to disconnect (and we haven't already) shut down the C{ProcessProtocol} with a 0 exit code. """ if self.closed: return self.closed = 1 self.proto.inConnectionLost() self.proto.outConnectionLost() self.proto.errConnectionLost() self.proto.processEnded(failure.Failure( error.ProcessTerminated(0, None, None)))
def test_lookupSubsystem(self): """ When a client requests a subsystem, the SSHSession object should get the subsystem by calling avatar.lookupSubsystem, and attach it as the client. """ ret = self.session.requestReceived( b'subsystem', common.NS(b'TestSubsystem') + b'data') self.assertTrue(ret) self.assertIsInstance(self.session.client, protocol.ProcessProtocol) self.assertIs(self.session.client.transport.proto, self.session.avatar.subsystem)
def test_noCompatibilityLayer(self): """ If no compatibility layer is present, imports of gobject and friends are disallowed. We do this by running a process where we make sure gi.pygtkcompat isn't present. """ if _PY3: raise SkipTest("Python3 always has the compatibility layer.") from twisted.internet import reactor if not IReactorProcess.providedBy(reactor): raise SkipTest("No process support available in this reactor.") result = Deferred() class Stdout(ProcessProtocol): data = b"" def errReceived(self, err): print(err) def outReceived(self, data): self.data += data def processExited(self, reason): result.callback(self.data) path = FilePath(__file__).sibling(b"process_gireactornocompat.py").path pyExe = FilePath(sys.executable)._asBytesPath() # Pass in a PYTHONPATH that is the test runner's os.path, to make sure # we're running from a checkout reactor.spawnProcess(Stdout(), pyExe, [pyExe, path], env={"PYTHONPATH": ":".join(sys.path)}) result.addCallback(self.assertEqual, b"success") return result
def _spawnProcess(self, proto, sibling, *args, **kw): """ Launch a child Python process and communicate with it using the given ProcessProtocol. @param proto: A L{ProcessProtocol} instance which will be connected to the child process. @param sibling: The basename of a file containing the Python program to run in the child process. @param *args: strings which will be passed to the child process on the command line as C{argv[2:]}. @param **kw: additional arguments to pass to L{reactor.spawnProcess}. @return: The L{IProcessTransport} provider for the spawned process. """ args = [sys.executable, b"-m", b"twisted.test." + sibling, reactor.__class__.__module__] + list(args) return reactor.spawnProcess( proto, sys.executable, args, env=properEnv, **kw)
def test_interface(self): """ L{ProcessProtocol} implements L{IProcessProtocol}. """ verifyObject(interfaces.IProcessProtocol, protocol.ProcessProtocol())
def test_outReceived(self): """ Verify that when stdout is delivered to L{ProcessProtocol.childDataReceived}, it is forwarded to L{ProcessProtocol.outReceived}. """ received = [] class OutProtocol(StubProcessProtocol): def outReceived(self, data): received.append(data) bytesToSend = b"bytes" p = OutProtocol() p.childDataReceived(1, bytesToSend) self.assertEqual(received, [bytesToSend])
def makeDeferredWithProcessProtocol(): """Returns a (`Deferred`, `ProcessProtocol`) tuple. The Deferred's `callback()` will be called (with None) if the `ProcessProtocol` is called back indicating that no error occurred. Its `errback()` will be called with the `Failure` reason otherwise. """ done = Deferred() protocol = ProcessProtocol() # Call the errback if the "failure" object indicates a non-zero exit. protocol.processEnded = lambda reason: ( done.errback(reason) if (reason and not reason.check(ProcessDone)) else done.callback(None)) return done, protocol
def test_wrongArguments(self): """ Test invalid arguments to spawnProcess: arguments and environment must only contains string or unicode, and not null bytes. """ exe = sys.executable p = protocol.ProcessProtocol() badEnvs = [ {"foo": 2}, {"foo": "egg\0a"}, {3: "bar"}, {"bar\0foo": "bar"}] badArgs = [ [exe, 2], "spam", [exe, "foo\0bar"]] # Sanity check - this will fail for people who have mucked with # their site configuration in a stupid way, but there's nothing we # can do about that. badUnicode = u'\N{SNOWMAN}' try: badUnicode.encode(sys.getdefaultencoding()) except UnicodeEncodeError: # Okay, that unicode doesn't encode, put it in as a bad environment # key. badEnvs.append({badUnicode: 'value for bad unicode key'}) badEnvs.append({'key for bad unicode value': badUnicode}) badArgs.append([exe, badUnicode]) else: # It _did_ encode. Most likely, Gtk2 is being used and the # default system encoding is UTF-8, which can encode anything. # In any case, if implicit unicode -> str conversion works for # that string, we can't test that TypeError gets raised instead, # so just leave it off. pass for env in badEnvs: self.assertRaises( TypeError, reactor.spawnProcess, p, exe, [exe, "-c", ""], env=env) for args in badArgs: self.assertRaises( TypeError, reactor.spawnProcess, p, exe, args, env=None) # Use upper-case so that the environment key test uses an upper case # name: some versions of Windows only support upper case environment # variable names, and I think Python (as of 2.5) doesn't use the right # syscall for lowercase or mixed case names to work anyway.
def test_wrongArguments(self): """ Test invalid arguments to spawnProcess: arguments and environment must only contains string or unicode, and not null bytes. """ p = protocol.ProcessProtocol() badEnvs = [ {b"foo": 2}, {b"foo": b"egg\0a"}, {3: b"bar"}, {b"bar\0foo": b"bar"}] badArgs = [ [pyExe, 2], b"spam", [pyExe, b"foo\0bar"]] # Sanity check - this will fail for people who have mucked with # their site configuration in a stupid way, but there's nothing we # can do about that. badUnicode = u'\N{SNOWMAN}' try: badUnicode.encode(sys.getfilesystemencoding()) except UnicodeEncodeError: # Okay, that unicode doesn't encode, put it in as a bad environment # key. badEnvs.append({badUnicode: 'value for bad unicode key'}) badEnvs.append({'key for bad unicode value': badUnicode}) badArgs.append([pyExe, badUnicode]) else: # It _did_ encode. Most likely, Gtk2 is being used and the # default system encoding is UTF-8, which can encode anything. # In any case, if implicit unicode -> str conversion works for # that string, we can't test that TypeError gets raised instead, # so just leave it off. pass for env in badEnvs: self.assertRaises( TypeError, reactor.spawnProcess, p, pyExe, [pyExe, b"-c", b""], env=env) for args in badArgs: self.assertRaises( TypeError, reactor.spawnProcess, p, pyExe, args, env=None)
def test_errorInProcessEnded(self): """ The handler which reaps a process is removed when the process is reaped, even if the protocol's C{processEnded} method raises an exception. """ connected = defer.Deferred() ended = defer.Deferred() # This script runs until we disconnect its transport. scriptPath = b"twisted.test.process_echoer" class ErrorInProcessEnded(protocol.ProcessProtocol): """ A protocol that raises an error in C{processEnded}. """ def makeConnection(self, transport): connected.callback(transport) def processEnded(self, reason): reactor.callLater(0, ended.callback, None) raise RuntimeError("Deliberate error") # Launch the process. reactor.spawnProcess( ErrorInProcessEnded(), pyExe, [pyExe, b"-u", b"-m", scriptPath], env=properEnv, path=None) pid = [] def cbConnected(transport): pid.append(transport.pid) # There's now a reap process handler registered. self.assertIn(transport.pid, process.reapProcessHandlers) # Kill the process cleanly, triggering an error in the protocol. transport.loseConnection() connected.addCallback(cbConnected) def checkTerminated(ignored): # The exception was logged. excs = self.flushLoggedErrors(RuntimeError) self.assertEqual(len(excs), 1) # The process is no longer scheduled for reaping. self.assertNotIn(pid[0], process.reapProcessHandlers) ended.addCallback(checkTerminated) return ended
def test_closeHandles(self): """ The win32 handles should be properly closed when the process exits. """ import win32api connected = defer.Deferred() ended = defer.Deferred() class SimpleProtocol(protocol.ProcessProtocol): """ A protocol that fires deferreds when connected and disconnected. """ def makeConnection(self, transport): connected.callback(transport) def processEnded(self, reason): ended.callback(None) p = SimpleProtocol() pyArgs = [pyExe, b"-u", b"-c", b"print('hello')"] proc = reactor.spawnProcess(p, pyExe, pyArgs) def cbConnected(transport): self.assertIs(transport, proc) # perform a basic validity test on the handles win32api.GetHandleInformation(proc.hProcess) win32api.GetHandleInformation(proc.hThread) # And save their values for later self.hProcess = proc.hProcess self.hThread = proc.hThread connected.addCallback(cbConnected) def checkTerminated(ignored): # The attributes on the process object must be reset... self.assertIsNone(proc.pid) self.assertIsNone(proc.hProcess) self.assertIsNone(proc.hThread) # ...and the handles must be closed. self.assertRaises(win32api.error, win32api.GetHandleInformation, self.hProcess) self.assertRaises(win32api.error, win32api.GetHandleInformation, self.hThread) ended.addCallback(checkTerminated) return defer.gatherResults([connected, ended])