我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用sys.stdin()。
def __init__(self, addr="127.0.0.1", port=4444): """Initialize the socket and initialize pdb.""" # Backup stdin and stdout before replacing them by the socket handle self.old_stdout = sys.stdout self.old_stdin = sys.stdin # Open a 'reusable' socket to let the webapp reload on the same port self.skt = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.skt.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True) self.skt.bind((addr, port)) self.skt.listen(1) (clientsocket, address) = self.skt.accept() handle = clientsocket.makefile('rw') pdb.Pdb.__init__(self, completekey='tab', stdin=handle, stdout=handle) sys.stdout = sys.stdin = handle
def isEmpty(): """ Return True if no non-whitespace characters remain in standard input. Otherwise return False. """ global _buffer while _buffer.strip() == '': line = sys.stdin.readline() if sys.hexversion < 0x03000000: line = line.decode('utf-8') if line == '': return True _buffer += line return False #-----------------------------------------------------------------------
def hasNextLine(): """ Return True if standard input has a next line. Otherwise return False. """ global _buffer if _buffer != '': return True else: _buffer = sys.stdin.readline() if sys.hexversion < 0x03000000: _buffer = _buffer.decode('utf-8') if _buffer == '': return False return True #-----------------------------------------------------------------------
def readAll(): """ Read and return as a string all remaining lines of standard input. """ global _buffer s = _buffer _buffer = '' for line in sys.stdin: if sys.hexversion < 0x03000000: line = line.decode('utf-8') s += line return s #======================================================================= # For Testing #=======================================================================
def __init__(self, file): self.file = file if file == '': self.infile = sys.stdin elif file.lower().startswith('http://') or file.lower().startswith('https://'): try: if sys.hexversion >= 0x020601F0: self.infile = urllib23.urlopen(file, timeout=5) else: self.infile = urllib23.urlopen(file) except urllib23.HTTPError: print('Error accessing URL %s' % file) print(sys.exc_info()[1]) sys.exit() elif file.lower().endswith('.zip'): try: self.zipfile = zipfile.ZipFile(file, 'r') self.infile = self.zipfile.open(self.zipfile.infolist()[0], 'r', C2BIP3('infected')) except: print('Error opening file %s' % file) print(sys.exc_info()[1]) sys.exit() else: try: self.infile = open(file, 'rb') except: print('Error opening file %s' % file) print(sys.exc_info()[1]) sys.exit() self.ungetted = []
def description_of(lines, name='stdin'): """ Return a string describing the probable encoding of a file or list of strings. :param lines: The lines to get the encoding of. :type lines: Iterable of bytes :param name: Name of file or collection of lines :type name: str """ u = UniversalDetector() for line in lines: u.feed(line) u.close() result = u.result if result['encoding']: return '{0}: {1} with confidence {2}'.format(name, result['encoding'], result['confidence']) else: return '{0}: no result'.format(name)
def main(): args = parse_args() stream = subunit.ByteStreamToStreamResult( sys.stdin, non_subunit_name='stdout') starts = Starts(sys.stdout) outcomes = testtools.StreamToDict( functools.partial(show_outcome, sys.stdout, print_failures=args.print_failures, failonly=args.failonly )) summary = testtools.StreamSummary() result = testtools.CopyStreamResult([starts, outcomes, summary]) result.startTestRun() try: stream.run(result) finally: result.stopTestRun() if count_tests('status', '.*') == 0: print("The test run didn't actually run any tests") return 1 if args.post_fails: print_fails(sys.stdout) print_summary(sys.stdout) return (0 if summary.wasSuccessful() else 1)
def _run(self): with tf.Session() as session: self.io.restore_session(session) inputs = sys.stdin singsen = SingleSentenceData() scounter = SpeedCounter().start() while True: senlen = singsen.read_from_file(sys.stdin, self.io.w2id) if senlen is None: break if senlen < 2: print(-9999) continue o = run_epoch(session, self.test_model, singsen) scounter.next() if self.params.progress and scounter.val % 20 ==0: print("\rLoglikes per secs: %f" % scounter.speed, end="", file=sys.stderr) print("%f" % o)
def default(self, line): if line[:1] == '!': line = line[1:] locals = self.curframe_locals globals = self.curframe.f_globals try: code = compile(line + '\n', '<stdin>', 'single') save_stdout = sys.stdout save_stdin = sys.stdin save_displayhook = sys.displayhook try: sys.stdin = self.stdin sys.stdout = self.stdout sys.displayhook = self.displayhook exec code in globals, locals finally: sys.stdout = save_stdout sys.stdin = save_stdin sys.displayhook = save_displayhook except: t, v = sys.exc_info()[:2] if type(t) == type(''): exc_type_name = t else: exc_type_name = t.__name__ print >>self.stdout, '***', exc_type_name + ':', v
def _test(): """Simple test program to disassemble a file.""" if sys.argv[1:]: if sys.argv[2:]: sys.stderr.write("usage: python dis.py [-|file]\n") sys.exit(2) fn = sys.argv[1] if not fn or fn == "-": fn = None else: fn = None if fn is None: f = sys.stdin else: f = open(fn) source = f.read() if fn is not None: f.close() else: fn = "<stdin>" code = compile(source, fn, "exec") dis(code)
def s_unload(self, *args): """Unload the module. Removes it from the restricted environment's sys.modules dictionary. This method is implicitly called by code executing in the restricted environment. Overriding this method in a subclass is used to change the policies enforced by a restricted environment. Similar to the r_unload() method, but has access to restricted versions of the standard I/O streams sys.stdin, sys.stderr, and sys.stdout. """ return self.s_apply(self.r_unload, args) # Restricted open(...)
def _raw_input(prompt="", stream=None, input=None): # A raw_input() replacement that doesn't save the string in the # GNU readline history. if not stream: stream = sys.stderr if not input: input = sys.stdin prompt = str(prompt) if prompt: stream.write(prompt) stream.flush() # NOTE: The Python C API calls flockfile() (and unlock) during readline. line = input.readline() if not line: raise EOFError if line[-1] == '\n': line = line[:-1] return line
def __call__(self, string): # the special argument "-" means sys.std{in,out} if string == '-': if 'r' in self._mode: return _sys.stdin elif 'w' in self._mode: return _sys.stdout else: msg = _('argument "-" with mode %r') % self._mode raise ValueError(msg) # all other arguments are used as file names try: return open(string, self._mode, self._bufsize) except IOError as e: message = _("can't open '%s': %s") raise ArgumentTypeError(message % (string, e))
def __init__(self, completekey='tab', stdin=None, stdout=None): """Instantiate a line-oriented interpreter framework. The optional argument 'completekey' is the readline name of a completion key; it defaults to the Tab key. If completekey is not None and the readline module is available, command completion is done automatically. The optional arguments stdin and stdout specify alternate input and output file objects; if not specified, sys.stdin and sys.stdout are used. """ import sys if stdin is not None: self.stdin = stdin else: self.stdin = sys.stdin if stdout is not None: self.stdout = stdout else: self.stdout = sys.stdout self.cmdqueue = [] self.completekey = completekey
def expand_args(args, flist): """read names in flist and append to args""" expanded = args[:] if flist: try: if flist == '-': fd = sys.stdin else: fd = open(flist) while 1: line = fd.readline() if not line: break expanded.append(line[:-1]) except IOError: print "Error reading file list %s" % flist raise return expanded
def interact(self): """Interaction function, emulates a very dumb telnet client.""" if sys.platform == "win32": self.mt_interact() return while 1: rfd, wfd, xfd = select.select([self, sys.stdin], [], []) if self in rfd: try: text = self.read_eager() except EOFError: print '*** Connection closed by remote host ***' break if text: sys.stdout.write(text) sys.stdout.flush() if sys.stdin in rfd: line = sys.stdin.readline() if not line: break self.write(line)
def test(): """Small test program""" import sys, getopt try: opts, args = getopt.getopt(sys.argv[1:], 'deut') except getopt.error, msg: sys.stdout = sys.stderr print msg print """usage: %s [-d|-e|-u|-t] [file|-] -d, -u: decode -e: encode (default) -t: encode and decode string 'Aladdin:open sesame'"""%sys.argv[0] sys.exit(2) func = encode for o, a in opts: if o == '-e': func = encode if o == '-d': func = decode if o == '-u': func = decode if o == '-t': test1(); return if args and args[0] != '-': with open(args[0], 'rb') as f: func(f, sys.stdout) else: func(sys.stdin, sys.stdout)
def unified_diff(filename, content2=None): # type: (str, Optional[bytes]) -> Tuple[int, Iterable[str]] """This function prints a unified diff of the contents of filename and the standard input, when used from the command line as follows: echo 123 > d.txt ; echo 456 | ./whatstyle.py --stdindiff d.txt We get this result: --- +++ @@ -1 +1 @@ -123 +456 """ use_stdin = content2 is None if content2 is None: # Read binary input stream stdin = rawstream(sys.stdin) econtent2 = bytestr(stdin.read()) else: econtent2 = content2 exit_code, diff = compute_unified_diff(filename, econtent2, lineterm='') if use_stdin: write('\n'.join(diff)) return exit_code, diff
def main(): parser = argparse.ArgumentParser(description="") parser.add_argument("--mode", default="stdio", help="communication (stdio|tcp)") parser.add_argument("--addr", default=2087, help="server listen (tcp)", type=int) args = parser.parse_args() if args.mode == "stdio": log("Reading on stdin, writing on stdout") s = LangServer(conn=ReadWriter(sys.stdin, sys.stdout)) s.listen() elif args.mode == "tcp": host, addr = "0.0.0.0", args.addr log("Accepting TCP connections on {}:{}".format(host, addr)) ThreadingTCPServer.allow_reuse_address = True s = ThreadingTCPServer((host, addr), LangserverTCPTransport) try: s.serve_forever() finally: s.shutdown()
def __init__(self, fileOrPath, ttFont, progress=None, quiet=None): if fileOrPath == '-': fileOrPath = sys.stdin if not hasattr(fileOrPath, "read"): self.file = open(fileOrPath, "rb") self._closeStream = True else: # assume readable file object self.file = fileOrPath self._closeStream = False self.ttFont = ttFont self.progress = progress if quiet is not None: from fontTools.misc.loggingTools import deprecateArgument deprecateArgument("quiet", "configure logging instead") self.quiet = quiet self.root = None self.contentStack = [] self.stackSize = 0
def main(): if len(sys.argv) == 1: infile = sys.stdin outfile = sys.stdout elif len(sys.argv) == 2: infile = open(sys.argv[1], 'r') outfile = sys.stdout elif len(sys.argv) == 3: infile = open(sys.argv[1], 'r') outfile = open(sys.argv[2], 'w') else: raise SystemExit(sys.argv[0] + " [infile [outfile]]") with infile: try: obj = json.load(infile, object_pairs_hook=json.OrderedDict, use_decimal=True) except ValueError: raise SystemExit(sys.exc_info()[1]) with outfile: json.dump(obj, outfile, sort_keys=True, indent=' ', use_decimal=True) outfile.write('\n')
def process(self, argv=sys.argv, ifile=sys.stdin, ofile=sys.stdout): """ Process data. :param argv: Command line arguments. :type argv: list or tuple :param ifile: Input data file. :type ifile: file :param ofile: Output data file. :type ofile: file :return: :const:`None` :rtype: NoneType """ if len(argv) > 1: self._process_protocol_v1(argv, ifile, ofile) else: self._process_protocol_v2(argv, ifile, ofile)
def print(self, f=sys.stdin): def pad(x): return x + " " * (4 - len(x)) for r in range(self.mapHeight): s = "" for c in range(self.mapWidth): idx = r * self.mapWidth + c if self.terrain[idx] == Terrain.MOUNTAIN: s += pad('M') elif self.terrain[idx] == Terrain.NEUTRAL_CITY: s += pad('C') elif self.owner[idx] == Owner.NEUTRAL: s += pad('.') elif self.owner[idx] == Owner.OURS: s += pad(str(self.armies[idx])) else: s += pad(str(-self.armies[idx])) print(s, file=f) print("move=%s" % str(self.nextMove), file=f) print("", file=f)
def read_input_features(l, inp=sys.stdin): if isinstance(inp, str): with open(inp, 'r') as f: return read_input_features(f) print("%d samples" % l, file=sys.stderr) xs = np.zeros((l, flen), np.int16) ys = np.zeros((l, n*n*classes), np.int16) i = 0 for line in inp: xs[i, :], ys[i, :] = parse_csv_row_xy(line) i += 1 if i % 10000 == 0: print("%d read from disk" % i, file=sys.stderr) return xs, ys
def testWithIO(inp, out, f): """Calls the function `f` with ``sys.stdin`` changed to `inp` and ``sys.stdout`` changed to `out`. They are restored when `f` returns. This function returns whatever `f` returns. """ import os try: oldin,sys.stdin = sys.stdin,inp oldout,sys.stdout = sys.stdout,out x = f() finally: sys.stdin = oldin sys.stdout = oldout if os.environ.get('PYPNG_TEST_TMP') and hasattr(out,'getvalue'): name = mycallersname() if name: w = open(name+'.png', 'wb') w.write(out.getvalue()) w.close() return x
def _argparse(): argparse = ArgumentParser('bh_tsne Python wrapper') argparse.add_argument('-d', '--no_dims', type=int, default=DEFAULT_NO_DIMS) argparse.add_argument('-p', '--perplexity', type=float, default=DEFAULT_PERPLEXITY) # 0.0 for theta is equivalent to vanilla t-SNE argparse.add_argument('-t', '--theta', type=float, default=DEFAULT_THETA) argparse.add_argument('-r', '--randseed', type=int, default=EMPTY_SEED) argparse.add_argument('-n', '--initial_dims', type=int, default=INITIAL_DIMENSIONS) argparse.add_argument('-v', '--verbose', action='store_true') argparse.add_argument('-i', '--input', type=FileType('r'), default=stdin) argparse.add_argument('-o', '--output', type=FileType('w'), default=stdout) argparse.add_argument('--use_pca', action='store_true') argparse.add_argument('--no_pca', dest='use_pca', action='store_false') argparse.set_defaults(use_pca=DEFAULT_USE_PCA) argparse.add_argument('-m', '--max_iter', type=int, default=DEFAULT_MAX_ITERATIONS) return argparse
def capture_output(data=None): """ with capture_output as (stdout, stderr): some_action() print stdout.getvalue(), stderr.getvalue() """ in_ = BytesIO(data or b"") err = BytesIO() out = BytesIO() old_in = sys.stdin old_err = sys.stderr old_out = sys.stdout sys.stdin = in_ sys.stderr = err sys.stdout = out try: yield (out, err) finally: sys.stdin = old_in sys.stderr = old_err sys.stdout = old_out
def main(): import sys parser = argparse.ArgumentParser(description="pysnappy driver") parser.add_argument("-f", "--file", help="Input file", default=sys.stdin) parser.add_argument("-b", "--bytesize", help="Bitesize for streaming reads", type=int, default=65536) parser.add_argument("-o", "--output", help="Output file", default=sys.stdout) group = parser.add_mutually_exclusive_group() group.add_argument("-c", "--compress", action="store_true") group.add_argument("-d", "--decompress", action="store_false") parser.add_argument("-t", "--framing", help="Framing format", choices=["framing2", "hadoop"], default="framing2") args = parser.parse_args() run(args)
def __call__(self, string): # the special argument "-" means sys.std{in,out} if string == '-': if 'r' in self._mode: return _sys.stdin elif 'w' in self._mode: return _sys.stdout else: msg = _('argument "-" with mode %r' % self._mode) raise ValueError(msg) try: # all other arguments are used as file names if self._bufsize: return open(string, self._mode, self._bufsize) else: return open(string, self._mode) except IOError: err = _sys.exc_info()[1] message = _("can't open '%s': %s") raise ArgumentTypeError(message % (string, err))
def from_command_line(cls, *args, **keys): params = list() for name, param in cls.params(): if name not in keys: params.append((name, param)) bot_name = inspect.getmodulename(inspect.stack()[1][1]) if "ABUSEHELPER_CONF_FROM_STDIN" in os.environ: defaults = dict(pickle.load(sys.stdin)) defaults.setdefault("bot_name", bot_name) added = cls._from_dict(params, **defaults) else: added = cls._from_sys_argv(params, bot_name=bot_name) added.update(keys) return cls(*args, **added)
def get_key(self, block=True, timeout=None): if timeout: check = select.select([sys.stdin], [], [], timeout)[0] if len(check) == 0: return None ret = self.stdscrs[-1].getkey() elif block: ret = self.stdscrs[-1].getkey() else: self.stdscrs[-1].nodelay(1) try: ret = self.stdscrs[-1].getkey() if ret == curses.ERR: ret = None except: ret = None finally: self.stdscrs[-1].nodelay(0) if len(ret) == 1: if ord(ret) < 0x20: ret = "^{}".format(chr(ord(ret) + ord('@'))) elif ord(ret) == 0x7f: ret = "^?" return ret
def shutdown(self): """Revert stdin and stdout, close the socket.""" sys.stdout = self.old_stdout sys.stdin = self.old_stdin self.skt.close() self.set_continue()
def run(): # REVISIT(ivc): current CNI implementation provided by this package is # experimental and its primary purpose is to enable development of other # components (e.g. functional tests, service/LBaaSv2 support) cni_conf = utils.CNIConfig(jsonutils.load(sys.stdin)) args = ['--config-file', cni_conf.kuryr_conf] try: if cni_conf.debug: args.append('-d') except AttributeError: pass config.init(args) config.setup_logging() # Initialize o.vo registry. k_objects.register_locally_defined_vifs() os_vif.initialize() if CONF.cni_daemon.daemon_enabled: runner = cni_api.CNIDaemonizedRunner() else: runner = cni_api.CNIStandaloneRunner(K8sCNIPlugin()) LOG.info("Using '%s' ", runner.__class__.__name__) def _timeout(signum, frame): runner._write_dict(sys.stdout, { 'msg': 'timeout', 'code': k_const.CNI_TIMEOUT_CODE, }) LOG.debug('timed out') sys.exit(1) signal.signal(signal.SIGALRM, _timeout) signal.alarm(_CNI_TIMEOUT) status = runner.run(os.environ, cni_conf, sys.stdout) LOG.debug("Exiting with status %s", status) if status: sys.exit(status)