我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用io.SEEK_CUR。
def recvfrom(self, bufsize, flags=0): if self.type != socket.SOCK_DGRAM: return _BaseSocket.recvfrom(self, bufsize, flags) if not self._proxyconn: self.bind(("", 0)) buf = BytesIO(_BaseSocket.recv(self, bufsize, flags)) buf.seek(+2, SEEK_CUR) frag = buf.read(1) if ord(frag): raise NotImplementedError("Received UDP packet fragment") fromhost, fromport = self._read_SOCKS5_address(buf) if self.proxy_peername: peerhost, peerport = self.proxy_peername if fromhost != peerhost or peerport not in (0, fromport): raise socket.error(EAGAIN, "Packet filtered") return (buf.read(), (fromhost, fromport))
def load(self, loader: bfres.core.ResFileLoader): self.value_type = ShaderParamType(loader.read_byte()) if loader.res_file.version >= 0x03030000: siz_data = loader.read_byte() self.data_offset = loader.read_uint16() offset = loader.read_int32() # Uniform variable offset. callback_pointer = loader.read_uint32() self.depended_index = loader.read_uint16() self.depend_index = loader.read_uint16() self.name = loader.load_string() else: # GUESS loader.seek(1, io.SEEK_CUR) self.data_offset = loader.read_uint16() offset = loader.read_int32() # Uniform variable offset. self.name = loader.load_string()
def seek(self, offset, whence=io.SEEK_SET): self.__raiseIfClosed() if not self.__seekable: raise OSError("Seek not enabled for this stream") if whence == io.SEEK_SET: if offset < 0: raise ValueError("Cannot have a negative absolute seek") newStreamPosition = offset elif whence == io.SEEK_CUR: newStreamPosition = self.__streamPosition + whence elif whence == io.SEEK_END: if not self.__buffers: newStreamPosition = 0 else: newStreamPosition = self.__streamEnd + offset self.__streamPosition = newStreamPosition
def seek(self, position, whence=io.SEEK_SET): """Seek to a position in the file. """ if whence == io.SEEK_SET: self.position = min(max(position, 0), self.size) elif whence == io.SEEK_CUR: if position < 0: self.position = max(self.position + position, 0) else: self.position = min(self.position + position, self.size) elif whence == io.SEEK_END: self.position = max(min(self.size + position, self.size), 0) else: raise ValueError("Invalid argument") return self.position
def seek(self, offset, whence=io.SEEK_CUR): return self.buffer.seek(offset, whence)
def seek(self, offset): """ We need to override the second param to move from the current position. :param offset: offset to move away. :type offset: int """ return await self.buffer.seek(offset, io.SEEK_CUR)
def align(self, alignment): self.reader.seek(-self.reader.tell() % alignment, io.SEEK_CUR)
def align(self, alignment): self.writer.seek(-self.writer.tell() % alignment, io.SEEK_CUR)
def peek_header(self): bytes_read = self.f.readinto(self.header) if bytes_read < ctypes.sizeof(IMCHeader): raise RuntimeError('LSF file ended abruptly.') # Return file position to before header self.f.seek(-ctypes.sizeof(IMCHeader), io.SEEK_CUR)
def write_index(self): """ Run through the lsf-file and generate an index file :return: """ self.f.seek(0) # Check for file end while self.f.peek(1): self.peek_header() # Timestamp of first message is used to avoid index/lsf mismatch on load if self.f.tell() == 0: self.idx['timestamp'] = self.header.timestamp # Store position for this message try: self.idx[self.header.mgid].append(self.f.tell()) except (KeyError, AttributeError) as e: self.idx[self.header.mgid] = [self.f.tell()] # Go to next message self.f.seek(ctypes.sizeof(IMCHeader) + self.header.size + ctypes.sizeof(IMCFooter), io.SEEK_CUR) self.f.seek(0) # Store index fbase, ext = os.path.splitext(self.fpath) with open(fbase + '.pyimc_idx', mode='wb') as f: pickle.dump(self.idx, f)
def read_message(self): """ Returns a generator that yields the messages in the currently open LSF file. This requires the LSFReader object to be opened using the "with" statement. See read(), where this is done automatically. :return: """ if self.idx: # Read using index for pos in self.sorted_idx_iter(self.msg_types): self.f.seek(pos) self.peek_header() self.parser.reset() b = self.f.read(self.header.size + ctypes.sizeof(IMCHeader) + ctypes.sizeof(IMCFooter)) msg = self.parser.parse(b) yield msg else: # Read file without index # Check for file end while self.f.peek(1): self.peek_header() if not self.msg_types or self.header.mgid in self.msg_types: self.parser.reset() b = self.f.read(self.header.size + ctypes.sizeof(IMCHeader) + ctypes.sizeof(IMCFooter)) msg = self.parser.parse(b) yield msg else: self.f.seek(ctypes.sizeof(IMCHeader) + self.header.size + ctypes.sizeof(IMCFooter), io.SEEK_CUR)
def invoke(self, arg, from_tty): contents_recv = receive_from_pince() hex_byte_list = [] address = contents_recv[0] offset = contents_recv[1] with open(ScriptUtils.mem_file, "rb") as FILE: FILE.seek(address) for item in range(offset): try: current_item = " ".join(format(n, '02x') for n in FILE.read(1)) except IOError: current_item = "??" FILE.seek(1, io.SEEK_CUR) # Necessary since read() failed to execute hex_byte_list.append(current_item) send_to_pince(hex_byte_list)
def seek(self, offset, whence): if (whence == io.SEEK_SET): self._pos = offset elif (whence == io.SEEK_CUR): self._pos += offset elif (whence == io.SEEK_END): self._pos = self._file._filesize + offset return self._pos
def test_small_file_seek_and_read(self): with self.fs.open(self.KEY_LOGO_PNG, "rb") as f: self.assertEqual(64, f.seek(64, io.SEEK_CUR)) self.assertEqual(128, f.seek(64, io.SEEK_CUR)) self.assertEqual(256, f.seek(256, io.SEEK_SET)) self.assertEqual(24610, f.seek(-256, io.SEEK_END)) self.assertEqual( b'\x04$\x00_\x85$\xfb^\xf8\xe8]\x7f;}\xa8\xb7', f.read(16))
def seekable(self): try: return self._f.seekable() except AttributeError: try: self.seek(0, SEEK_CUR) except IOError: return False else: return True
def seek(self, offset, whence=io.SEEK_SET): """ Change the stream position to the given byte *offset*. *offset* is interpreted relative to the position indicated by *whence*. Values for *whence* are: * ``SEEK_SET`` or ``0`` – start of the stream (the default); *offset* should be zero or positive * ``SEEK_CUR`` or ``1`` – current stream position; *offset* may be negative * ``SEEK_END`` or ``2`` – end of the stream; *offset* is usually negative Return the new absolute position. """ with self.lock: if whence == io.SEEK_CUR: offset = self._pos + offset elif whence == io.SEEK_END: offset = self._length + offset if offset < 0: raise ValueError( 'New position is before the start of the stream') self._set_pos(offset) return self._pos
def test_seeking_forward_from_current(self): contents = TEXT with EncodedFile(io.BytesIO(COMPRESSED)) as fp: self.assertEqual(fp.read(100), contents[:100]) fp.seek(50, io.SEEK_CUR) # Move 50 forwards self.assertEqual(fp.read(100), contents[150:250])
def test_seeking_backwards_from_current(self): with EncodedFile(io.BytesIO(COMPRESSED)) as fp: contents = fp.read() fp.seek(-100, io.SEEK_CUR) self.assertEqual(fp.read(), contents[-100:])
def test_seeking_beyond_beginning_from_current(self): with EncodedFile(io.BytesIO(COMPRESSED)) as fp: self.assertRaises(IOError, fp.seek, -100, io.SEEK_CUR)
def seek(self, offset, whence=io.SEEK_SET): # Recalculate offset as an absolute file position. if whence == io.SEEK_SET: pass elif whence == io.SEEK_CUR: offset += self._pos elif whence == io.SEEK_END: if self._size < 0: # Finish reading the file while self.read(io.DEFAULT_BUFFER_SIZE): pass offset += self._size else: raise ValueError('Invalid value for whence: {}'.format(whence)) if offset < 0: msg = '[Error {code}] {msg}' raise IOError(msg.format(code=errno.EINVAL, msg=os.strerror(errno.EINVAL))) # Make it so that offset is the number of bytes to skip forward. if offset < self._pos: self._rewind() else: offset -= self._pos # Read and discard data until we reach the desired position while offset > 0: data = self.read(min(io.DEFAULT_BUFFER_SIZE, offset)) if not data: break offset -= len(data) return self._pos
def run(self): try: while not self._should_stop: readable, _, _ = select.select(self._inputs, [], [], 1) if self._hci_socket in readable: source = 'socket' packet = self._hci_socket.recv(4096) self._logger.debug('SOCKET: %s', RawCopy(HciPacket).parse(packet)) self.handle_packet(packet, source) if self._pty_fd in readable: data = os.read(self._pty_master, 4096) self._logger.debug('Raw PTY data: %s', repr(data)) self._pty_buffer.write(data) self._pty_buffer.seek(-len(data), SEEK_CUR) source = 'pty' while True: if self._pty_buffer.pos == self._pty_buffer.len: break parsed_packet = RawCopy(HciPacket).parse_stream(self._pty_buffer) if not parsed_packet: break self._logger.debug('PTY: %s', parsed_packet) packet = parsed_packet.data self.handle_packet(packet, source) except KeyboardInterrupt: log.info("Received SIGTERM, exiting")
def load(self, loader: bfres.core.ResFileLoader): loader.check_signature(self._SIGNATURE) self._flags = loader.read_uint32() num_bone = loader.read_uint16() num_smooth_matrix = loader.read_uint16() num_rigid_matrix = loader.read_uint16() loader.seek(2, io.SEEK_CUR) self.bones = loader.load_dict(bfres.Bone) ofs_bone_list = loader.read_offset() # Only load dict self.matrix_to_bone_list = loader.load_custom(lambda l: l.read_uint16s(num_smooth_matrix + num_rigid_matrix)) self.inverse_model_matrices = loader.load_custom(lambda l: l.read_matrix3x4s(num_smooth_matrix)) user_pointer = loader.read_uint32()
def load(self, loader: bfres.core.ResFileLoader): loader.check_signature("FVTX") num_vertex_attrib = loader.read_byte() num_buffer = loader.read_byte() idx = loader.read_uint16() vertex_count = loader.read_uint32() self.vertex_skin_count = loader.read_byte() loader.seek(3, io.SEEK_CUR) ofs_vertex_attrib_list = loader.read_offset() # Only load dict. self.attributes = loader.load_dict(bfres.VertexAttrib) self.buffers = loader.load_list(bfres.Buffer, num_buffer) user_pointer = loader.read_uint32()
def load(self, loader: bfres.core.ResFileLoader): self.primitive_type = bfres.gx2.GX2PrimitiveType(loader.read_uint32()) self.index_format = bfres.gx2.GX2IndexFormat(loader.read_uint32()) index_count = loader.read_uint32() num_sub_mesh = loader.read_uint16() loader.seek(2, io.SEEK_CUR) self.sub_meshes = loader.load_list(bfres.SubMesh, num_sub_mesh) self.index_buffer = loader.load(bfres.Buffer) self.first_vertex = loader.read_uint32()
def load(self, loader: bfres.core.ResFileLoader): self.name = loader.load_string() self.buffer_index = loader.read_byte() loader.seek(1, io.SEEK_CUR) self.offset = loader.read_uint16() self.format = bfres.gx2.GX2AttribFormat(loader.read_uint32())
def load(self, loader: bfres.core.ResFileLoader): count = loader.read_uint16() self.value_type = RenderInfoType(loader.read_byte()) loader.seek(1, io.SEEK_CUR) self.name = loader.load_string() if self.value_type == RenderInfoType.INT32: self.value = loader.read_int32s(count) elif self.value_type == RenderInfoType.SINGLE: self.value = loader.read_singles(count) elif self.value_type == RenderInfoType.STRING: self.value = loader.load_strings(count)
def load(self, loader: bfres.core.ResFileLoader): loader.check_signature("FTEX") self.dim = bfres.gx2.GX2SurfaceDim(loader.read_uint32()) self.width = loader.read_uint32() self.height = loader.read_uint32() self.depth = loader.read_uint32() self.mip_count = loader.read_uint32() self.format = bfres.gx2.GX2SurfaceFormat(loader.read_uint32()) self.aa_mode = bfres.gx2.GX2AAMode(loader.read_uint32()) self.use = bfres.gx2.GX2SurfaceUse(loader.read_uint32()) siz_data = loader.read_uint32() image_pointer = loader.read_uint32() siz_map_data = loader.read_uint32() mip_pointer = loader.read_uint32() self.tile_mode = bfres.gx2.GX2TileMode(loader.read_uint32()) self.swizzle = loader.read_uint32() self.alignment = loader.read_uint32() self.pitch = loader.read_uint32() self.mip_offsets = loader.read_uint32s(13) self.view_mip_first = loader.read_uint32() self.view_mip_count = loader.read_uint32() self.view_slice_first = loader.read_uint32() self.view_slice_count = loader.read_uint32() self.comp_sel_r = bfres.gx2.GX2CompSel(loader.read_byte()) self.comp_sel_g = bfres.gx2.GX2CompSel(loader.read_byte()) self.comp_sel_b = bfres.gx2.GX2CompSel(loader.read_byte()) self.comp_sel_a = bfres.gx2.GX2CompSel(loader.read_byte()) self.regs = loader.read_uint32s(5) handle = loader.read_uint32() self.array_length = loader.read_uint32() # Possibly just a byte. self.name = loader.load_string() self.path = loader.load_string() self.data = loader.load_custom(lambda l: l.read_bytes(siz_data)) self.mip_data = loader.load_custom(lambda l: l.read_bytes(siz_map_data)) self.user_data = loader.load_dict(bfres.UserData) num_user_data = loader.read_uint16() loader.seek(2, io.SEEK_CUR)
def load(self, loader: bfres.core.ResFileLoader): self.name = loader.load_string() count = loader.read_uint16() self.value_type = UserDataType(loader.read_byte()) loader.seek(1, io.SEEK_CUR) if self.value_type == UserDataType.INT32: self.value = loader.read_int32s(count) elif self.value_type == UserDataType.SINGLE: self.value = loader.read_singles(count) elif self.value_type == UserDataType.STRING: self.value = loader.load_strings(count, "ascii") elif self.value_type == UserDataType.WSTRING: self.value = loader.load_strings(count, "utf-8") elif self.value_type == UserDataType.BYTE: self.value = loader.read_bytes(count)
def seek(self, offset, whence=SEEK_SET): "Adjust seek from prefix start and, if present, from prefix" if not self._rawStreamSize() >= (self.PREFIX_SIZE + self.SUFFIX_SIZE): return if whence == SEEK_SET: offset += self._prefixStart + self.PREFIX_SIZE return self._stream.seek(offset, whence) elif whence == SEEK_CUR: return self._stream.seek(offset, whence) elif whence == SEEK_END: # even if the suffix hasn't been received yet, we calculate our offsets as if it had. # why? because if it hasn't been received yet, we don't want to finish! The whole packet # isn't framed (verified) until the final bytes are received. offset = offset - self.SUFFIX_SIZE return self._stream.seek(offset, whence)
def from_file(cls, fp, size=None): """Construct a Frame Descriptor object from binary data. Parameters ---------- fp : file-like File-like object to read the Frame Descriptor from. The cursor must be at the position where the Frame Descriptor starts. size : int, optional the number of bytes to be read. If not given, the value inspected from the fp itself will be used. Returns ------- FrameDescriptor .. note:: A number of bytes equal to `size` will be consumed from the fp. .. warning: If the number of requested bytes is smaller than the size of the Frame Descriptor, some fields might be left unfilled. """ out = FrameDescriptor() if size is None: size = np.fromfile(fp, dtype=np.uint32, count=1) if size.size == 0: raise ValueError('Unexpected EOF') fp.seek(-size.nbytes, io.SEEK_CUR) size = size[0] if size > ct.sizeof(FrameDescriptor): raise ValueError('Number of requested bytes (%d) do not fit ' 'in the data structure (%d)' % (size, ct.sizeof(out))) buf = fp.read(size) ct.memmove(ct.addressof(out), buf, size) return out