我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用twisted.internet.reactor.spawnProcess()。
def execute(self, args, p, preargs=''): if runtime.platformType == 'win32': raise unittest.SkipTest, "can't run cmdline client on win32" port = self.server.getHost().port cmd = ('-p %i -l testuser ' '--known-hosts kh_test ' '--user-authentications publickey ' '--host-key-algorithms ssh-rsa ' '-a -I ' '-K direct ' '-i dsa_test ' '-v ') % port + preargs + \ ' 127.0.0.1 ' + args cmds = _makeArgs(cmd.split()) log.msg(str(cmds)) env = os.environ.copy() env['PYTHONPATH'] = os.pathsep.join(sys.path) reactor.spawnProcess(p, sys.executable, cmds, env=env) return p.deferred
def _spawnProcess(self, proto, sibling, *args): import twisted subenv = dict(os.environ) subenv['PYTHONPATH'] = os.pathsep.join( [os.path.abspath( os.path.dirname(os.path.dirname(twisted.__file__))), subenv.get('PYTHONPATH', '') ]) return reactor.spawnProcess( proto, sys.executable, [sys.executable, filepath.FilePath(__file__).sibling(sibling).path, reactor.__class__.__module__] + list(args), env=subenv, )
def testStdio(self): """twisted.internet.stdio test.""" exe = sys.executable scriptPath = util.sibpath(__file__, "process_twisted.py") p = Accumulator() d = p.endedDeferred = defer.Deferred() env = {"PYTHONPATH": os.pathsep.join(sys.path)} reactor.spawnProcess(p, exe, [exe, "-u", scriptPath], env=env, path=None, usePTY=self.usePTY) p.transport.write("hello, world") p.transport.write("abc") p.transport.write("123") p.transport.closeStdin() def processEnded(ign): self.assertEquals(p.outF.getvalue(), "hello, worldabc123", "Output follows:\n" "%s\n" "Error message from process_twisted follows:\n" "%s\n" % (p.outF.getvalue(), p.errF.getvalue())) return d.addCallback(processEnded)
def testProcess(self): exe = sys.executable scriptPath = util.sibpath(__file__, "process_tester.py") d = defer.Deferred() p = TestProcessProtocol() p.deferred = d reactor.spawnProcess(p, exe, [exe, "-u", scriptPath], env=None) def check(ignored): self.assertEquals(p.stages, [1, 2, 3, 4, 5]) f = p.reason f.trap(error.ProcessTerminated) self.assertEquals(f.value.exitCode, 23) # would .signal be available on non-posix? # self.assertEquals(f.value.signal, None) try: import process_tester, glob for f in glob.glob(process_tester.test_file_match): os.remove(f) except: pass d.addCallback(check) return d
def testManyProcesses(self): def _check(results, protocols): for p in protocols: self.assertEquals(p.stages, [1, 2, 3, 4, 5], "[%d] stages = %s" % (id(p.transport), str(p.stages))) # test status code f = p.reason f.trap(error.ProcessTerminated) self.assertEquals(f.value.exitCode, 23) exe = sys.executable scriptPath = util.sibpath(__file__, "process_tester.py") args = [exe, "-u", scriptPath] protocols = [] deferreds = [] for i in xrange(50): p = TestManyProcessProtocol() protocols.append(p) reactor.spawnProcess(p, exe, args, env=None) deferreds.append(p.deferred) deferredList = defer.DeferredList(deferreds, consumeErrors=True) deferredList.addCallback(_check, protocols) return deferredList
def testAbnormalTermination(self): if os.path.exists('/bin/false'): cmd = '/bin/false' elif os.path.exists('/usr/bin/false'): cmd = '/usr/bin/false' else: raise RuntimeError("false not found in /bin or /usr/bin") d = defer.Deferred() p = TrivialProcessProtocol(d) reactor.spawnProcess(p, cmd, ['false'], env=None, usePTY=self.usePTY) def check(ignored): p.reason.trap(error.ProcessTerminated) self.assertEquals(p.reason.value.exitCode, 1) self.assertEquals(p.reason.value.signal, None) d.addCallback(check) return d
def testStderr(self): # we assume there is no file named ZZXXX..., both in . and in /tmp if not os.path.exists('/bin/ls'): raise RuntimeError("/bin/ls not found") p = Accumulator() d = p.endedDeferred = defer.Deferred() reactor.spawnProcess(p, '/bin/ls', ["/bin/ls", "ZZXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"], env=None, path="/tmp", usePTY=self.usePTY) def processEnded(ign): self.assertEquals(lsOut, p.errF.getvalue()) return d.addCallback(processEnded)
def testProcess(self): if os.path.exists('/bin/gzip'): cmd = '/bin/gzip' elif os.path.exists('/usr/bin/gzip'): cmd = '/usr/bin/gzip' else: raise RuntimeError("gzip not found in /bin or /usr/bin") s = "there's no place like home!\n" * 3 p = Accumulator() d = p.endedDeferred = defer.Deferred() reactor.spawnProcess(p, cmd, [cmd, "-c"], env=None, path="/tmp", usePTY=self.usePTY) p.transport.write(s) p.transport.closeStdin() def processEnded(ign): f = p.outF f.seek(0, 0) gf = gzip.GzipFile(fileobj=f) self.assertEquals(gf.read(), s) return d.addCallback(processEnded)
def doit(self, fd): p = ClosingPipesProcessProtocol(True) p.deferred.addCallbacks( callback=lambda _: self.fail("I wanted an errback."), errback=self._endProcess, errbackArgs=(p,)) reactor.spawnProcess(p, sys.executable, [sys.executable, '-u', '-c', r'raw_input(); import sys, os; os.write(%d, "foo\n"); sys.exit(42)' % fd], env=None) p.transport.write('go\n') if fd == 1: p.transport.closeStdout() elif fd == 2: p.transport.closeStderr() else: raise RuntimeError # make the buggy case not hang p.transport.closeStdin() return p.deferred
def execute(self, args, p, preargs = ''): cmdline = ('ssh -2 -l testuser -p %i ' '-oUserKnownHostsFile=kh_test ' '-oPasswordAuthentication=no ' # Always use the RSA key, since that's the one in kh_test. '-oHostKeyAlgorithms=ssh-rsa ' '-a ' '-i dsa_test ') + preargs + \ ' 127.0.0.1 ' + args port = self.server.getHost().port ssh_path = None for path in ['/usr', '', '/usr/local']: if os.path.exists(path+'/bin/ssh'): ssh_path = path+'/bin/ssh' break if not ssh_path: log.msg('skipping test, cannot find ssh') raise unittest.SkipTest, 'skipping test, cannot find ssh' cmds = (cmdline % port).split() reactor.spawnProcess(p, ssh_path, cmds) return p.deferred
def execCommand(self, proto, cmd): from twisted.internet import reactor uid, gid = self.avatar.getUserGroupId() homeDir = self.avatar.getHomeDir() shell = self.avatar.getShell() or '/bin/sh' command = (shell, '-c', cmd) peer = self.avatar.conn.transport.transport.getPeer() host = self.avatar.conn.transport.transport.getHost() self.environ['SSH_CLIENT'] = '%s %s %s' % (peer.host, peer.port, host.port) if self.ptyTuple: self.getPtyOwnership() self.pty = reactor.spawnProcess(proto, \ shell, command, self.environ, homeDir, uid, gid, usePTY = self.ptyTuple or 0) if self.ptyTuple: self.addUTMPEntry() if self.modes: self.setModes() # else: # tty.setraw(self.pty.pipes[0].fileno(), tty.TCSANOW) self.avatar.conn.transport.transport.setTcpNoDelay(1)
def _win32_popen(self, args, env, callback_trigger=None): # This is a workaround to prevent Command Prompt windows from opening # when spawning tahoe processes from the GUI on Windows, as Twisted's # reactor.spawnProcess() API does not allow Windows creation flags to # be passed to subprocesses. By passing 0x08000000 (CREATE_NO_WINDOW), # the opening of the Command Prompt window will be surpressed while # still allowing access to stdout/stderr. See: # https://twistedmatrix.com/pipermail/twisted-python/2007-February/014733.html import subprocess proc = subprocess.Popen( args, env=env, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True, creationflags=0x08000000) output = BytesIO() for line in iter(proc.stdout.readline, ''): output.write(line.encode('utf-8')) self.line_received(line.rstrip()) if callback_trigger and callback_trigger in line.rstrip(): return proc.pid proc.poll() if proc.returncode: raise subprocess.CalledProcessError(proc.returncode, args) else: return output.getvalue()
def command(self, args, callback_trigger=None): exe = (self.executable if self.executable else which('tahoe')[0]) args = [exe] + ['-d', self.nodedir] + args env = os.environ env['PYTHONUNBUFFERED'] = '1' if sys.platform == 'win32' and getattr(sys, 'frozen', False): from twisted.internet.threads import deferToThread output = yield deferToThread( self._win32_popen, args, env, callback_trigger) else: protocol = CommandProtocol(self, callback_trigger) reactor.spawnProcess(protocol, exe, args=args, env=env) output = yield protocol.done returnValue(output) #@inlineCallbacks #def start_monitor(self): # furl = os.path.join(self.nodedir, 'private', 'logport.furl') # yield self.command(['debug', 'flogtool', 'tail', furl])
def spawn_process(executable, args, env, outfile=None): """Spawn a process using Twisted reactor. Return a deferred which will be called with process stdout, stderr and exit code. If outfile is provided then the stdout and stderr stings will be empty. Note: this is a variant of landscape.lib.twisted_util.spawn_process that supports streaming to a file. """ list_args = [executable] list_args.extend(args) logging.info("running {!r}".format(" ".join(list_args))) import pprint logging.info("OS env:\n" + pprint.pformat(env)) result = Deferred() if outfile is None: protocol = AllOutputProcessProtocol(result) else: protocol = StreamToFileProcessProtocol(result, outfile) reactor.spawnProcess(protocol, executable, args=list_args, env=env) return result
def spider_start(self, kw): ''' ''' pp = SubProcessProtocol(self, self.factory.spiders, kw) args = [sys.executable] + kw['cmd'].split(' ') reactor.spawnProcess(pp, sys.executable, args=args, env={ 'PYTHONPATH':'/srv/ataobao:$PYTHONPATH', 'ENV':'TEST', }, path='/srv/ataobao/crawler') # for i in range(0, kw["process"]): # pp = SubProcessProtocol(self, self.factory.spiders, kw) # args = [sys.executable, "spider_tx.py", kw["spider"], str(kw["threads"])] # args = [sys.executable, "crawler/worker.py", "-w", "shop", "-p", "10"] # args = [sys.executable, "crawler/tbcat.py", "-c", "16"] # print args # reactor.spawnProcess(pp, sys.executable, args=args)
def test_stdin(self): """ Making sure getPassword accepts a password from standard input by running a child process which uses getPassword to read in a string which it then writes it out again. Write a string to the child process and then read one and make sure it is the right string. """ p = PasswordTestingProcessProtocol() p.finished = Deferred() reactor.spawnProcess( p, pyExe, [pyExe, b'-c', (b'import sys\n' b'from twisted.python.util import getPassword\n' b'sys.stdout.write(getPassword())\n' b'sys.stdout.flush()\n')], env={b'PYTHONPATH': os.pathsep.join(sys.path).encode("utf8")}) def processFinished(result): (reason, output) = result reason.trap(ProcessDone) self.assertIn((1, b'secret'), output) return p.finished.addCallback(processFinished)
def _spawn(script, outputFD): """ Start a script that is a peer of this test as a subprocess. @param script: the module name of the script in this directory (no package prefix, no '.py') @type script: C{str} @rtype: L{StartStopProcessProtocol} """ pyExe = FilePath(sys.executable).asBytesMode().path env = bytesEnviron() env[b"PYTHONPATH"] = FilePath( pathsep.join(sys.path)).asBytesMode().path sspp = StartStopProcessProtocol() reactor.spawnProcess( sspp, pyExe, [ pyExe, FilePath(__file__).sibling(script + ".py").asBytesMode().path, intToBytes(outputFD), ], env=env, childFDs={0: "w", 1: "r", 2: "r", outputFD: outputFD} ) return sspp
def test_useReactorArgument(self): """ L{twcgi.FilteredScript.runProcess} uses the reactor passed as an argument to the constructor. """ class FakeReactor: """ A fake reactor recording whether spawnProcess is called. """ called = False def spawnProcess(self, *args, **kwargs): """ Set the C{called} flag to C{True} if C{spawnProcess} is called. @param args: Positional arguments. @param kwargs: Keyword arguments. """ self.called = True fakeReactor = FakeReactor() request = DummyRequest(['a', 'b']) resource = twcgi.FilteredScript("dummy-file", reactor=fakeReactor) _render(resource, request) self.assertTrue(fakeReactor.called)
def test_twisted(self): """Invoking python -m twisted should execute twist.""" cmd = sys.executable p = Accumulator() d = p.endedDeferred = defer.Deferred() reactor.spawnProcess(p, cmd, [cmd, '-m', 'twisted', '--help'], env=None) p.transport.closeStdin() def processEnded(ign): f = p.outF output = f.getvalue().replace(b'\r\n', b'\n') options = TwistOptions() message = '{}\n'.format(options).encode('utf-8') self.assertEqual(output, message) return d.addCallback(processEnded)
def test_stdio(self): """ L{twisted.internet.stdio} test. """ scriptPath = b"twisted.test.process_twisted" p = Accumulator() d = p.endedDeferred = defer.Deferred() reactor.spawnProcess(p, pyExe, [pyExe, b'-u', b"-m", scriptPath], env=properEnv, path=None, usePTY=self.usePTY) p.transport.write(b"hello, world") p.transport.write(b"abc") p.transport.write(b"123") p.transport.closeStdin() def processEnded(ign): self.assertEqual(p.outF.getvalue(), b"hello, worldabc123", "Output follows:\n" "%s\n" "Error message from process_twisted follows:\n" "%s\n" % (p.outF.getvalue(), p.errF.getvalue())) return d.addCallback(processEnded)
def test_unsetPid(self): """ Test if pid is None/non-None before/after process termination. This reuses process_echoer.py to get a process that blocks on stdin. """ finished = defer.Deferred() p = TrivialProcessProtocol(finished) scriptPath = b"twisted.test.process_echoer" procTrans = reactor.spawnProcess(p, pyExe, [pyExe, b'-u', b"-m", scriptPath], env=properEnv) self.assertTrue(procTrans.pid) def afterProcessEnd(ignored): self.assertIsNone(procTrans.pid) p.transport.closeStdin() return finished.addCallback(afterProcessEnd)
def test_echo(self): """ A spawning a subprocess which echoes its stdin to its stdout via L{IReactorProcess.spawnProcess} will result in that echoed output being delivered to outReceived. """ finished = defer.Deferred() p = EchoProtocol(finished) scriptPath = b"twisted.test.process_echoer" reactor.spawnProcess(p, pyExe, [pyExe, b'-u', b"-m", scriptPath], env=properEnv) def asserts(ignored): self.assertFalse(p.failure, p.failure) self.assertTrue(hasattr(p, 'buffer')) self.assertEqual(len(p.buffer), len(p.s * p.n)) def takedownProcess(err): p.transport.closeStdin() return err return finished.addCallback(asserts).addErrback(takedownProcess)
def test_abnormalTermination(self): """ When a process terminates with a system exit code set to 1, C{processEnded} is called with a L{error.ProcessTerminated} error, the C{exitCode} attribute reflecting the system exit code. """ d = defer.Deferred() p = TrivialProcessProtocol(d) reactor.spawnProcess(p, pyExe, [pyExe, b'-c', b'import sys; sys.exit(1)'], env=None, usePTY=self.usePTY) def check(ignored): p.reason.trap(error.ProcessTerminated) self.assertEqual(p.reason.value.exitCode, 1) self.assertIsNone(p.reason.value.signal) d.addCallback(check) return d
def test_executionError(self): """ Raise an error during execvpe to check error management. """ cmd = self.getCommand('false') d = defer.Deferred() p = TrivialProcessProtocol(d) def buggyexecvpe(command, args, environment): raise RuntimeError("Ouch") oldexecvpe = os.execvpe os.execvpe = buggyexecvpe try: reactor.spawnProcess(p, cmd, [b'false'], env=None, usePTY=self.usePTY) def check(ignored): errData = b"".join(p.errData + p.outData) self.assertIn(b"Upon execvpe", errData) self.assertIn(b"Ouch", errData) d.addCallback(check) finally: os.execvpe = oldexecvpe return d
def _mockForkInParentTest(self): """ Assert that in the main process, spawnProcess disables the garbage collector, calls fork, closes the pipe file descriptors it created for the child process, and calls waitpid. """ self.mockos.child = False cmd = b'/mock/ouch' d = defer.Deferred() p = TrivialProcessProtocol(d) reactor.spawnProcess(p, cmd, [b'ouch'], env=None, usePTY=False) # It should close the first read pipe, and the 2 last write pipes self.assertEqual(set(self.mockos.closed), set([-1, -4, -6])) self.assertEqual(self.mockos.actions, [("fork", False), "waitpid"])
def test_mockForkErrorGivenFDs(self): """ When C{os.forks} raises an exception and that file descriptors have been specified with the C{childFDs} arguments of L{reactor.spawnProcess}, they are not closed. """ self.mockos.raiseFork = OSError(errno.EAGAIN, None) protocol = TrivialProcessProtocol(None) self.assertRaises(OSError, reactor.spawnProcess, protocol, None, childFDs={0: -10, 1: -11, 2: -13}) self.assertEqual(self.mockos.actions, [("fork", False)]) self.assertEqual(self.mockos.closed, []) # We can also put "r" or "w" to let twisted create the pipes self.assertRaises(OSError, reactor.spawnProcess, protocol, None, childFDs={0: "r", 1: -11, 2: -13}) self.assertEqual(set(self.mockos.closed), set([-1, -2]))
def test_mockPTYSetUid(self): """ Try creating a PTY process with setting its uid: it's almost the same path as the standard path, but with a C{switchUID} call before the exec. """ cmd = b'/mock/ouch' d = defer.Deferred() p = TrivialProcessProtocol(d) try: reactor.spawnProcess(p, cmd, [b'ouch'], env=None, usePTY=True, uid=8081) except SystemError: self.assertTrue(self.mockos.exited) self.assertEqual( self.mockos.actions, [('fork', False), 'setsid', ('setuid', 0), ('setgid', 0), ('switchuid', 8081, 1234), 'exec', ('exit', 1)]) else: self.fail("Should not be here")
def test_mockPTYSetUidInParent(self): """ When spawning a child process with PTY and a UID different from the UID of the current process, the current process does not have its UID changed. """ self.mockos.child = False cmd = b'/mock/ouch' d = defer.Deferred() p = TrivialProcessProtocol(d) oldPTYProcess = process.PTYProcess try: process.PTYProcess = DumbPTYProcess reactor.spawnProcess(p, cmd, [b'ouch'], env=None, usePTY=True, uid=8080) finally: process.PTYProcess = oldPTYProcess self.assertEqual(self.mockos.actions, [('fork', False), 'waitpid'])
def test_mockWithWaitError(self): """ Test that reapProcess logs errors raised. """ self.mockos.child = False cmd = b'/mock/ouch' self.mockos.waitChild = (0, 0) d = defer.Deferred() p = TrivialProcessProtocol(d) proc = reactor.spawnProcess(p, cmd, [b'ouch'], env=None, usePTY=False) self.assertEqual(self.mockos.actions, [("fork", False), "waitpid"]) self.mockos.raiseWaitPid = OSError() proc.reapProcess() errors = self.flushLoggedErrors() self.assertEqual(len(errors), 1) errors[0].trap(OSError)
def test_mockErrorECHILDInReapProcess(self): """ Test that reapProcess doesn't log anything when waitpid raises a C{OSError} with errno C{ECHILD}. """ self.mockos.child = False cmd = b'/mock/ouch' self.mockos.waitChild = (0, 0) d = defer.Deferred() p = TrivialProcessProtocol(d) proc = reactor.spawnProcess(p, cmd, [b'ouch'], env=None, usePTY=False) self.assertEqual(self.mockos.actions, [("fork", False), "waitpid"]) self.mockos.raiseWaitPid = OSError() self.mockos.raiseWaitPid.errno = errno.ECHILD # This should not produce any errors proc.reapProcess()
def test_stderr(self): """ Bytes written to stderr by the spawned process are passed to the C{errReceived} callback on the C{ProcessProtocol} passed to C{spawnProcess}. """ value = "42" p = Accumulator() d = p.endedDeferred = defer.Deferred() reactor.spawnProcess(p, pyExe, [pyExe, b"-c", networkString("import sys; sys.stderr.write" "('{0}')".format(value))], env=None, path="/tmp", usePTY=self.usePTY) def processEnded(ign): self.assertEqual(b"42", p.errF.getvalue()) return d.addCallback(processEnded)
def test_openingTTY(self): scriptPath = b"twisted.test.process_tty" p = Accumulator() d = p.endedDeferred = defer.Deferred() reactor.spawnProcess(p, pyExe, [pyExe, b"-u", b"-m", scriptPath], env=properEnv, usePTY=self.usePTY) p.transport.write(b"hello world!\n") def processEnded(ign): self.assertRaises( error.ProcessExitedAlready, p.transport.signalProcess, 'HUP') self.assertEqual( p.outF.getvalue(), b"hello world!\r\nhello world!\r\n", ("Error message from process_tty " "follows:\n\n%s\n\n" % (p.outF.getvalue(),))) return d.addCallback(processEnded)
def test_encodableUnicodeEnvironment(self): """ Test C{os.environ} (inherited by every subprocess on Windows) that contains an ascii-encodable Unicode string. This is different from passing Unicode environment explicitly to spawnProcess (which is not supported on Python 2). """ os.environ[self.goodKey] = self.goodValue self.addCleanup(operator.delitem, os.environ, self.goodKey) p = GetEnvironmentDictionary.run(reactor, [], properEnv) def gotEnvironment(environ): self.assertEqual( environ[self.goodKey.encode('ascii')], self.goodValue.encode('ascii')) return p.getResult().addCallback(gotEnvironment)
def launchWorkerProcesses(self, spawner, protocols, arguments): """ Spawn processes from a list of process protocols. @param spawner: A C{IReactorProcess.spawnProcess} implementation. @param protocols: An iterable of C{ProcessProtocol} instances. @param arguments: Extra arguments passed to the processes. """ workertrialPath = theSystemPath[ 'twisted.trial._dist.workertrial'].filePath.path childFDs = {0: 'w', 1: 'r', 2: 'r', _WORKER_AMP_STDIN: 'w', _WORKER_AMP_STDOUT: 'r'} environ = os.environ.copy() # Add an environment variable containing the raw sys.path, to be used by # subprocesses to make sure it's identical to the parent. See # workertrial._setupPath. environ['TRIAL_PYTHONPATH'] = os.pathsep.join(sys.path) for worker in protocols: args = [sys.executable, workertrialPath] args.extend(arguments) spawner(worker, sys.executable, args=args, childFDs=childFDs, env=environ)
def fork(executable, args=(), env={}, path=None, timeout=3600): """fork Provides a deferred wrapper function with a timeout function :param executable: Executable :type executable: str. :param args: Tupple of arguments :type args: tupple. :param env: Environment dictionary :type env: dict. :param timeout: Kill the child process if timeout is exceeded :type timeout: int. """ de = defer.Deferred() proc = ProcessProtocol(de, timeout) reactor.spawnProcess(proc, executable, (executable,)+tuple(args), env, path) return de
def test_run_script(self): """ We run the script specified in the usage options and take whatever is printed to stdout as the results of the test. """ processProtocol = ScriptProcessProtocol(self) interpreter = self.localOptions['interpreter'] if not which(interpreter): log.err('Unable to find %s executable in PATH.' % interpreter) return reactor.spawnProcess(processProtocol, interpreter, args=[interpreter, self.localOptions['script']], env={'HOME': os.environ['HOME']}, usePTY=True) if not reactor.running: reactor.run() return processProtocol.deferred
def spawnProcessAndNullifyStdout(protocol, args): """"Utility function to spawn a process and redirect stdout to /dev/null. Spawns the process with the specified `protocol` in the reactor, with the specified list of binary `args`. """ # Using childFDs we arrange for the child's stdout to go to /dev/null # and for stderr to be read asynchronously by the reactor. with open(os.devnull, "r+b") as devnull: # This file descriptor to /dev/null will be closed before the # spawned process finishes, but will remain open in the spawned # process; that's the Magic Of UNIX™. reactor.spawnProcess( protocol, args[0], args, childFDs={ 0: devnull.fileno(), 1: devnull.fileno(), 2: 'r' }, env=select_c_utf8_bytes_locale())
def test__protocol_logs_stderr(self): logger = self.useFixture(TwistedLoggerFixture()) ifname = factory.make_name('eth') service = NeighbourDiscoveryService(ifname, lambda _: None) protocol = service.createProcessProtocol() reactor.spawnProcess(protocol, b"sh", (b"sh", b"-c", b"exec cat >&2")) protocol.transport.write( b"Lines written to stderr are logged\n" b"with a prefix, with no exceptions.\n") protocol.transport.closeStdin() yield protocol.done self.assertThat(logger.output, Equals( "observe-arp[%s]: Lines written to stderr are logged\n" "---\n" "observe-arp[%s]: with a prefix, with no exceptions." % (ifname, ifname)))
def test__protocol_logs_stderr(self): logger = self.useFixture(TwistedLoggerFixture()) ifname = factory.make_name('eth') service = BeaconingService(ifname, lambda _: None) protocol = service.createProcessProtocol() reactor.spawnProcess(protocol, b"sh", (b"sh", b"-c", b"exec cat >&2")) protocol.transport.write( b"Lines written to stderr are logged\n" b"with a prefix, with no exceptions.\n") protocol.transport.closeStdin() yield protocol.done self.assertThat(logger.output, Equals( "observe-beacons[%s]: Lines written to stderr are logged\n" "---\n" "observe-beacons[%s]: with a prefix, with no exceptions." % (ifname, ifname)))
def setUp(self): # A memory-only terminal emulator, into which the server will # write things and make other state changes. What ends up # here is basically what a user would have seen on their # screen. testTerminal = NotifyingExpectableBuffer() # An insults client protocol which will translate bytes # received from the child process into keystroke commands for # an ITerminalProtocol. insultsClient = insults.ClientProtocol(lambda: testTerminal) # A process protocol which will translate stdout and stderr # received from the child process to dataReceived calls and # error reporting on an insults client protocol. processClient = stdio.TerminalProcessProtocol(insultsClient) # Run twisted/conch/stdio.py with the name of a class # implementing ITerminalProtocol. This class will be used to # handle bytes we send to the child process. exe = sys.executable module = stdio.__file__ args = ["python2.3", module, reflect.qual(self.serverProtocol)] env = {"PYTHONPATH": os.pathsep.join(sys.path)} from twisted.internet import reactor clientTransport = reactor.spawnProcess(processClient, exe, args, env=env, usePTY=True) self.recvlineClient = self.testTerminal = testTerminal self.processClient = processClient self.clientTransport = clientTransport # Wait for the process protocol and test terminal to become # connected before proceeding. The former should always # happen first, but it doesn't hurt to be safe. return defer.gatherResults(filter(None, [ processClient.onConnection, testTerminal.expect(">>> ")]))
def serviceStarted(self): if self.spawn: env = os.environ.copy() env['PYTHONPATH'] = os.pathsep.join(sys.path) reactor.callLater(0,reactor.spawnProcess, env=env, *self.spawn) self.connected = 1