我们从Python开源项目中,提取了以下19个代码示例,用于说明如何使用os.truncate()。
def test_does_not_fit(self): # The contents of a structure is too large for the image size. workdir = self._resources.enter_context(TemporaryDirectory()) # See LP: #1666580 main(('snap', '--workdir', workdir, '--thru', 'load_gadget_yaml', self.model_assertion)) # Make the gadget's mbr contents too big. path = os.path.join(workdir, 'unpack', 'gadget', 'pc-boot.img') os.truncate(path, 512) mock = self._resources.enter_context(patch( 'ubuntu_image.__main__._logger.error')) code = main(('snap', '--workdir', workdir, '--resume')) self.assertEqual(code, 1) self.assertEqual( mock.call_args_list[-1], call('Volume contents do not fit (72B over): ' 'volumes:<pc>:structure:<mbr> [#0]'))
def test_truncate_fixed(sub_open): fname = join(sub_open, 'new_file') f = open(fname, 'w') f.write('hello\n') f.flush() ff = open(fname, 'a') ff.truncate(1) ff.close() ff = open(fname, 'r') assert ff.read() == 'h' ff.close() f.close() os.truncate(fname, 0) f = open(fname, 'r') assert f.read() == '' f.close()
def test_ftruncate(self): if hasattr(os, "ftruncate"): self.check(os.truncate, 0) self.check(os.ftruncate, 0)
def create_internal_disk(self): """Create a internal-image*.wic in the resultdir that runqemu can use.""" copyfile(os.path.join(self.image_dir, 'refkit-installer-image-%s.qemuboot.conf' % self.image_arch), os.path.join(self.resultdir, 'internal-image-%s.qemuboot.conf' % self.image_arch)) for ovmf in glob('%s/ovmf*' % self.image_dir): os.symlink(ovmf, os.path.join(self.resultdir, os.path.basename(ovmf))) with open(os.path.join(self.resultdir, 'internal-image-%s.wic' % self.image_arch), 'w') as f: # empty, sparse file of 8GB os.truncate(f.fileno(), 8 * 1024 * 1024 * 1024)
def get_default_sector_size(): with NamedTemporaryFile() as fp: # Truncate to zero, so that extending the size in the next call # will cause all the bytes to read as zero. Stevens $4.13 os.truncate(fp.name, 0) os.truncate(fp.name, MiB(1)) return Device(fp.name).sectorSize
def __init__(self, path, size, schema=None): """Initialize an image file to a given size in bytes. :param path: Path to image file on the file system. :type path: str :param size: Size in bytes to set the image file to. :type size: int :param schema: The partitioning schema of the volume. :type schema: VolumeSchema Public attributes: * path - Path to the image file. """ self.path = path # Create an empty image file of a fixed size. Unlike # truncate(1) --size 0, os.truncate(path, 0) doesn't touch the # file; i.e. it must already exist. with open(path, 'wb'): pass # Truncate to zero, so that extending the size in the next call # will cause all the bytes to read as zero. Stevens $4.13 os.truncate(path, 0) os.truncate(path, size) # Prepare the device and disk objects for parted to be used for all # future partition() calls. Only do it if we actually care about the # partition table. if schema is None: self.sector_size = 512 self.device = None self.disk = None self.schema = None else: self.device = parted.Device(self.path) label = 'msdos' if schema is VolumeSchema.mbr else 'gpt' self.schema = schema self.disk = parted.freshDisk(self.device, label) self.sector_size = self.device.sectorSize
def test_sparse_copy(self): with ExitStack() as resources: tmpdir = resources.enter_context(TemporaryDirectory()) sparse_file = os.path.join(tmpdir, 'sparse.dat') fp = resources.enter_context(open(sparse_file, 'w')) os.truncate(fp.fileno(), 1000000) # This file is sparse. self.assertTrue(is_sparse(sparse_file)) copied_file = os.path.join(tmpdir, 'copied.dat') sparse_copy(sparse_file, copied_file) self.assertTrue(is_sparse(copied_file))
def test_copy_symlink(self): with ExitStack() as resources: tmpdir = resources.enter_context(TemporaryDirectory()) sparse_file = os.path.join(tmpdir, 'sparse.dat') fp = resources.enter_context(open(sparse_file, 'w')) os.truncate(fp.fileno(), 1000000) # This file is sparse. self.assertTrue(is_sparse(sparse_file)) # Create a symlink to the sparse file. linked_file = os.path.join(tmpdir, 'linked.dat') os.symlink(sparse_file, linked_file) self.assertTrue(os.path.islink(linked_file)) copied_link = os.path.join(tmpdir, 'copied.dat') sparse_copy(linked_file, copied_link, follow_symlinks=False) self.assertTrue(os.path.islink(copied_link))
def test_ftruncate(self): self.check(os.truncate, 0) self.check(os.ftruncate, 0)
def _open_log_file(): """Open a Porcupine log file. Usually this opens and overwrites log.txt. If another Porcupine process has it currently opened, this opens log1.txt instead, then log2.txt and so on. """ # create an iterator 'log.txt', 'log2.txt', 'log3.txt', ... filenames = itertools.chain( ['log.txt'], map('log{}.txt'.format, itertools.count(start=2)), ) for filename in filenames: path = os.path.join(dirs.cachedir, filename) # unfortunately there's not a mode that would open in write but # not truncate like 'w' or seek to end like 'a' fileno = os.open(path, os.O_WRONLY | os.O_CREAT, 0o644) if _lock(fileno): # now we can delete the old content, can't use os.truncate # here because it doesn't exist on windows file = open(fileno, 'w') file.truncate(0) return file else: os.close(fileno) # FileHandler doesn't take already opened files and StreamHandler # doesn't close the file :(
def truncate(self, length): raise FuseOSError(EPERM)
def truncate(self, length): self.data = self.get_data() if length == 0: self.data = bytes('', 'utf8') elif length <= len(self.data): self.data = self.data[:length] else: self.data = self.data + bytes( '\0' * (length - self.stat['st_size']), 'utf8' ) self.overwrite = True
def truncate(self, length): if self._handle is None: os.truncate(self.full_path, length) else: self._handle.seek(0) self._handle.truncate(length) self._handle.flush()
def truncate(self, length): old_data = self.data if length == 0: self.data = bytes('', 'utf8') elif length <= len(old_data): self.data = self.data[:length] else: self.data = old_data + bytes( '\0' * (length - len(old_data)), 'utf8' ) self.stat['st_atime'] = time() self.stat['st_mtime'] = time() self.dirty = True
def truncate(self, path, length, fh=None): with self._lock: if length < 0: # pragma: no cover raise FuseOSError(EINVAL) if fh is None: file = self.get_file(path, expect_type=SingleFile) else: # pragma: no cover file = self._open_files[fh] if self.fixed and not isinstance(file, (TempFile, SpecialFile)): raise FuseOSError(EPERM) file.truncate(length)
def _consistent(self, cached): try: all_fh = [ ] for vlist in cached.open_handle.values(): all_fh += vlist for v_fh in all_fh: assert (None, v_fh) in self._shelfcache, 'Inconsistent list members' except Exception as e: self.logger.error('Shadow cache is corrupt: %s' % str(e)) if self.verbose > 3: set_trace() raise TmfsOSError(errno.EBADFD) # Most uses send an open handle fh (integer) as key. truncate by name # is the exception. An update needs to be reflected for all keys.
def truncate(self, shelf, length, fh): if fh is not None: # This is an update, but there's no good way to flag that to # __setitem__. Do an idiot check here. assert (None, fh) in self, 'VFS thinks %s is open but LFS does not, shelf_id: %s' % ( shelf.name, shelf.id) self[(shelf.id, None)] = shelf return 0
def truncate(self, shelf, length, fd): # shadow_dir, yes an fd try: if fd: # It's an open shelf assert self[shelf.name]._fd == fd, 'fd mismatch on truncate' os.truncate( shelf._fd if shelf._fd >= 0 else self.shadowpath(shelf.name), length) shelf.size_bytes = length if shelf.open_handle is None: shelf = self[shelf.id] if shelf is not None: shelf.size_bytes = length return 0 except OSError as e: raise TmfsOSError(e.errno)