我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用io.IOBase()。
def __is_or_has_file(self, data): ''' Figure out if we have been given a file-like object as one of the inputs to the function that called this. Is a bit clunky because 'file' doesn't exist as a bare-word type check in Python 3 and built in file objects are not instances of io.<anything> in Python 2 https://stackoverflow.com/questions/1661262/check-if-object-is-file-like-in-python Returns: Boolean - True if we have a file-like object ''' if (hasattr(data, 'file')): data = data.file try: return isinstance(data, file) except NameError: from io import IOBase return isinstance(data, IOBase)
def cmd_stmt_execute(self, statement_id, data=(), parameters=(), flags=0): """Execute a prepared MySQL statement""" parameters = list(parameters) long_data_used = {} if data: for param_id, _ in enumerate(parameters): if isinstance(data[param_id], IOBase): binary = True try: binary = 'b' not in data[param_id].mode except AttributeError: pass self.cmd_stmt_send_long_data(statement_id, param_id, data[param_id]) long_data_used[param_id] = (binary,) execute_packet = self._protocol.make_stmt_execute( statement_id, data, tuple(parameters), flags, long_data_used, self.charset) packet = self._send_cmd(ServerCmd.STMT_EXECUTE, packet=execute_packet) result = self._handle_binary_result(packet) return result
def outputpapertemplate(self, dest, listchar, output=None): if output == None: output = PyPDF2.PdfFileWriter() while listchar: iopage = self.outputtemplateonepage(listchar) page = PyPDF2.PdfFileReader(iopage) output.addPage(page.getPage(0)) if dest != None: if isinstance(dest, str): # when dest is a file path destdir = os.path.dirname(dest) if destdir != '' and not os.path.isdir(destdir): os.makedirs(destdir) with open(dest, "wb") as w: output.write(w) else: # when dest is io.IOBase output.write(dest) else: return output
def __init__(self, record_type=None, record_name=None, data=None): self._message_begin = self._message_end = False self._type = self._name = self._data = '' if not (record_type is None and record_name is None): self.type = record_type if record_type is not None else 'unknown' if record_name is not None: self.name = record_name if data is not None: self.data = data elif data is not None: if isinstance(data, (bytearray, str)): data = io.BytesIO(data) if isinstance(data, io.IOBase): self._read(data) else: raise TypeError("invalid data argument type")
def __init__(self, f, summary=True): """Args: + f: Either a file name or a seekable binary stream. + summary: If True, call self.read_summary(). """ super().__init__(0) if isinstance(f, IOBase): self.stream = f else: self.stream = open(f, 'rb') self.stream.seek(0, SEEK_END) self.stream_size = self.stream.tell() self.stream.seek(0, SEEK_SET) if summary: self.read_summary()
def _find_options_in_meta(self, content): """Reads 'content' and extracts options encoded in HTML meta tags :param content: str or file-like object - contains HTML to parse returns: dict: {config option: value} """ if (isinstance(content, io.IOBase) or content.__class__.__name__ == 'StreamReaderWriter'): content = content.read() found = {} for x in re.findall('<meta [^>]*>', content): if re.search('name=["\']%s' % self.config.meta_tag_prefix, x): name = re.findall('name=["\']%s([^"\']*)' % self.config.meta_tag_prefix, x)[0] found[name] = re.findall('content=["\']([^"\']*)', x)[0] return found
def test_wrap_non_iobase(): class FakeFile: def close(self): # pragma: no cover pass def write(self): # pragma: no cover pass wrapped = FakeFile() assert not isinstance(wrapped, io.IOBase) async_file = trio.wrap_file(wrapped) assert isinstance(async_file, AsyncIOWrapper) del FakeFile.write with pytest.raises(TypeError): trio.wrap_file(FakeFile())
def wrap_file_object(fileobj): """Handle differences in Python 2 and 3 around writing bytes.""" # If it's not an instance of IOBase, we're probably using Python 2 and # that is less finnicky about writing text versus bytes to a file. if not isinstance(fileobj, io.IOBase): return fileobj # At this point we're using Python 3 and that will mangle text written to # a file written in bytes mode. So, let's check if the file can handle # text as opposed to bytes. if isinstance(fileobj, io.TextIOBase): return fileobj # Finally, we've determined that the fileobj passed in cannot handle text, # so we use TextIOWrapper to handle the conversion for us. return io.TextIOWrapper(fileobj)
def get_gramet_image_url(url_or_fp): img_src = '' if isinstance(url_or_fp, io.IOBase): # noinspection PyUnresolvedReferences data = url_or_fp.read() u = urlsplit(OGIMET_URL) else: u = urlsplit(url_or_fp) import requests r = requests.get(url_or_fp) data = r.text if data: m = re.search(r'<img src="([^"]+/gramet_[^"]+)"', data) if m: img_src = "{url.scheme}://{url.netloc}{path}".format( url=u, path=m.group(1)) return img_src
def copy_file_data(src_file, dst_file, chunk_size=None): """Copy data from one file object to another. Arguments: src_file (io.IOBase): File open for reading. dst_file (io.IOBase): File open for writing. chunk_size (int, optional): Number of bytes to copy at a time (or `None` to use sensible default). """ chunk_size = chunk_size or io.DEFAULT_BUFFER_SIZE read = src_file.read write = dst_file.write # The 'or None' is so that it works with binary and text files for chunk in iter(lambda: read(chunk_size) or None, None): write(chunk)
def _find_options_in_meta(self, content): """Reads 'content' and extracts options encoded in HTML meta tags :param content: str or file-like object - contains HTML to parse returns: dict: {config option: value} """ if (isinstance(content, io.IOBase) or content.__class__.__name__ == 'StreamReaderWriter'): content = content.read() found = {} for x in re.findall('<meta [^>]*>', content): if re.search('name=["\']%s' % self.configuration.meta_tag_prefix, x): name = re.findall('name=["\']%s([^"\']*)' % self.configuration.meta_tag_prefix, x)[0] found[name] = re.findall('content=["\']([^"\']*)', x)[0] return found
def _upload_media_py3(self, media_type, media_file, extension=''): if isinstance(media_file, io.IOBase) and hasattr(media_file, 'name'): extension = media_file.name.split('.')[-1].lower() if not is_allowed_extension(extension): raise ValueError('Invalid file type.') filename = media_file.name elif isinstance(media_file, io.BytesIO): extension = extension.lower() if not is_allowed_extension(extension): raise ValueError('Please provide \'extension\' parameters when the type of \'media_file\' is \'io.BytesIO\'.') filename = 'temp.' + extension else: raise ValueError('Parameter media_file must be io.BufferedIOBase(open a file with \'rb\') or io.BytesIO object.') return self.request.post( url='https://api.weixin.qq.com/cgi-bin/media/upload', params={ 'type': media_type, }, files={ 'media': (filename, media_file, convert_ext_to_mime(extension)) } )
def truncate(self, size=None): """ Resize the stream to the given size in bytes (or the current position if size is not specified). This resizing can extend or reduce the current file size. The new file size is returned. In prior versions of picamera, truncation also changed the position of the stream (because prior versions of these stream classes were non-seekable). This functionality is now deprecated; scripts should use :meth:`~io.IOBase.seek` and :meth:`truncate` as one would with regular :class:`~io.BytesIO` instances. """ if size is not None: warnings.warn( PiCameraDeprecated( 'This method changes the position of the stream to the ' 'truncated length; this is deprecated functionality and ' 'you should not rely on it (seek before or after truncate ' 'to ensure position is consistent)')) super(PiArrayOutput, self).truncate(size) if size is not None: self.seek(size)
def test_good_response(self, resolwe_mock, requests_mock, os_mock, open_mock): resolwe_mock.configure_mock(**self.config) os_mock.path.isfile.return_value = True # When mocking open one wants it to return a "file-like" mock: (spec=io.IOBase) mock_open.return_value = MagicMock(spec=io.IOBase) requests_mock.get.return_value = MagicMock(ok=True, **{'iter_content.return_value': range(3)}) Resolwe._download_files(resolwe_mock, self.file_list) self.assertEqual(resolwe_mock.logger.info.call_count, 3) # This asserts may seem wierd. To check what is happening behind the scenes: # print(open_mock.mock_calls) self.assertEqual(open_mock.return_value.__enter__.return_value.write.call_count, 6) # Why 6? 2 files in self.file_list, each downloads 3 chunks (defined in response mock)
def add_fields(self, *fields): to_add = list(fields) while to_add: rec = to_add.pop(0) if isinstance(rec, io.IOBase): k = guess_filename(rec, 'unknown') self.add_field(k, rec) elif isinstance(rec, (MultiDictProxy, MultiDict)): to_add.extend(rec.items()) elif isinstance(rec, (list, tuple)) and len(rec) == 2: k, fp = rec self.add_field(k, fp) else: raise TypeError('Only io.IOBase, multidict and (name, file) ' 'pairs allowed, use .add_field() for passing ' 'more complex parameters, got {!r}' .format(rec))
def __init__(self, obj, headers=None, *, chunk_size=8192): if headers is None: headers = CIMultiDict() elif not isinstance(headers, CIMultiDict): headers = CIMultiDict(headers) self.obj = obj self.headers = headers self._chunk_size = chunk_size self._fill_headers_with_defaults() self._serialize_map = { bytes: self._serialize_bytes, str: self._serialize_str, io.IOBase: self._serialize_io, MultipartWriter: self._serialize_multipart, ('application', 'json'): self._serialize_json, ('application', 'x-www-form-urlencoded'): self._serialize_form }
def _guess_content_length(self, obj): if isinstance(obj, bytes): return len(obj) elif isinstance(obj, str): *_, params = parse_mimetype(self.headers.get(CONTENT_TYPE)) charset = params.get('charset', 'us-ascii') return len(obj.encode(charset)) elif isinstance(obj, io.StringIO): *_, params = parse_mimetype(self.headers.get(CONTENT_TYPE)) charset = params.get('charset', 'us-ascii') return len(obj.getvalue().encode(charset)) - obj.tell() elif isinstance(obj, io.BytesIO): return len(obj.getvalue()) - obj.tell() elif isinstance(obj, io.IOBase): try: return os.fstat(obj.fileno()).st_size - obj.tell() except (AttributeError, OSError): return None else: return None
def upload(self, file, name: str=None, path: str=None) -> StackFile: """ Upload a file to Stack :param file: IO pointer or string containing a path to a file :param name: Custom name which will be used on the remote server, defaults to file.name :param path: Path to upload it to, defaults to current working directory :return: Instance of a stack file """ if not path: path = self.__cwd if isinstance(file, IOBase): return self.__upload(file=file, path=path, name=name) if isinstance(file, str): with open(file, "rb") as fd: return self.__upload(file=fd, path=path, name=name) raise StackException("File should either be a path to a file on disk or an IO type, got: {}".format(type(file)))
def analyse_file(self, filename, token=None, language=None): token = token or self.token if token is None: raise RecastError("Token is missing") language = language or self.language filename = open(filename, 'rb') if not isinstance(filename, io.IOBase) else filename body = {'voice': filename} if language is not None: body['language'] = language response = requests.post( Utils.REQUEST_ENDPOINT, files=body, headers={'Authorization': "Token {}".format(token)} ) if response.status_code != requests.codes.ok: raise RecastError(response.json().get('message', '')) return Response(response.json()['results'])
def close(self): """Delete the IO object and close the input file and the output file""" if self.__closed: # avoid double close return deleted = False try: # on posix, one can remove a file while it's opend by a process # the file then will be not visable to others, but process still have the file descriptor # it is recommand to remove temp file before close it on posix to avoid race # on nt, it will just fail and raise OSError so that after closing remove it again self.__del_files() deleted = True except OSError: pass if isinstance(self.input_file, IOBase): self.input_file.close() if isinstance(self.output_file, IOBase): self.output_file.close() if not deleted: self.__del_files() self.__closed = True
def __new__(cls, **kwargs): """Patch for abstractmethod-like enforcement in io.IOBase grandchildren.""" if ( not (hasattr(cls, '_read_bytes') and callable(cls._read_bytes)) or not (hasattr(cls, '_prep_message') and callable(cls._read_bytes)) or not hasattr(cls, '_config_class') ): raise TypeError("Can't instantiate abstract class {}".format(cls.__name__)) instance = super(_EncryptionStream, cls).__new__(cls) config = kwargs.pop('config', None) if not isinstance(config, instance._config_class): # pylint: disable=protected-access config = instance._config_class(**kwargs) # pylint: disable=protected-access instance.config = config instance.bytes_read = 0 instance.output_buffer = b'' instance._message_prepped = False # pylint: disable=protected-access instance.source_stream = instance.config.source instance._stream_length = instance.config.source_length # pylint: disable=protected-access return instance
def deserialize(data): if isinstance(data, IOBase): try: data.seek(0) except UnsupportedOperation: pass if hasattr(data, 'readall'): data = data.readall() else: data = data.read() if isinstance(data, bytes): data = str(data, encoding='utf-8') if isinstance(data, str): try: data = json.loads(data, object_hook=collections.OrderedDict) except json.JSONDecodeError as e: data = yaml.load(data) return data
def is_file(ob): return isinstance(ob, io.IOBase)
def _is_filelike_object(f): try: return isinstance(f, (file, io.IOBase)) except NameError: # 'file' is not a class in python3 return isinstance(f, io.IOBase)
def _is_file(f): return isinstance(f, io.IOBase)
def put(self, data: Union[bytes, IOBase], *, format: str, graph: Optional[IRI] = None): async with self._crud_request("PUT", graph=graph, data=data, content_type=format) as resp: resp.raise_for_status()
def post(self, data: Union[bytes, IOBase], *, format: str, graph: Optional[IRI] = None): async with self._crud_request("POST", graph=graph, data=data, content_type=format) as resp: resp.raise_for_status()
def __iter__(self): # type: ignore # Until https://github.com/python/typing/issues/11 # there's no good way to tell mypy about custom # iterators that subclass io.IOBase. """Let this class act as an iterator.""" return self
def sanitize_for_serialization(obj): """ Sanitize an object for Request. If obj is None, return None. If obj is str, int, float, bool, return directly. If obj is datetime.datetime, datetime.date convert to string in iso8601 format. If obj is list, santize each element in the list. If obj is dict, return the dict. If obj is swagger model, return the properties dict. """ if isinstance(obj, type(None)): return None elif isinstance(obj, (str, int, float, bool, io.IOBase, tuple)): return obj elif isinstance(obj, list): return [NbClientManager.sanitize_for_serialization(sub_obj) for sub_obj in obj] elif isinstance(obj, (datetime.datetime, datetime.date)): return obj.isoformat() else: if isinstance(obj, dict): obj_dict = obj else: # Convert model obj to dict except attributes `swaggerTypes`, `attributeMap` # and attributes which value is not None. # Convert attribute name to json key in model definition for # request. obj_dict = {obj.attributeMap[key]: val for key, val in obj.__dict__.items() if key != 'swaggerTypes' and key != 'attributeMap' and val is not None} return {key: NbClientManager.sanitize_for_serialization(val) for (key, val) in obj_dict.items()}
def read(obj): """Context manager for reading data from multiple sources as a file object Args: obj (string|Path|file object): Data to read / read from If obj is a file object, this is just a pass through If obj is a Path object, this is similar to obj.open() If obj is a string, this creates a StringIO so the data can be read like a file object Returns: file object: File handle containing data """ try: # Python 2 compatibility is_unicode = isinstance(obj, unicode) except NameError: is_unicode = False is_open = False if isinstance(obj, Path): fh = obj.open() is_open = True elif isinstance(obj, str) or is_unicode: fh = StringIO(obj) fh.name = '<string>' elif isinstance(obj, IOBase): fh = obj else: raise Exception("Unknown input type {}".format(type(obj).__name__)) try: yield fh finally: if is_open: fh.close()
def is_file_like_obj(obj): """ Helper function to check that the given object implements all public methods of the :class:`~io.IOBase` abstract class. """ return all((hasattr(obj, name) for name in dir(io.IOBase) if not name.startswith('_')))
def test_scratch_file_supports_file_obj_interface(active_scratch_dir, method_name): """ Assert that methods of :class:`~scratchdir.ScratchDir` that are expected to return file-like objects do so and these objects implement, atleast, the :class:`~io.IOBase` interface. """ method = getattr(active_scratch_dir, method_name) assert is_file_like_obj(method())
def __init__(self, file, start): assert hasattr(file, 'read') and hasattr(file, 'tell') and hasattr(file, 'seek') assert isinstance(start, int) self.file, self.start = file, start self.total_len = total_len(file) self.len = self.total_len - start io.IOBase.__init__(self) try: self.seek(0) except: pass
def store(self, data, blob): if not isinstance(data, io.IOBase): raise NotStorable('Could not store data (not of "file").') filename = getattr(data, 'name', None) if filename is not None: blob.consumeFile(filename) return
def isFile(self, path=None): # dirty hack to check where file is opened with codecs module # (because it returns 'instance' type when encoding is specified if path: return isinstance(path, io.IOBase) or path.__class__.__name__ == 'StreamReaderWriter' else: return 'file' in self.type
def aclose(self): """Like :meth:`io.IOBase.close`, but async. This is also shielded from cancellation; if a cancellation scope is cancelled, the wrapped file object will still be safely closed. """ # ensure the underling file is closed during cancellation with _core.open_cancel_scope(shield=True): await trio.run_sync_in_worker_thread(self._wrapped.close) await _core.checkpoint_if_cancelled()
def test_feed_download_output(mock_open, connection, feed, start_time, end_time, feed_download_url, feed_report_csv): """Verifies feed download writing to a file.""" mock_open.return_value = mock.MagicMock(spec=io.IOBase) httpretty.register_uri( httpretty.POST, '{}/feed/{}/prepare'.format( matchlight.MATCHLIGHT_API_URL_V2, feed.name), body=json.dumps({'feed_response_id': 1}), content_type='application/json', status=200) httpretty.register_uri( httpretty.POST, '{}/feed/{}/link'.format( matchlight.MATCHLIGHT_API_URL_V2, feed.name), responses=[ httpretty.Response( body=json.dumps({'status': 'pending'}), content_type='application/json', status=200), httpretty.Response( body=json.dumps({ 'status': 'ready', 'url': feed_download_url, }), content_type='application/json', status=200), ], ) body = '\n'.join(feed_report_csv).encode('utf-8') httpretty.register_uri( httpretty.GET, feed_download_url, content_type='text/csv', body=body) connection.feeds.download( feed, start_time, end_time, save_path='/tmp/output') file_handle = mock_open.return_value.__enter__.return_value file_handle.write.assert_called_once_with(body)
def isIOBase(obj): return isinstance(obj, IOBase)
def _get_file_object(csvfile, encoding=None): if isinstance(csvfile, str): assert encoding, 'encoding required for file path' return open(csvfile, 'rt', encoding=encoding, newline='') # <- EXIT! if hasattr(csvfile, 'mode'): assert 'b' not in csvfile.mode, "File must be open in text mode ('rt')." elif issubclass(csvfile.__class__, io.IOBase): assert issubclass(csvfile.__class__, io.TextIOBase), ("Stream object must inherit " "from io.TextIOBase.") return csvfile
def _py2_get_file_object(csvfile, encoding): if isinstance(csvfile, str): return open(csvfile, 'rb') # <- EXIT! if hasattr(csvfile, 'mode'): assert 'b' in csvfile.mode, ("When using Python 2, file must " "be open in binary mode ('rb').") elif issubclass(csvfile.__class__, io.IOBase): assert not issubclass(csvfile.__class__, io.TextIOBase), ("When using Python 2, " "must use byte stream " "(not text stream).") return csvfile
def from_csv(cls, file, encoding=None, **fmtparams): """Create a DataSource from a CSV *file* (a path or file-like object):: source = datatest.DataSource.from_csv('mydata.csv') If *file* is an iterable of files, data will be loaded and aligned by column name:: files = ['mydata1.csv', 'mydata2.csv'] source = datatest.DataSource.from_csv(files) """ if isinstance(file, string_types) or isinstance(file, IOBase): file = [file] new_cls = cls.__new__(cls) temptable = _from_csv(file, encoding, **fmtparams) new_cls._connection = temptable.connection new_cls._table = temptable.name repr_string = '{0}.from_csv({1}{2}{3})'.format( new_cls.__class__.__name__, repr(file[0]) if len(file) == 1 else repr(file), ', {0!r}'.format(encoding) if encoding else '', ', **{0!r}'.format(fmtparams) if fmtparams else '', ) new_cls._repr_string = repr_string return new_cls
def file_output(file_object): """ Writes strings to a file, making sure there's a newline at the end :param: - `file_object`: opened, writable file or name of file to open """ from io import IOBase if not isinstance(file_object, IOBase): file_object = open(file_object, WRITEABLE) while True: line = (yield) line = line.rstrip(NEWLINE) + NEWLINE file_object.write(line)
def assemble_file(self, real_file): """ File is about to be closed, last chance to append data. @param real_file: Actual output stream. @type real_file: C{io.IOBase} """ real_file.write(self.file.getvalue())