我们从Python开源项目中,提取了以下46个代码示例,用于说明如何使用typing.IO。
def parse(f: IO[Any]) -> Result: """ Parse a shellscript and return a ShellScript :param f: TextIOBase handle to the shellscript file :return: Result with Ok or Err """ comments = [] commands = [] interpreter = "" buf = f.readlines() for line in buf: trimmed = line.strip() if trimmed.startswith("#!"): interpreter = trimmed elif trimmed.startswith("#"): comments.append(str(trimmed)) else: # Skip blank lines if trimmed: commands.append(str(trimmed)) return Ok(ShellScript(interpreter=interpreter, comments=comments, commands=commands))
def parse_entries(self, file: IO[Any]) -> Result: """ Parse fstab entries :param file: TextIOWrapper file handle to the fstab :return: Result with Ok or Err """ entries = [] contents = file.readlines() for line in contents: if line.startswith("#"): continue parts = line.split() if len(parts) != 6: continue fsck_order = int(parts[5]) entries.append(FsEntry( fs_spec=parts[0], mountpoint=os.path.join(parts[1]), vfs_type=parts[2], mount_options=parts[3].split(","), dump=False if parts[4] == "0" else True, fsck_order=fsck_order)) return Ok(entries)
def __init__(self, *, process: subprocess.Popen, encoding: str, tmp_input: FilesystemIPC) -> None: self.process = process self.stdout_encoding = encoding self.iterating = False # # We need to capture stderr in a background thread to avoid deadlocks. # (The problem would occur when dlvhex2 is blocked because the OS buffers on the stderr pipe are full... so we have to constantly read from *both* stdout and stderr) self.stderr_capture_thread = StreamCaptureThread(self.process.stderr) self.stderr_capture_thread.start() # # Set up finalization. Using weakref.finalize seems to work more robustly than using __del__. # (One problem that occurred with __del__: It seemed like python was calling __del__ for self.process and its IO streams first, # which resulted in ResourceWarnings even though we were closing the streams properly in our __del__ function.) self._finalize = weakref.finalize(self, DlvhexLineReader.__close, process, self.stderr_capture_thread, encoding, tmp_input) # type: ignore # Make sure the subprocess will be terminated if it's still running when the python process exits self._finalize.atexit = True
def parse(self, file): if not isinstance(file, str) and not isinstance(file, IO): raise TypeError("file is not str or IO") if isinstance(file, str): try: enc = 'windows-1251' with open(file, encoding=enc) as f: content = f.readlines() except FileNotFoundError: raise FileNotFoundError("Not found " + file) else: content = file.readlines() self._data = [x.strip() for x in content] self.clear() return self
def _read_in_chunks(file_object: IO[bytes], chunk_size: int = 2*MB) -> Generator[bytes, None, None]: """Read a file in fixed-size chunks (to minimize memory usage for large files). Args: file_object: An opened file-like object supporting read(). chunk_size: Max size (in bytes) of each file chunk. Yields: File chunks, each of size at most chunk_size. """ while True: chunk = file_object.read(chunk_size) if chunk: yield chunk else: return # End of file.
def __init__( self, _=None, # type: Optional[Union[AnyStr, typing.Mapping, typing.Sequence, typing.IO]] ): self._meta = None if _ is not None: if isinstance(_, HTTPResponse): meta.get(self).url = _.url _ = deserialize(_) for k, v in _.items(): try: self[k] = v except KeyError as e: if e.args and len(e.args) == 1: e.args = ( r'%s.%s: %s' % (type(self).__name__, e.args[0], json.dumps(_)), ) raise e
def rawstream(fp): # type: (IO[Any]) -> IO[bytes] if PY3: try: return fp.buffer # type: ignore except AttributeError: # There might be a BytesIO behind fp. pass return fp # type: Optional[IO[bytes]]
def write(s, fp=None): # type: (Union[str, bytes], Optional[IO[Any]]) -> None """Write s to the binary stream fp (default is stdout). """ efp = fp if fp is not None else sys.stdout rawstream(efp).write(bytestr(s))
def outline(s=b'', end=b'\n', fp=None): # type: (Union[str, bytes], Union[str, bytes], Optional[IO]) -> None write(bytestr(s) + bytestr(end), fp=fp)
def category_print(categories, categorytype, category, s, prefix='', end='\n', fp=None): # type: (Set[str], str, str, Union[str, bytes], str, str, Optional[IO]) -> None if category not in categories: return if categorytype == 'info': msg = prefix else: msg = '%s%s_%s: ' % (prefix, categorytype, category) if MESSAGE_CATEGORY_FILES is not None: logfilename = 'whatstyle_%s_%s.log' % (categorytype, category) fp = MESSAGE_CATEGORY_FILES.get(logfilename) if fp is None: path = os.path.join(tempfile.gettempdir(), logfilename) fp = open(path, 'wb') MESSAGE_CATEGORY_FILES[logfilename] = fp if fp is None and LOGFILE: global LOGFILEFP if not LOGFILEFP: LOGFILEFP = open(LOGFILE, 'wb') fp = LOGFILEFP if fp is None: fp = rawstream(sys.stderr if STDERR_OUTPUT else sys.stdout) write(msg, fp=fp) write(s, fp=fp) if end: write(end, fp=fp)
def iprint(category, s, prefix='', end='\n', fp=None): # type: (str, AnyStr, str, str, Optional[IO[AnyStr]]) -> None category_print(args_info, 'info', category, s, prefix, end, fp=fp)
def reporterror(s, fp=None): # type: (str, Optional[IO[AnyStr]]) -> None if fp is None: fp = rawstream(sys.stderr) # type: ignore reportmessage(s, fp=fp)
def soutline(s='', enc='utf-8', fp=None): # type: (str, str, Optional[IO[Any]]) -> None data = unescape_ill_surrencode(s, enc=enc) write(data + b'\n', fp=fp)
def __init__(self, namespace): self.__answers = namespace.answers # type: IO[Any] self.__verbose = namespace.verbose # type: int self.__execute = namespace.execute # type: bool
def answers(self): # type: () -> IO[Any] return self.__answers
def __load_answers(gateway, target): # type: (AnswersGateway, IO) -> Answers return gateway.read_answers_from_file(target)
def write_answers_to_file(self, answers, target_file): # type: (Answers, IO) -> None raw_answers = {} if answers.installer() is not None: raw_answers['installer'] = answers.installer().raw_options() if answers.fqdn_configuration() is not None: raw_answers['fqdn'] = answers.fqdn_configuration().raw_options() if answers.csrattrs_configuration() is not None: raw_answers['csr-attributes'] = answers.csrattrs_configuration().raw_options() yaml = ruamel.yaml.dump(raw_answers, Dumper=ruamel.yaml.RoundTripDumper) target_file.write(yaml)
def read_answers_from_file(self, target): # type: (IO) -> Answers pass
def _run(predictor: Predictor, input_file: IO, output_file: Optional[IO], batch_size: int, print_to_console: bool, cuda_device: int) -> None: def _run_predictor(batch_data): if len(batch_data) == 1: result = predictor.predict_json(batch_data[0], cuda_device) # Batch results return a list of json objects, so in # order to iterate over the result below we wrap this in a list. results = [result] else: results = predictor.predict_batch_json(batch_data, cuda_device) for model_input, output in zip(batch_data, results): string_output = json.dumps(output) if print_to_console: print("input: ", model_input) print("prediction: ", string_output) if output_file: output_file.write(string_output + "\n") batch_json_data = [] for line in input_file: if not line.isspace(): # Collect batch size amount of data. json_data = json.loads(line) batch_json_data.append(json_data) if len(batch_json_data) == batch_size: _run_predictor(batch_json_data) batch_json_data = [] # We might not have a dataset perfectly divisible by the batch size, # so tidy up the scraps. if batch_json_data: _run_predictor(batch_json_data)
def file(self, mode: str = 'w+b', buffering: int = -1, encoding: typing.Optional[str] = None, newline: typing.Optional[str] = None, suffix: typing.Optional[str] = DEFAULT_SUFFIX, prefix: typing.Optional[str] = DEFAULT_PREFIX, dir: typing.Optional[str] = None) -> typing.IO: """ Create a new temporary file within the scratch dir. This returns the result of :func:`~tempfile.TemporaryFile` which returns a nameless, file-like object that will cease to exist once it is closed. :param mode: (Optional) mode to open the file with :type mode: :class:`~str` :param buffering: (Optional) size of the file buffer :type buffering: :class:`~int` :param encoding: (Optional) encoding to open the file with :type encoding: :class:`~str` or :class:`~NoneType` :param newline: (Optional) newline argument to open the file with :type newline: :class:`~str` or :class:`~NoneType` :param suffix: (Optional) filename suffix :type suffix: :class:`~str` or :class:`~NoneType` :param prefix: (Optional) filename prefix :type prefix: :class:`~str` or :class:`~NoneType` :param dir: (Optional) relative path to directory within the scratch dir where the file should exist :type dir: :class:`~str` or :class:`~NoneType` :return: file-like object as returned by :func:`~tempfile.TemporaryFile` :rtype: :class:`~_io.BufferedRandom` """ return tempfile.TemporaryFile(mode, buffering, encoding, newline, suffix, prefix, self.join(dir))
def named(self, mode: str = 'w+b', buffering: int = -1, encoding: typing.Optional[str] = None, newline: typing.Optional[str] = None, suffix: typing.Optional[str] = DEFAULT_SUFFIX, prefix: typing.Optional[str] = DEFAULT_PREFIX, dir: typing.Optional[str] = None, delete: bool = True) -> typing.IO: """ Create a new named temporary file within the scratch dir. This returns the result of :func:`~tempfile.NamedTemporaryFile` which returns a named, file-like object that will cease to exist once it is closed unless `delete` is set to `False`. :param mode: (Optional) mode to open the file with :type mode: :class:`~str` :param buffering: (Optional) size of the file buffer :type buffering: :class:`~int` :param encoding: (Optional) encoding to open the file with :type encoding: :class:`~str` or :class:`~NoneType` :param newline: (Optional) newline argument to open the file with :type newline: :class:`~str` or :class:`~NoneType` :param suffix: (Optional) filename suffix :type suffix: :class:`~str` or :class:`~NoneType` :param prefix: (Optional) filename prefix :type prefix: :class:`~str` or :class:`~NoneType` :param dir: (Optional) relative path to directory within the scratch dir where the file should exist :type dir: :class:`~str` or :class:`~NoneType` :param delete: (Optional) flag to indicate if the file should be deleted from disk when it is closed :type delete: :class:`~bool` :return: file-like object as returned by :func:`~tempfile.NamedTemporaryFile` :rtype: :class:`~_io.TemporaryFileWrapper` """ return tempfile.NamedTemporaryFile(mode, buffering, encoding, newline, suffix, prefix, self.join(dir), delete)
def spooled(self, max_size: int = 0, mode: str = 'w+b', buffering: int = -1, encoding: typing.Optional[str] = None, newline: typing.Optional[str] = None, suffix: typing.Optional[str] = DEFAULT_SUFFIX, prefix: typing.Optional[str] = DEFAULT_PREFIX, dir: typing.Optional[str] = None) -> typing.IO: """ Create a new spooled temporary file within the scratch dir. This returns a :class:`~tempfile.SpooledTemporaryFile` which is a specialized object that wraps a :class:`StringIO`/:class:`BytesIO` instance that transparently overflows into a file on the disk once it reaches a certain size. By default, a spooled file will never roll over to disk. :param max_size: (Optional) max size before the in-memory buffer rolls over to disk :type max_size: :class:`~int` :param mode: (Optional) mode to open the file with :type mode: :class:`~str` :param buffering: (Optional) size of the file buffer :type buffering: :class:`~int` :param encoding: (Optional) encoding to open the file with :type encoding: :class:`~str` :param newline: (Optional) newline argument to open the file with :type newline: :class:`~str` or :class:`~NoneType` :param suffix: (Optional) filename suffix :type suffix: :class:`~str` or :class:`~NoneType` :param prefix: (Optional) filename prefix :type prefix: :class:`~str` or :class:`~NoneType` :param dir: (Optional) relative path to directory within the scratch dir where the file should exist :type dir: :class:`~bool` :return: SpooledTemporaryFile instance :rtype: :class:`~tempfile.SpooledTemporaryFile` """ return tempfile.SpooledTemporaryFile(max_size, mode, buffering, encoding, newline, suffix, prefix, self.join(dir))
def sha1_from_file_object(file_object: typing.IO[bytes]): block_size = 65536 hasher = hashlib.sha1() buf = file_object.read(block_size) while len(buf) > 0: hasher.update(buf) buf = file_object.read(block_size) file_object.close() return hasher.hexdigest()
def __init__(self, candidates: List[int], lock_dir: str) -> None: self.candidates = candidates self.lock_dir = lock_dir self.lock_file = None # type: Optional[IO[Any]] self.lock_file_path = None # type: Optional[str] self.gpu_id = None # type: Optional[int] self._acquired_lock = False
def print_multilines(cls, name: str, value: str, file: IO): if value: lines = value.split('\n') if len(lines) == 1: print(" * {name}: {value}".format(name=name, value=value), file=file) else: print(" * {name}:".format(name=name), file=file) for line in lines: print(" - {line}".format(line=line), file=file)
def print_leaf(cls, commit: Commit, file: IO) -> None: print("* subject: {subject}".format(subject=commit.subject or ''), file=file) cls.print_multilines(name='body', value=commit.body, file=file) print(" * date: {date}".format(date=datetools.date2str(commit.date)), file=file) print(" * author: {author}".format(author=commit.author), file=file) print(" * commit: {id}".format(id=commit.id), file=file)
def print_header(self, node: 'Node', file: IO): print( "{header} {criterion_name}: {name}".format( header="#" * (self.depth_level() + 1), criterion_name=Commit.property_name(node.criterion), name=node.name ), file=file ) print(file=file)
def __init__(self, view: View, info: Dict[str, Any], api: TwitchAPI, quality: str, temp_dir: str = '.') -> None: if not TwitchVideo._schema: with open('video_info.schema') as json_data: TwitchVideo._schema = json.load(json_data) self._validate_info(info) self.info = info self.api = api self.quality = quality self.temp_dir = temp_dir self.view = view self.download_done: bool = False self.file: Optional[IO[bytes]] = None
def open_file(path: Union[str, IO], mode='rb'): if isinstance(path, str): file = open(path, mode) else: file = path return file
def save(self, path: Union[str, IO]): state = self.__getstate__() with open_file(path, 'wb') as outfile: pickle.dump(state, outfile)
def load(cls, path: Union[str, IO]) -> 'TorchModel': with open_file(path, 'rb') as infile: state = pickle.load(infile) model = cls.__new__(cls) model.__setstate__(state) return model # for using pickle.dump/load directly
def __init__(self, stream: IO[S]) -> None: super().__init__(daemon=True) self.stream = stream self.data = None # type: S
def write_file(config_dict: Dict[str, Any], config_file: IO[str]) -> None: config = configparser.ConfigParser() config.read_dict(config_dict) config.write(config_file, space_around_delimiters=False)
def create_test_file(self) -> (typing.IO, bytes): import io import secrets file_contents = secrets.token_bytes(512) test_file: typing.IO = io.BytesIO(file_contents) return test_file, file_contents
def _export(deck: db.Deck, handle: IO[Any]) -> None: json.dump( { 'name': deck.name, 'description': deck.description, 'tags': [{ 'name': tag.name, 'color': tag.color, } for tag in deck.tags], 'cards': [{ 'id': card.num, 'question': card.question, 'answers': card.answers, 'active': card.is_active, 'activation_date': card.activation_date, 'tags': [tag.name for tag in card.tags], 'user_answers': [{ 'date': answer.date, 'correct': answer.is_correct, } for answer in card.user_answers], } for card in deck.cards], }, handle, default=_json_serializer, separators=(',', ':'), check_circular=False)
def configure(sources: Iterable[Path], cmd: str, blddir: Path, out: IO[str] = sys.stdout) -> None: namer = ObjectFileNamer() args = cmd.split() fortran_tasks = {str(path): { 'source': str(path), 'args': args + [str(blddir/namer(path))] } for path in sources} json.dump(fortran_tasks, out)
def download_course(course): folder_name = os.path.join("drumeo", course.number) if not os.path.isdir(folder_name): os.makedirs(folder_name) details = os.path.join(folder_name, "details.txt") if not os.path.isfile(details): with open(details, "wt") as file_handle: # type: IO[str] print("course_number: {}".format(course.number), file=file_handle) print("course_name: {}".format(course.name), file=file_handle) print("course_difficulty: {}".format(course.diff), file=file_handle) print("instructor: {}".format(course.instructor), file=file_handle) if course.resources is not None: download_url(course.resources, os.path.join(folder_name, "resources.zip")) for i, (video, quality) in enumerate(course.videos): download_video_if_wider(video, os.path.join(folder_name, "{}.mp4".format(i)), width=int(quality))
def open(self, filepath): # type: (str) -> IO[str] filepath = os.path.normpath(filepath) abs_filepath = os.path.join(self._root_dir, filepath) if abs_filepath.startswith(self._root_dir): return open(abs_filepath) else: raise PermissionError("Cannot open file \"{}\". Bots may only access " "files in their local directory.".format(abs_filepath))
def generate_and_write(filepaths, file_obj): # type: (Iterator[str], IO[str]) -> None template = 'include {line}\n' lines = map(lambda line: template.format(line=line), filepaths) file_obj.writelines(lines) file_obj.write('\n')
def process_loop(log): # type: (IO[Any]) -> None restart_check_count = 0 last_check_time = time.time() while True: select.select([zephyr._z.getFD()], [], [], 15) try: # Fetch notices from the queue until its empty while True: notice = zephyr.receive(block=False) if notice is None: break try: process_notice(notice, log) except Exception: logger.exception("Error relaying zephyr:") time.sleep(2) except Exception: logger.exception("Error checking for new zephyrs:") time.sleep(1) continue if time.time() - last_check_time > 15: last_check_time = time.time() try: maybe_restart_mirroring_script() if restart_check_count > 0: logger.info("Stopped getting errors checking whether restart is required.") restart_check_count = 0 except Exception: if restart_check_count < 5: logger.exception("Error checking whether restart is required:") restart_check_count += 1 if options.forward_class_messages: try: update_subscriptions() except Exception: logger.exception("Error updating subscriptions from Zulip:")
def call_endpoint(self, url=None, method="POST", request=None, longpolling=False, files=None): # type: (str, str, Dict[str, Any], bool, List[IO[Any]]) -> Dict[str, Any] if request is None: request = dict() return self.do_api_query(request, API_VERSTRING + url, method=method, longpolling=longpolling, files=files)
def upload_file(self, file): # type: (IO[Any]) -> Dict[str, Any] ''' See examples/upload-file for example usage. ''' return self.call_endpoint( url='user_uploads', files=[file] )
def unidump(inbytes: IO[bytes], env: Env) -> None: """take a list of bytes and print their Unicode codepoints >>> import io >>> import sys >>> from unidump.env import Env >>> _env = Env(linelength=4, output=sys.stdout) >>> unidump(io.BytesIO(b'\\x01\\xF0\\x9F\\x99\\xB8ABC'), _env) 0 0001 1F678 0041 0042 .\U0001F678AB 7 0043 C >>> unidump(io.BytesIO(b'\\xD7'), _env) 0 ?D7? X >>> _env.encoding = 'latin1' >>> unidump(io.BytesIO(b'\\xD7'), _env) 0 00D7 \u00D7 """ byteoffset = 0 bytebuffer = b'' current_line = [0, [], ''] byte = inbytes.read(1) while byte: byteoffset += 1 bytebuffer += byte try: char = bytebuffer.decode(env.encoding) except UnicodeDecodeError: next_byte = inbytes.read(1) if not next_byte or len(bytebuffer) >= 4: for i, x in enumerate(bytebuffer): current_line = ( fill_and_print(current_line, byteoffset - 4 + i, '?{:02X}?'.format(x), 'X', env) ) bytebuffer = b'' byte = next_byte continue else: current_line = ( fill_and_print(current_line, byteoffset - len(bytebuffer), '{:04X}'.format(ord(char)), sanitize_char(char), env) ) bytebuffer = b'' byte = inbytes.read(1) print_line(current_line, env)
def _import(handle: IO[Any]) -> None: with db.session_scope() as session: deck_obj = json.load(handle) deck = db.Deck() deck.name = deck_obj['name'] deck.description = deck_obj['description'] existing_deck = db.try_get_deck_by_name(session, deck.name) if existing_deck: if not util.confirm( 'Are you sure you want to overwrite deck %r?' % deck.name): return session.delete(existing_deck) session.commit() tag_dict = {} for tag_obj in deck_obj['tags']: tag = db.Tag() tag.name = tag_obj['name'] tag.color = tag_obj['color'] deck.tags.append(tag) tag_dict[tag.name] = tag for card_obj in deck_obj['cards']: card = db.Card() card.num = card_obj['id'] card.question = card_obj['question'] card.answers = card_obj['answers'] card.is_active = card_obj['active'] card.tags = [tag_dict[name] for name in card_obj['tags']] for user_answer_obj in card_obj['user_answers']: user_answer = db.UserAnswer() user_answer.date = parse_date(user_answer_obj['date']) user_answer.is_correct = user_answer_obj['correct'] card.user_answers.append(user_answer) if 'activation_date' in card_obj: if card_obj['activation_date']: card.activation_date = parse_date( card_obj['activation_date']) elif card.user_answers: card.activation_date = sorted( card.user_answers, key=lambda ua: ua.date)[0].date card.due_date = scheduler.next_due_date(card) deck.cards.append(card) session.add(deck)