我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.isatty()。
def create_logger(): """ Setup the logging environment """ log = logging.getLogger() # root logger log.setLevel(logging.INFO) format_str = '%(asctime)s - %(levelname)-8s - %(message)s' date_format = '%Y-%m-%d %H:%M:%S' if HAVE_COLORLOG and os.isatty(2): cformat = '%(log_color)s' + format_str colors = {'DEBUG': 'reset', 'INFO': 'reset', 'WARNING': 'bold_yellow', 'ERROR': 'bold_red', 'CRITICAL': 'bold_red'} formatter = colorlog.ColoredFormatter(cformat, date_format, log_colors=colors) else: formatter = logging.Formatter(format_str, date_format) stream_handler = logging.StreamHandler() stream_handler.setFormatter(formatter) log.addHandler(stream_handler) return logging.getLogger(__name__)
def __init__(self, fmt=None, datefmt="%Y-%m-%d %H:%M:%S", exceptionfmt="Traceback (most recent call last):\n%(traceback)s\n%(classname)s: %(message)s"): # set standard Formatter values self.datefmt = datefmt self._fmt = fmt self._exceptionfmt = exceptionfmt # set colors self._fmt = Template(fmt).safe_substitute(SHELL_COLORS) self._exceptionfmt = Template(exceptionfmt).safe_substitute(SHELL_COLORS) # do we have a tty? if sys.stdout.isatty() or os.isatty(sys.stdout.fileno()): self.useColors = True else: self.useColors = False # remove all colors if not self.useColors: self._fmt = re.sub("\033\[(\d+|;)+m", "", self._fmt) self._exceptionfmt = re.sub("\033\[(\d+|;)+m", "", self._exceptionfmt)
def getecho(self): '''This returns the terminal echo mode. This returns True if echo is on or False if echo is off. Child applications that are expecting you to enter a password often set ECHO False. See waitnoecho(). Not supported on platforms where ``isatty()`` returns False. ''' try: attr = termios.tcgetattr(self.fd) except termios.error as err: errmsg = 'getecho() may not be called on this platform' if err.args[0] == errno.EINVAL: raise IOError(err.args[0], '%s: %s.' % (err.args[1], errmsg)) raise self.echo = bool(attr[3] & termios.ECHO) return self.echo
def clear_screen(self): """Try to clear the terminal screen (if stdout is a terminal).""" if self.clear_method is None: try: isatty = os.isatty(sys.stdout.fileno()) except Exception: isatty = False if isatty: self.clear_method = "clear" else: self.clear_method = "delimiter" if self.clear_method == "clear": try: retcode = subprocess.call("clear") except Exception: retcode = 1 if retcode != 0: self.clear_method = "newlines" if self.clear_method == "newlines": print "\n" * (self.screen_height()+1) elif self.clear_method == "delimiter": print 78 * "-"
def tty_size(self, fd): """Get the tty size Return a tuple (rows,cols) representing the size of the TTY `fd`. The provided file descriptor should be the stdout stream of the TTY. If the TTY size cannot be determined, returns None. """ if not os.isatty(fd.fileno()): return None try: dims = struct.unpack('hh', fcntl.ioctl(fd, termios.TIOCGWINSZ, 'hhhh')) except Exception: try: dims = (os.environ['LINES'], os.environ['COLUMNS']) except Exception: return None return dims
def __init__(self, options): checkers = create_checkers(self._add_issue) self.options = options for name in self.options.disable_checker: checker = self._find_checker_by_name(checkers, name) if checker is None: raise Exception('Unknown checker {!r}'.format(name)) checkers.remove(checker) for checker in checkers: checker.configure_all(options) self.checkers = checkers self._issues = [] self.include_dirs = options.include_dir if self.include_dirs is None: self.include_dirs = [] self.verbose = options.verbose self.colors = os.isatty(sys.stdout.fileno()) self.sources = {} self._included_file_cache = IncludedFileCache()
def colored(msg, color=None, background=None, style=None, force=False): """ Return the colored version of a string *msg*. *color*'s: red, green, yellow, blue, pink, cyan, white. *background*'s: none, red, green, yellow, blue, pink, cyan, grey. *style*'s: none, bright, underline. Unless *force* is *True*, the *msg* string is returned unchanged in case the output is not a tty. """ try: if not force and not os.isatty(sys.stdout.fileno()): return msg except: return msg color = colors.get(color, colors["white"]) background = backgrounds.get(background, backgrounds["none"]) if not isinstance(style, (tuple, list, set)): style = (style,) style = ";".join(str(styles.get(s, styles["none"])) for s in style) return "\033[{};{};{}m{}\033[0m".format(style, background, color, msg)
def container_service_log_handler(message): try: msg = json.loads(message) out = sys.stdout if msg.get("streamType", None) == "stderr": out = sys.stderr log = msg["log"] source = msg.get("source", None) if source: log = " | ".join([source, log]) if os.isatty(out.fileno()): log = AnsiColor.color_it(log, source) out.write(log) out.flush() except: pass
def print_warning(msg, pos = None): """ Output a warning message to the user. """ if verbosity_level < VERBOSITY_WARNING: return if pos: msg = str(pos) + ": " + msg msg = " nmlc warning: " + msg if (sys.stderr.isatty()) and (os.name == 'posix'): msg = "\033[33m" + msg + "\033[0m" hide_progress() print(msg, file=sys.stderr) show_progress()
def getecho(self): '''This returns the terminal echo mode. This returns True if echo is on or False if echo is off. Child applications that are expecting you to enter a password often set ECHO False. See waitnoecho(). Not supported on platforms where ``isatty()`` returns False. ''' try: attr = termios.tcgetattr(self.child_fd) except termios.error as err: errmsg = 'getecho() may not be called on this platform' if err.args[0] == errno.EINVAL: raise IOError(err.args[0], '%s: %s.' % (err.args[1], errmsg)) raise self.echo = bool(attr[3] & termios.ECHO) return self.echo
def _is_daemonized(self): """Return boolean indicating if the current process is running as a daemon. The criteria to determine the `daemon` condition is to verify if the current pid is not the same as the one that got used on the initial construction of the plugin *and* the stdin is not connected to a terminal. The sole validation of the tty is not enough when the plugin is executing inside other process like in a CI tool (Buildbot, Jenkins). """ if (self._original_pid != os.getpid() and not os.isatty(sys.stdin.fileno())): return True else: return False
def _check_stream(self): """Determines which output stream (stdout, stderr, or custom) to use""" if self.stream: try: if self.stream == 1 and os.isatty(sys.stdout.fileno()): self._stream_out = sys.stdout.write self._stream_flush = sys.stdout.flush elif self.stream == 2 and os.isatty(sys.stderr.fileno()): self._stream_out = sys.stderr.write self._stream_flush = sys.stderr.flush # a fix for IPython notebook "IOStream has no fileno." except UnsupportedOperation: if self.stream == 1: self._stream_out = sys.stdout.write self._stream_flush = sys.stdout.flush elif self.stream == 2: self._stream_out = sys.stderr.write self._stream_flush = sys.stderr.flush else: if self.stream is not None and hasattr(self.stream, 'write'): self._stream_out = self.stream.write self._stream_flush = self.stream.flush else: print('Warning: No valid output stream.')
def _print_msg(self, msg, tid=None, single=0): """Display message to the screen and to the log file.""" if single and self._empty_msg: # Display only a single empty line return tidmsg_l = '' if tid is None else _test_map[tid] self.write_log(tidmsg_l + msg) if self.isatty: tidmsg_s = _test_map_c.get(tid, tidmsg_l) if tid == HEAD: msg = VT_HL + VT_BOLD + msg + VT_NORM elif tid == INFO: msg = VT_BLUE + VT_BOLD + msg + VT_NORM elif tid in [PASS, FAIL]: msg = VT_BOLD + msg + VT_NORM else: tidmsg_s = tidmsg_l print tidmsg_s + msg sys.stdout.flush() if len(msg) > 0: self._empty_msg = 0 self._disp_msgs += 1 else: self._empty_msg = 1
def run( self ): "Run our cmdloop(), catching KeyboardInterrupt" while True: try: # Make sure no nodes are still waiting for node in self.mn.values(): while node.waiting: info( 'stopping', node, '\n' ) node.sendInt() node.waitOutput() if self.isatty(): quietRun( 'stty echo sane intr ^C' ) self.cmdloop() break except KeyboardInterrupt: # Output a message - unless it's also interrupted # pylint: disable=broad-except try: output( '\nInterrupt\n' ) except Exception: pass # pylint: enable=broad-except
def isatty(stream): try: return stream.isatty() except: pass try: fileno = stream.fileno() except: pass else: return os.isatty(fileno) return False
def supports_ansi_escape_codes(fd): """Returns whether the output device is capable of interpreting ANSI escape codes when :func:`print_` is used. Args: fd (int): file descriptor (e.g. ``sys.stdout.fileno()``) Returns: `bool` """ if os.isatty(fd): return True if not is_win: return False # Check for cygwin/msys terminal handle = winapi._get_osfhandle(fd) if handle == winapi.INVALID_HANDLE_VALUE: return False if winapi.GetFileType(handle) != winapi.FILE_TYPE_PIPE: return False file_name = _get_file_name_for_handle(handle) match = re.match( "^\\\\(cygwin|msys)-[a-z0-9]+-pty[0-9]+-(from|to)-master$", file_name) return match is not None
def setup_logger(): """Setup a colored logger if available and possible""" logger = logging.getLogger('clikraken') logger.setLevel(logging.DEBUG) # log to stdout from DEBUG to INFO ch_out = logging.StreamHandler(sys.stdout) ch_out.setLevel(logging.DEBUG) ch_out.addFilter(LessThanFilter(logging.WARNING)) # WARNING and above goes to stderr ch_err = logging.StreamHandler(sys.stderr) ch_err.setLevel(logging.WARNING) format_str = '{levelname:8} - {message}' # basic formatter f = logging.Formatter(fmt=format_str, style='{') # colored formatted if have_colorlog: cf = colorlog.ColoredFormatter(fmt='{log_color}' + format_str, style='{') else: cf = f # only use the colored formatter when we are outputting to a terminal if os.isatty(2): ch_out.setFormatter(cf) ch_err.setFormatter(cf) else: ch_out.setFormatter(f) ch_err.setFormatter(f) logger.addHandler(ch_out) logger.addHandler(ch_err) return logger
def EventQueue(fd, encoding): keycodes = general_keycodes() if os.isatty(fd): backspace = tcgetattr(fd)[6][VERASE] keycodes[backspace] = unicode('backspace') k = keymap.compile_keymap(keycodes) trace('keymap {k!r}', k=k) return EncodedQueue(k, encoding)
def isatty(self): '''This returns True if the file descriptor is open and connected to a tty(-like) device, else False. On SVR4-style platforms implementing streams, such as SunOS and HP-UX, the child pty may not appear as a terminal device. This means methods such as setecho(), setwinsize(), getwinsize() may raise an IOError. ''' return os.isatty(self.child_fd)
def getecho(self): '''This returns the terminal echo mode. This returns True if echo is on or False if echo is off. Child applications that are expecting you to enter a password often set ECHO False. See waitnoecho(). Not supported on platforms where ``isatty()`` returns False. ''' return self.ptyproc.getecho()
def setecho(self, state): '''This sets the terminal echo mode on or off. Note that anything the child sent before the echo will be lost, so you should be sure that your input buffer is empty before you call setecho(). For example, the following will work as expected:: p = pexpect.spawn('cat') # Echo is on by default. p.sendline('1234') # We expect see this twice from the child... p.expect(['1234']) # ... once from the tty echo... p.expect(['1234']) # ... and again from cat itself. p.setecho(False) # Turn off tty echo p.sendline('abcd') # We will set this only once (echoed by cat). p.sendline('wxyz') # We will set this only once (echoed by cat) p.expect(['abcd']) p.expect(['wxyz']) The following WILL NOT WORK because the lines sent before the setecho will be lost:: p = pexpect.spawn('cat') p.sendline('1234') p.setecho(False) # Turn off tty echo p.sendline('abcd') # We will set this only once (echoed by cat). p.sendline('wxyz') # We will set this only once (echoed by cat) p.expect(['1234']) p.expect(['1234']) p.expect(['abcd']) p.expect(['wxyz']) Not supported on platforms where ``isatty()`` returns False. ''' return self.ptyproc.setecho(state)
def isatty(self): '''This returns True if the file descriptor is open and connected to a tty(-like) device, else False. On SVR4-style platforms implementing streams, such as SunOS and HP-UX, the child pty may not appear as a terminal device. This means methods such as setecho(), setwinsize(), getwinsize() may raise an IOError. ''' return os.isatty(self.fd)
def callback(_log, exit_on_error=False): """Helper to call the appropriate logging level""" log = logging.getLogger("iocage") level = _log["level"] message = _log["message"] if level == 'CRITICAL': log.critical(message) elif level == 'ERROR': log.error(message) elif level == 'WARNING': log.warning(message) elif level == 'INFO': log.info(message) elif level == 'DEBUG': log.debug(message) elif level == 'VERBOSE': log.log(15, message) elif level == 'NOTICE': log.log(25, message) elif level == 'EXCEPTION': try: if not os.isatty(sys.stdout.fileno()) and not exit_on_error: raise RuntimeError(message) else: log.error(message) raise SystemExit(1) except AttributeError: # They are lacking the fileno object raise RuntimeError(message)
def is_a_tty(): try: return os.isatty(sys.stdout.fileno()) except Exception: return False
def build(containers, service_names, **kwargs): monochrome = not os.isatty(1) presenters = build_log_presenters(service_names, monochrome) printer = LogPrinter(containers, presenters, **kwargs) printer.start()
def get_terminal_height(fd=1): """ Returns height of terminal if it is a tty, 999 otherwise :param fd: file descriptor (default: 1=stdout) """ if os.isatty(fd): height = os.get_terminal_size(fd)[1] else: height = 50 return height
def get_terminal_width(fd=1): """ Returns width of terminal if it is a tty, 999 otherwise :param fd: file descriptor (default: 1=stdout) """ if os.isatty(fd): width = os.get_terminal_size(fd)[0] else: width = 50 return width
def _check_interactive(*descriptors): for desc in descriptors: try: if not isatty(desc.fileno()): return False except Exception: # Anything broken we are going to pretend this is not # interactive return False return True # pragma: no cover
def _provide_processing_entry_feedback(self, entry): self.processed_entries += 1 if os.isatty(sys.stdout.fileno()): if self.processed_entries > 1: sys.stdout.write("\033[F") sys.stdout.write("\r\033[KProcessing entry {}/{} ({})...\n".format( self.processed_entries, self.expected_entries, sanitise(entry) ))
def do_var_prompt(self, varname, private=True, prompt=None, encrypt=None, confirm=False, salt_size=None, salt=None, default=None): result = None if sys.__stdin__.isatty(): do_prompt = self.prompt if prompt and default is not None: msg = "%s [%s]: " % (prompt, default) elif prompt: msg = "%s: " % prompt else: msg = 'input for %s: ' % varname if confirm: while True: result = do_prompt(msg, private) second = do_prompt("confirm " + msg, private) if result == second: break self.display("***** VALUES ENTERED DO NOT MATCH ****") else: result = do_prompt(msg, private) else: result = None self.warning("Not prompting as we are not in interactive mode") # if result is false and default is not None if not result and default is not None: result = default if encrypt: # Circular import because encrypt needs a display class from ansible.utils.encrypt import do_encrypt result = do_encrypt(result, encrypt, salt_size, salt) # handle utf-8 chars result = to_text(result, errors='surrogate_or_strict') return result
def _set_column_width(self): if os.isatty(0): tty_size = unpack('HHHH', fcntl.ioctl(0, TIOCGWINSZ, pack('HHHH', 0, 0, 0, 0)))[1] else: tty_size = 0 self.columns = max(79, tty_size)
def main(server, eventHandler, params): if not os.isatty(sys.stdout.fileno()): print("FATAL: Unable to run 'ncurses' UI without a TTY.") return ui = NCursesUI() try: curses.wrapper(ui.main, server, eventHandler, params) except: import traceback traceback.print_exc()
def link(source, target): """link(source, target) -> None Attempt to hard link the source file to the target file name. On OS/2, this creates a complete copy of the source file. """ s = os.open(source, os.O_RDONLY | os.O_BINARY) if os.isatty(s): raise OSError(errno.EXDEV, 'Cross-device link') data = os.read(s, 1024) try: t = os.open(target, os.O_WRONLY | os.O_BINARY | os.O_CREAT | os.O_EXCL) except OSError: os.close(s) raise try: while data: os.write(t, data) data = os.read(s, 1024) except OSError: os.close(s) os.close(t) os.unlink(target) raise os.close(s) os.close(t)
def test_isatty(self): if hasattr(os, "isatty"): self.assertEqual(os.isatty(support.make_bad_fd()), False)
def setUp(self): # isatty() and close() can hang on some platforms. Set an alarm # before running the test to make sure we don't hang forever. self.old_alarm = signal.signal(signal.SIGALRM, self.handle_sig) signal.alarm(10)
def handle_sig(self, sig, frame): self.fail("isatty hung")
def setup_tty(self): if os.isatty(sys.stdin.fileno()): LOG.debug('putting tty into raw mode') self.old_settings = termios.tcgetattr(sys.stdin) tty.setraw(sys.stdin)
def restore_tty(self): if os.isatty(sys.stdin.fileno()): LOG.debug('restoring tty configuration') termios.tcsetattr(sys.stdin, termios.TCSADRAIN, self.old_settings)