我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.lseek()。
def _lseek(file_obj, offset, whence): """This is a helper function which invokes 'os.lseek' for file object 'file_obj' and with specified 'offset' and 'whence'. The 'whence' argument is supposed to be either '_SEEK_DATA' or '_SEEK_HOLE'. When there is no more data or hole starting from 'offset', this function returns '-1'. Otherwise the data or hole position is returned.""" try: return os.lseek(file_obj.fileno(), offset, whence) except OSError as err: # The 'lseek' system call returns the ENXIO if there is no data or # hole starting from the specified offset. if err.errno == os.errno.ENXIO: return -1 elif err.errno == os.errno.EINVAL: raise ErrorNotSupp("the kernel or file-system does not support " "\"SEEK_HOLE\" and \"SEEK_DATA\"") else: raise
def test_fs_holes(self): # Even if the filesystem doesn't report holes, # if the OS supports it the SEEK_* constants # will be defined and will have a consistent # behaviour: # os.SEEK_DATA = current position # os.SEEK_HOLE = end of file position with open(support.TESTFN, 'r+b') as fp: fp.write(b"hello") fp.flush() size = fp.tell() fno = fp.fileno() try : for i in range(size): self.assertEqual(i, os.lseek(fno, i, os.SEEK_DATA)) self.assertLessEqual(size, os.lseek(fno, i, os.SEEK_HOLE)) self.assertRaises(OSError, os.lseek, fno, size, os.SEEK_DATA) self.assertRaises(OSError, os.lseek, fno, size, os.SEEK_HOLE) except OSError : # Some OSs claim to support SEEK_HOLE/SEEK_DATA # but it is not true. # For instance: # http://lists.freebsd.org/pipermail/freebsd-amd64/2012-January/014332.html raise unittest.SkipTest("OSError raised!")
def is_sparse(path): # LP: #1656371 - Looking at stat().st_blocks won't work on ZFS file # systems, since that seems to return 1, whereas on EXT4 it returns 0. # Rather than hard code the value based on file system type (which could # be different even on other file systems), a more reliable way seems to # be to use SEEK_DATA with an offset of 0 to find the first block of data # after position 0. If there is no data, an ENXIO will get raised, at # least on any modern Linux kernels we care about. See lseek(2) for # details. with open(path, 'r') as fp: try: os.lseek(fp.fileno(), 0, os.SEEK_DATA) except OSError as error: # There is no OSError subclass for ENXIO. if error.errno != errno.ENXIO: raise # The expected exception occurred, meaning, there is no data in # the file, so it's entirely sparse. return True # The expected exception did not occur, so there is data in the file. return False
def test_writev(self): fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT) try: n = os.writev(fd, (b'test1', b'tt2', b't3')) self.assertEqual(n, 10) os.lseek(fd, 0, os.SEEK_SET) self.assertEqual(b'test1tt2t3', posix.read(fd, 10)) # Issue #20113: empty list of buffers should not crash try: size = posix.writev(fd, []) except OSError: # writev(fd, []) raises OSError(22, "Invalid argument") # on OpenIndiana pass else: self.assertEqual(size, 0) finally: os.close(fd)
def test_readv(self): fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT) try: os.write(fd, b'test1tt2t3') os.lseek(fd, 0, os.SEEK_SET) buf = [bytearray(i) for i in [5, 3, 2]] self.assertEqual(posix.readv(fd, buf), 10) self.assertEqual([b'test1', b'tt2', b't3'], [bytes(i) for i in buf]) # Issue #20113: empty list of buffers should not crash try: size = posix.readv(fd, []) except OSError: # readv(fd, []) raises OSError(22, "Invalid argument") # on OpenIndiana pass else: self.assertEqual(size, 0) finally: os.close(fd)
def write_data(self, fd, offset=0, size=None, pattern=None): """Write data to the file given by the file descriptor fd: File descriptor offset: File offset where data will be written to [default: 0] size: Total number of bytes to write [default: --filesize option] pattern: Data pattern to write to the file [default: data_pattern default] """ if size is None: size = self.filesize while size > 0: # Write as much as wsize bytes per write call dsize = min(self.wsize, size) os.lseek(fd, offset, 0) count = os.write(fd, self.data_pattern(offset, dsize, pattern)) size -= count offset += count
def _copy_stream(self, fd, url, offset): """Copies remote file to local. :param fd: the file`s descriptor :param url: the remote file`s url :param offset: the number of bytes from the beginning, that will be skipped :return: the count of actually copied bytes """ source = self.open_stream(url, offset) os.ftruncate(fd, offset) os.lseek(fd, offset, os.SEEK_SET) chunk_size = 16 * 1024 size = 0 while 1: chunk = source.read(chunk_size) if not chunk: break os.write(fd, chunk) size += len(chunk) return size
def read(self, path, length, offset, fh): if debug: print "read: " + path print "offset: " print offset print "length: " print length print fh full_path = self._full_path(path) print full_path #os.lseek(fh, offset, os.SEEK_SET) #if os.path.isfile(full_path) == False: import config as config account_name = config.STORAGE_ACCOUNT_NAME account_key = config.STORAGE_ACCOUNT_KEY containername = path.split('/')[1] filename = path.split('/')[2] service = baseblobservice.BaseBlobService(account_name, account_key) blob = service.get_blob_to_bytes(containername, filename, None, offset, offset+length-1) #blob = blob[offset:(offset+length)] bytes = blob.content return bytes """try: if os.path.isdir(path.split('/')[1]) == False: os.mkdir(full_path.split('/')[0]+'/'+containername) if os.path.isfile(full_path) == False: print "read block blob" block_blob_service.get_blob_to_path(containername, filename, full_path) else: os.remove(full_path) block_blob_service.get_blob_to_path(containername, filename, full_path) except: pass fhn = os.open(full_path, 32768) os.lseek(fhn, offset, os.SEEK_SET) #print "os.read(fh, length)" #print os.read(fh, length) return os.read(fhn, length)"""
def write(self, path, buf, offset, fh): if debug: print "write: " + path os.lseek(fh, offset, os.SEEK_SET) return os.write(fh, buf)
def try_seek(fd, offset): try: if offset is None: os.lseek(fd, 0, os.SEEK_END) elif offset >= 0: os.lseek(fd, offset, os.SEEK_SET) else: os.lseek(fd, offset, os.SEEK_END) except OSError as ose: if ose.args[0] != errno.ESPIPE: raise
def readChunk(self, offset, length): return self.server.avatar._runAsUser([ (os.lseek, (self.fd, offset, 0)), (os.read, (self.fd, length)) ])
def writeChunk(self, offset, data): return self.server.avatar._runAsUser([(os.lseek, (self.fd, offset, 0)), (os.write, (self.fd, data))])
def get_volume(self, id): """ return volume information if the argument is an id or a path """ # If the id is actually a path if exists(id): with open(id) as file: size = os.lseek(file.fileno(), 0, os.SEEK_END) return {'path': id, 'size': size} return self.volume.get(id)
def read(self, path, length, offset, fh): os.lseek(fh, offset, os.SEEK_SET) return os.read(fh, length)
def write(self, path, buf, offset, fh): os.lseek(fh, offset, os.SEEK_SET) return os.write(fh, buf)
def smbComWrite(connId, smbServer, SMBCommand, recvPacket): connData = smbServer.getConnectionData(connId) respSMBCommand = smb.SMBCommand(smb.SMB.SMB_COM_WRITE) respParameters = smb.SMBWriteResponse_Parameters() respData = '' comWriteParameters = smb.SMBWrite_Parameters(SMBCommand['Parameters']) comWriteData = smb.SMBWrite_Data(SMBCommand['Data']) if connData['OpenedFiles'].has_key(comWriteParameters['Fid']): fileHandle = connData['OpenedFiles'][comWriteParameters['Fid']]['FileHandle'] errorCode = STATUS_SUCCESS try: if fileHandle != PIPE_FILE_DESCRIPTOR: # TODO: Handle big size files # If we're trying to write past the file end we just skip the write call (Vista does this) if os.lseek(fileHandle, 0, 2) >= comWriteParameters['Offset']: os.lseek(fileHandle,comWriteParameters['Offset'],0) os.write(fileHandle,comWriteData['Data']) else: sock = connData['OpenedFiles'][comWriteParameters['Fid']]['Socket'] sock.send(comWriteData['Data']) respParameters['Count'] = comWriteParameters['Count'] except Exception, e: smbServer.log('smbComWrite: %s' % e, logging.ERROR) errorCode = STATUS_ACCESS_DENIED else: errorCode = STATUS_INVALID_HANDLE if errorCode > 0: respParameters = '' respData = '' respSMBCommand['Parameters'] = respParameters respSMBCommand['Data'] = respData smbServer.setConnectionData(connId, connData) return [respSMBCommand], None, errorCode
def smbComRead(connId, smbServer, SMBCommand, recvPacket): connData = smbServer.getConnectionData(connId) respSMBCommand = smb.SMBCommand(smb.SMB.SMB_COM_READ) respParameters = smb.SMBReadResponse_Parameters() respData = smb.SMBReadResponse_Data() comReadParameters = smb.SMBRead_Parameters(SMBCommand['Parameters']) if connData['OpenedFiles'].has_key(comReadParameters['Fid']): fileHandle = connData['OpenedFiles'][comReadParameters['Fid']]['FileHandle'] errorCode = STATUS_SUCCESS try: if fileHandle != PIPE_FILE_DESCRIPTOR: # TODO: Handle big size files os.lseek(fileHandle,comReadParameters['Offset'],0) content = os.read(fileHandle,comReadParameters['Count']) else: sock = connData['OpenedFiles'][comReadParameters['Fid']]['Socket'] content = sock.recv(comReadParameters['Count']) respParameters['Count'] = len(content) respData['DataLength'] = len(content) respData['Data'] = content except Exception, e: smbServer.log('smbComRead: %s ' % e, logging.ERROR) errorCode = STATUS_ACCESS_DENIED else: errorCode = STATUS_INVALID_HANDLE if errorCode > 0: respParameters = '' respData = '' respSMBCommand['Parameters'] = respParameters respSMBCommand['Data'] = respData smbServer.setConnectionData(connId, connData) return [respSMBCommand], None, errorCode
def smb2Read(connId, smbServer, recvPacket): connData = smbServer.getConnectionData(connId) respSMBCommand = smb2.SMB2Read_Response() readRequest = smb2.SMB2Read(recvPacket['Data']) respSMBCommand['Buffer'] = '\x00' if str(readRequest['FileID']) == '\xff'*16: # Let's take the data from the lastRequest if connData['LastRequest'].has_key('SMB2_CREATE'): fileID = connData['LastRequest']['SMB2_CREATE']['FileID'] else: fileID = str(readRequest['FileID']) else: fileID = str(readRequest['FileID']) if connData['OpenedFiles'].has_key(fileID): fileHandle = connData['OpenedFiles'][fileID]['FileHandle'] errorCode = 0 try: if fileHandle != PIPE_FILE_DESCRIPTOR: offset = readRequest['Offset'] os.lseek(fileHandle,offset,0) content = os.read(fileHandle,readRequest['Length']) else: sock = connData['OpenedFiles'][fileID]['Socket'] content = sock.recv(readRequest['Length']) respSMBCommand['DataOffset'] = 0x50 respSMBCommand['DataLength'] = len(content) respSMBCommand['DataRemaining']= 0 respSMBCommand['Buffer'] = content except Exception, e: smbServer.log('SMB2_READ: %s ' % e, logging.ERROR) errorCode = STATUS_ACCESS_DENIED else: errorCode = STATUS_INVALID_HANDLE smbServer.setConnectionData(connId, connData) return [respSMBCommand], None, errorCode
def seekable(self): if self._seekable is None: try: os.lseek(self._fileno, 0, os.SEEK_CUR) except OSError: self._seekable = False else: self._seekable = True return self._seekable
def seek(self, offset, whence=0): return os.lseek(self._fileno, offset, whence)
def _probe_seek_hole(self): """ Check whether the system implements 'SEEK_HOLE' and 'SEEK_DATA'. Unfortunately, there seems to be no clean way for detecting this, because often the system just fakes them by just assuming that all files are fully mapped, so 'SEEK_HOLE' always returns EOF and 'SEEK_DATA' always returns the requested offset. I could not invent a better way of detecting the fake 'SEEK_HOLE' implementation than just to create a temporary file in the same directory where the image file resides. It would be nice to change this to something better. """ directory = os.path.dirname(self._image_path) try: tmp_obj = tempfile.TemporaryFile("w+", dir=directory) except IOError as err: raise ErrorNotSupp("cannot create a temporary in \"%s\": %s" % (directory, err)) try: os.ftruncate(tmp_obj.fileno(), self.block_size) except OSError as err: raise ErrorNotSupp("cannot truncate temporary file in \"%s\": %s" % (directory, err)) offs = _lseek(tmp_obj, 0, _SEEK_HOLE) if offs != 0: # We are dealing with the stub 'SEEK_HOLE' implementation which # always returns EOF. self._log.debug("lseek(0, SEEK_HOLE) returned %d" % offs) raise ErrorNotSupp("the file-system does not support " "\"SEEK_HOLE\" and \"SEEK_DATA\" but only " "provides a stub implementation") tmp_obj.close()
def test_read(self): with open(support.TESTFN, "w+b") as fobj: fobj.write(b"spam") fobj.flush() fd = fobj.fileno() os.lseek(fd, 0, 0) s = os.read(fd, 4) self.assertEqual(type(s), bytes) self.assertEqual(s, b"spam")
def test_lseek(self): if hasattr(os, "lseek"): self.check(os.lseek, 0, 0)
def test_stdin_filedes(self): # stdin is set to open file descriptor tf = tempfile.TemporaryFile() self.addCleanup(tf.close) d = tf.fileno() os.write(d, b"pear") os.lseek(d, 0, 0) p = subprocess.Popen([sys.executable, "-c", 'import sys; sys.exit(sys.stdin.read() == "pear")'], stdin=d) p.wait() self.assertEqual(p.returncode, 1)
def test_stdout_filedes(self): # stdout is set to open file descriptor tf = tempfile.TemporaryFile() self.addCleanup(tf.close) d = tf.fileno() p = subprocess.Popen([sys.executable, "-c", 'import sys; sys.stdout.write("orange")'], stdout=d) p.wait() os.lseek(d, 0, 0) self.assertEqual(os.read(d, 1024), b"orange")
def test_stderr_filedes(self): # stderr is set to open file descriptor tf = tempfile.TemporaryFile() self.addCleanup(tf.close) d = tf.fileno() p = subprocess.Popen([sys.executable, "-c", 'import sys; sys.stderr.write("strawberry")'], stderr=d) p.wait() os.lseek(d, 0, 0) self.assertStderrEqual(os.read(d, 1024), b"strawberry")
def test_textmode(self): # _mkstemp_inner can create files in text mode if not has_textmode: return # ugh, can't use SkipTest. # A text file is truncated at the first Ctrl+Z byte f = self.do_create(bin=0) f.write(b"blat\x1a") f.write(b"extra\n") os.lseek(f.fd, 0, os.SEEK_SET) self.assertEqual(os.read(f.fd, 20), b"blat")
def test_lseek(self): if verbose: print('play around with os.lseek() with the built largefile') with self.open(TESTFN, 'rb') as f: self.assertEqual(os.lseek(f.fileno(), 0, 0), 0) self.assertEqual(os.lseek(f.fileno(), 42, 0), 42) self.assertEqual(os.lseek(f.fileno(), 42, 1), 84) self.assertEqual(os.lseek(f.fileno(), 0, 1), 84) self.assertEqual(os.lseek(f.fileno(), 0, 2), size+1+0) self.assertEqual(os.lseek(f.fileno(), -10, 2), size+1-10) self.assertEqual(os.lseek(f.fileno(), -size-1, 2), 0) self.assertEqual(os.lseek(f.fileno(), size, 0), size) # the 'a' that was written at the end of file above self.assertEqual(f.read(1), b'a')
def _write(self, piece): pos = piece.index * self.torrent.info.piece_length os.lseek(self.fd, pos, os.SEEK_SET) os.write(self.fd, piece.data)
def read(self, begin, index, length): pos = index * self.torrent.info.piece_length os.lseek(self.fd, pos, os.SEEK_SET) return os.read(self.fd, length)
def test_lseek(self): self.check(os.lseek, 0, 0)
def test_stdin_filedes(self): # stdin is set to open file descriptor tf = tempfile.TemporaryFile() d = tf.fileno() os.write(d, "pear") os.lseek(d, 0, 0) p = subprocess.Popen([sys.executable, "-c", 'import sys; sys.exit(sys.stdin.read() == "pear")'], stdin=d) p.wait() self.assertEqual(p.returncode, 1)
def test_stdout_filedes(self): # stdout is set to open file descriptor tf = tempfile.TemporaryFile() d = tf.fileno() p = subprocess.Popen([sys.executable, "-c", 'import sys; sys.stdout.write("orange")'], stdout=d) p.wait() os.lseek(d, 0, 0) self.assertEqual(os.read(d, 1024), "orange")
def test_stderr_filedes(self): # stderr is set to open file descriptor tf = tempfile.TemporaryFile() d = tf.fileno() p = subprocess.Popen([sys.executable, "-c", 'import sys; sys.stderr.write("strawberry")'], stderr=d) p.wait() os.lseek(d, 0, 0) self.assertStderrEqual(os.read(d, 1024), "strawberry")
def __init__(self, buf = None, pos = 0, filename = None, fp = None): if fp is not None: try: fileno = fp.fileno() length = os.path.getsize(fp.name) import mmap except: # read whole file into memory buf = fp.read() pos = 0 else: # map the whole file into memory if length: # length must not be zero buf = mmap.mmap(fileno, length, access = mmap.ACCESS_READ) pos = os.lseek(fileno, 0, 1) else: buf = '' pos = 0 if filename is None: try: filename = fp.name except AttributeError: filename = None self.buf = buf self.pos = pos self.line = 1 self.col = 1 self.filename = filename
def _save_chunk(self, n, data): if self._part_fd < 0: if not self._open_files(): return False assert self._part_fd >= 0 offs = self.chunk_size * n os.lseek(self._part_fd, offs, os.SEEK_SET) os.write(self._part_fd, data) self._segments_file.write(self._segments) return True