我们从Python开源项目中,提取了以下48个代码示例,用于说明如何使用io.SEEK_END。
def write(self, writer, base_address): pos = writer.tell() writer.seek(0, io.SEEK_END) end_position = writer.tell() if self.is_leaf: # Write the offset back at this nodes address. writer.seek(pos) writer.write_uint32((end_position - base_address - 2) | 0x80000000) writer.seek(end_position + 4) # Write the triangle indices and terminate the list with 0xFFFF. writer.write_uint16s(self.indices) writer.write_uint16(0xFFFF) else: writer.seek(pos) writer.write_uint32(end_position - base_address) writer.seek(end_position + 4) base = writer.tell() writer.write_uint32s([0x00000000] * 8) writer.seek(base) for node in self.branches: node.write(writer, base) writer.seek(pos + 4)
def refresh_contents(self): log_file = open(self.log_path) log_file.seek(0, io.SEEK_END) end_pos = log_file.tell() if end_pos > 20000: log_file.seek(end_pos - 20000, io.SEEK_SET) else: log_file.seek(0, io.SEEK_SET) contents = log_file.read().split("\n", 1)[-1] if contents != self.contents: self.contents = contents self.textBrowser_LogContent.clear() self.textBrowser_LogContent.setPlainText(contents) # Scrolling to bottom cursor = self.textBrowser_LogContent.textCursor() cursor.movePosition(QTextCursor.End) self.textBrowser_LogContent.setTextCursor(cursor) self.textBrowser_LogContent.ensureCursorVisible() log_file.close()
def _downstream_thread(self, stream, downstream_boundary): data_buff = io.BytesIO() multipart_parser = MultipartParser(data_buff, downstream_boundary) while not self._stop_threads_event.is_set(): if stream.data: current_buffer_pos = data_buff.tell() data_buff.seek(0, io.SEEK_END) data_buff.write(b''.join(stream.data)) data_buff.seek(current_buffer_pos) stream.data = [] message_part = multipart_parser.get_next_part() if not message_part: time.sleep(0.5) continue self._process_message_parts([message_part])
def __init__(self, file_object, start, size): """ Build an object that will act like a BufferedReader object, but constrained to a predetermined part of the file_object. :param file_object: A file opened for reading in binary mode. :param start: Start of the part within the file_object :param size: Ideal size of the part. This will be set to the number of bytes remaining in the file if there aren't enough bytes remaining. """ self.file = file_object self.bytes_read = 0 self.start = start # Seek to the end of the file to see how many bytes remain # from the start of the part. self.file.seek(0, io.SEEK_END) self.size = min(self.file.tell() - self.start, size) # Reset the pointer to the start of the part. self.file.seek(start, io.SEEK_SET)
def seek(self, offset, whence=io.SEEK_SET): self.__raiseIfClosed() if not self.__seekable: raise OSError("Seek not enabled for this stream") if whence == io.SEEK_SET: if offset < 0: raise ValueError("Cannot have a negative absolute seek") newStreamPosition = offset elif whence == io.SEEK_CUR: newStreamPosition = self.__streamPosition + whence elif whence == io.SEEK_END: if not self.__buffers: newStreamPosition = 0 else: newStreamPosition = self.__streamEnd + offset self.__streamPosition = newStreamPosition
def seek(self, position, whence=io.SEEK_SET): """Seek to a position in the file. """ if whence == io.SEEK_SET: self.position = min(max(position, 0), self.size) elif whence == io.SEEK_CUR: if position < 0: self.position = max(self.position + position, 0) else: self.position = min(self.position + position, self.size) elif whence == io.SEEK_END: self.position = max(min(self.size + position, self.size), 0) else: raise ValueError("Invalid argument") return self.position
def get_media_size(media_io): if hasattr(media_io, "getvalue"): size = len(media_io.getvalue()) elif hasattr(media_io, "seekable") and media_io.seekable(): media_io.seek(0, SEEK_END) size = media_io.tell() media_io.seek(SEEK_SET) else: raise DontwiMediaError return size
def _reset_frame(self): self._iobuf = io.BytesIO(self._iobuf.read()) self._iobuf.seek(0, 2) # io.SEEK_END == 2 (constant not present in 2.6) self._current_frame = None
def get_cell_log_pos(self): """Returns the current position of the last byte in the Tor cell log.""" return self.cell_log.seek(0, SEEK_END)
def dump(): parser = argparse.ArgumentParser(description='Dump all the boxes from an MP4 file') parser.add_argument("input_file", type=argparse.FileType("rb"), metavar="FILE", help="Path to the MP4 file to open") args = parser.parse_args() fd = args.input_file fd.seek(0, io.SEEK_END) eof = fd.tell() fd.seek(0) while fd.tell() < eof: box = Box.parse_stream(fd) print(box)
def sendall(self, data): self._buffer.seek(0, io.SEEK_END) self._buffer.write(data)
def seek(self, offset, whence): if (whence == io.SEEK_SET): self._pos = offset elif (whence == io.SEEK_CUR): self._pos += offset elif (whence == io.SEEK_END): self._pos = self._file._filesize + offset return self._pos
def test_small_file_seek_and_read(self): with self.fs.open(self.KEY_LOGO_PNG, "rb") as f: self.assertEqual(64, f.seek(64, io.SEEK_CUR)) self.assertEqual(128, f.seek(64, io.SEEK_CUR)) self.assertEqual(256, f.seek(256, io.SEEK_SET)) self.assertEqual(24610, f.seek(-256, io.SEEK_END)) self.assertEqual( b'\x04$\x00_\x85$\xfb^\xf8\xe8]\x7f;}\xa8\xb7', f.read(16))
def testTruncate(self): f = _FileIO(TESTFN, 'w') f.write(bytes(bytearray(range(10)))) self.assertEqual(f.tell(), 10) f.truncate(5) self.assertEqual(f.tell(), 10) self.assertEqual(f.seek(0, io.SEEK_END), 5) f.truncate(15) self.assertEqual(f.tell(), 5) self.assertEqual(f.seek(0, io.SEEK_END), 15) f.close()
def seek(self, offset, whence=io.SEEK_SET): """ Change the stream position to the given byte *offset*. *offset* is interpreted relative to the position indicated by *whence*. Values for *whence* are: * ``SEEK_SET`` or ``0`` – start of the stream (the default); *offset* should be zero or positive * ``SEEK_CUR`` or ``1`` – current stream position; *offset* may be negative * ``SEEK_END`` or ``2`` – end of the stream; *offset* is usually negative Return the new absolute position. """ with self.lock: if whence == io.SEEK_CUR: offset = self._pos + offset elif whence == io.SEEK_END: offset = self._length + offset if offset < 0: raise ValueError( 'New position is before the start of the stream') self._set_pos(offset) return self._pos
def test_seeking_from_end(self): with EncodedFile(io.BytesIO(COMPRESSED)) as fp: self.assertEqual(fp.read(100), TEXT[:100]) seeked_pos = fp.seek(-100, io.SEEK_END) self.assertEqual(seeked_pos, len(TEXT) - 100) self.assertEqual(fp.read(100), TEXT[-100:])
def test_seeking_from_end_beyond_beginning(self): with EncodedFile(io.BytesIO(COMPRESSED)) as fp: # Go to end to get size size = fp.seek(0, io.SEEK_END) # Go to beginning self.assertNotRaises(fp.seek, -size, io.SEEK_END) # One before beginning self.assertRaises(IOError, fp.seek, -size - 1, io.SEEK_END)
def seek(self, offset, whence=io.SEEK_SET): # Recalculate offset as an absolute file position. if whence == io.SEEK_SET: pass elif whence == io.SEEK_CUR: offset += self._pos elif whence == io.SEEK_END: if self._size < 0: # Finish reading the file while self.read(io.DEFAULT_BUFFER_SIZE): pass offset += self._size else: raise ValueError('Invalid value for whence: {}'.format(whence)) if offset < 0: msg = '[Error {code}] {msg}' raise IOError(msg.format(code=errno.EINVAL, msg=os.strerror(errno.EINVAL))) # Make it so that offset is the number of bytes to skip forward. if offset < self._pos: self._rewind() else: offset -= self._pos # Read and discard data until we reach the desired position while offset > 0: data = self.read(min(io.DEFAULT_BUFFER_SIZE, offset)) if not data: break offset -= len(data) return self._pos
def recognizes(cls, file): size = os.stat(file.path).st_size if size < CBFS_HEADER_SIZE or size > CBFS_MAXIMUM_FILE_SIZE: return False with open(file.path, 'rb') as f: # pick at the latest byte as it should contain the relative offset of the header f.seek(-4, io.SEEK_END) # <pgeorgi> given the hardware we support so far, it looks like # that field is now bound to be little endian # -- #coreboot, 2015-10-14 rel_offset = struct.unpack('<i', f.read(4))[0] if rel_offset < 0 and -rel_offset > CBFS_HEADER_SIZE and -rel_offset < size: f.seek(rel_offset, io.SEEK_END) logger.debug('looking for header at offset: %x', f.tell()) if is_header_valid(f.read(CBFS_HEADER_SIZE), size): return True elif not file.name.endswith('.rom'): return False else: logger.debug('CBFS relative offset seems wrong, scanning whole image') f.seek(0, io.SEEK_SET) offset = 0 buf = f.read(CBFS_HEADER_SIZE) while len(buf) >= CBFS_HEADER_SIZE: if is_header_valid(buf, size, offset): return True if len(buf) - offset <= CBFS_HEADER_SIZE: buf = f.read(32768) offset = 0 else: offset += 1 return False
def seek(self, offset, whence): """ Seek within the part. This is similar to the standard seek, except that io.SEEK_SET is the start of the part and io.SEEK_END is the end of the part. :param offset: Offset in bytes from location determined by the whence value :param whence: io.SEEK_END and io.SEEK_SET are supported. """ if whence == io.SEEK_END: self.file.seek(self.start + self.size + offset, io.SEEK_SET) elif whence == io.SEEK_SET: self.file.seek(self.start + offset, io.SEEK_SET) else: raise RuntimeError("Unhandled whence value: {}".format(whence))
def add_parts_from_file(self, file_path): """ Splits a file into parts and adds all parts to an internal list of parts to upload. The parts will not be uploaded to the server until upload is called. :param string file_path: Path of file to upload in parts """ with io.open(file_path, mode='rb') as file_object: file_object.seek(0, io.SEEK_END) end = file_object.tell() file_object.seek(0, io.SEEK_SET) offset = 0 while file_object.tell() < end: self.add_part_from_file(file_path, offset=offset, size=self.part_size) offset += self.part_size file_object.seek(offset, io.SEEK_SET)
def _reverse_read_lines(fp, buf_size=8192): # pylint: disable=invalid-name """ Async generator that returns the lines of a file in reverse order. ref: https://stackoverflow.com/a/23646049/8776239 and: https://stackoverflow.com/questions/2301789/read-a-file-in-reverse-order-using-python """ segment = None # holds possible incomplete segment at the beginning of the buffer offset = 0 await fp.seek(0, io.SEEK_END) file_size = remaining_size = await fp.tell() while remaining_size > 0: offset = min(file_size, offset + buf_size) await fp.seek(file_size - offset) buffer = await fp.read(min(remaining_size, buf_size)) remaining_size -= buf_size lines = buffer.splitlines(True) # the first line of the buffer is probably not a complete line so # we'll save it and append it to the last line of the next buffer # we read if segment is not None: # if the previous chunk starts right from the beginning of line # do not concat the segment to the last line of new chunk # instead, yield the segment first if buffer[-1] == '\n': # print 'buffer ends with newline' yield segment else: lines[-1] += segment # print 'enlarged last line to >{}<, len {}'.format(lines[-1], len(lines)) segment = lines[0] for index in range(len(lines) - 1, 0, -1): l = lines[index] if l: yield l # Don't yield None if the file was empty if segment is not None: yield segment
def available(self): """ This is duplicate functionality if we have a HighPerformanceStreamIO. But we also want to support those that aren't. TODO: Better solution? """ curPos = self._stream.tell() self._stream.seek(0, SEEK_END) endPos = self._stream.tell() self._stream.seek(curPos) return endPos-curPos
def _rawStreamSize(self): curPos = self._stream.tell() self._stream.seek(0, SEEK_END) endPos = self._stream.tell() self._stream.seek(curPos) return endPos - self._prefixStart
def _rawAvailable(self): "some underlying streams may not support 'available'" curPos = self._stream.tell() self._stream.seek(0, SEEK_END) endPos = self._stream.tell() self._stream.seek(curPos) return endPos - curPos
def available(self): """ Get the available bytes, cutting off the suffix if it's been acocunted for """ if not self._rawStreamSize() >= (self.PREFIX_SIZE + self.SUFFIX_SIZE): return 0 curPos = self._stream.tell() # even if the suffix hasn't been received yet, we calculate our offsets as if it had. # why? because if it hasn't been received yet, we don't want to finish! The whole packet # isn't framed (verified) until the final bytes are received. self._stream.seek(-self.SUFFIX_SIZE, SEEK_END) endPos = self._stream.tell() self._stream.seek(curPos) return endPos-curPos
def seek(self, offset, whence=SEEK_SET): "Adjust seek from prefix start and, if present, from prefix" if not self._rawStreamSize() >= (self.PREFIX_SIZE + self.SUFFIX_SIZE): return if whence == SEEK_SET: offset += self._prefixStart + self.PREFIX_SIZE return self._stream.seek(offset, whence) elif whence == SEEK_CUR: return self._stream.seek(offset, whence) elif whence == SEEK_END: # even if the suffix hasn't been received yet, we calculate our offsets as if it had. # why? because if it hasn't been received yet, we don't want to finish! The whole packet # isn't framed (verified) until the final bytes are received. offset = offset - self.SUFFIX_SIZE return self._stream.seek(offset, whence)
def update(self, newData): beforeWritePos = self.tell() self.seek(0, io.SEEK_END) self.write(newData) self.seek(beforeWritePos)
def available(self): cur = self.tell() end = self.seek(0, io.SEEK_END) self.seek(cur) return end-cur
def ended(self) -> bool: try: fileno = self._codeio.fileno() offset = os.fstat(fileno).st_size except io.UnsupportedOperation: old_offset = self.offset self._codeio.seek(0, io.SEEK_END) offset = self._codeio.tell() self._codeio.seek(old_offset) return self.offset == offset
def _play_run(self, f): err = None try: # Calculate how many records are in the file; we'll use this later # when updating the progress bar rec_total = (f.seek(0, io.SEEK_END) - HEADER_REC.size) // DATA_REC.size f.seek(0) skipped = 0 for rec, data in enumerate(self._play_source(f)): now = time() if data.timestamp < now: skipped += 1 continue else: if self._play_event.wait(data.timestamp - now): break self.props.application.pressure.set_values(data.pressure, data.ptemp) self.props.application.humidity.set_values(data.humidity, data.htemp) self.props.application.imu.set_imu_values( (data.ax, data.ay, data.az), (data.gx, data.gy, data.gz), (data.cx, data.cy, data.cz), (data.ox, data.oy, data.oz), ) # Again, would be better to use custom signals here but # attempting to do so just results in seemingly random # segfaults during playback with self._play_update_lock: if self._play_update_id == 0: self._play_update_id = GLib.idle_add(self._play_update_controls, rec / rec_total) except Exception as e: err = e finally: f.close() # Must ensure that controls are only re-enabled *after* all pending # control updates have run with self._play_update_lock: if self._play_update_id: GLib.source_remove(self._play_update_id) self._play_update_id = 0 # Get the main thread to re-enable the controls at the end of # playback GLib.idle_add(self._play_controls_finish, err)