Python typing 模块,BinaryIO() 实例源码

我们从Python开源项目中,提取了以下14个代码示例,用于说明如何使用typing.BinaryIO()

项目:Odin    作者:HackSoftware    | 项目源码 | 文件源码
def create_binary_problem(*,
                          language: str='',
                          test_type: int=GraderBinaryProblem.OUTPUT_CHECKING,
                          file_type: int=GraderBinaryProblem.BINARY,
                          solution: BinaryIO=None,
                          test: BinaryIO=None,
                          extra_options: Dict={}) -> GraderBinaryProblem:

    run_create_problem_service_validation(language=language,
                                          test_type=test_type,
                                          file_type=file_type)

    return GraderBinaryProblem.objects.create(
        language=language,
        test_type=test_type,
        file_type=file_type,
        solution=solution,
        test=test,
        extra_options=extra_options
    )
项目:Odin    作者:HackSoftware    | 项目源码 | 文件源码
def create_competition_test(*,
                            existing_test: Test=None,
                            task: CompetitionTask,
                            language: ProgrammingLanguage,
                            extra_options: Dict={},
                            code: str=None,
                            file: BinaryIO=None):
    new_test = CompetitionTest(task=task)
    if existing_test is None:
        existing_test = Test(language=language, extra_options=extra_options, code=code, file=file)
        existing_test.full_clean()
        existing_test.save()

    new_test.language = existing_test.language
    new_test.extra_options = existing_test.extra_options
    new_test.code = existing_test.code
    new_test.file = existing_test.file

    new_test.test = existing_test
    new_test.save()

    return new_test
项目:Odin    作者:HackSoftware    | 项目源码 | 文件源码
def create_test_for_task(*,
                         existing_test: Test=None,
                         task: IncludedTask,
                         language: ProgrammingLanguage,
                         extra_options: Dict={},
                         code: str=None,
                         file: BinaryIO=None):
    new_test = IncludedTest(task=task)
    if existing_test is None:
        existing_test = Test(language=language, extra_options=extra_options, code=code, file=file)
        existing_test.full_clean()
        existing_test.save()

    new_test.language = existing_test.language
    new_test.extra_options = existing_test.extra_options
    new_test.code = existing_test.code
    new_test.file = existing_test.file

    new_test.test = existing_test
    new_test.save()

    return new_test
项目:bit-torrent    作者:borzunov    | 项目源码 | 文件源码
def _iter_files(self, offset: int, data_length: int) -> Iterable[Tuple[BinaryIO, int, int]]:
        if offset < 0 or offset + data_length > self._download_info.total_size:
            raise IndexError('Data position out of range')

        # Find rightmost file which start offset less than or equal to `offset`
        index = bisect_right(self._offsets, offset) - 1

        while data_length != 0:
            file_start_offset = self._offsets[index]
            file_end_offset = self._offsets[index + 1]
            file_pos = offset - file_start_offset
            bytes_to_operate = min(file_end_offset - offset, data_length)

            descriptor = self._descriptors[index]
            yield descriptor, file_pos, bytes_to_operate

            offset += bytes_to_operate
            data_length -= bytes_to_operate
            index += 1
项目:acton    作者:chengsoonong    | 项目源码 | 文件源码
def read_bytes_from_buffer(n: int, buffer: BinaryIO) -> bytes:
    """Reads n bytes from stdin, blocking until all bytes are received.

    Parameters
    ----------
    n
        How many bytes to read.
    buffer
        Which buffer to read from.

    Returns
    -------
    bytes
        Exactly n bytes.
    """
    b = b''
    while len(b) < n:
        b += buffer.read(n - len(b))
    assert len(b) == n
    return b
项目:aiomongo    作者:ZeoAlliance    | 项目源码 | 文件源码
def put(self, data: Union[bytes, str, BinaryIO], **kwargs) -> GridIn:
        """Put data in GridFS as a new file.

        Equivalent to doing::

          try:
              f = new_file(**kwargs)
              f.write(data)
          finally:
              f.close()

        `data` can be either an instance of :class:`str` (:class:`bytes`
        in python 3) or a file-like object providing a :meth:`read` method.
        If an `encoding` keyword argument is passed, `data` can also be a
        :class:`unicode` (:class:`str` in python 3) instance, which will
        be encoded as `encoding` before being written. Any keyword arguments
        will be passed through to the created file - see
        :meth:`~gridfs.grid_file.GridIn` for possible arguments. Returns the
        ``"_id"`` of the created file.

        If the ``"_id"`` of the file is manually specified, it must
        not already exist in GridFS. Otherwise
        :class:`~gridfs.errors.FileExists` is raised.

        :Parameters:
          - `data`: data to be written as a file.
          - `**kwargs` (optional): keyword arguments for file creation
        """
        grid_file = GridIn(self.__collection, **kwargs)
        try:
            await grid_file.write(data)
        finally:
            await grid_file.close()

        return grid_file._id
项目:wifimitm    作者:mvondracek    | 项目源码 | 文件源码
def __init__(self):
        self.logging_level = None  # type: Optional[int]
        self.phishing_enabled = None  # type: Optional[bool]
        self.capture_file = None  # type: Optional[BinaryIO]
        self.essid = None  # type: Optional[str]
        self.interface = None  # type: Optional[WirelessInterface]

        self.parser = self.init_parser()  # type: argparse.ArgumentParser
项目:wifimitm    作者:mvondracek    | 项目源码 | 文件源码
def __init__(self, interface: WirelessInterface, capture_file: Optional[BinaryIO] = None):
        """
        :type capture_file: Optional[BinaryIO]
        :param capture_file: file for writing packet capture

        :type interface: WirelessInterface
        :param interface: wireless interface for capture
        """
        self.state = self.State.STARTED
        self.flags = self.__initial_flags()
        self.stats = self.__initial_stats()

        self.interface = interface  # type: WirelessInterface
        self.capture_file = capture_file
        # If `capture_file` was None, dumpcap will create capture file in /tmp. `self.tmp_capture_file_path` is set
        # during `self.update`.
        self.tmp_capture_file_path = None

        cmd = ['dumpcap',
               '-i', self.interface.name]
        stdout = None
        if self.capture_file:
            # If `capture_file` was provided, set dumpcap to write raw packet data to stdout...
            cmd.append('-w')
            cmd.append('-')
            # ... and redirect dumpcap's stdout to provided `capture_file`.
            stdout = self.capture_file
        super().__init__(cmd, stdout=stdout)
项目:fuse-3ds    作者:ihaveamac    | 项目源码 | 文件源码
def get_seed(f: BinaryIO, program_id: int):
    """Get a seed in a seeddb.bin from an I/O stream."""
    tid_bytes = program_id.to_bytes(0x8, 'little')
    f.seek(0)
    seed_count = util.readle(f.read(2))
    f.seek(0x10)
    for _ in range(seed_count):
        entry = f.read(0x20)
        if entry[0:8] == tid_bytes:
            return entry[0x8:0x18]
    raise NCCHSeedException("missing seed for {:016X} from seeddb.bin".format(program_id))
项目:Odin    作者:HackSoftware    | 项目源码 | 文件源码
def create_gradable_solution(*,
                             task: CompetitionTask,
                             participant: CompetitionParticipant,
                             code: str=None,
                             file: BinaryIO=None) -> Solution:
    if code is not None and file is not None:
        raise ValidationError("Provide either code or a file, not both!")
    if code is None and file is None:
        raise ValidationError("Provide either code or a file!")
    if not CompetitionTest.objects.filter(task=task).exists():
        raise ValidationError("This task does not have tests yet")
    if code is not None:
        new_solution = Solution.objects.create(
            task=task,
            participant=participant,
            code=code,
            status=6
        )
    if file is not None:
        new_solution = Solution.objects.create(
            task=task,
            participant=participant,
            file=file,
            status=6
        )

    return new_solution
项目:aiomongo    作者:ZeoAlliance    | 项目源码 | 文件源码
def write(self, data: Union[bytes, str, BinaryIO]) -> None:
        """Write data to the file. There is no return value.

        `data` can be either a string of bytes or a file-like object
        (implementing :meth:`read`). If the file has an
        :attr:`encoding` attribute, `data` can also be a
        :class:`unicode` (:class:`str` in python 3) instance, which
        will be encoded as :attr:`encoding` before being written.

        Due to buffering, the data may not actually be written to the
        database until the :meth:`close` method is called. Raises
        :class:`ValueError` if this file is already closed. Raises
        :class:`TypeError` if `data` is not an instance of
        :class:`str` (:class:`bytes` in python 3), a file-like object,
        or an instance of :class:`unicode` (:class:`str` in python 3).
        Unicode data is only allowed if the file has an :attr:`encoding`
        attribute.

        :Parameters:
          - `data`: string of bytes or file-like object to be written
            to the file
        """
        if self._closed:
            raise ValueError('cannot write to a closed file')

        try:
            # file-like
            read = data.read
        except AttributeError:
            # string
            if not isinstance(data, (str, bytes)):
                raise TypeError('can only write strings or file-like objects')
            if isinstance(data, str):
                try:
                    data = data.encode(self.encoding)
                except AttributeError:
                    raise TypeError('must specify an encoding for file in '
                                    'order to write %s' % (str.__name__,))
            read = BytesIO(data).read

        if self._buffer.tell() > 0:
            # Make sure to flush only when _buffer is complete
            space = self.chunk_size - self._buffer.tell()
            if space:
                try:
                    to_write = read(space)
                except:
                    self.abort()
                    raise
                self._buffer.write(to_write)
                if len(to_write) < space:
                    return  # EOF or incomplete
            await self.__flush_buffer()
        to_write = read(self.chunk_size)
        while to_write and len(to_write) == self.chunk_size:
            await self.__flush_data(to_write)
            to_write = read(self.chunk_size)
        self._buffer.write(to_write)
项目:zeus    作者:getsentry    | 项目源码 | 文件源码
def dispatch(
        self,
        path: str,
        method: str,
        data: dict=None,
        files: Mapping[str, BinaryIO]=None,
        json: dict=None,
        request=None,
        tenant=True,
    ) -> Response:
        if request:
            assert not json
            assert not data
            assert not files
            data = request.data
            files = request.files
            json = None

        if tenant is True:
            tenant = auth.get_current_tenant()

        if json:
            assert not data
            data = dumps(json)
            content_type = 'application/json'
        elif files:
            if not data:
                data = {}
            for key, value in request.form.items():
                data[key] = value
            for key, value in files.items():
                data[key] = value
            content_type = 'multipart/form-data'
        else:
            content_type = None

        with current_app.test_client() as client:
            response = client.open(
                path='/api/{}'.format(path.lstrip('/')),
                method=method,
                content_type=content_type,
                data=data,
                environ_overrides={
                    'zeus.tenant': tenant,
                }
            )
        if not (200 <= response.status_code < 300):
            raise ApiError(
                text=response.get_data(as_text=True),
                code=response.status_code,
            )
        if response.headers['Content-Type'] != 'application/json':
            raise ApiError(
                text='Request returned invalid content type: {}'.format(
                    response.headers['Content-Type']),
                code=response.status_code,
            )
        return response
项目:Odin    作者:HackSoftware    | 项目源码 | 文件源码
def create_course(*,
                  name: str,
                  start_date: datetime,
                  end_date: datetime,
                  repository: str,
                  facebook_group: str=None,
                  video_channel: str=None,
                  slug_url: str=None,
                  logo: BinaryIO=None,
                  public: bool=True,
                  description: str="") -> Course:

    if Course.objects.filter(name=name).exists():
        raise ValidationError('Course already exists')

    course = Course.objects.create(
        name=name,
        start_date=start_date,
        end_date=end_date,
        repository=repository,
        facebook_group=facebook_group,
        video_channel=video_channel,
        slug_url=slug_url,
        logo=logo,
        public=public
    )

    weeks = course.duration_in_weeks
    start_date = course.start_date
    start_date = start_date - timedelta(days=start_date.weekday())

    week_instances = []
    for i in range(1, weeks + 1):
        current = Week(course=course,
                       number=i,
                       start_date=start_date,
                       end_date=start_date + timedelta(days=7))
        start_date = current.end_date
        week_instances.append(current)

    Week.objects.bulk_create(week_instances)
    CourseDescription.objects.create(course=course, verbose=description)

    return course
项目:pillar    作者:armadillica    | 项目源码 | 文件源码
def upload_and_process(local_file: typing.Union[io.BytesIO, typing.BinaryIO],
                       uploaded_file: werkzeug.datastructures.FileStorage,
                       project_id: str):
    # Figure out the file size, as we need to pass this in explicitly to GCloud.
    # Otherwise it always uses os.fstat(file_obj.fileno()).st_size, which isn't
    # supported by a BytesIO object (even though it does have a fileno
    # attribute).
    if isinstance(local_file, io.BytesIO):
        file_size = len(local_file.getvalue())
    else:
        file_size = os.fstat(local_file.fileno()).st_size

    # Check the file size again, now that we know its size for sure.
    assert_file_size_allowed(file_size)

    # Create file document in MongoDB.
    file_id, internal_fname, status = create_file_doc_for_upload(project_id, uploaded_file)

    # Copy the file into storage.
    bucket = default_storage_backend(project_id)
    blob = bucket.blob(internal_fname)
    blob.create_from_file(local_file,
                          file_size=file_size,
                          content_type=uploaded_file.mimetype)

    log.debug('Marking uploaded file id=%s, fname=%s, '
              'size=%i as "queued_for_processing"',
              file_id, internal_fname, file_size)
    update_file_doc(file_id,
                    status='queued_for_processing',
                    file_path=internal_fname,
                    length=blob.size,
                    content_type=uploaded_file.mimetype)

    log.debug('Processing uploaded file id=%s, fname=%s, size=%i', file_id,
              internal_fname, blob.size)
    process_file(bucket, file_id, local_file)

    # Local processing is done, we can close the local file so it is removed.
    if local_file is not None:
        local_file.close()

    log.debug('Handled uploaded file id=%s, fname=%s, size=%i, status=%i',
              file_id, internal_fname, blob.size, status)

    # Status is 200 if the file already existed, and 201 if it was newly
    # created.
    # TODO: add a link to a thumbnail in the response.
    return dict(status='ok', file_id=str(file_id), status_code=status)