Python sys 模块,stdin() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用sys.stdin()

项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def __init__(self, addr="127.0.0.1", port=4444):
        """Initialize the socket and initialize pdb."""

        # Backup stdin and stdout before replacing them by the socket handle
        self.old_stdout = sys.stdout
        self.old_stdin = sys.stdin

        # Open a 'reusable' socket to let the webapp reload on the same port
        self.skt = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.skt.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
        self.skt.bind((addr, port))
        self.skt.listen(1)
        (clientsocket, address) = self.skt.accept()
        handle = clientsocket.makefile('rw')
        pdb.Pdb.__init__(self, completekey='tab', stdin=handle, stdout=handle)
        sys.stdout = sys.stdin = handle
项目:MIT-CS-lectures    作者:William-Python-King    | 项目源码 | 文件源码
def isEmpty():
    """
    Return True if no non-whitespace characters remain in standard
    input. Otherwise return False.
    """
    global _buffer
    while _buffer.strip() == '':
        line = sys.stdin.readline()
        if sys.hexversion < 0x03000000:
            line = line.decode('utf-8')
        if line == '':
            return True
        _buffer += line
    return False

#-----------------------------------------------------------------------
项目:MIT-CS-lectures    作者:William-Python-King    | 项目源码 | 文件源码
def hasNextLine():
    """
    Return True if standard input has a next line. Otherwise return
    False.
    """
    global _buffer
    if _buffer != '':
        return True
    else:
        _buffer = sys.stdin.readline()
        if sys.hexversion < 0x03000000:
            _buffer = _buffer.decode('utf-8')
        if _buffer == '':
            return False
        return True

#-----------------------------------------------------------------------
项目:MIT-CS-lectures    作者:William-Python-King    | 项目源码 | 文件源码
def readAll():
    """
    Read and return as a string all remaining lines of standard input.
    """
    global _buffer
    s = _buffer
    _buffer = ''
    for line in sys.stdin:
        if sys.hexversion < 0x03000000:
            line = line.decode('utf-8')
        s += line
    return s

#=======================================================================
# For Testing
#=======================================================================
项目:Cortex-Analyzers    作者:CERT-BDF    | 项目源码 | 文件源码
def __init__(self, file):
        self.file = file
        if file == '':
            self.infile = sys.stdin
        elif file.lower().startswith('http://') or file.lower().startswith('https://'):
            try:
                if sys.hexversion >= 0x020601F0:
                    self.infile = urllib23.urlopen(file, timeout=5)
                else:
                    self.infile = urllib23.urlopen(file)
            except urllib23.HTTPError:
                print('Error accessing URL %s' % file)
                print(sys.exc_info()[1])
                sys.exit()
        elif file.lower().endswith('.zip'):
            try:
                self.zipfile = zipfile.ZipFile(file, 'r')
                self.infile = self.zipfile.open(self.zipfile.infolist()[0], 'r', C2BIP3('infected'))
            except:
                print('Error opening file %s' % file)
                print(sys.exc_info()[1])
                sys.exit()
        else:
            try:
                self.infile = open(file, 'rb')
            except:
                print('Error opening file %s' % file)
                print(sys.exc_info()[1])
                sys.exit()
        self.ungetted = []
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def description_of(lines, name='stdin'):
    """
    Return a string describing the probable encoding of a file or
    list of strings.

    :param lines: The lines to get the encoding of.
    :type lines: Iterable of bytes
    :param name: Name of file or collection of lines
    :type name: str
    """
    u = UniversalDetector()
    for line in lines:
        u.feed(line)
    u.close()
    result = u.result
    if result['encoding']:
        return '{0}: {1} with confidence {2}'.format(name, result['encoding'],
                                                     result['confidence'])
    else:
        return '{0}: no result'.format(name)
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def description_of(lines, name='stdin'):
    """
    Return a string describing the probable encoding of a file or
    list of strings.

    :param lines: The lines to get the encoding of.
    :type lines: Iterable of bytes
    :param name: Name of file or collection of lines
    :type name: str
    """
    u = UniversalDetector()
    for line in lines:
        u.feed(line)
    u.close()
    result = u.result
    if result['encoding']:
        return '{0}: {1} with confidence {2}'.format(name, result['encoding'],
                                                     result['confidence'])
    else:
        return '{0}: no result'.format(name)
项目:networking-huawei    作者:openstack    | 项目源码 | 文件源码
def main():
    args = parse_args()
    stream = subunit.ByteStreamToStreamResult(
        sys.stdin, non_subunit_name='stdout')
    starts = Starts(sys.stdout)
    outcomes = testtools.StreamToDict(
        functools.partial(show_outcome, sys.stdout,
                          print_failures=args.print_failures,
                          failonly=args.failonly
                      ))
    summary = testtools.StreamSummary()
    result = testtools.CopyStreamResult([starts, outcomes, summary])
    result.startTestRun()
    try:
        stream.run(result)
    finally:
        result.stopTestRun()
    if count_tests('status', '.*') == 0:
        print("The test run didn't actually run any tests")
        return 1
    if args.post_fails:
        print_fails(sys.stdout)
    print_summary(sys.stdout)
    return (0 if summary.wasSuccessful() else 1)
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def __init__(self, addr="127.0.0.1", port=4444):
        """Initialize the socket and initialize pdb."""

        # Backup stdin and stdout before replacing them by the socket handle
        self.old_stdout = sys.stdout
        self.old_stdin = sys.stdin

        # Open a 'reusable' socket to let the webapp reload on the same port
        self.skt = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.skt.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
        self.skt.bind((addr, port))
        self.skt.listen(1)
        (clientsocket, address) = self.skt.accept()
        handle = clientsocket.makefile('rw')
        pdb.Pdb.__init__(self, completekey='tab', stdin=handle, stdout=handle)
        sys.stdout = sys.stdin = handle
项目:tf_rnnlm    作者:Ubiqus    | 项目源码 | 文件源码
def _run(self):
    with tf.Session() as session:
      self.io.restore_session(session)

      inputs = sys.stdin
      singsen = SingleSentenceData()
      scounter = SpeedCounter().start()
      while True:
        senlen = singsen.read_from_file(sys.stdin, self.io.w2id)
        if senlen is None:
          break
        if senlen < 2:
          print(-9999)
          continue

        o = run_epoch(session, self.test_model, singsen)
        scounter.next()
        if self.params.progress and scounter.val % 20 ==0:
          print("\rLoglikes per secs: %f" % scounter.speed, end="", file=sys.stderr)
        print("%f" % o)
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def default(self, line):
        if line[:1] == '!': line = line[1:]
        locals = self.curframe_locals
        globals = self.curframe.f_globals
        try:
            code = compile(line + '\n', '<stdin>', 'single')
            save_stdout = sys.stdout
            save_stdin = sys.stdin
            save_displayhook = sys.displayhook
            try:
                sys.stdin = self.stdin
                sys.stdout = self.stdout
                sys.displayhook = self.displayhook
                exec code in globals, locals
            finally:
                sys.stdout = save_stdout
                sys.stdin = save_stdin
                sys.displayhook = save_displayhook
        except:
            t, v = sys.exc_info()[:2]
            if type(t) == type(''):
                exc_type_name = t
            else: exc_type_name = t.__name__
            print >>self.stdout, '***', exc_type_name + ':', v
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def _test():
    """Simple test program to disassemble a file."""
    if sys.argv[1:]:
        if sys.argv[2:]:
            sys.stderr.write("usage: python dis.py [-|file]\n")
            sys.exit(2)
        fn = sys.argv[1]
        if not fn or fn == "-":
            fn = None
    else:
        fn = None
    if fn is None:
        f = sys.stdin
    else:
        f = open(fn)
    source = f.read()
    if fn is not None:
        f.close()
    else:
        fn = "<stdin>"
    code = compile(source, fn, "exec")
    dis(code)
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def s_unload(self, *args):
        """Unload the module.

        Removes it from the restricted environment's sys.modules dictionary.

        This method is implicitly called by code executing in the
        restricted environment.  Overriding this method in a subclass is
        used to change the policies enforced by a restricted environment.

        Similar to the r_unload() method, but has access to restricted
        versions of the standard I/O streams sys.stdin, sys.stderr, and
        sys.stdout.

        """
        return self.s_apply(self.r_unload, args)

    # Restricted open(...)
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def _raw_input(prompt="", stream=None, input=None):
    # A raw_input() replacement that doesn't save the string in the
    # GNU readline history.
    if not stream:
        stream = sys.stderr
    if not input:
        input = sys.stdin
    prompt = str(prompt)
    if prompt:
        stream.write(prompt)
        stream.flush()
    # NOTE: The Python C API calls flockfile() (and unlock) during readline.
    line = input.readline()
    if not line:
        raise EOFError
    if line[-1] == '\n':
        line = line[:-1]
    return line
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def __call__(self, string):
        # the special argument "-" means sys.std{in,out}
        if string == '-':
            if 'r' in self._mode:
                return _sys.stdin
            elif 'w' in self._mode:
                return _sys.stdout
            else:
                msg = _('argument "-" with mode %r') % self._mode
                raise ValueError(msg)

        # all other arguments are used as file names
        try:
            return open(string, self._mode, self._bufsize)
        except IOError as e:
            message = _("can't open '%s': %s")
            raise ArgumentTypeError(message % (string, e))
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def __init__(self, completekey='tab', stdin=None, stdout=None):
        """Instantiate a line-oriented interpreter framework.

        The optional argument 'completekey' is the readline name of a
        completion key; it defaults to the Tab key. If completekey is
        not None and the readline module is available, command completion
        is done automatically. The optional arguments stdin and stdout
        specify alternate input and output file objects; if not specified,
        sys.stdin and sys.stdout are used.

        """
        import sys
        if stdin is not None:
            self.stdin = stdin
        else:
            self.stdin = sys.stdin
        if stdout is not None:
            self.stdout = stdout
        else:
            self.stdout = sys.stdout
        self.cmdqueue = []
        self.completekey = completekey
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def expand_args(args, flist):
    """read names in flist and append to args"""
    expanded = args[:]
    if flist:
        try:
            if flist == '-':
                fd = sys.stdin
            else:
                fd = open(flist)
            while 1:
                line = fd.readline()
                if not line:
                    break
                expanded.append(line[:-1])
        except IOError:
            print "Error reading file list %s" % flist
            raise
    return expanded
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def interact(self):
        """Interaction function, emulates a very dumb telnet client."""
        if sys.platform == "win32":
            self.mt_interact()
            return
        while 1:
            rfd, wfd, xfd = select.select([self, sys.stdin], [], [])
            if self in rfd:
                try:
                    text = self.read_eager()
                except EOFError:
                    print '*** Connection closed by remote host ***'
                    break
                if text:
                    sys.stdout.write(text)
                    sys.stdout.flush()
            if sys.stdin in rfd:
                line = sys.stdin.readline()
                if not line:
                    break
                self.write(line)
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def test():
    """Small test program"""
    import sys, getopt
    try:
        opts, args = getopt.getopt(sys.argv[1:], 'deut')
    except getopt.error, msg:
        sys.stdout = sys.stderr
        print msg
        print """usage: %s [-d|-e|-u|-t] [file|-]
        -d, -u: decode
        -e: encode (default)
        -t: encode and decode string 'Aladdin:open sesame'"""%sys.argv[0]
        sys.exit(2)
    func = encode
    for o, a in opts:
        if o == '-e': func = encode
        if o == '-d': func = decode
        if o == '-u': func = decode
        if o == '-t': test1(); return
    if args and args[0] != '-':
        with open(args[0], 'rb') as f:
            func(f, sys.stdout)
    else:
        func(sys.stdin, sys.stdout)
项目:whatstyle    作者:mikr    | 项目源码 | 文件源码
def unified_diff(filename, content2=None):
    # type: (str, Optional[bytes]) -> Tuple[int, Iterable[str]]
    """This function prints a unified diff of the contents of
    filename and the standard input, when used from the command line
    as follows:
        echo 123 > d.txt ; echo 456 | ./whatstyle.py --stdindiff d.txt
    We get this result:
    ---
    +++
    @@ -1 +1 @@
    -123
    +456
    """
    use_stdin = content2 is None
    if content2 is None:
        # Read binary input stream
        stdin = rawstream(sys.stdin)
        econtent2 = bytestr(stdin.read())
    else:
        econtent2 = content2
    exit_code, diff = compute_unified_diff(filename, econtent2, lineterm='')
    if use_stdin:
        write('\n'.join(diff))
    return exit_code, diff
项目:coala-langserver    作者:gaocegege    | 项目源码 | 文件源码
def main():
    parser = argparse.ArgumentParser(description="")
    parser.add_argument("--mode", default="stdio",
                        help="communication (stdio|tcp)")
    parser.add_argument("--addr", default=2087,
                        help="server listen (tcp)", type=int)

    args = parser.parse_args()

    if args.mode == "stdio":
        log("Reading on stdin, writing on stdout")
        s = LangServer(conn=ReadWriter(sys.stdin, sys.stdout))
        s.listen()
    elif args.mode == "tcp":
        host, addr = "0.0.0.0", args.addr
        log("Accepting TCP connections on {}:{}".format(host, addr))
        ThreadingTCPServer.allow_reuse_address = True
        s = ThreadingTCPServer((host, addr), LangserverTCPTransport)
        try:
            s.serve_forever()
        finally:
            s.shutdown()
项目:otRebuilder    作者:Pal3love    | 项目源码 | 文件源码
def __init__(self, fileOrPath, ttFont, progress=None, quiet=None):
        if fileOrPath == '-':
            fileOrPath = sys.stdin
        if not hasattr(fileOrPath, "read"):
            self.file = open(fileOrPath, "rb")
            self._closeStream = True
        else:
            # assume readable file object
            self.file = fileOrPath
            self._closeStream = False
        self.ttFont = ttFont
        self.progress = progress
        if quiet is not None:
            from fontTools.misc.loggingTools import deprecateArgument
            deprecateArgument("quiet", "configure logging instead")
            self.quiet = quiet
        self.root = None
        self.contentStack = []
        self.stackSize = 0
项目:oscars2016    作者:0x0ece    | 项目源码 | 文件源码
def main():
    if len(sys.argv) == 1:
        infile = sys.stdin
        outfile = sys.stdout
    elif len(sys.argv) == 2:
        infile = open(sys.argv[1], 'r')
        outfile = sys.stdout
    elif len(sys.argv) == 3:
        infile = open(sys.argv[1], 'r')
        outfile = open(sys.argv[2], 'w')
    else:
        raise SystemExit(sys.argv[0] + " [infile [outfile]]")
    with infile:
        try:
            obj = json.load(infile,
                            object_pairs_hook=json.OrderedDict,
                            use_decimal=True)
        except ValueError:
            raise SystemExit(sys.exc_info()[1])
    with outfile:
        json.dump(obj, outfile, sort_keys=True, indent='    ', use_decimal=True)
        outfile.write('\n')
项目:mongodb-monitoring    作者:jruaux    | 项目源码 | 文件源码
def process(self, argv=sys.argv, ifile=sys.stdin, ofile=sys.stdout):
        """ Process data.

        :param argv: Command line arguments.
        :type argv: list or tuple

        :param ifile: Input data file.
        :type ifile: file

        :param ofile: Output data file.
        :type ofile: file

        :return: :const:`None`
        :rtype: NoneType

        """
        if len(argv) > 1:
            self._process_protocol_v1(argv, ifile, ofile)
        else:
            self._process_protocol_v2(argv, ifile, ofile)
项目:generalscnn    作者:zxqfl    | 项目源码 | 文件源码
def print(self, f=sys.stdin):
        def pad(x):
            return x + " " * (4 - len(x))
        for r in range(self.mapHeight):
            s = ""
            for c in range(self.mapWidth):
                idx = r * self.mapWidth + c
                if self.terrain[idx] == Terrain.MOUNTAIN:
                    s += pad('M')
                elif self.terrain[idx] == Terrain.NEUTRAL_CITY:
                    s += pad('C')
                elif self.owner[idx] == Owner.NEUTRAL:
                    s += pad('.')
                elif self.owner[idx] == Owner.OURS:
                    s += pad(str(self.armies[idx]))
                else:
                    s += pad(str(-self.armies[idx]))
            print(s, file=f)
        print("move=%s" % str(self.nextMove), file=f)
        print("", file=f)
项目:generalscnn    作者:zxqfl    | 项目源码 | 文件源码
def read_input_features(l, inp=sys.stdin):
    if isinstance(inp, str):
        with open(inp, 'r') as f:
            return read_input_features(f)

    print("%d samples" % l, file=sys.stderr)
    xs = np.zeros((l, flen), np.int16)
    ys = np.zeros((l, n*n*classes), np.int16)

    i = 0
    for line in inp:
        xs[i, :], ys[i, :] = parse_csv_row_xy(line)
        i += 1
        if i % 10000 == 0:
            print("%d read from disk" % i, file=sys.stderr)
    return xs, ys
项目:Projects    作者:it2school    | 项目源码 | 文件源码
def testWithIO(inp, out, f):
    """Calls the function `f` with ``sys.stdin`` changed to `inp`
    and ``sys.stdout`` changed to `out`.  They are restored when `f`
    returns.  This function returns whatever `f` returns.
    """

    import os

    try:
        oldin,sys.stdin = sys.stdin,inp
        oldout,sys.stdout = sys.stdout,out
        x = f()
    finally:
        sys.stdin = oldin
        sys.stdout = oldout
    if os.environ.get('PYPNG_TEST_TMP') and hasattr(out,'getvalue'):
        name = mycallersname()
        if name:
            w = open(name+'.png', 'wb')
            w.write(out.getvalue())
            w.close()
    return x
项目:aws-waf-security-automation    作者:cerbo    | 项目源码 | 文件源码
def description_of(lines, name='stdin'):
    """
    Return a string describing the probable encoding of a file or
    list of strings.

    :param lines: The lines to get the encoding of.
    :type lines: Iterable of bytes
    :param name: Name of file or collection of lines
    :type name: str
    """
    u = UniversalDetector()
    for line in lines:
        u.feed(line)
    u.close()
    result = u.result
    if result['encoding']:
        return '{0}: {1} with confidence {2}'.format(name, result['encoding'],
                                                     result['confidence'])
    else:
        return '{0}: no result'.format(name)
项目:Splunk_CBER_App    作者:MHaggis    | 项目源码 | 文件源码
def process(self, argv=sys.argv, ifile=sys.stdin, ofile=sys.stdout):
        """ Process data.

        :param argv: Command line arguments.
        :type argv: list or tuple

        :param ifile: Input data file.
        :type ifile: file

        :param ofile: Output data file.
        :type ofile: file

        :return: :const:`None`
        :rtype: NoneType

        """
        if len(argv) > 1:
            self._process_protocol_v1(argv, ifile, ofile)
        else:
            self._process_protocol_v2(argv, ifile, ofile)
项目:mbin    作者:fanglab    | 项目源码 | 文件源码
def _argparse():
    argparse = ArgumentParser('bh_tsne Python wrapper')
    argparse.add_argument('-d', '--no_dims', type=int,
                          default=DEFAULT_NO_DIMS)
    argparse.add_argument('-p', '--perplexity', type=float,
            default=DEFAULT_PERPLEXITY)
    # 0.0 for theta is equivalent to vanilla t-SNE
    argparse.add_argument('-t', '--theta', type=float, default=DEFAULT_THETA)
    argparse.add_argument('-r', '--randseed', type=int, default=EMPTY_SEED)
    argparse.add_argument('-n', '--initial_dims', type=int, default=INITIAL_DIMENSIONS)
    argparse.add_argument('-v', '--verbose', action='store_true')
    argparse.add_argument('-i', '--input', type=FileType('r'), default=stdin)
    argparse.add_argument('-o', '--output', type=FileType('w'),
            default=stdout)
    argparse.add_argument('--use_pca', action='store_true')
    argparse.add_argument('--no_pca', dest='use_pca', action='store_false')
    argparse.set_defaults(use_pca=DEFAULT_USE_PCA)
    argparse.add_argument('-m', '--max_iter', type=int, default=DEFAULT_MAX_ITERATIONS)
    return argparse
项目:senf    作者:quodlibet    | 项目源码 | 文件源码
def capture_output(data=None):
    """
    with capture_output as (stdout, stderr):
        some_action()
    print stdout.getvalue(), stderr.getvalue()
    """

    in_ = BytesIO(data or b"")
    err = BytesIO()
    out = BytesIO()
    old_in = sys.stdin
    old_err = sys.stderr
    old_out = sys.stdout
    sys.stdin = in_
    sys.stderr = err
    sys.stdout = out

    try:
        yield (out, err)
    finally:
        sys.stdin = old_in
        sys.stderr = old_err
        sys.stdout = old_out
项目:charm-keystone    作者:openstack    | 项目源码 | 文件源码
def __init__(self, addr="127.0.0.1", port=4444):
        """Initialize the socket and initialize pdb."""

        # Backup stdin and stdout before replacing them by the socket handle
        self.old_stdout = sys.stdout
        self.old_stdin = sys.stdin

        # Open a 'reusable' socket to let the webapp reload on the same port
        self.skt = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.skt.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
        self.skt.bind((addr, port))
        self.skt.listen(1)
        (clientsocket, address) = self.skt.accept()
        handle = clientsocket.makefile('rw')
        pdb.Pdb.__init__(self, completekey='tab', stdin=handle, stdout=handle)
        sys.stdout = sys.stdin = handle
项目:charm-keystone    作者:openstack    | 项目源码 | 文件源码
def __init__(self, addr="127.0.0.1", port=4444):
        """Initialize the socket and initialize pdb."""

        # Backup stdin and stdout before replacing them by the socket handle
        self.old_stdout = sys.stdout
        self.old_stdin = sys.stdin

        # Open a 'reusable' socket to let the webapp reload on the same port
        self.skt = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.skt.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
        self.skt.bind((addr, port))
        self.skt.listen(1)
        (clientsocket, address) = self.skt.accept()
        handle = clientsocket.makefile('rw')
        pdb.Pdb.__init__(self, completekey='tab', stdin=handle, stdout=handle)
        sys.stdout = sys.stdin = handle
项目:charm-keystone    作者:openstack    | 项目源码 | 文件源码
def __init__(self, addr="127.0.0.1", port=4444):
        """Initialize the socket and initialize pdb."""

        # Backup stdin and stdout before replacing them by the socket handle
        self.old_stdout = sys.stdout
        self.old_stdin = sys.stdin

        # Open a 'reusable' socket to let the webapp reload on the same port
        self.skt = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.skt.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
        self.skt.bind((addr, port))
        self.skt.listen(1)
        (clientsocket, address) = self.skt.accept()
        handle = clientsocket.makefile('rw')
        pdb.Pdb.__init__(self, completekey='tab', stdin=handle, stdout=handle)
        sys.stdout = sys.stdin = handle
项目:charm-keystone    作者:openstack    | 项目源码 | 文件源码
def __init__(self, addr="127.0.0.1", port=4444):
        """Initialize the socket and initialize pdb."""

        # Backup stdin and stdout before replacing them by the socket handle
        self.old_stdout = sys.stdout
        self.old_stdin = sys.stdin

        # Open a 'reusable' socket to let the webapp reload on the same port
        self.skt = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.skt.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
        self.skt.bind((addr, port))
        self.skt.listen(1)
        (clientsocket, address) = self.skt.accept()
        handle = clientsocket.makefile('rw')
        pdb.Pdb.__init__(self, completekey='tab', stdin=handle, stdout=handle)
        sys.stdout = sys.stdin = handle
项目:pysnappy    作者:aeroevan    | 项目源码 | 文件源码
def main():
    import sys
    parser = argparse.ArgumentParser(description="pysnappy driver")
    parser.add_argument("-f", "--file", help="Input file", default=sys.stdin)
    parser.add_argument("-b", "--bytesize",
                        help="Bitesize for streaming reads",
                        type=int, default=65536)
    parser.add_argument("-o", "--output", help="Output file",
                        default=sys.stdout)
    group = parser.add_mutually_exclusive_group()
    group.add_argument("-c", "--compress", action="store_true")
    group.add_argument("-d", "--decompress", action="store_false")
    parser.add_argument("-t", "--framing", help="Framing format",
                        choices=["framing2", "hadoop"], default="framing2")

    args = parser.parse_args()
    run(args)
项目:swjtu-pyscraper    作者:Desgard    | 项目源码 | 文件源码
def description_of(lines, name='stdin'):
    """
    Return a string describing the probable encoding of a file or
    list of strings.

    :param lines: The lines to get the encoding of.
    :type lines: Iterable of bytes
    :param name: Name of file or collection of lines
    :type name: str
    """
    u = UniversalDetector()
    for line in lines:
        u.feed(line)
    u.close()
    result = u.result
    if result['encoding']:
        return '{0}: {1} with confidence {2}'.format(name, result['encoding'],
                                                     result['confidence'])
    else:
        return '{0}: no result'.format(name)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def description_of(lines, name='stdin'):
    """
    Return a string describing the probable encoding of a file or
    list of strings.

    :param lines: The lines to get the encoding of.
    :type lines: Iterable of bytes
    :param name: Name of file or collection of lines
    :type name: str
    """
    u = UniversalDetector()
    for line in lines:
        u.feed(line)
    u.close()
    result = u.result
    if result['encoding']:
        return '{0}: {1} with confidence {2}'.format(name, result['encoding'],
                                                     result['confidence'])
    else:
        return '{0}: no result'.format(name)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def description_of(lines, name='stdin'):
    """
    Return a string describing the probable encoding of a file or
    list of strings.

    :param lines: The lines to get the encoding of.
    :type lines: Iterable of bytes
    :param name: Name of file or collection of lines
    :type name: str
    """
    u = UniversalDetector()
    for line in lines:
        u.feed(line)
    u.close()
    result = u.result
    if result['encoding']:
        return '{0}: {1} with confidence {2}'.format(name, result['encoding'],
                                                     result['confidence'])
    else:
        return '{0}: no result'.format(name)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def description_of(lines, name='stdin'):
    """
    Return a string describing the probable encoding of a file or
    list of strings.

    :param lines: The lines to get the encoding of.
    :type lines: Iterable of bytes
    :param name: Name of file or collection of lines
    :type name: str
    """
    u = UniversalDetector()
    for line in lines:
        u.feed(line)
    u.close()
    result = u.result
    if result['encoding']:
        return '{0}: {1} with confidence {2}'.format(name, result['encoding'],
                                                     result['confidence'])
    else:
        return '{0}: no result'.format(name)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def description_of(lines, name='stdin'):
    """
    Return a string describing the probable encoding of a file or
    list of strings.

    :param lines: The lines to get the encoding of.
    :type lines: Iterable of bytes
    :param name: Name of file or collection of lines
    :type name: str
    """
    u = UniversalDetector()
    for line in lines:
        u.feed(line)
    u.close()
    result = u.result
    if result['encoding']:
        return '{0}: {1} with confidence {2}'.format(name, result['encoding'],
                                                     result['confidence'])
    else:
        return '{0}: no result'.format(name)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def description_of(lines, name='stdin'):
    """
    Return a string describing the probable encoding of a file or
    list of strings.

    :param lines: The lines to get the encoding of.
    :type lines: Iterable of bytes
    :param name: Name of file or collection of lines
    :type name: str
    """
    u = UniversalDetector()
    for line in lines:
        u.feed(line)
    u.close()
    result = u.result
    if result['encoding']:
        return '{0}: {1} with confidence {2}'.format(name, result['encoding'],
                                                     result['confidence'])
    else:
        return '{0}: no result'.format(name)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def description_of(lines, name='stdin'):
    """
    Return a string describing the probable encoding of a file or
    list of strings.

    :param lines: The lines to get the encoding of.
    :type lines: Iterable of bytes
    :param name: Name of file or collection of lines
    :type name: str
    """
    u = UniversalDetector()
    for line in lines:
        u.feed(line)
    u.close()
    result = u.result
    if result['encoding']:
        return '{0}: {1} with confidence {2}'.format(name, result['encoding'],
                                                     result['confidence'])
    else:
        return '{0}: no result'.format(name)
项目:jira_worklog_scanner    作者:pgarneau    | 项目源码 | 文件源码
def description_of(lines, name='stdin'):
    """
    Return a string describing the probable encoding of a file or
    list of strings.

    :param lines: The lines to get the encoding of.
    :type lines: Iterable of bytes
    :param name: Name of file or collection of lines
    :type name: str
    """
    u = UniversalDetector()
    for line in lines:
        u.feed(line)
    u.close()
    result = u.result
    if result['encoding']:
        return '{0}: {1} with confidence {2}'.format(name, result['encoding'],
                                                     result['confidence'])
    else:
        return '{0}: no result'.format(name)
项目:zabbix_manager    作者:BillWang139967    | 项目源码 | 文件源码
def __call__(self, string):
        # the special argument "-" means sys.std{in,out}
        if string == '-':
            if 'r' in self._mode:
                return _sys.stdin
            elif 'w' in self._mode:
                return _sys.stdout
            else:
                msg = _('argument "-" with mode %r' % self._mode)
                raise ValueError(msg)

        try:
            # all other arguments are used as file names
            if self._bufsize:
                return open(string, self._mode, self._bufsize)
            else:
                return open(string, self._mode)
        except IOError:
            err = _sys.exc_info()[1]
            message = _("can't open '%s': %s")
            raise ArgumentTypeError(message % (string, err))
项目:sublime-text-3-packages    作者:nickjj    | 项目源码 | 文件源码
def testWithIO(inp, out, f):
    """Calls the function `f` with ``sys.stdin`` changed to `inp`
    and ``sys.stdout`` changed to `out`.  They are restored when `f`
    returns.  This function returns whatever `f` returns.
    """

    import os

    try:
        oldin,sys.stdin = sys.stdin,inp
        oldout,sys.stdout = sys.stdout,out
        x = f()
    finally:
        sys.stdin = oldin
        sys.stdout = oldout
    if os.environ.get('PYPNG_TEST_TMP') and hasattr(out,'getvalue'):
        name = mycallersname()
        if name:
            w = open(name+'.png', 'wb')
            w.write(out.getvalue())
            w.close()
    return x
项目:abusehelper    作者:Exploit-install    | 项目源码 | 文件源码
def from_command_line(cls, *args, **keys):
        params = list()
        for name, param in cls.params():
            if name not in keys:
                params.append((name, param))

        bot_name = inspect.getmodulename(inspect.stack()[1][1])

        if "ABUSEHELPER_CONF_FROM_STDIN" in os.environ:
            defaults = dict(pickle.load(sys.stdin))
            defaults.setdefault("bot_name", bot_name)
            added = cls._from_dict(params, **defaults)
        else:
            added = cls._from_sys_argv(params, bot_name=bot_name)

        added.update(keys)
        return cls(*args, **added)
项目:bittyband    作者:yam655    | 项目源码 | 文件源码
def get_key(self, block=True, timeout=None):
        if timeout:
            check = select.select([sys.stdin], [], [], timeout)[0]
            if len(check) == 0:
                return None
            ret = self.stdscrs[-1].getkey()
        elif block:
            ret = self.stdscrs[-1].getkey()
        else:
            self.stdscrs[-1].nodelay(1)
            try:
                ret = self.stdscrs[-1].getkey()
                if ret == curses.ERR:
                    ret = None
            except:
                ret = None
            finally:
                self.stdscrs[-1].nodelay(0)
        if len(ret) == 1:
            if ord(ret) < 0x20:
                ret = "^{}".format(chr(ord(ret) + ord('@')))
            elif ord(ret) == 0x7f:
                ret = "^?"
        return ret
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def shutdown(self):
        """Revert stdin and stdout, close the socket."""
        sys.stdout = self.old_stdout
        sys.stdin = self.old_stdin
        self.skt.close()
        self.set_continue()
项目:kuryr-kubernetes    作者:openstack    | 项目源码 | 文件源码
def run():
    # REVISIT(ivc): current CNI implementation provided by this package is
    # experimental and its primary purpose is to enable development of other
    # components (e.g. functional tests, service/LBaaSv2 support)
    cni_conf = utils.CNIConfig(jsonutils.load(sys.stdin))
    args = ['--config-file', cni_conf.kuryr_conf]

    try:
        if cni_conf.debug:
            args.append('-d')
    except AttributeError:
        pass
    config.init(args)
    config.setup_logging()

    # Initialize o.vo registry.
    k_objects.register_locally_defined_vifs()
    os_vif.initialize()

    if CONF.cni_daemon.daemon_enabled:
        runner = cni_api.CNIDaemonizedRunner()
    else:
        runner = cni_api.CNIStandaloneRunner(K8sCNIPlugin())
    LOG.info("Using '%s' ", runner.__class__.__name__)

    def _timeout(signum, frame):
        runner._write_dict(sys.stdout, {
            'msg': 'timeout',
            'code': k_const.CNI_TIMEOUT_CODE,
        })
        LOG.debug('timed out')
        sys.exit(1)

    signal.signal(signal.SIGALRM, _timeout)
    signal.alarm(_CNI_TIMEOUT)
    status = runner.run(os.environ, cni_conf, sys.stdout)
    LOG.debug("Exiting with status %s", status)
    if status:
        sys.exit(status)