我们从Python开源项目中,提取了以下14个代码示例,用于说明如何使用typing.BinaryIO()。
def create_binary_problem(*, language: str='', test_type: int=GraderBinaryProblem.OUTPUT_CHECKING, file_type: int=GraderBinaryProblem.BINARY, solution: BinaryIO=None, test: BinaryIO=None, extra_options: Dict={}) -> GraderBinaryProblem: run_create_problem_service_validation(language=language, test_type=test_type, file_type=file_type) return GraderBinaryProblem.objects.create( language=language, test_type=test_type, file_type=file_type, solution=solution, test=test, extra_options=extra_options )
def create_competition_test(*, existing_test: Test=None, task: CompetitionTask, language: ProgrammingLanguage, extra_options: Dict={}, code: str=None, file: BinaryIO=None): new_test = CompetitionTest(task=task) if existing_test is None: existing_test = Test(language=language, extra_options=extra_options, code=code, file=file) existing_test.full_clean() existing_test.save() new_test.language = existing_test.language new_test.extra_options = existing_test.extra_options new_test.code = existing_test.code new_test.file = existing_test.file new_test.test = existing_test new_test.save() return new_test
def create_test_for_task(*, existing_test: Test=None, task: IncludedTask, language: ProgrammingLanguage, extra_options: Dict={}, code: str=None, file: BinaryIO=None): new_test = IncludedTest(task=task) if existing_test is None: existing_test = Test(language=language, extra_options=extra_options, code=code, file=file) existing_test.full_clean() existing_test.save() new_test.language = existing_test.language new_test.extra_options = existing_test.extra_options new_test.code = existing_test.code new_test.file = existing_test.file new_test.test = existing_test new_test.save() return new_test
def _iter_files(self, offset: int, data_length: int) -> Iterable[Tuple[BinaryIO, int, int]]: if offset < 0 or offset + data_length > self._download_info.total_size: raise IndexError('Data position out of range') # Find rightmost file which start offset less than or equal to `offset` index = bisect_right(self._offsets, offset) - 1 while data_length != 0: file_start_offset = self._offsets[index] file_end_offset = self._offsets[index + 1] file_pos = offset - file_start_offset bytes_to_operate = min(file_end_offset - offset, data_length) descriptor = self._descriptors[index] yield descriptor, file_pos, bytes_to_operate offset += bytes_to_operate data_length -= bytes_to_operate index += 1
def read_bytes_from_buffer(n: int, buffer: BinaryIO) -> bytes: """Reads n bytes from stdin, blocking until all bytes are received. Parameters ---------- n How many bytes to read. buffer Which buffer to read from. Returns ------- bytes Exactly n bytes. """ b = b'' while len(b) < n: b += buffer.read(n - len(b)) assert len(b) == n return b
def put(self, data: Union[bytes, str, BinaryIO], **kwargs) -> GridIn: """Put data in GridFS as a new file. Equivalent to doing:: try: f = new_file(**kwargs) f.write(data) finally: f.close() `data` can be either an instance of :class:`str` (:class:`bytes` in python 3) or a file-like object providing a :meth:`read` method. If an `encoding` keyword argument is passed, `data` can also be a :class:`unicode` (:class:`str` in python 3) instance, which will be encoded as `encoding` before being written. Any keyword arguments will be passed through to the created file - see :meth:`~gridfs.grid_file.GridIn` for possible arguments. Returns the ``"_id"`` of the created file. If the ``"_id"`` of the file is manually specified, it must not already exist in GridFS. Otherwise :class:`~gridfs.errors.FileExists` is raised. :Parameters: - `data`: data to be written as a file. - `**kwargs` (optional): keyword arguments for file creation """ grid_file = GridIn(self.__collection, **kwargs) try: await grid_file.write(data) finally: await grid_file.close() return grid_file._id
def __init__(self): self.logging_level = None # type: Optional[int] self.phishing_enabled = None # type: Optional[bool] self.capture_file = None # type: Optional[BinaryIO] self.essid = None # type: Optional[str] self.interface = None # type: Optional[WirelessInterface] self.parser = self.init_parser() # type: argparse.ArgumentParser
def __init__(self, interface: WirelessInterface, capture_file: Optional[BinaryIO] = None): """ :type capture_file: Optional[BinaryIO] :param capture_file: file for writing packet capture :type interface: WirelessInterface :param interface: wireless interface for capture """ self.state = self.State.STARTED self.flags = self.__initial_flags() self.stats = self.__initial_stats() self.interface = interface # type: WirelessInterface self.capture_file = capture_file # If `capture_file` was None, dumpcap will create capture file in /tmp. `self.tmp_capture_file_path` is set # during `self.update`. self.tmp_capture_file_path = None cmd = ['dumpcap', '-i', self.interface.name] stdout = None if self.capture_file: # If `capture_file` was provided, set dumpcap to write raw packet data to stdout... cmd.append('-w') cmd.append('-') # ... and redirect dumpcap's stdout to provided `capture_file`. stdout = self.capture_file super().__init__(cmd, stdout=stdout)
def get_seed(f: BinaryIO, program_id: int): """Get a seed in a seeddb.bin from an I/O stream.""" tid_bytes = program_id.to_bytes(0x8, 'little') f.seek(0) seed_count = util.readle(f.read(2)) f.seek(0x10) for _ in range(seed_count): entry = f.read(0x20) if entry[0:8] == tid_bytes: return entry[0x8:0x18] raise NCCHSeedException("missing seed for {:016X} from seeddb.bin".format(program_id))
def create_gradable_solution(*, task: CompetitionTask, participant: CompetitionParticipant, code: str=None, file: BinaryIO=None) -> Solution: if code is not None and file is not None: raise ValidationError("Provide either code or a file, not both!") if code is None and file is None: raise ValidationError("Provide either code or a file!") if not CompetitionTest.objects.filter(task=task).exists(): raise ValidationError("This task does not have tests yet") if code is not None: new_solution = Solution.objects.create( task=task, participant=participant, code=code, status=6 ) if file is not None: new_solution = Solution.objects.create( task=task, participant=participant, file=file, status=6 ) return new_solution
def write(self, data: Union[bytes, str, BinaryIO]) -> None: """Write data to the file. There is no return value. `data` can be either a string of bytes or a file-like object (implementing :meth:`read`). If the file has an :attr:`encoding` attribute, `data` can also be a :class:`unicode` (:class:`str` in python 3) instance, which will be encoded as :attr:`encoding` before being written. Due to buffering, the data may not actually be written to the database until the :meth:`close` method is called. Raises :class:`ValueError` if this file is already closed. Raises :class:`TypeError` if `data` is not an instance of :class:`str` (:class:`bytes` in python 3), a file-like object, or an instance of :class:`unicode` (:class:`str` in python 3). Unicode data is only allowed if the file has an :attr:`encoding` attribute. :Parameters: - `data`: string of bytes or file-like object to be written to the file """ if self._closed: raise ValueError('cannot write to a closed file') try: # file-like read = data.read except AttributeError: # string if not isinstance(data, (str, bytes)): raise TypeError('can only write strings or file-like objects') if isinstance(data, str): try: data = data.encode(self.encoding) except AttributeError: raise TypeError('must specify an encoding for file in ' 'order to write %s' % (str.__name__,)) read = BytesIO(data).read if self._buffer.tell() > 0: # Make sure to flush only when _buffer is complete space = self.chunk_size - self._buffer.tell() if space: try: to_write = read(space) except: self.abort() raise self._buffer.write(to_write) if len(to_write) < space: return # EOF or incomplete await self.__flush_buffer() to_write = read(self.chunk_size) while to_write and len(to_write) == self.chunk_size: await self.__flush_data(to_write) to_write = read(self.chunk_size) self._buffer.write(to_write)
def dispatch( self, path: str, method: str, data: dict=None, files: Mapping[str, BinaryIO]=None, json: dict=None, request=None, tenant=True, ) -> Response: if request: assert not json assert not data assert not files data = request.data files = request.files json = None if tenant is True: tenant = auth.get_current_tenant() if json: assert not data data = dumps(json) content_type = 'application/json' elif files: if not data: data = {} for key, value in request.form.items(): data[key] = value for key, value in files.items(): data[key] = value content_type = 'multipart/form-data' else: content_type = None with current_app.test_client() as client: response = client.open( path='/api/{}'.format(path.lstrip('/')), method=method, content_type=content_type, data=data, environ_overrides={ 'zeus.tenant': tenant, } ) if not (200 <= response.status_code < 300): raise ApiError( text=response.get_data(as_text=True), code=response.status_code, ) if response.headers['Content-Type'] != 'application/json': raise ApiError( text='Request returned invalid content type: {}'.format( response.headers['Content-Type']), code=response.status_code, ) return response
def create_course(*, name: str, start_date: datetime, end_date: datetime, repository: str, facebook_group: str=None, video_channel: str=None, slug_url: str=None, logo: BinaryIO=None, public: bool=True, description: str="") -> Course: if Course.objects.filter(name=name).exists(): raise ValidationError('Course already exists') course = Course.objects.create( name=name, start_date=start_date, end_date=end_date, repository=repository, facebook_group=facebook_group, video_channel=video_channel, slug_url=slug_url, logo=logo, public=public ) weeks = course.duration_in_weeks start_date = course.start_date start_date = start_date - timedelta(days=start_date.weekday()) week_instances = [] for i in range(1, weeks + 1): current = Week(course=course, number=i, start_date=start_date, end_date=start_date + timedelta(days=7)) start_date = current.end_date week_instances.append(current) Week.objects.bulk_create(week_instances) CourseDescription.objects.create(course=course, verbose=description) return course
def upload_and_process(local_file: typing.Union[io.BytesIO, typing.BinaryIO], uploaded_file: werkzeug.datastructures.FileStorage, project_id: str): # Figure out the file size, as we need to pass this in explicitly to GCloud. # Otherwise it always uses os.fstat(file_obj.fileno()).st_size, which isn't # supported by a BytesIO object (even though it does have a fileno # attribute). if isinstance(local_file, io.BytesIO): file_size = len(local_file.getvalue()) else: file_size = os.fstat(local_file.fileno()).st_size # Check the file size again, now that we know its size for sure. assert_file_size_allowed(file_size) # Create file document in MongoDB. file_id, internal_fname, status = create_file_doc_for_upload(project_id, uploaded_file) # Copy the file into storage. bucket = default_storage_backend(project_id) blob = bucket.blob(internal_fname) blob.create_from_file(local_file, file_size=file_size, content_type=uploaded_file.mimetype) log.debug('Marking uploaded file id=%s, fname=%s, ' 'size=%i as "queued_for_processing"', file_id, internal_fname, file_size) update_file_doc(file_id, status='queued_for_processing', file_path=internal_fname, length=blob.size, content_type=uploaded_file.mimetype) log.debug('Processing uploaded file id=%s, fname=%s, size=%i', file_id, internal_fname, blob.size) process_file(bucket, file_id, local_file) # Local processing is done, we can close the local file so it is removed. if local_file is not None: local_file.close() log.debug('Handled uploaded file id=%s, fname=%s, size=%i, status=%i', file_id, internal_fname, blob.size, status) # Status is 200 if the file already existed, and 201 if it was newly # created. # TODO: add a link to a thumbnail in the response. return dict(status='ok', file_id=str(file_id), status_code=status)