我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.ftruncate()。
def wipe(self): filter_bitmap_fd = os.open("/dev/shm/kafl_filter0", os.O_RDWR | os.O_SYNC | os.O_CREAT) os.ftruncate(filter_bitmap_fd, self.config.config_values['BITMAP_SHM_SIZE']) filter_bitmap = mmap.mmap(filter_bitmap_fd, self.config.config_values['BITMAP_SHM_SIZE'], mmap.MAP_SHARED, mmap.PROT_WRITE | mmap.PROT_READ) for i in range(self.config.config_values['BITMAP_SHM_SIZE']): filter_bitmap[i] = '\x00' filter_bitmap.close() os.close(filter_bitmap_fd) filter_bitmap_fd = os.open("/dev/shm/kafl_tfilter", os.O_RDWR | os.O_SYNC | os.O_CREAT) os.ftruncate(filter_bitmap_fd, 0x1000000) filter_bitmap = mmap.mmap(filter_bitmap_fd, 0x1000000, mmap.MAP_SHARED, mmap.PROT_WRITE | mmap.PROT_READ) for i in range(0x1000000): filter_bitmap[i] = '\x00' filter_bitmap.close() os.close(filter_bitmap_fd)
def __set_binary(self, filename, binaryfile, max_size): shm_fd = os.open(filename, os.O_RDWR | os.O_SYNC | os.O_CREAT) os.ftruncate(shm_fd, max_size) shm = mmap.mmap(shm_fd, max_size, mmap.MAP_SHARED, mmap.PROT_WRITE | mmap.PROT_READ) shm.seek(0x0) shm.write('\x00' * max_size) shm.seek(0x0) f = open(binaryfile, "rb") bytes = f.read(1024) if bytes: shm.write(bytes) while bytes != "": bytes = f.read(1024) if bytes: shm.write(bytes) f.close() shm.close() os.close(shm_fd)
def init(self): self.control = socket.socket(socket.AF_UNIX) while True: try: self.control.connect(self.control_filename) #self.control.connect(self.control_filename) break except socket_error: pass #time.sleep(0.01) self.kafl_shm_f = os.open(self.bitmap_filename, os.O_RDWR | os.O_SYNC | os.O_CREAT) self.fs_shm_f = os.open(self.payload_filename, os.O_RDWR | os.O_SYNC | os.O_CREAT) #argv_fd = os.open(self.argv_filename, os.O_RDWR | os.O_SYNC | os.O_CREAT) os.ftruncate(self.kafl_shm_f, self.bitmap_size) os.ftruncate(self.fs_shm_f, (128 << 10)) #os.ftruncate(argv_fd, (4 << 10)) self.kafl_shm = mmap.mmap(self.kafl_shm_f, self.bitmap_size, mmap.MAP_SHARED, mmap.PROT_WRITE | mmap.PROT_READ) self.fs_shm = mmap.mmap(self.fs_shm_f, (128 << 10), mmap.MAP_SHARED, mmap.PROT_WRITE | mmap.PROT_READ) return True
def prepare_rootfs_btrfs(self, rootfs, oe_builddir, rootfs_dir): """ Prepare content for a btrfs rootfs partition. Currently handles ext2/3/4 and btrfs. """ du_cmd = "sudo du -ks %s" % rootfs_dir out = exec_cmd(du_cmd) actual_rootfs_size = int(out.split()[0]) rootfs_size = self.get_rootfs_size(actual_rootfs_size) with open(rootfs, 'w') as sparse: os.ftruncate(sparse.fileno(), rootfs_size * 1024) label_str = "" if self.label: label_str = "-L %s" % self.label mkfs_cmd = "sudo mkfs.%s -b %d -r %s %s %s" % \ (self.fstype, rootfs_size * 1024, rootfs_dir, label_str, rootfs) exec_cmd(mkfs_cmd)
def prepare_empty_partition_ext(self, rootfs, oe_builddir): """ Prepare an empty ext2/3/4 partition. """ size = self.disk_size with open(rootfs, 'w') as sparse: os.ftruncate(sparse.fileno(), size * 1024) extra_imagecmd = "-i 8192" label_str = "" if self.label: label_str = "-L %s" % self.label mkfs_cmd = "sudo mkfs.%s -F %s %s %s" % \ (self.fstype, extra_imagecmd, label_str, rootfs) exec_cmd(mkfs_cmd)
def _copy_stream(self, fd, url, offset): """Copies remote file to local. :param fd: the file`s descriptor :param url: the remote file`s url :param offset: the number of bytes from the beginning, that will be skipped :return: the count of actually copied bytes """ source = self.open_stream(url, offset) os.ftruncate(fd, offset) os.lseek(fd, offset, os.SEEK_SET) chunk_size = 16 * 1024 size = 0 while 1: chunk = source.read(chunk_size) if not chunk: break os.write(fd, chunk) size += len(chunk) return size
def __init__(self, transfer_size): fd, self.filename = tempfile.mkstemp() os.ftruncate(fd, 20) self.buf = mmap.mmap(fd, 20, mmap.MAP_SHARED, mmap.PROT_WRITE) os.close(fd) self.total_bytes = ctypes.c_uint64.from_buffer(self.buf) self.total_bytes.value = 0 self.average_time = ctypes.c_double.from_buffer(self.buf, 8) self.average_time.value = 0.0 self.transfer_size = ctypes.c_uint32.from_buffer(self.buf, 16) self.transfer_size.value = transfer_size
def create_shm(self): for j in range(len(self.files)): for i in range(self.num_processes): shm_f = os.open(self.files[j]+str(i), os.O_CREAT | os.O_RDWR | os.O_SYNC) os.ftruncate(shm_f, self.sizes[j]*self.tasks_per_requests) os.close(shm_f)
def open_global_bitmap(self): self.global_bitmap_fd = os.open(self.config.argument_values['work_dir'] + "/bitmap", os.O_RDWR | os.O_SYNC | os.O_CREAT) os.ftruncate(self.global_bitmap_fd, self.bitmap_size) self.global_bitmap = mmap.mmap(self.global_bitmap_fd, self.bitmap_size, mmap.MAP_SHARED, mmap.PROT_WRITE | mmap.PROT_READ)
def write_pid_file(pid_file, pid): import fcntl import stat try: fd = os.open(pid_file, os.O_RDWR | os.O_CREAT, stat.S_IRUSR | stat.S_IWUSR) except OSError as e: shell.print_exception(e) return -1 flags = fcntl.fcntl(fd, fcntl.F_GETFD) assert flags != -1 flags |= fcntl.FD_CLOEXEC r = fcntl.fcntl(fd, fcntl.F_SETFD, flags) assert r != -1 # There is no platform independent way to implement fcntl(fd, F_SETLK, &fl) # via fcntl.fcntl. So use lockf instead try: fcntl.lockf(fd, fcntl.LOCK_EX | fcntl.LOCK_NB, 0, 0, os.SEEK_SET) except IOError: r = os.read(fd, 32) if r: logging.error('already started at pid %s' % common.to_str(r)) else: logging.error('already started') os.close(fd) return -1 os.ftruncate(fd, 0) os.write(fd, common.to_bytes(str(pid))) return 0
def mmap(self, data): url = "mmap://{0}?offset=0&length={1}".format( self.temp.name, len(data)) os.ftruncate(self.temp.fileno(), 4096) mm = mmap(self.temp.fileno(), 4096) mm.write(data) mm.close() return url
def write_pid_file(pid_file, pid): try: fd = os.open(pid_file, os.O_RDWR | os.O_CREAT, stat.S_IRUSR | stat.S_IWUSR) except OSError as e: LOG.exception(e) return -1 flags = fcntl.fcntl(fd, fcntl.F_GETFD) assert flags != -1 flags |= fcntl.FD_CLOEXEC r = fcntl.fcntl(fd, fcntl.F_SETFD, flags) assert r != -1 # There is no platform independent way to implement fcntl(fd, F_SETLK, &fl) # via fcntl.fcntl. So use lockf instead try: fcntl.lockf(fd, fcntl.LOCK_EX | fcntl.LOCK_NB, 0, 0, os.SEEK_SET) except IOError: r = os.read(fd, 32) if r: logging.error('already started at pid %s' % utils.to_str(r)) else: logging.error('already started') os.close(fd) return -1 os.ftruncate(fd, 0) os.write(fd, utils.to_bytes(str(pid))) return 0
def _probe_seek_hole(self): """ Check whether the system implements 'SEEK_HOLE' and 'SEEK_DATA'. Unfortunately, there seems to be no clean way for detecting this, because often the system just fakes them by just assuming that all files are fully mapped, so 'SEEK_HOLE' always returns EOF and 'SEEK_DATA' always returns the requested offset. I could not invent a better way of detecting the fake 'SEEK_HOLE' implementation than just to create a temporary file in the same directory where the image file resides. It would be nice to change this to something better. """ directory = os.path.dirname(self._image_path) try: tmp_obj = tempfile.TemporaryFile("w+", dir=directory) except IOError as err: raise ErrorNotSupp("cannot create a temporary in \"%s\": %s" % (directory, err)) try: os.ftruncate(tmp_obj.fileno(), self.block_size) except OSError as err: raise ErrorNotSupp("cannot truncate temporary file in \"%s\": %s" % (directory, err)) offs = _lseek(tmp_obj, 0, _SEEK_HOLE) if offs != 0: # We are dealing with the stub 'SEEK_HOLE' implementation which # always returns EOF. self._log.debug("lseek(0, SEEK_HOLE) returned %d" % offs) raise ErrorNotSupp("the file-system does not support " "\"SEEK_HOLE\" and \"SEEK_DATA\" but only " "provides a stub implementation") tmp_obj.close()
def prepare_rootfs_ext(self, rootfs, oe_builddir, rootfs_dir): """ Prepare content for an ext2/3/4 rootfs partition. """ du_cmd = "sudo du -ks %s" % rootfs_dir out = exec_cmd(du_cmd) actual_rootfs_size = int(out.split()[0]) rootfs_size = self.get_rootfs_size(actual_rootfs_size) with open(rootfs, 'w') as sparse: os.ftruncate(sparse.fileno(), rootfs_size * 1024) extra_imagecmd = "-i 8192" label_str = "" if self.label: label_str = "-L %s" % self.label mkfs_cmd = "sudo mkfs.%s -F %s %s %s -d %s" % \ (self.fstype, extra_imagecmd, rootfs, label_str, rootfs_dir) exec_cmd(mkfs_cmd) try: mkfs_cmd = "fsck.%s -pvfD %s" % (self.fstype, rootfs) exec_cmd(mkfs_cmd) except WicExecError as e: if e.returncode != 1: raise e
def prepare_empty_partition_btrfs(self, rootfs, oe_builddir): """ Prepare an empty btrfs partition. """ size = self.disk_size with open(rootfs, 'w') as sparse: os.ftruncate(sparse.fileno(), size * 1024) label_str = "" if self.label: label_str = "-L %s" % self.label mkfs_cmd = "sudo mkfs.%s -b %d %s %s" % \ (self.fstype, self.size * 1024, label_str, rootfs) exec_cmd(mkfs_cmd)
def test_ftruncate(self): if hasattr(os, "ftruncate"): self.check(os.ftruncate, 0)
def test_ftruncate(self): self.check(os.ftruncate, 0)
def __init__(self, size): xdg_runtime_dir = os.getenv('XDG_RUNTIME_DIR') if not xdg_runtime_dir: raise NoXDGRuntimeDir() self._fd, name = tempfile.mkstemp(dir=xdg_runtime_dir) os.ftruncate(self._fd, size)
def test_ftruncate(self): if hasattr(os, "ftruncate"): self.check(os.truncate, 0) self.check(os.ftruncate, 0)
def truncate(self, size=-1): self.flush() if size == -1: size = self.tell() try: rv = os.ftruncate(self.fileno(), size) except OSError as e: raise IOError(*e.args) else: self.seek(size) # move position&clear buffer return rv
def truncate(self, size=-1): if size == -1: size = self.tell() try: rv = _original_os.ftruncate(self._fileno, size) except OSError as e: raise IOError(*e.args) else: self.seek(size) # move position&clear buffer return rv
def write_pid_file(pid_file, pid): """ Use the pid file to govern that the daemon is only running one instance. Open the pid file and set the close-on-exec flag firstly. Then try to acquire the exclusive lock of the pid file: If success, return 0 to start the daemon process. else, there already is a daemon process running, return -1. """ import fcntl import stat # https://github.com/xuelangZF/AnnotatedShadowSocks/issues/23 try: fd = os.open(pid_file, os.O_RDWR | os.O_CREAT, stat.S_IRUSR | stat.S_IWUSR) except OSError as e: logging.error(e) return -1 # https://github.com/xuelangZF/AnnotatedShadowSocks/issues/25 flags = fcntl.fcntl(fd, fcntl.F_GETFD) assert flags != -1 flags |= fcntl.FD_CLOEXEC r = fcntl.fcntl(fd, fcntl.F_SETFD, flags) assert r != -1 # https://github.com/xuelangZF/AnnotatedShadowSocks/issues/26 try: fcntl.lockf(fd, fcntl.LOCK_EX | fcntl.LOCK_NB, 0, 0, os.SEEK_SET) except IOError: r = os.read(fd, 32) if r: logging.error('already started at pid %s' % common.to_str(r)) else: logging.error('already started') os.close(fd) return -1 os.ftruncate(fd, 0) os.write(fd, common.to_bytes(str(pid))) return 0
def _probe_seek_hole(self): """ Check whether the system implements 'SEEK_HOLE' and 'SEEK_DATA'. Unfortunately, there seems to be no clean way for detecting this, because often the system just fakes them by just assuming that all files are fully mapped, so 'SEEK_HOLE' always returns EOF and 'SEEK_DATA' always returns the requested offset. I could not invent a better way of detecting the fake 'SEEK_HOLE' implementation than just to create a temporary file in the same directory where the image file resides. It would be nice to change this to something better. """ directory = os.path.dirname(self._image_path) try: tmp_obj = tempfile.TemporaryFile("w+", dir=directory) except IOError as err: raise ErrorNotSupp("cannot create a temporary in \"%s\": %s" % (directory, err)) try: os.ftruncate(tmp_obj.fileno(), self.block_size) except OSError as err: raise ErrorNotSupp("cannot truncate temporary file in \"%s\": %s" % (directory, err)) offs = _lseek(tmp_obj, 0, _SEEK_HOLE) if offs != 0: # We are dealing with the stub 'SEEK_HOLE' implementation which # always returns EOF. _log.debug("lseek(0, SEEK_HOLE) returned %d" % offs) raise ErrorNotSupp("the file-system does not support " "\"SEEK_HOLE\" and \"SEEK_DATA\" but only " "provides a stub implementation") tmp_obj.close()
def test_ftruncate(self): self.check(os.truncate, 0) self.check(os.ftruncate, 0)
def __init__(self, args, lfs_globals): super().__init__(args, lfs_globals) (head, tail) = os.path.split(args.shadow_file) assert os.path.isdir(head), 'No such directory %s' % head try: probe = tempfile.TemporaryFile(dir=head) probe.close() except OSError as e: raise RuntimeError('%s is not writeable' % head) if os.path.isfile(args.shadow_file): fd = os.open(args.shadow_file, os.O_RDWR) else: fd = os.open(args.shadow_file, os.O_RDWR | os.O_CREAT) size = lfs_globals['nvm_bytes_total'] os.ftruncate(fd, size) self._shadow_fd = fd # Compare node requirements to actual file size statinfo = os.stat(args.shadow_file) assert self._S_IFREG_URW == self._S_IFREG_URW & statinfo.st_mode, \ '%s is not RW' assert statinfo.st_size >= lfs_globals['nvm_bytes_total'] args.aperture_base = 0 args.aperture_size = statinfo.st_size args.addr_mode = self._MODE_FALLBACK super().__init__(args, lfs_globals) # open(), create(), release() only do caching as handled by superclass
def _create_files(self, filename): # XXX this is racy assert filename logger.debug('Opening files for ‘%s’', filename) mkdir_p(os.path.dirname(filename)) self._part_filename = '%s.part' % (filename, ) self._part_fd = os.open( self._part_filename, os.O_CREAT | os.O_WRONLY | os.O_NONBLOCK, 0o600) try: self._segments_file.lock() except IOError as e: # Clean up. os.close(self._part_fd) self._part_fd = -1 self._part_filename = None if e.errno == errno.EAGAIN: # Cannot acquire lock: some other process (or part of this # process) is already downloading it. Clean up and watch that # file for completion. logger.debug('File ‘%s.sgt’ is locked: waiting on completion.', filename) self._watch_for_completion(filename) return False else: raise # XXX hack return True # Reserve space for the full file and truncate any existing content to # the start of the final chunk (because it might be smaller than the # chunk size). size = self.chunk_size * (self._num_segments - 1) try: fallocate.fallocate(self._part_fd, 0, size) except IOError as e: # if it fails, we might get surprises later, but it's ok. logger.debug('Error calling fallocate(%u, 0, %u): %s' % (self._part_fd, self._size, e.message)) try: os.ftruncate(self._part_fd, size) except IOError as e: logger.debug('Error calling ftruncate(%u, %u): %s' % (self._part_fd, size, e.message)) return True