我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用sys.__stderr__()。
def doItConsolicious(opt): # reclaim stdout/stderr from log sys.stdout = sys.__stdout__ sys.stderr = sys.__stderr__ if opt['zipfile']: print 'Unpacking documentation...' for n in zipstream.unzipIter(opt['zipfile'], opt['ziptargetdir']): if n % 100 == 0: print n, if n % 1000 == 0: print print 'Done unpacking.' if opt['compiledir']: print 'Compiling to pyc...' import compileall compileall.compile_dir(opt["compiledir"]) print 'Done compiling.'
def get_terminal_size(fallback=(80, 24)): """ Return tuple containing columns and rows of controlling terminal, trying harder than shutil.get_terminal_size to find a tty before returning fallback. Theoretically, stdout, stderr, and stdin could all be different ttys that could cause us to get the wrong measurements (instead of using the fallback) but the much more common case is that IO is piped. """ for stream in [sys.__stdout__, sys.__stderr__, sys.__stdin__]: try: # Make WINSIZE call to terminal data = fcntl.ioctl(stream.fileno(), TIOCGWINSZ, b"\x00\x00\00\x00") except (IOError, OSError): pass else: # Unpack two shorts from ioctl call lines, columns = struct.unpack("hh", data) break else: columns, lines = fallback return columns, lines
def become_daemon(self, root_dir='/'): if os.fork() != 0: # launch child and ... os._exit(0) # kill off parent os.setsid() os.chdir(root_dir) os.umask(0) if os.fork() != 0: # fork again so we are not a session leader os._exit(0) sys.stdin.close() sys.__stdin__ = sys.stdin sys.stdout.close() sys.stdout = sys.__stdout__ = _NullDevice() sys.stderr.close() sys.stderr = sys.__stderr__ = _NullDevice() for fd in range(1024): try: os.close(fd) except OSError: pass
def do_complete(self, code, cursor_pos): #print('completing on', repr(code), file=sys.__stderr__) code = code[:cursor_pos] m = re.search(r'(\w+\.)*(\w+)?$', code) if m: prefix = m.group() #print('prefix', repr(prefix), file=sys.__stderr__) if '.' in prefix: obj, prefix = prefix.rsplit('.') names = self._eval('dir({})'.format(obj)) else: names = self._eval('dir()') #print('names', names, file=sys.__stderr__) matches = [n for n in names if n.startswith(prefix)] return {'matches': matches, 'cursor_start': cursor_pos - len(prefix), 'cursor_end': cursor_pos, 'metadata': {}, 'status': 'ok'} else: return {'matches': [], 'cursor_start': cursor_pos, 'cursor_end': cursor_pos, 'metadata': {}, 'status': 'ok'}
def handle_error(self, request, client_address): """Override TCPServer method Error message goes to __stderr__. No error message if exiting normally or socket raised EOF. Other exceptions not handled in server code will cause os._exit. """ try: raise except SystemExit: raise except: erf = sys.__stderr__ print('\n' + '-'*40, file=erf) print('Unhandled server exception!', file=erf) print('Thread: %s' % threading.current_thread().name, file=erf) print('Client Address: ', client_address, file=erf) print('Request: ', repr(request), file=erf) traceback.print_exc(file=erf) print('\n*** Unrecoverable, server exiting!', file=erf) print('-'*40, file=erf) os._exit(0) #----------------- end class RPCServer --------------------
def putmessage(self, message): self.debug("putmessage:%d:" % message[0]) try: s = pickle.dumps(message) except pickle.PicklingError: print("Cannot pickle:", repr(message), file=sys.__stderr__) raise s = struct.pack("<i", len(s)) + s while len(s) > 0: try: r, w, x = select.select([], [self.sock], []) n = self.sock.send(s[:BUFSIZE]) except (AttributeError, TypeError): raise IOError("socket no longer exists") except socket.error: raise else: s = s[n:]
def quit(self, *args): # restore stdout before signaling the run thread to exit! # it can get stuck in trying to dump to the redirected text message area sys.stdout = sys.__stdout__ sys.stderr = sys.__stderr__ try: self.display.quit() except: pass self.root.destroy() print "* All done!" # sys.exit() # force quit! return # ----------------------------------------------------------
def idle_showwarning( message, category, filename, lineno, file=None, line=None): """Show Idle-format warning (after replacing warnings.showwarning). The differences are the formatter called, the file=None replacement, which can be None, the capture of the consequence AttributeError, and the output of a hard-coded prompt. """ if file is None: file = warning_stream try: file.write(idle_formatwarning( message, category, filename, lineno, line=line)) file.write(">>> ") except (AttributeError, IOError): pass # if file (probably __stderr__) is invalid, skip warning.
def manage_socket(address): for i in range(3): time.sleep(i) try: server = MyRPCServer(address, MyHandler) break except socket.error as err: print>>sys.__stderr__,"IDLE Subprocess: socket error: "\ + err.args[1] + ", retrying...." else: print>>sys.__stderr__, "IDLE Subprocess: Connection to "\ "IDLE GUI failed, exiting." show_socket_error(err, address) global exit_now exit_now = True return server.handle_request() # A single request only
def handle_error(self, request, client_address): """Override TCPServer method Error message goes to __stderr__. No error message if exiting normally or socket raised EOF. Other exceptions not handled in server code will cause os._exit. """ try: raise except SystemExit: raise except: erf = sys.__stderr__ print>>erf, '\n' + '-'*40 print>>erf, 'Unhandled server exception!' print>>erf, 'Thread: %s' % threading.currentThread().getName() print>>erf, 'Client Address: ', client_address print>>erf, 'Request: ', repr(request) traceback.print_exc(file=erf) print>>erf, '\n*** Unrecoverable, server exiting!' print>>erf, '-'*40 os._exit(0) #----------------- end class RPCServer --------------------
def set_exception(self, exception): " Mark the future done and set an exception. " if self._done: raise InvalidStateError('Future result has been set already.') self._exception = exception self._done = True if self.done_callbacks: self._call_callbacks() else: # When an exception is set on a 'Future' object, but there # is no callback set to handle it, print the exception. # -- Uncomment for debugging. -- # import traceback, sys # print(''.join(traceback.format_stack()), file=sys.__stderr__) # print('Uncollected error: %r' % (exception, ), file=sys.__stderr__) pass
def _rmtree(path): def _rmtree_inner(path): for name in os.listdir(path): fullname = os.path.join(path, name) try: mode = os.lstat(fullname).st_mode except OSError as exc: print("support.rmtree(): os.lstat(%r) failed with %s" % (fullname, exc), file=sys.__stderr__) mode = 0 if stat.S_ISDIR(mode): _waitfor(_rmtree_inner, fullname, waitall=True) os.rmdir(fullname) else: os.unlink(fullname) _waitfor(_rmtree_inner, path, waitall=True) _waitfor(os.rmdir, path)
def test_is_enabled(self): orig_stderr = sys.stderr try: # regrtest may replace sys.stderr by io.StringIO object, but # faulthandler.enable() requires that sys.stderr has a fileno() # method sys.stderr = sys.__stderr__ was_enabled = faulthandler.is_enabled() try: faulthandler.enable() self.assertTrue(faulthandler.is_enabled()) faulthandler.disable() self.assertFalse(faulthandler.is_enabled()) finally: if was_enabled: faulthandler.enable() else: faulthandler.disable() finally: sys.stderr = orig_stderr
def __init__(self, _path='log.log', _mode='w', _print=True, _size=50): '''???? @param _path : Log????? @type _path : <str> @param _mode : Log?????????'w'???????????'a+'??????log?size?? @type _mode : <str> @param _print : ??????? @type _print : <bool> @param _size : ??Log????????: MB???50MB????????Log????????Log??????????????????????????Log???? @type _size : <int> ''' self._path = os.path.abspath(_path) self._mode = _mode self._print = _print self._size = _size if os.path.exists(self._path) and os.path.getsize(self._path) / 1024 / 1024 > self._size: os.remove(self._path) self._log_io = file(self._path, self._mode) sys.stdout = __StdToIO__(sys.__stdout__, self._log_io, self._print) sys.stderr = __StdToIO__(sys.__stderr__, self._log_io, self._print)
def __enter__(self): self.sys = sys # save previous stdout/stderr self.saved_streams = saved_streams = sys.__stdout__, sys.__stderr__ self.fds = fds = [s.fileno() for s in saved_streams] self.saved_fds = map(os.dup, fds) # flush any pending output for s in saved_streams: s.flush() # open surrogate files if self.combine: null_streams = [open(self.outfiles[0], self.mode, 0)] * 2 if self.outfiles[0] != os.devnull: # disable buffering so output is merged immediately sys.stdout, sys.stderr = map(os.fdopen, fds, ['w']*2, [0]*2) else: null_streams = [open(f, self.mode, 0) for f in self.outfiles] self.null_fds = null_fds = [s.fileno() for s in null_streams] self.null_streams = null_streams # overwrite file objects and low-level file descriptors map(os.dup2, null_fds, fds)
def serve_forever(self): ''' Run the server forever ''' self.stop_event = threading.Event() process.current_process()._manager_server = self try: accepter = threading.Thread(target=self.accepter) accepter.daemon = True accepter.start() try: while not self.stop_event.is_set(): self.stop_event.wait(1) except (KeyboardInterrupt, SystemExit): pass finally: if sys.stdout != sys.__stdout__: util.debug('resetting stdout, stderr') sys.stdout = sys.__stdout__ sys.stderr = sys.__stderr__ sys.exit(0)
def idle_showwarning( message, category, filename, lineno, file=None, line=None): """Show Idle-format warning (after replacing warnings.showwarning). The differences are the formatter called, the file=None replacement, which can be None, the capture of the consequence AttributeError, and the output of a hard-coded prompt. """ if file is None: file = warning_stream try: file.write(idle_formatwarning( message, category, filename, lineno, line=line)) file.write(">>> ") except (AttributeError, OSError): pass # if file (probably __stderr__) is invalid, skip warning.
def manage_socket(address): for i in range(3): time.sleep(i) try: server = MyRPCServer(address, MyHandler) break except OSError as err: print("IDLE Subprocess: OSError: " + err.args[1] + ", retrying....", file=sys.__stderr__) socket_error = err else: print("IDLE Subprocess: Connection to " "IDLE GUI failed, exiting.", file=sys.__stderr__) show_socket_error(socket_error, address) global exit_now exit_now = True return server.handle_request() # A single request only
def manage_socket(address): for i in range(3): time.sleep(i) try: server = MyRPCServer(address, MyHandler) break except socket.error, err: print>>sys.__stderr__,"IDLE Subprocess: socket error: "\ + err.args[1] + ", retrying...." else: print>>sys.__stderr__, "IDLE Subprocess: Connection to "\ "IDLE GUI failed, exiting." show_socket_error(err, address) global exit_now exit_now = True return server.handle_request() # A single request only
def putmessage(self, message): self.debug("putmessage:%d:" % message[0]) try: s = pickle.dumps(message) except pickle.PicklingError: print >>sys.__stderr__, "Cannot pickle:", repr(message) raise s = struct.pack("<i", len(s)) + s while len(s) > 0: try: r, w, x = select.select([], [self.sock], []) n = self.sock.send(s[:BUFSIZE]) except (AttributeError, TypeError): raise IOError, "socket no longer exists" except socket.error: raise else: s = s[n:]
def restore_default_configuration(): """ Restores the sys.stdout and the sys.stderr buffer streams to their default values without regard to what step has currently overridden their values. This is useful during cleanup outside of the running execution block """ def restore(target, default_value): if target == default_value: return default_value if not isinstance(target, RedirectBuffer): return target try: target.active = False target.close() except Exception: pass return default_value sys.stdout = restore(sys.stdout, sys.__stdout__) sys.stderr = restore(sys.stderr, sys.__stderr__)
def disable(step: 'projects.ProjectStep'): # Restore the print buffer sys.stdout = sys.__stdout__ sys.stderr = sys.__stderr__ stdout_interceptor = step.report.stdout_interceptor if stdout_interceptor: stdout_interceptor.active = False stdout_interceptor.close() step.report.stdout_interceptor = None stderr_interceptor = step.report.stderr_interceptor if stderr_interceptor: stderr_interceptor.active = False stderr_interceptor.close() step.report.stderr_interceptor = None
def test_restore(self): """ should restore from an enabled state """ support.create_project(self, 'percy') support.add_step(self) project = cd.project.internal_project step = project.steps[0] redirection.enable(step) self.assertIsInstance(sys.stdout, redirection.RedirectBuffer) self.assertIsInstance(sys.stderr, redirection.RedirectBuffer) redirection.restore_default_configuration() self.assertNotIsInstance(sys.stdout, redirection.RedirectBuffer) self.assertNotIsInstance(sys.stderr, redirection.RedirectBuffer) self.assertEqual(sys.stdout, sys.__stdout__) self.assertEqual(sys.stderr, sys.__stderr__)
def test_enable_disable(self): """ should properly enable and disable redirection """ support.create_project(self, 'tonks') support.add_step(self) project = cd.project.internal_project step = project.steps[0] redirection.enable(step) self.assertIsInstance(sys.stdout, redirection.RedirectBuffer) self.assertIsInstance(sys.stderr, redirection.RedirectBuffer) redirection.disable(step) self.assertNotIsInstance(sys.stdout, redirection.RedirectBuffer) self.assertNotIsInstance(sys.stderr, redirection.RedirectBuffer) self.assertEqual(sys.stdout, sys.__stdout__) self.assertEqual(sys.stderr, sys.__stderr__)
def run_tests(src_file='tests.scm'): """Run a read-eval loop that reads from src_file and collects outputs.""" sys.stderr = sys.stdout = io.StringIO() # Collect output to stdout and stderr reader = None try: reader = TestReader(open(src_file).readlines(), sys.stdout) src = Buffer(tokenize_lines(reader)) def next_line(): src.current() return src read_eval_print_loop(next_line, create_global_frame()) except BaseException as exc: sys.stderr = sys.__stderr__ if reader: print("Tests terminated due to unhandled exception " "after line {0}:\n>>>".format(reader.line_number), file=sys.stderr) raise finally: sys.stdout = sys.__stdout__ # Revert stdout sys.stderr = sys.__stderr__ # Revert stderr summarize(reader.output, reader.expected_output)
def fix_windows_stdout_stderr(): """ Processes can't write to stdout/stderr on frozen windows apps because they do not exist here if process tries it anyway we get a nasty dialog window popping up, so we redirect the streams to a dummy see https://github.com/jopohl/urh/issues/370 """ if hasattr(sys, "frozen") and sys.platform == "win32": try: sys.stdout.write("\n") sys.stdout.flush() except: class DummyStream(object): def __init__(self): pass def write(self, data): pass def read(self, data): pass def flush(self): pass def close(self): pass sys.stdout, sys.stderr, sys.stdin = DummyStream(), DummyStream(), DummyStream() sys.__stdout__, sys.__stderr__, sys.__stdin__ = DummyStream(), DummyStream(), DummyStream()
def mute(): """Redirect `sys.stdout` and `sys.stderr` to /dev/null, silencent them. Decorator example:: @mock.mute def test_foo(self): something() Context example:: with mock.mute(): something() """ prev_out, prev_err = sys.stdout, sys.stderr prev_rout, prev_rerr = sys.__stdout__, sys.__stderr__ devnull = open(os.devnull, 'w') mystdout, mystderr = devnull, devnull sys.stdout = sys.__stdout__ = mystdout sys.stderr = sys.__stderr__ = mystderr try: yield finally: sys.stdout = prev_out sys.stderr = prev_err sys.__stdout__ = prev_rout sys.__stderr__ = prev_rerr
def silence_streams(func): def wrapper(*args, **kwargs): # save stderr save_streams = sys.__stderr__ save_streams.flush() # file descriptor for stderr is 2 save_stderr = os.dup(2) # silence null_fd = os.open(os.devnull, os.O_RDWR) os.dup2(null_fd, 2) # uncomment the line to test if stderr is silenced # 1/0 try: bak_file_path = func(*args, **kwargs) sys.stderr = save_streams os.dup2(save_stderr, 2) return bak_file_path except: sys.stderr = save_streams os.dup2(save_stderr, 2) return wrapper
def shutdown(self, c): ''' Shutdown this process ''' try: try: util.debug('manager received shutdown message') c.send(('#RETURN', None)) if sys.stdout != sys.__stdout__: util.debug('resetting stdout, stderr') sys.stdout = sys.__stdout__ sys.stderr = sys.__stderr__ util._run_finalizers(0) for p in active_children(): util.debug('terminating a child process of manager') p.terminate() for p in active_children(): util.debug('terminating a child process of manager') p.join() util._run_finalizers() util.info('manager exiting with exitcode 0') except: import traceback traceback.print_exc() finally: exit(0)
def _force_disconnect(): while True: try: await asyncio.sleep(random.randrange(24, 64)) print('test > Force disconnect') await client.ws.close() except Exception as ex: print("Exception at exit: {}".format(ex), sys.__stderr__)
def init(ctx): global LOGFILE filename = os.path.abspath(LOGFILE) try: os.makedirs(os.path.dirname(os.path.abspath(filename))) except OSError: pass if hasattr(os, 'O_NOINHERIT'): fd = os.open(LOGFILE, os.O_CREAT | os.O_TRUNC | os.O_WRONLY | os.O_NOINHERIT) fileobj = os.fdopen(fd, 'w') else: fileobj = open(LOGFILE, 'w') old_stderr = sys.stderr # sys.stdout has already been replaced, so __stdout__ will be faster #sys.stdout = log_to_file(sys.stdout, fileobj, filename) #sys.stderr = log_to_file(sys.stderr, fileobj, filename) def wrap(stream): if stream.isatty(): return ansiterm.AnsiTerm(stream) return stream sys.stdout = log_to_file(wrap(sys.__stdout__), fileobj, filename) sys.stderr = log_to_file(wrap(sys.__stderr__), fileobj, filename) # now mess with the logging module... for x in Logs.log.handlers: try: stream = x.stream except AttributeError: pass else: if id(stream) == id(old_stderr): x.stream = sys.stderr
def init(ctx): global LOGFILE filename = os.path.abspath(LOGFILE) try: os.makedirs(os.path.dirname(os.path.abspath(filename))) except OSError: pass if hasattr(os, 'O_NOINHERIT'): fd = os.open(LOGFILE, os.O_CREAT | os.O_TRUNC | os.O_WRONLY | os.O_NOINHERIT) fileobj = os.fdopen(fd, 'w') else: fileobj = open(LOGFILE, 'w') old_stderr = sys.stderr # sys.stdout has already been replaced, so __stdout__ will be faster #sys.stdout = log_to_file(sys.stdout, fileobj, filename) #sys.stderr = log_to_file(sys.stderr, fileobj, filename) sys.stdout = log_to_file(sys.__stdout__, fileobj, filename) sys.stderr = log_to_file(sys.__stderr__, fileobj, filename) # now mess with the logging module... for x in Logs.log.handlers: try: stream = x.stream except AttributeError: pass else: if id(stream) == id(old_stderr): x.stream = sys.stderr
def quiet(): # save stdout/stderr # Jupyter doesn't support setting it back to # sys.__stdout__ and sys.__stderr__ _sys_stdout = sys.stdout _sys_stderr = sys.stderr # Divert stdout and stderr to devnull sys.stdout = sys.stderr = open(os.devnull, "w") try: yield finally: # Revert back to standard stdout/stderr sys.stdout = _sys_stdout sys.stderr = _sys_stderr
def readVcf(inFile, logDebug): log.info("reading the VCF file") ## We read only one sample from the VCF file if logDebug: vcf = allel.read_vcf(inFile, samples = [0], fields = '*') else: sys.stderr = StringIO.StringIO() vcf = allel.read_vcf(inFile, samples = [0], fields = '*') #vcf = vcfnp.variants(inFile, cache=False).view(np.recarray) #vcfD = vcfnp.calldata_2d(inFile, cache=False).view(np.recarray) sys.stderr = sys.__stderr__ (snpCHR, snpsREQ) = parseChrName(vcf['variants/CHROM']) try: snpGT = allel.GenotypeArray(vcf['calldata/GT']).to_gt()[snpsREQ, 0] except AttributeError: die("input VCF file doesnt have required GT field") snpsREQ = snpsREQ[np.where(snpGT != './.')[0]] snpGT = allel.GenotypeArray(vcf['calldata/GT']).to_gt()[snpsREQ, 0] if 'calldata/PL' in sorted(vcf.keys()): snpWEI = np.copy(vcf['calldata/PL'][snpsREQ, 0]).astype('float') snpWEI = snpWEI/(-10) snpWEI = np.exp(snpWEI) else: snpBinary = parseGT(snpGT) snpWEI = np.ones((len(snpsREQ), 3)) ## for homo and het snpWEI[np.where(snpBinary != 0),0] = 0 snpWEI[np.where(snpBinary != 1),2] = 0 snpWEI[np.where(snpBinary != 2),1] = 0 snpCHR = snpCHR[snpsREQ] DPmean = np.mean(vcf['calldata/DP'][snpsREQ,0]) snpPOS = np.array(vcf['variants/POS'][snpsREQ]) return (DPmean, snpCHR, snpPOS, snpGT, snpWEI)