我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用io.TextIOWrapper()。
def generate_info(): tickets_archive_path = ROOT_DIR_PATH.joinpath('tickets.zip') ensure_data_file(tickets_archive_path, DATA_FILE_INFO['TICKETS_URL']) with zipfile.ZipFile(str(tickets_archive_path)) as zf: for name in zf.namelist(): stem, ext = os.path.splitext(name) if ext != '.csv': continue with zf.open(name) as f: # Zipfile only opens file in binary mode, but csv only accepts # text files, so we need to wrap this. # See <https://stackoverflow.com/questions/5627954>. textfile = io.TextIOWrapper(f, encoding='utf8', newline='') for row in csv.DictReader(textfile): yield Registration(row)
def run(self, *args, **options): p_path = os.path.join('/proc', str(os.getppid()), 'cmdline') with open(p_path, 'rb') as f: p_cmdline = f.read().split(b'\x00') p = None if b'runserver' not in p_cmdline: self.stdout.write("Starting webpack-dev-server...") p = webpack_dev_server() wrapper = io.TextIOWrapper(p.stdout, line_buffering=True) first_line = next(wrapper) webpack_host = first_line.split()[-1] print(webpack_host) super().run(**options) if p: p.kill() p.wait()
def open_str(self, name: str, encoding='utf8'): """Open a file in unicode mode or raise FileNotFoundError. The return value is a StringIO in-memory buffer. """ with self: # File() calls with the VPK object we need directly. if isinstance(name, VPKFile): file = name else: try: file = self._ref[name] except KeyError: raise FileNotFoundError(name) # Wrap the data to treat it as bytes, then # wrap that to decode and clean up universal newlines. return io.TextIOWrapper(io.BytesIO(file.read()), encoding)
def popen(cmd, mode="r", buffering=-1): if not isinstance(cmd, str): raise TypeError("invalid cmd type (%s, expected string)" % type(cmd)) if mode not in ("r", "w"): raise ValueError("invalid mode %r" % mode) if buffering == 0 or buffering is None: raise ValueError("popen() does not support unbuffered streams") import subprocess, io if mode == "r": proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, bufsize=buffering) return _wrap_close(io.TextIOWrapper(proc.stdout), proc) else: proc = subprocess.Popen(cmd, shell=True, stdin=subprocess.PIPE, bufsize=buffering) return _wrap_close(io.TextIOWrapper(proc.stdin), proc) # Helper for popen() -- a proxy for a file whose close waits for the process
def main(): parser = argparse.ArgumentParser() parser.add_argument('-c', '--config', help='Config file') args = parser.parse_args() if args.config: with open(args.config) as input: config = json.load(input) else: config = {} if not config.get('disable_collection', False): logger.info('Sending version information to stitchdata.com. ' + 'To disable sending anonymous usage data, set ' + 'the config parameter "disable_collection" to true') threading.Thread(target=send_usage_stats).start() input = io.TextIOWrapper(sys.stdin.buffer, encoding='utf-8') state = persist_lines(config.get('delimiter', ','), config.get('quotechar', '"'), input) emit_state(state) logger.debug("Exiting normally")
def backport_makefile(self, mode="r", buffering=None, encoding=None, errors=None, newline=None): """ Backport of ``socket.makefile`` from Python 3.5. """ if not set(mode) <= set(["r", "w", "b"]): raise ValueError( "invalid mode %r (only r, w, b allowed)" % (mode,) ) writing = "w" in mode reading = "r" in mode or not writing assert reading or writing binary = "b" in mode rawmode = "" if reading: rawmode += "r" if writing: rawmode += "w" raw = SocketIO(self, rawmode) self._makefile_refs += 1 if buffering is None: buffering = -1 if buffering < 0: buffering = io.DEFAULT_BUFFER_SIZE if buffering == 0: if not binary: raise ValueError("unbuffered streams must be binary") return raw if reading and writing: buffer = io.BufferedRWPair(raw, raw, buffering) elif reading: buffer = io.BufferedReader(raw, buffering) else: assert writing buffer = io.BufferedWriter(raw, buffering) if binary: return buffer text = io.TextIOWrapper(buffer, encoding, errors, newline) text.mode = mode return text
def handle_display_options(self, option_order): """If there were any non-global "display-only" options (--help-commands or the metadata display options) on the command line, display the requested info and return true; else return false. """ import sys if six.PY2 or self.help_commands: return _Distribution.handle_display_options(self, option_order) # Stdout may be StringIO (e.g. in tests) import io if not isinstance(sys.stdout, io.TextIOWrapper): return _Distribution.handle_display_options(self, option_order) # Don't wrap stdout if utf-8 is already the encoding. Provides # workaround for #334. if sys.stdout.encoding.lower() in ('utf-8', 'utf8'): return _Distribution.handle_display_options(self, option_order) # Print metadata in UTF-8 no matter the platform encoding = sys.stdout.encoding errors = sys.stdout.errors newline = sys.platform != 'win32' and '\n' or None line_buffering = sys.stdout.line_buffering sys.stdout = io.TextIOWrapper( sys.stdout.detach(), 'utf-8', errors, newline, line_buffering) try: return _Distribution.handle_display_options(self, option_order) finally: sys.stdout = io.TextIOWrapper( sys.stdout.detach(), encoding, errors, newline, line_buffering)
def gettext_popen_wrapper(args, os_err_exc_type=CommandError, stdout_encoding="utf-8"): """ Makes sure text obtained from stdout of gettext utilities is Unicode. """ # This both decodes utf-8 and cleans line endings. Simply using # popen_wrapper(universal_newlines=True) doesn't properly handle the # encoding. This goes back to popen's flaky support for encoding: # https://bugs.python.org/issue6135. This is a solution for #23271, #21928. # No need to do anything on Python 2 because it's already a byte-string there. manual_io_wrapper = six.PY3 and stdout_encoding != DEFAULT_LOCALE_ENCODING stdout, stderr, status_code = popen_wrapper(args, os_err_exc_type=os_err_exc_type, universal_newlines=not manual_io_wrapper) if manual_io_wrapper: stdout = io.TextIOWrapper(io.BytesIO(stdout), encoding=stdout_encoding).read() if six.PY2: stdout = stdout.decode(stdout_encoding) return stdout, stderr, status_code
def anyfile(infile, mode='r', encoding="utf8"): ''' return a file handler with the support for gzip/zip comppressed files if infile is a two value tuple, then first one is the compressed file; the second one is the actual filename in the compressed file. e.g., ('a.zip', 'aa.txt') ''' if isinstance(infile, tuple): infile, rawfile = infile[:2] else: rawfile = os.path.splitext(infile)[0] filetype = os.path.splitext(infile)[1].lower() if filetype == '.gz': import gzip in_f = io.TextIOWrapper(gzip.GzipFile(infile, 'r'),encoding=encoding) elif filetype == '.zip': import zipfile in_f = io.TextIOWrapper(zipfile.ZipFile(infile, 'r').open(rawfile, 'r'),encoding=encoding) else: in_f = open(infile, mode, encoding=encoding) return in_f
def open_str(self, name: str, encoding='utf8'): """Return a string buffer for a 'file'. This performs universal newlines conversion. The encoding argument is ignored for files which are originally text. """ # We don't need this, but it should match other filesystems. self._check_open() try: filename, data = self._mapping[self._clean_path(name)] except KeyError: raise FileNotFoundError(name) if isinstance(data, bytes): # Decode on the fly, with universal newlines. return io.TextIOWrapper( io.BytesIO(data), encoding=encoding, ) else: # None = universal newlines mode directly. # No encoding is needed obviously. return io.StringIO(data, newline=None)
def read_cases(file): open = try_open_zip(file) if not open: file.seek(0) open = try_open_tar(file) if not open: raise FormatError(file, 'not a zip file or tar file') try: config = TextIOWrapper(open('config.ini'), encoding='utf-8', errors='replace') return read_legacy_cases(config, open) except FileNotFoundError: pass try: config = open('config.yaml') return read_yaml_cases(config, open) except FileNotFoundError: pass raise FormatError('config file not found')
def load_source(self): """Load the source for the specified file.""" if self.filename in self.STDIN_NAMES: self.filename = 'stdin' if sys.version_info[0] < 3: self.source = sys.stdin.read() else: self.source = TextIOWrapper(sys.stdin.buffer, errors='ignore').read() else: # Could be a Python 2.7 StringIO with no context manager, sigh. # with tokenize_open(self.filename) as fd: # self.source = fd.read() handle = tokenize_open(self.filename) self.source = handle.read() handle.close()
def __write_merged_imports(self, file: TextIOWrapper, imports: list): """ Sorts and write the given imports to the file, adding a blank line between each group f -- the file object (needs write permission) imports -- the list of imports """ sorted_imports = self.sort_imports(imports) previous_group = -1 for imp in sorted_imports: group = self.get_import_group(imp) if not (group == previous_group): if not (previous_group == -1): file.write("\n") previous_group = group file.write(imp)
def source_to_unicode(txt, errors='replace', skip_encoding_cookie=True): """Converts a bytes string with python source code to unicode. Unicode strings are passed through unchanged. Byte strings are checked for the python source file encoding cookie to determine encoding. txt can be either a bytes buffer or a string containing the source code. """ if isinstance(txt, str): return txt if isinstance(txt, bytes): buffer = BytesIO(txt) else: buffer = txt try: encoding, _ = detect_encoding(buffer.readline) except SyntaxError: encoding = "ascii" buffer.seek(0) text = TextIOWrapper(buffer, encoding, errors=errors, line_buffering=True) text.mode = 'r' if skip_encoding_cookie: return u"".join(strip_encoding_cookie(text)) else: return text.read()
def popen(cmd, mode="r", buffering=-1): if not isinstance(cmd, str): raise TypeError("invalid cmd type (%s, expected string)" % type(cmd)) if mode not in ("r", "w"): raise ValueError("invalid mode %r" % mode) if buffering == 0 or buffering == None: raise ValueError("popen() does not support unbuffered streams") import subprocess, io if mode == "r": proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, bufsize=buffering) return _wrap_close(io.TextIOWrapper(proc.stdout), proc) else: proc = subprocess.Popen(cmd, shell=True, stdin=subprocess.PIPE, bufsize=buffering) return _wrap_close(io.TextIOWrapper(proc.stdin), proc) # Helper for popen() -- a proxy for a file whose close waits for the process
def test_fileobj_readlines(self): self.tar.extract("ustar/regtype", TEMPDIR) tarinfo = self.tar.getmember("ustar/regtype") with open(os.path.join(TEMPDIR, "ustar/regtype"), "r") as fobj1: lines1 = fobj1.readlines() fobj = self.tar.extractfile(tarinfo) try: fobj2 = io.TextIOWrapper(fobj) lines2 = fobj2.readlines() self.assertTrue(lines1 == lines2, "fileobj.readlines() failed") self.assertTrue(len(lines2) == 114, "fileobj.readlines() failed") self.assertTrue(lines2[83] == "I will gladly admit that Python is not the fastest running scripting language.\n", "fileobj.readlines() failed") finally: fobj.close()
def test_add(self): # Add copies of a sample message keys = [] keys.append(self._box.add(self._template % 0)) self.assertEqual(len(self._box), 1) keys.append(self._box.add(mailbox.Message(_sample_message))) self.assertEqual(len(self._box), 2) keys.append(self._box.add(email.message_from_string(_sample_message))) self.assertEqual(len(self._box), 3) keys.append(self._box.add(io.BytesIO(_bytes_sample_message))) self.assertEqual(len(self._box), 4) keys.append(self._box.add(_sample_message)) self.assertEqual(len(self._box), 5) keys.append(self._box.add(_bytes_sample_message)) self.assertEqual(len(self._box), 6) with self.assertWarns(DeprecationWarning): keys.append(self._box.add( io.TextIOWrapper(io.BytesIO(_bytes_sample_message)))) self.assertEqual(len(self._box), 7) self.assertEqual(self._box.get_string(keys[0]), self._template % 0) for i in (1, 2, 3, 4, 5, 6): self._check_sample(self._box[keys[i]])
def open(self, mode='r', encoding=None): """Return file-like object Args: mode (str): access mode (only reading modes are supported) encoding (str): text decoding method for text access (default: system default) Returns: io.BytesIO OR io.TextIOWrapper: buffer accessing the file as bytes or characters """ access_type = self._get_access_type(mode) if access_type == 't' and encoding is not None and encoding != self.encoded_with: warnings.warn('Attempting to decode %s as "%s", but encoding is declared as "%s"' % (self, encoding, self.encoded_with)) if encoding is None: encoding = self.encoded_with buffer = io.BytesIO(self._contents) if access_type == 'b': return buffer else: return io.TextIOWrapper(buffer, encoding=encoding)
def open(filename): """Open a file in read only mode using the encoding detected by detect_encoding(). """ buffer = _builtin_open(filename, 'rb') try: encoding, lines = detect_encoding(buffer.readline) buffer.seek(0) text = TextIOWrapper(buffer, encoding, line_buffering=True) text.mode = 'r' return text except: buffer.close() raise
def load(self, file): try: tree = json.load(io.TextIOWrapper(file, encoding='utf8')) tree = Audit.SCHEMA.validate(tree) self.__artifact = Artifact.fromData(tree["artifact"]) self.__references = { r["artifact-id"] : Artifact.fromData(r) for r in tree["references"] } except schema.SchemaError as e: raise ParseError("Invalid audit record: " + str(e)) except ValueError as e: raise ParseError("Invalid json: " + str(e)) self.__validate()
def save(self, file): tree = { "artifact" : self.__artifact.dump(), "references" : [ a.dump() for a in self.__references.values() ] } try: with gzip.open(file, 'wb', 6) as gzf: json.dump(tree, io.TextIOWrapper(gzf, encoding='utf8')) except OSError as e: raise BuildError("Cannot write audit: " + str(e))
def make_a_snapshot(file_name, output_name, delay=DEFAULT_DELAY): file_type = output_name.split('.')[-1] pixel_ratio = 2 shell_flag = False if sys.platform == 'win32': shell_flag = True __actual_delay_in_ms = int(delay * 1000) # add shell=True and it works on Windows now. proc_params = [ PHANTOMJS_EXE, os.path.join(get_resource_dir('phantomjs'), 'snapshot.js'), file_name.replace('\\', '/'), file_type, str(__actual_delay_in_ms), str(pixel_ratio) ] proc = subprocess.Popen( proc_params, stdout=subprocess.PIPE, shell=shell_flag) if PY2: content = proc.stdout.read() content = content.decode('utf-8') else: content = io.TextIOWrapper(proc.stdout, encoding="utf-8").read() content_array = content.split(',') if len(content_array) != 2: raise Exception("No snapshot taken by phantomjs. " + "Please make sure it is installed " + "and available on your path") base64_imagedata = content_array[1] imagedata = decode_base64(base64_imagedata.encode('utf-8')) if file_type in ['pdf', 'gif']: save_as(imagedata, output_name, file_type) elif file_type in ['png', 'jpeg']: save_as_png(imagedata, output_name) else: raise Exception(NOT_SUPPORTED_FILE_TYPE % file_type)
def _wrap_reader_for_text(fp, encoding): if isinstance(fp.read(0), bytes): fp = io.TextIOWrapper(io.BufferedReader(fp), encoding) return fp
def _wrap_writer_for_text(fp, encoding): try: fp.write('') except TypeError: fp = io.TextIOWrapper(fp, encoding) return fp
def handle_display_options(self, option_order): """If there were any non-global "display-only" options (--help-commands or the metadata display options) on the command line, display the requested info and return true; else return false. """ import sys if sys.version_info < (3,) or self.help_commands: return _Distribution.handle_display_options(self, option_order) # Stdout may be StringIO (e.g. in tests) import io if not isinstance(sys.stdout, io.TextIOWrapper): return _Distribution.handle_display_options(self, option_order) # Don't wrap stdout if utf-8 is already the encoding. Provides # workaround for #334. if sys.stdout.encoding.lower() in ('utf-8', 'utf8'): return _Distribution.handle_display_options(self, option_order) # Print metadata in UTF-8 no matter the platform encoding = sys.stdout.encoding errors = sys.stdout.errors newline = sys.platform != 'win32' and '\n' or None line_buffering = sys.stdout.line_buffering sys.stdout = io.TextIOWrapper( sys.stdout.detach(), 'utf-8', errors, newline, line_buffering) try: return _Distribution.handle_display_options(self, option_order) finally: sys.stdout = io.TextIOWrapper( sys.stdout.detach(), encoding, errors, newline, line_buffering) # Install it throughout the distutils
def __init__(self, port=None): """Opens a connection to a Removinator controller on the specified serial port. If a port is not specified, an attempt will be made to auto-discover the Removinator controller. :param port: port that the Removinator is connected to :type port: str :raises: :exc:`ConnectError` :ivar last_result: The result output from the last executed command :ivar last_response: The response output from the last executed command :ivar port: The port that the Removinator is connected to """ self.port = port self.last_result = '' self.last_response = '' # Attempt to discover the Removinator controller if a # port was not specified. if port is None: port = _discover_removinator() # Open a connection to the Removinator controller. try: self.connection = serial.Serial( port, 9600, serial.EIGHTBITS, serial.PARITY_NONE, serial.STOPBITS_ONE, 1) self.sio = io.TextIOWrapper(io.BufferedRWPair(self.connection, self.connection)) except serial.SerialException as e: raise ConnectError('Unable to open connection to Removinator ' 'controller on port {0}'.format(port))
def readlines(filename): """Read the source code.""" try: with open(filename, 'rb') as f: (coding, lines) = tokenize.detect_encoding(f.readline) f = TextIOWrapper(f, coding, line_buffering=True) return [l.decode(coding) for l in lines] + f.readlines() except (LookupError, SyntaxError, UnicodeError): # Fall back if file encoding is improperly declared with open(filename, encoding='latin-1') as f: return f.readlines()
def stdin_get_value(): return TextIOWrapper(sys.stdin.buffer, errors='ignore').read()
def __init__(self, stream, encoding, errors, **extra): self._stream = stream = _FixupStream(stream) io.TextIOWrapper.__init__(self, stream, encoding, errors, **extra) # The io module is a place where the Python 3 text behavior # was forced upon Python 2, so we need to unbreak # it to look like Python 2.
def write(self, x): if isinstance(x, str) or is_bytes(x): try: self.flush() except Exception: pass return self.buffer.write(str(x)) return io.TextIOWrapper.write(self, x)
def handle_display_options(self, option_order): """If there were any non-global "display-only" options (--help-commands or the metadata display options) on the command line, display the requested info and return true; else return false. """ import sys if PY2 or self.help_commands: return _Distribution.handle_display_options(self, option_order) # Stdout may be StringIO (e.g. in tests) import io if not isinstance(sys.stdout, io.TextIOWrapper): return _Distribution.handle_display_options(self, option_order) # Don't wrap stdout if utf-8 is already the encoding. Provides # workaround for #334. if sys.stdout.encoding.lower() in ('utf-8', 'utf8'): return _Distribution.handle_display_options(self, option_order) # Print metadata in UTF-8 no matter the platform encoding = sys.stdout.encoding errors = sys.stdout.errors newline = sys.platform != 'win32' and '\n' or None line_buffering = sys.stdout.line_buffering sys.stdout = io.TextIOWrapper( sys.stdout.detach(), 'utf-8', errors, newline, line_buffering) try: return _Distribution.handle_display_options(self, option_order) finally: sys.stdout = io.TextIOWrapper( sys.stdout.detach(), encoding, errors, newline, line_buffering) # Install it throughout the distutils
def handle_display_options(self, option_order): """If there were any non-global "display-only" options (--help-commands or the metadata display options) on the command line, display the requested info and return true; else return false. """ import sys if six.PY2 or self.help_commands: return _Distribution.handle_display_options(self, option_order) # Stdout may be StringIO (e.g. in tests) import io if not isinstance(sys.stdout, io.TextIOWrapper): return _Distribution.handle_display_options(self, option_order) # Don't wrap stdout if utf-8 is already the encoding. Provides # workaround for #334. if sys.stdout.encoding.lower() in ('utf-8', 'utf8'): return _Distribution.handle_display_options(self, option_order) # Print metadata in UTF-8 no matter the platform encoding = sys.stdout.encoding errors = sys.stdout.errors newline = sys.platform != 'win32' and '\n' or None line_buffering = sys.stdout.line_buffering sys.stdout = io.TextIOWrapper( sys.stdout.detach(), 'utf-8', errors, newline, line_buffering) try: return _Distribution.handle_display_options(self, option_order) finally: sys.stdout = io.TextIOWrapper( sys.stdout.detach(), encoding, errors, newline, line_buffering) # Install it throughout the distutils
def begin(): if _isatty: global _pager global _file _pager = subprocess.Popen(['less', '-F', '-R', '-S', '-X', '-K'], stdin=subprocess.PIPE, stdout=sys.stdout) _file = io.TextIOWrapper(_pager.stdin, 'UTF-8')
def read_gzip(filepath): # handles buffering # TODO: profile whether this is fast. with gzip.open(filepath, 'rb') as f: # leave in binary mode (default), let TextIOWrapper decode with io.BufferedReader(f, buffer_size=2**18) as g: # 256KB buffer with io.TextIOWrapper(g) as h: # bytes -> unicode yield h