我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.SEEK_CUR。
def seek(self, pos, whence=os.SEEK_SET): """Seek to a position in the file. """ if self.closed: raise ValueError("I/O operation on closed file") if whence == os.SEEK_SET: self.position = min(max(pos, 0), self.size) elif whence == os.SEEK_CUR: if pos < 0: self.position = max(self.position + pos, 0) else: self.position = min(self.position + pos, self.size) elif whence == os.SEEK_END: self.position = max(min(self.size + pos, self.size), 0) else: raise ValueError("Invalid argument") self.buffer = b"" self.fileobj.seek(self.position)
def seek(self, pos, whence=os.SEEK_SET): """Seek to a position in the file. """ if self.closed: raise ValueError("I/O operation on closed file") if whence == os.SEEK_SET: self.position = min(max(pos, 0), self.size) elif whence == os.SEEK_CUR: if pos < 0: self.position = max(self.position + pos, 0) else: self.position = min(self.position + pos, self.size) elif whence == os.SEEK_END: self.position = max(min(self.size + pos, self.size), 0) else: raise ValueError("Invalid argument") self.buffer = "" self.fileobj.seek(self.position)
def recvfrom(self, bufsize, flags=0): if self.type != socket.SOCK_DGRAM: return super(socksocket, self).recvfrom(bufsize, flags) if not self._proxyconn: self.bind(("", 0)) buf = BytesIO(super(socksocket, self).recv(bufsize + 1024, flags)) buf.seek(2, SEEK_CUR) frag = buf.read(1) if ord(frag): raise NotImplementedError("Received UDP packet fragment") fromhost, fromport = self._read_SOCKS5_address(buf) if self.proxy_peername: peerhost, peerport = self.proxy_peername if fromhost != peerhost or peerport not in (0, fromport): raise socket.error(EAGAIN, "Packet filtered") return (buf.read(bufsize), (fromhost, fromport))
def recvfrom(self, bufsize, flags=0): if self.type != socket.SOCK_DGRAM: return _BaseSocket.recvfrom(self, bufsize, flags) if not self._proxyconn: self.bind(("", 0)) buf = BytesIO(_BaseSocket.recv(self, bufsize, flags)) buf.seek(+2, SEEK_CUR) frag = buf.read(1) if ord(frag): raise NotImplementedError("Received UDP packet fragment") fromhost, fromport = _read_socks5_address(buf) if self.proxy_peername: peerhost, peerport = self.proxy_peername if fromhost != peerhost or peerport not in (0, fromport): raise socket.error(EAGAIN, "Packet filtered") return buf.read(), (fromhost, fromport)
def recvfrom(self, bufsize, flags=0): if self.type != socket.SOCK_DGRAM: return _BaseSocket.recvfrom(self, bufsize, flags) if not self._proxyconn: self.bind(("", 0)) buf = BytesIO(_BaseSocket.recv(self, bufsize, flags)) buf.seek(+2, SEEK_CUR) frag = buf.read(1) if ord(frag): raise NotImplementedError("Received UDP packet fragment") fromhost, fromport = self._read_SOCKS5_address(buf) if self.proxy_peername: peerhost, peerport = self.proxy_peername if fromhost != peerhost or peerport not in (0, fromport): raise socket.error(EAGAIN, "Packet filtered") return (buf.read(), (fromhost, fromport))
def fixOffsets(self, count, isShort=False, isLong=False): if not isShort and not isLong: raise RuntimeError("offset is neither short nor long") for i in range(count): offset = self.readShort() if isShort else self.readLong() offset += self.offsetOfNewPage if isShort and offset >= 65536: # offset is now too large - we must convert shorts to longs if count != 1: raise RuntimeError("not implemented") # XXX TODO # simple case - the offset is just one and therefore it is # local (not referenced with another offset) self.rewriteLastShortToLong(offset) self.f.seek(-10, os.SEEK_CUR) self.writeShort(4) # rewrite the type to LONG self.f.seek(8, os.SEEK_CUR) elif isShort: self.rewriteLastShort(offset) else: self.rewriteLastLong(offset)
def rar_v4(f): # based on http://www.forensicswiki.org/wiki/RAR # and http://acritum.com/winrar/rar-format while True: pos = f.tell() crc, typ, flags, size = struct.unpack('<HBHH', f.read(7)) if flags & 0x8000: size += struct.unpack('<L', f.read(4))[0] if not 0x72 <= typ <= 0x7b: raise FileCorrupted f.try_seek(pos + size, os.SEEK_SET) # f.try_seek(size, os.SEEK_CUR) f.update_pos() if typ == 0x7b: break elif typ == 0x73 and flags & 0x80: return Ellipsis # encrypted, assume bad faith return True
def seek(self, offset, whence=os.SEEK_SET): """Set the file's current position. Args: offset: seek offset as number. whence: seek mode. Supported modes are os.SEEK_SET (absolute seek), and os.SEEK_CUR (seek relative to the current position) and os.SEEK_END (seek relative to the end, offset should be negative). """ self._verify_read_mode() if whence == os.SEEK_SET: self._offset = offset elif whence == os.SEEK_CUR: self._offset += offset elif whence == os.SEEK_END: file_stat = self.stat() self._offset = file_stat.st_size + offset else: raise InvalidArgumentError('Whence mode %d is not supported', whence)
def seek(self, offset, whence=os.SEEK_SET): """Set the file's current position. Args: offset: seek offset as number. whence: seek mode. Supported modes are os.SEEK_SET (absolute seek), os.SEEK_CUR (seek relative to the current position), and os.SEEK_END (seek relative to the end, offset should be negative). """ if whence == os.SEEK_SET: self._position = offset elif whence == os.SEEK_CUR: self._position += offset elif whence == os.SEEK_END: file_stat = stat(self._filename) self._position = file_stat.st_size + offset else: raise InvalidArgumentError('Whence mode %d is not supported', whence) self._buffer = '' self._buffer_pos = 0 self._eof = False
def read_data(self, start=None, end=None): """read data from file and store it locally""" nframe = self._find_nframe_from_file() seek_to_data(self.file_object) read_start = 0 end_read = nframe * self.nifs * self.nchans if start is not None: if start < 0: read_start = (nframe + start) * self.nifs * self.nchans elif start >= 0: read_start = start * self.nifs * self.nchans if end is not None: if end < 0: end_read = (nframe + end) * self.nifs * self.nchans elif end >= 0: end_read = end * self.nifs * self.nchans self.file_object.seek(read_start, os.SEEK_CUR) nbytes_to_read = end_read - read_start data = np.fromfile(self.file_object, count=nbytes_to_read, dtype=self.dtype) nframe = data.size // self.nifs // self.nchans data = data.reshape((nframe, self.nifs, self.nchans)) if self.nbits < 8: data = unpack(data, self.nbits) self.data = data return self.data
def _oops_dump_state(self, ignore_remaining_data=False): """ Log a deserialization error :param ignore_remaining_data: If True, don't log an error when unused trailing bytes are remaining """ log_error("==Oops state dump" + "=" * (30 - 17)) log_error("References: {0}".format(self.references)) log_error("Stream seeking back at -16 byte (2nd line is an actual " "position!):") # Do not use a keyword argument self.object_stream.seek(-16, os.SEEK_CUR) position = self.object_stream.tell() the_rest = self.object_stream.read() if not ignore_remaining_data and len(the_rest): log_error("Warning!!!!: Stream still has {0} bytes left." .format(len(the_rest))) log_error(self._create_hexdump(the_rest, position)) log_error("=" * 30) # ------------------------------------------------------------------------------
def recvfrom(self, bufsize, flags=0): if self.type != socket.SOCK_DGRAM or self.proxy[0] == HTTP or not self._is_client: return super(socksocket, self).recvfrom(bufsize, flags) if not self._proxyconn: self.bind(("", 0)) buf = BytesIO(super(socksocket, self).recv(bufsize + 1024, flags)) buf.seek(2, SEEK_CUR) frag = buf.read(1) if ord(frag): raise NotImplementedError("Received UDP packet fragment") fromhost, fromport = self._read_SOCKS5_address(buf) if self.proxy_peername: peerhost, peerport = self.proxy_peername if fromhost != peerhost or peerport not in (0, fromport): raise socket.error(EAGAIN, "Packet filtered") return (buf.read(bufsize), (fromhost, fromport))