Python typing 模块,IO 实例源码


def parse(f: IO[Any]) -> Result:
    Parse a shellscript and return a ShellScript
    :param f: TextIOBase handle to the shellscript file
    :return: Result with Ok or Err
    comments = []
    commands = []
    interpreter = ""

    buf = f.readlines()

    for line in buf:
        trimmed = line.strip()
        if trimmed.startswith("#!"):
            interpreter = trimmed
        elif trimmed.startswith("#"):
            # Skip blank lines
            if trimmed:
    return Ok(ShellScript(interpreter=interpreter,
def parse_entries(self, file: IO[Any]) -> Result:
        Parse fstab entries
        :param file: TextIOWrapper file handle to the fstab
        :return: Result with Ok or Err
        entries = []
        contents = file.readlines()

        for line in contents:
            if line.startswith("#"):
            parts = line.split()
            if len(parts) != 6:
            fsck_order = int(parts[5])
                dump=False if parts[4] == "0" else True,
        return Ok(entries)
def __init__(self, *, process: subprocess.Popen, encoding: str, tmp_input: FilesystemIPC) -> None:
        self.process = process
        self.stdout_encoding = encoding
        self.iterating = False
        # We need to capture stderr in a background thread to avoid deadlocks.
        # (The problem would occur when dlvhex2 is blocked because the OS buffers on the stderr pipe are full... so we have to constantly read from *both* stdout and stderr)
        self.stderr_capture_thread = StreamCaptureThread(self.process.stderr)
        # Set up finalization. Using weakref.finalize seems to work more robustly than using __del__.
        # (One problem that occurred with __del__: It seemed like python was calling __del__ for self.process and its IO streams first,
        # which resulted in ResourceWarnings even though we were closing the streams properly in our __del__ function.)
        self._finalize = weakref.finalize(self, DlvhexLineReader.__close, process, self.stderr_capture_thread, encoding, tmp_input)  # type: ignore
        # Make sure the subprocess will be terminated if it's still running when the python process exits
        self._finalize.atexit = True
def parse(self, file):
        if not isinstance(file, str) and not isinstance(file, IO):
            raise TypeError("file is not str or IO")
        if isinstance(file, str):
                enc = 'windows-1251'
                with open(file, encoding=enc) as f:
                    content = f.readlines()
            except FileNotFoundError:
                raise FileNotFoundError("Not found " + file)
            content = file.readlines()
        self._data = [x.strip() for x in content]

        return self
def _read_in_chunks(file_object: IO[bytes], chunk_size: int = 2*MB) -> Generator[bytes, None, None]:
    """Read a file in fixed-size chunks (to minimize memory usage for large files).

        file_object: An opened file-like object supporting read().
        chunk_size: Max size (in bytes) of each file chunk.

        File chunks, each of size at most chunk_size.
    while True:
        chunk =
        if chunk:
            yield chunk
            return  # End of file.
def __init__(
        _=None,  # type: Optional[Union[AnyStr, typing.Mapping, typing.Sequence, typing.IO]]
        self._meta = None
        if _ is not None:
            if isinstance(_, HTTPResponse):
                meta.get(self).url = _.url
            _ = deserialize(_)
            for k, v in _.items():
                    self[k] = v
                except KeyError as e:
                    if e.args and len(e.args) == 1:
                        e.args = (
                            r'%s.%s: %s' % (type(self).__name__, e.args[0], json.dumps(_)),
                    raise e
def rawstream(fp):
    # type: (IO[Any]) -> IO[bytes]
    if PY3:
            return fp.buffer  # type: ignore
        except AttributeError:
            # There might be a BytesIO behind fp.
    return fp  # type: Optional[IO[bytes]]
def write(s, fp=None):
    # type: (Union[str, bytes], Optional[IO[Any]]) -> None
    """Write s to the binary stream fp (default is stdout).
    efp = fp if fp is not None else sys.stdout
def outline(s=b'', end=b'\n', fp=None):
    # type: (Union[str, bytes], Union[str, bytes], Optional[IO]) -> None
    write(bytestr(s) + bytestr(end), fp=fp)
def category_print(categories, categorytype, category, s, prefix='', end='\n', fp=None):
    # type: (Set[str], str, str, Union[str, bytes], str, str, Optional[IO]) -> None
    if category not in categories:
    if categorytype == 'info':
        msg = prefix
        msg = '%s%s_%s: ' % (prefix, categorytype, category)
    if MESSAGE_CATEGORY_FILES is not None:
        logfilename = 'whatstyle_%s_%s.log' % (categorytype, category)
        fp = MESSAGE_CATEGORY_FILES.get(logfilename)
        if fp is None:
            path = os.path.join(tempfile.gettempdir(), logfilename)
            fp = open(path, 'wb')
            MESSAGE_CATEGORY_FILES[logfilename] = fp
    if fp is None and LOGFILE:
        global LOGFILEFP
        if not LOGFILEFP:
            LOGFILEFP = open(LOGFILE, 'wb')
        fp = LOGFILEFP
    if fp is None:
        fp = rawstream(sys.stderr if STDERR_OUTPUT else sys.stdout)
    write(msg, fp=fp)
    write(s, fp=fp)
    if end:
        write(end, fp=fp)
def iprint(category, s, prefix='', end='\n', fp=None):
    # type: (str, AnyStr, str, str, Optional[IO[AnyStr]]) -> None
    category_print(args_info, 'info', category, s, prefix, end, fp=fp)
def reporterror(s, fp=None):
    # type: (str, Optional[IO[AnyStr]]) -> None
    if fp is None:
        fp = rawstream(sys.stderr)  # type: ignore
    reportmessage(s, fp=fp)
def soutline(s='', enc='utf-8', fp=None):
    # type: (str, str, Optional[IO[Any]]) -> None
    data = unescape_ill_surrencode(s, enc=enc)
    write(data + b'\n', fp=fp)
def __init__(self, namespace):
        self.__answers = namespace.answers  # type: IO[Any]
        self.__verbose = namespace.verbose  # type: int
        self.__execute = namespace.execute  # type: bool
def answers(self):
        # type: () -> IO[Any]
        return self.__answers
def __load_answers(gateway, target):
        # type: (AnswersGateway, IO) -> Answers
        return gateway.read_answers_from_file(target)
def write_answers_to_file(self, answers, target_file):
        # type: (Answers, IO) -> None
        raw_answers = {}
        if answers.installer() is not None:
            raw_answers['installer'] = answers.installer().raw_options()
        if answers.fqdn_configuration() is not None:
            raw_answers['fqdn'] = answers.fqdn_configuration().raw_options()
        if answers.csrattrs_configuration() is not None:
            raw_answers['csr-attributes'] = answers.csrattrs_configuration().raw_options()
        yaml = ruamel.yaml.dump(raw_answers, Dumper=ruamel.yaml.RoundTripDumper)
项目:puppeter    作者:coi-gov-pl    | 项目源码 | 文件源码
def read_answers_from_file(self, target):
        # type: (IO) -> Answers
def _run(predictor: Predictor,
         input_file: IO,
         output_file: Optional[IO],
         batch_size: int,
         print_to_console: bool,
         cuda_device: int) -> None:

    def _run_predictor(batch_data):
        if len(batch_data) == 1:
            result = predictor.predict_json(batch_data[0], cuda_device)
            # Batch results return a list of json objects, so in
            # order to iterate over the result below we wrap this in a list.
            results = [result]
            results = predictor.predict_batch_json(batch_data, cuda_device)

        for model_input, output in zip(batch_data, results):
            string_output = json.dumps(output)
            if print_to_console:
                print("input: ", model_input)
                print("prediction: ", string_output)
            if output_file:
                output_file.write(string_output + "\n")

    batch_json_data = []
    for line in input_file:
        if not line.isspace():
            # Collect batch size amount of data.
            json_data = json.loads(line)
            if len(batch_json_data) == batch_size:
                batch_json_data = []

    # We might not have a dataset perfectly divisible by the batch size,
    # so tidy up the scraps.
    if batch_json_data:
def file(self, mode: str = 'w+b', buffering: int = -1, encoding: typing.Optional[str] = None,
             newline: typing.Optional[str] = None, suffix: typing.Optional[str] = DEFAULT_SUFFIX,
             prefix: typing.Optional[str] = DEFAULT_PREFIX, dir: typing.Optional[str] = None) -> typing.IO:
        Create a new temporary file within the scratch dir.

        This returns the result of :func:`~tempfile.TemporaryFile` which returns a nameless, file-like object that
        will cease to exist once it is closed.

        :param mode: (Optional) mode to open the file with
        :type mode: :class:`~str`
        :param buffering: (Optional) size of the file buffer
        :type buffering: :class:`~int`
        :param encoding: (Optional) encoding to open the file with
        :type encoding: :class:`~str` or :class:`~NoneType`
        :param newline: (Optional) newline argument to open the file with
        :type newline: :class:`~str` or :class:`~NoneType`
        :param suffix: (Optional) filename suffix
        :type suffix: :class:`~str` or :class:`~NoneType`
        :param prefix: (Optional) filename prefix
        :type prefix: :class:`~str` or :class:`~NoneType`
        :param dir: (Optional) relative path to directory within the scratch dir where the file should exist
        :type dir: :class:`~str` or :class:`~NoneType`
        :return: file-like object as returned by :func:`~tempfile.TemporaryFile`
        :rtype: :class:`~_io.BufferedRandom`
        return tempfile.TemporaryFile(mode, buffering, encoding, newline,
                                      suffix, prefix, self.join(dir))
def named(self, mode: str = 'w+b', buffering: int = -1, encoding: typing.Optional[str] = None,
              newline: typing.Optional[str] = None, suffix: typing.Optional[str] = DEFAULT_SUFFIX,
              prefix: typing.Optional[str] = DEFAULT_PREFIX, dir: typing.Optional[str] = None,
              delete: bool = True) -> typing.IO:
        Create a new named temporary file within the scratch dir.

        This returns the result of :func:`~tempfile.NamedTemporaryFile` which returns a named, file-like object that
        will cease to exist once it is closed unless `delete` is set to `False`.

        :param mode: (Optional) mode to open the file with
        :type mode: :class:`~str`
        :param buffering: (Optional) size of the file buffer
        :type buffering: :class:`~int`
        :param encoding: (Optional) encoding to open the file with
        :type encoding: :class:`~str` or :class:`~NoneType`
        :param newline: (Optional) newline argument to open the file with
        :type newline: :class:`~str` or :class:`~NoneType`
        :param suffix: (Optional) filename suffix
        :type suffix: :class:`~str` or :class:`~NoneType`
        :param prefix: (Optional) filename prefix
        :type prefix: :class:`~str` or :class:`~NoneType`
        :param dir: (Optional) relative path to directory within the scratch dir where the file should exist
        :type dir: :class:`~str` or :class:`~NoneType`
        :param delete: (Optional) flag to indicate if the file should be deleted from disk when it is closed
        :type delete: :class:`~bool`
        :return: file-like object as returned by :func:`~tempfile.NamedTemporaryFile`
        :rtype: :class:`~_io.TemporaryFileWrapper`
        return tempfile.NamedTemporaryFile(mode, buffering, encoding, newline,
                                           suffix, prefix, self.join(dir), delete)
def spooled(self, max_size: int = 0, mode: str = 'w+b', buffering: int = -1,
                encoding: typing.Optional[str] = None, newline: typing.Optional[str] = None,
                suffix: typing.Optional[str] = DEFAULT_SUFFIX, prefix: typing.Optional[str] = DEFAULT_PREFIX,
                dir: typing.Optional[str] = None) -> typing.IO:
        Create a new spooled temporary file within the scratch dir.

        This returns a :class:`~tempfile.SpooledTemporaryFile` which is a specialized object that wraps a
        :class:`StringIO`/:class:`BytesIO` instance that transparently overflows into a file on the disk once it
        reaches a certain size.

        By default, a spooled file will never roll over to disk.

        :param max_size: (Optional) max size before the in-memory buffer rolls over to disk
        :type max_size: :class:`~int`
        :param mode: (Optional) mode to open the file with
        :type mode: :class:`~str`
        :param buffering: (Optional) size of the file buffer
        :type buffering: :class:`~int`
        :param encoding: (Optional) encoding to open the file with
        :type encoding: :class:`~str`
        :param newline: (Optional) newline argument to open the file with
        :type newline: :class:`~str` or :class:`~NoneType`
        :param suffix: (Optional) filename suffix
        :type suffix: :class:`~str` or :class:`~NoneType`
        :param prefix: (Optional) filename prefix
        :type prefix: :class:`~str` or :class:`~NoneType`
        :param dir: (Optional) relative path to directory within the scratch dir where the file should exist
        :type dir: :class:`~bool`
        :return: SpooledTemporaryFile instance
        :rtype: :class:`~tempfile.SpooledTemporaryFile`
        return tempfile.SpooledTemporaryFile(max_size, mode, buffering, encoding,
                                             newline, suffix, prefix, self.join(dir))
def sha1_from_file_object(file_object: typing.IO[bytes]):
    block_size = 65536
    hasher = hashlib.sha1()
    buf =
    while len(buf) > 0:
        buf =
    return hasher.hexdigest()
def __init__(self, candidates: List[int], lock_dir: str) -> None:
        self.candidates = candidates
        self.lock_dir = lock_dir
        self.lock_file = None  # type: Optional[IO[Any]]
        self.lock_file_path = None  # type: Optional[str]
        self.gpu_id = None  # type: Optional[int]
        self._acquired_lock = False
def print_multilines(cls, name: str, value: str, file: IO):
        if value:
            lines = value.split('\n')
            if len(lines) == 1:
                print("    * {name}: {value}".format(name=name, value=value), file=file)
                print("    * {name}:".format(name=name), file=file)
                for line in lines:
                    print("        - {line}".format(line=line), file=file)
def print_leaf(cls, commit: Commit, file: IO) -> None:
        print("* subject: {subject}".format(subject=commit.subject or ''), file=file)
        cls.print_multilines(name='body', value=commit.body, file=file)
        print("    * date: {date}".format(date=datetools.date2str(, file=file)
        print("    * author: {author}".format(, file=file)
        print("    * commit: {id}".format(, file=file)
def print_header(self, node: 'Node', file: IO):
            "{header} {criterion_name}: {name}".format(
                header="#" * (self.depth_level() + 1),
def __init__(self, view: View, info: Dict[str, Any], api: TwitchAPI, quality: str, temp_dir: str = '.') -> None:
        if not TwitchVideo._schema:
            with open('video_info.schema') as json_data:
                TwitchVideo._schema = json.load(json_data)
        self._validate_info(info) = info
        self.api = api
        self.quality = quality
        self.temp_dir = temp_dir
        self.view = view

        self.download_done: bool = False
        self.file: Optional[IO[bytes]] = None
def open_file(path: Union[str, IO], mode='rb'):
    if isinstance(path, str):
        file = open(path, mode)
        file = path
    return file
def save(self, path: Union[str, IO]):
        state = self.__getstate__()
        with open_file(path, 'wb') as outfile:
            pickle.dump(state, outfile)
项目:sk-torch    作者:mattHawthorn    | 项目源码 | 文件源码
        with open_file(path, 'rb') as infile:
            state = pickle.load(infile)
        model = cls.__new__(cls)
        return model

    # for using pickle.dump/load directly
def __init__(self, stream: IO[S]) -> None:
        super().__init__(daemon=True) = stream = None  # type: S
def write_file(config_dict: Dict[str, Any], config_file: IO[str]) -> None:
    config = configparser.ConfigParser()
    config.write(config_file, space_around_delimiters=False)
def write_file(config_dict: Dict[str, Any], config_file: IO[str]) -> None:
    config = configparser.ConfigParser()
    config.write(config_file, space_around_delimiters=False)
def write_file(config_dict: Dict[str, Any], config_file: IO[str]) -> None:
    config = configparser.ConfigParser()
    config.write(config_file, space_around_delimiters=False)
def create_test_file(self) -> (typing.IO, bytes):
        import io
        import secrets

        file_contents = secrets.token_bytes(512)
        test_file: typing.IO = io.BytesIO(file_contents)

        return test_file, file_contents
def _export(deck: db.Deck, handle: IO[Any]) -> None:
            'description': deck.description,
                'color': tag.color,
            } for tag in deck.tags],
                'id': card.num,
                'question': card.question,
                'answers': card.answers,
                'active': card.is_active,
                'activation_date': card.activation_date,
                'tags': [ for tag in card.tags],
                    'correct': answer.is_correct,
                } for answer in card.user_answers],
            } for card in],
        separators=(',', ':'),
def configure(sources: Iterable[Path],
              cmd: str,
              blddir: Path,
              out: IO[str] = sys.stdout) -> None:
    namer = ObjectFileNamer()
    args = cmd.split()
    fortran_tasks = {str(path): {
        'source': str(path),
        'args': args + [str(blddir/namer(path))]
    } for path in sources}
    json.dump(fortran_tasks, out)
def download_course(course):
    folder_name = os.path.join("drumeo", course.number)
    if not os.path.isdir(folder_name):
    details = os.path.join(folder_name, "details.txt")
    if not os.path.isfile(details):
        with open(details, "wt") as file_handle:  # type: IO[str]
            print("course_number: {}".format(course.number), file=file_handle)
            print("course_name: {}".format(, file=file_handle)
            print("course_difficulty: {}".format(course.diff), file=file_handle)
            print("instructor: {}".format(course.instructor), file=file_handle)
    if course.resources is not None:
        download_url(course.resources, os.path.join(folder_name, ""))
    for i, (video, quality) in enumerate(course.videos):
        download_video_if_wider(video, os.path.join(folder_name, "{}.mp4".format(i)), width=int(quality))
def open(self, filepath):
        # type: (str) -> IO[str]
        filepath = os.path.normpath(filepath)
        abs_filepath = os.path.join(self._root_dir, filepath)
        if abs_filepath.startswith(self._root_dir):
            return open(abs_filepath)
            raise PermissionError("Cannot open file \"{}\". Bots may only access "
                                  "files in their local directory.".format(abs_filepath))
def generate_and_write(filepaths, file_obj):
    # type: (Iterator[str], IO[str]) -> None
    template = 'include {line}\n'
    lines = map(lambda line: template.format(line=line), filepaths)

def process_loop(log):
    # type: (IO[Any]) -> None
    restart_check_count = 0
    last_check_time = time.time()
    while True:[zephyr._z.getFD()], [], [], 15)
            # Fetch notices from the queue until its empty
            while True:
                notice = zephyr.receive(block=False)
                if notice is None:
                    process_notice(notice, log)
                except Exception:
                    logger.exception("Error relaying zephyr:")
        except Exception:
            logger.exception("Error checking for new zephyrs:")

        if time.time() - last_check_time > 15:
            last_check_time = time.time()
                if restart_check_count > 0:
          "Stopped getting errors checking whether restart is required.")
                    restart_check_count = 0
            except Exception:
                if restart_check_count < 5:
                    logger.exception("Error checking whether restart is required:")
                    restart_check_count += 1

            if options.forward_class_messages:
                except Exception:
                    logger.exception("Error updating subscriptions from Zulip:")
def call_endpoint(self, url=None, method="POST", request=None, longpolling=False, files=None):
        # type: (str, str, Dict[str, Any], bool, List[IO[Any]]) -> Dict[str, Any]
        if request is None:
            request = dict()
        return self.do_api_query(request, API_VERSTRING + url, method=method,
                                 longpolling=longpolling, files=files)
def upload_file(self, file):
        # type: (IO[Any]) -> Dict[str, Any]
            See examples/upload-file for example usage.
        return self.call_endpoint(
def unidump(inbytes: IO[bytes], env: Env) -> None:
    """take a list of bytes and print their Unicode codepoints

    >>> import io
    >>> import sys
    >>> from unidump.env import Env
    >>> _env = Env(linelength=4, output=sys.stdout)
    >>> unidump(io.BytesIO(b'\\x01\\xF0\\x9F\\x99\\xB8ABC'), _env)
          0    0001 1F678 0041 0042    .\U0001F678AB
          7    0043                   C
    >>> unidump(io.BytesIO(b'\\xD7'), _env)
          0    ?D7?                   X
    >>> _env.encoding = 'latin1'
    >>> unidump(io.BytesIO(b'\\xD7'), _env)
          0    00D7                   \u00D7

    byteoffset = 0
    bytebuffer = b''
    current_line = [0, [], '']

    byte =
    while byte:
        byteoffset += 1
        bytebuffer += byte

            char = bytebuffer.decode(env.encoding)
        except UnicodeDecodeError:
            next_byte =
            if not next_byte or len(bytebuffer) >= 4:
                for i, x in enumerate(bytebuffer):
                    current_line = (
                        fill_and_print(current_line, byteoffset - 4 + i,
                                       '?{:02X}?'.format(x), 'X', env)
                bytebuffer = b''
            byte = next_byte
            current_line = (
                fill_and_print(current_line, byteoffset - len(bytebuffer),
                               '{:04X}'.format(ord(char)), sanitize_char(char),

        bytebuffer = b''
        byte =

    print_line(current_line, env)
def _import(handle: IO[Any]) -> None:
    with db.session_scope() as session:
        deck_obj = json.load(handle)

        deck = db.Deck() = deck_obj['name']
        deck.description = deck_obj['description']

        existing_deck = db.try_get_deck_by_name(session,
        if existing_deck:
            if not util.confirm(
                    'Are you sure you want to overwrite deck %r?' %

        tag_dict = {}
        for tag_obj in deck_obj['tags']:
            tag = db.Tag()
   = tag_obj['name']
            tag.color = tag_obj['color']
            tag_dict[] = tag

        for card_obj in deck_obj['cards']:
            card = db.Card()
            card.num = card_obj['id']
            card.question = card_obj['question']
            card.answers = card_obj['answers']
            card.is_active = card_obj['active']
            card.tags = [tag_dict[name] for name in card_obj['tags']]
            for user_answer_obj in card_obj['user_answers']:
                user_answer = db.UserAnswer()
       = parse_date(user_answer_obj['date'])
                user_answer.is_correct = user_answer_obj['correct']
            if 'activation_date' in card_obj:
                if card_obj['activation_date']:
                    card.activation_date = parse_date(
            elif card.user_answers:
                card.activation_date = sorted(
                    card.user_answers, key=lambda ua:[0].date
            card.due_date = scheduler.next_due_date(card)