我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用io.SEEK_SET。
def refresh_contents(self): log_file = open(self.log_path) log_file.seek(0, io.SEEK_END) end_pos = log_file.tell() if end_pos > 20000: log_file.seek(end_pos - 20000, io.SEEK_SET) else: log_file.seek(0, io.SEEK_SET) contents = log_file.read().split("\n", 1)[-1] if contents != self.contents: self.contents = contents self.textBrowser_LogContent.clear() self.textBrowser_LogContent.setPlainText(contents) # Scrolling to bottom cursor = self.textBrowser_LogContent.textCursor() cursor.movePosition(QTextCursor.End) self.textBrowser_LogContent.setTextCursor(cursor) self.textBrowser_LogContent.ensureCursorVisible() log_file.close()
def seek(self, offset, whence=io.SEEK_SET): """Change the file position. The new position is specified by offset, relative to the position indicated by whence. Values for whence are: 0: start of stream (default); offset must not be negative 1: current stream position 2: end of stream; offset must not be positive Returns the new file position. Note that seeking is emulated, so depending on the parameters, this operation may be extremely slow. """ with self._lock: self._check_can_seek() return self._buffer.seek(offset, whence)
def __init__(self, file_object, start, size): """ Build an object that will act like a BufferedReader object, but constrained to a predetermined part of the file_object. :param file_object: A file opened for reading in binary mode. :param start: Start of the part within the file_object :param size: Ideal size of the part. This will be set to the number of bytes remaining in the file if there aren't enough bytes remaining. """ self.file = file_object self.bytes_read = 0 self.start = start # Seek to the end of the file to see how many bytes remain # from the start of the part. self.file.seek(0, io.SEEK_END) self.size = min(self.file.tell() - self.start, size) # Reset the pointer to the start of the part. self.file.seek(start, io.SEEK_SET)
def seek(self, offset, whence=io.SEEK_SET): self.__raiseIfClosed() if not self.__seekable: raise OSError("Seek not enabled for this stream") if whence == io.SEEK_SET: if offset < 0: raise ValueError("Cannot have a negative absolute seek") newStreamPosition = offset elif whence == io.SEEK_CUR: newStreamPosition = self.__streamPosition + whence elif whence == io.SEEK_END: if not self.__buffers: newStreamPosition = 0 else: newStreamPosition = self.__streamEnd + offset self.__streamPosition = newStreamPosition
def seek(self, position, whence=io.SEEK_SET): """Seek to a position in the file. """ if whence == io.SEEK_SET: self.position = min(max(position, 0), self.size) elif whence == io.SEEK_CUR: if position < 0: self.position = max(self.position + position, 0) else: self.position = min(self.position + position, self.size) elif whence == io.SEEK_END: self.position = max(min(self.size + position, self.size), 0) else: raise ValueError("Invalid argument") return self.position
def get_mime(self, media_io): if hasattr(media_io, "getvalue"): buffer = media_io.getvalue() elif hasattr(media_io, "seekable") and media_io.seekable(): buffer = media_io.read(1024) media_io.seek(SEEK_SET) else: raise DontwiMediaError return self.magic.from_buffer(buffer)
def get_media_size(media_io): if hasattr(media_io, "getvalue"): size = len(media_io.getvalue()) elif hasattr(media_io, "seekable") and media_io.seekable(): media_io.seek(0, SEEK_END) size = media_io.tell() media_io.seek(SEEK_SET) else: raise DontwiMediaError return size
def reset(self): """Move the cursor the beginning of the 1st frame descriptor. """ self._fin.seek(self._offset, io.SEEK_SET)
def seek(self, offset, whence=io.SEEK_SET): self.reader.seek(offset, whence)
def seek(self, offset, whence=io.SEEK_SET): self.writer.seek(offset, whence)
def get_full_trace(self, start_idx, end_idx): """Returns the Tor DATA cells transmitted over a circuit during a specified time period.""" # Sanity check assert start_idx >= 0 and end_idx > 0, ("Invalid (negative) logfile " "position") assert end_idx > start_idx, ("logfile section end_idx must come " "after start_idx") self.cell_log.seek(start_idx, SEEK_SET) return self.cell_log.read(end_idx - start_idx)
def _generate_image(): from PIL import Image thumb = Image.new('RGB', (100, 100), 'blue') thumb_io = io.BytesIO() thumb.save(thumb_io, format='PNG') thumb_io.seek(0, io.SEEK_SET) return SimpleUploadedFile("image.png", thumb_io.getbuffer(), content_type="image/png")
def __getitem__(self, name): # Retrieve value of an input pin. pin = self.pins[name] if pin["type"] != "in": raise KeyError(f"Not an input: {name}") pin["file"].seek(0, io.SEEK_SET) return pin["file"].read(1) == "1"
def web(self, path, args={}, encoding=None, allow_return_none=False): self.logger.debug('????????') if allow_return_none: if path in self.web_cache and self.web_cache[path] == args: self.logger.debug('????? {} ????'.format(path)) self.logger.debug('???{}'.format(args)) return self.web_cache[path] = dict(args) url = urllib.parse.urljoin(self.web_url, urllib.parse.quote(path)) if len(args) > 0: url += '?' + urllib.parse.urlencode(args) self.logger.debug('HTTP ?????{}'.format(url)) data = io.BytesIO() self.curl.setopt(pycurl.URL, url) self.curl.setopt(pycurl.COOKIE, self.web_cookie) self.curl.setopt(pycurl.NOBODY, False) self.curl.setopt(pycurl.NOPROGRESS, True) self.curl.setopt(pycurl.WRITEDATA, data) self.curl.setopt(pycurl.HEADERFUNCTION, lambda *x: None) self.curl.setopt(pycurl.XFERINFOFUNCTION, lambda *x: None) self.curl.perform() status = self.curl.getinfo(pycurl.RESPONSE_CODE) if status != 200: raise ServerError(status) data.seek(io.SEEK_SET) return etree.parse(data, etree.HTMLParser( encoding=encoding, remove_comments=True))
def recv(self, n): self._buffer.seek(self._read_pos, io.SEEK_SET) bs = self._buffer.read(n) self._read_pos = self._buffer.tell() return bs
def seek(self, offset, whence): if (whence == io.SEEK_SET): self._pos = offset elif (whence == io.SEEK_CUR): self._pos += offset elif (whence == io.SEEK_END): self._pos = self._file._filesize + offset return self._pos
def test_small_file_seek_and_read(self): with self.fs.open(self.KEY_LOGO_PNG, "rb") as f: self.assertEqual(64, f.seek(64, io.SEEK_CUR)) self.assertEqual(128, f.seek(64, io.SEEK_CUR)) self.assertEqual(256, f.seek(256, io.SEEK_SET)) self.assertEqual(24610, f.seek(-256, io.SEEK_END)) self.assertEqual( b'\x04$\x00_\x85$\xfb^\xf8\xe8]\x7f;}\xa8\xb7', f.read(16))
def seek(self, offset, whence=SEEK_SET): return self._f.seek(offset, whence)
def seek(self, offset, whence=io.SEEK_SET): """ Change the stream position to the given byte *offset*. *offset* is interpreted relative to the position indicated by *whence*. Values for *whence* are: * ``SEEK_SET`` or ``0`` – start of the stream (the default); *offset* should be zero or positive * ``SEEK_CUR`` or ``1`` – current stream position; *offset* may be negative * ``SEEK_END`` or ``2`` – end of the stream; *offset* is usually negative Return the new absolute position. """ with self.lock: if whence == io.SEEK_CUR: offset = self._pos + offset elif whence == io.SEEK_END: offset = self._length + offset if offset < 0: raise ValueError( 'New position is before the start of the stream') self._set_pos(offset) return self._pos
def seek(self, offset, whence=io.SEEK_SET): # Recalculate offset as an absolute file position. if whence == io.SEEK_SET: pass elif whence == io.SEEK_CUR: offset += self._pos elif whence == io.SEEK_END: if self._size < 0: # Finish reading the file while self.read(io.DEFAULT_BUFFER_SIZE): pass offset += self._size else: raise ValueError('Invalid value for whence: {}'.format(whence)) if offset < 0: msg = '[Error {code}] {msg}' raise IOError(msg.format(code=errno.EINVAL, msg=os.strerror(errno.EINVAL))) # Make it so that offset is the number of bytes to skip forward. if offset < self._pos: self._rewind() else: offset -= self._pos # Read and discard data until we reach the desired position while offset > 0: data = self.read(min(io.DEFAULT_BUFFER_SIZE, offset)) if not data: break offset -= len(data) return self._pos
def seek(self, offset, whence=io.SEEK_SET): if self.in_buffer: return self.in_buffer.seek(offset, whence)
def recognizes(cls, file): size = os.stat(file.path).st_size if size < CBFS_HEADER_SIZE or size > CBFS_MAXIMUM_FILE_SIZE: return False with open(file.path, 'rb') as f: # pick at the latest byte as it should contain the relative offset of the header f.seek(-4, io.SEEK_END) # <pgeorgi> given the hardware we support so far, it looks like # that field is now bound to be little endian # -- #coreboot, 2015-10-14 rel_offset = struct.unpack('<i', f.read(4))[0] if rel_offset < 0 and -rel_offset > CBFS_HEADER_SIZE and -rel_offset < size: f.seek(rel_offset, io.SEEK_END) logger.debug('looking for header at offset: %x', f.tell()) if is_header_valid(f.read(CBFS_HEADER_SIZE), size): return True elif not file.name.endswith('.rom'): return False else: logger.debug('CBFS relative offset seems wrong, scanning whole image') f.seek(0, io.SEEK_SET) offset = 0 buf = f.read(CBFS_HEADER_SIZE) while len(buf) >= CBFS_HEADER_SIZE: if is_header_valid(buf, size, offset): return True if len(buf) - offset <= CBFS_HEADER_SIZE: buf = f.read(32768) offset = 0 else: offset += 1 return False
def add_sample(self, sample, end_callback=None): assert sample.samplewidth == self.samplewidth assert sample.samplerate == self.samplerate assert sample.nchannels == self.nchannels stream = io.BytesIO() sample.write_wav(stream) stream.seek(0, io.SEEK_SET) self.add_stream(stream, end_callback=end_callback)
def load(self, value_type: type): """ Reads and returns an instance of the value_type from the following offset or returns None if the read offset is 0. :param value_type: The type of the instance to read. :return: The instance or None. """ offset = self.read_offset() if not offset: return None with self.temporary_seek(offset, io.SEEK_SET): return self._read_res_data(value_type)
def load_custom(self, callback, offset: int = None): """ Reads and returns an instance of arbitrary type from the following offset with the given callback or returns None if the read offset is 0. :param callback: The callback to read the instance data with. :param offset: The optional offset to use instead of reading a following one. :return: The data instance or None. """ offset = offset or self.read_offset() if not offset: return None with self.temporary_seek(offset, io.SEEK_SET): return callback(self)
def load_dict(self, value_type: type): """ Reads and returns a ResDict instance with elements of the given value_type from the following offset or returns an empty instance if the read offset is 0. :param value_type: The type of the elements. :return: The ResDict instance. """ offset = self.read_offset() resdict = bfres.ResDict(value_type) if not offset: return resdict with self.temporary_seek(offset, io.SEEK_SET): resdict.load(self) return resdict
def load_list(self, value_type: type, count: int, offset: int = None): items = [] offset = offset or self.read_offset() if not offset or not count: return items # Seek to the list start and read it. with self.temporary_seek(offset, io.SEEK_SET): for i in range(count): items.append(self._read_res_data(value_type)) return items
def load_string(self, encoding: str = None): """ Reads and returns a str instance from the following offset or None if the read offset is 0. :return: The read text. """ offset = self.read_offset() if not offset: return None with self.temporary_seek(offset, io.SEEK_SET): return self.read_string_0(encoding or self.encoding)
def seek(self, offset, whence): """ Seek within the part. This is similar to the standard seek, except that io.SEEK_SET is the start of the part and io.SEEK_END is the end of the part. :param offset: Offset in bytes from location determined by the whence value :param whence: io.SEEK_END and io.SEEK_SET are supported. """ if whence == io.SEEK_END: self.file.seek(self.start + self.size + offset, io.SEEK_SET) elif whence == io.SEEK_SET: self.file.seek(self.start + offset, io.SEEK_SET) else: raise RuntimeError("Unhandled whence value: {}".format(whence))
def add_parts_from_file(self, file_path): """ Splits a file into parts and adds all parts to an internal list of parts to upload. The parts will not be uploaded to the server until upload is called. :param string file_path: Path of file to upload in parts """ with io.open(file_path, mode='rb') as file_object: file_object.seek(0, io.SEEK_END) end = file_object.tell() file_object.seek(0, io.SEEK_SET) offset = 0 while file_object.tell() < end: self.add_part_from_file(file_path, offset=offset, size=self.part_size) offset += self.part_size file_object.seek(offset, io.SEEK_SET)
def seek(self, offset, whence=SEEK_SET): "Adjust seek from prefix start and, if present, from prefix" if not self._rawStreamSize() >= (self.PREFIX_SIZE + self.SUFFIX_SIZE): return if whence == SEEK_SET: offset += self._prefixStart + self.PREFIX_SIZE return self._stream.seek(offset, whence) elif whence == SEEK_CUR: return self._stream.seek(offset, whence) elif whence == SEEK_END: # even if the suffix hasn't been received yet, we calculate our offsets as if it had. # why? because if it hasn't been received yet, we don't want to finish! The whole packet # isn't framed (verified) until the final bytes are received. offset = offset - self.SUFFIX_SIZE return self._stream.seek(offset, whence)
def importMesh(self, stream, filename=None): assert(issubclass(type(stream),IOBase)); #Check mesh name validity if (filename is None): if (hasattr(stream,'name')): filename = stream.name; elif (hasattr(stream, 'filename')): filename = stream.filename; else: raise ValueError("Cannot determine the filename of the stream please add filename parameter") filename = os.path.basename(filename); mesh_name = os.path.splitext(filename)[0]; if mesh_name in bpy.data.meshes.keys(): raise ValueError("Mesh with name " + mesh_name + " already exists in blender"); #Check header and select impl self._determineEndianness(stream); headerID = self._readUShorts(stream,1)[0]; if (headerID != OgreMeshSerializer.HEADER_CHUNK_ID): raise ValueError("File header not found"); ver = OgreSerializer.readString(stream); stream.seek(0,SEEK_SET); impl = None; for i in self._versionData: if (i.versionString == ver): impl = i.impl; break; if (impl is None): print(ver); raise ValueError("Cannot find serializer implementation for " "mesh version " + ver); #Create the blender mesh and import the mesh mesh = OgreMesh(mesh_name); impl.importMesh(stream,mesh,self.listener); #Check if newer version if (ver != self._versionData[0].versionString): print("Warning: " " older format (" + ver + "); you should upgrade it as soon as possible" + " using the OgreMeshUpgrade tool."); #Probably useless if (self.listener is not None): listener.processMeshCompleted(mesh);