我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.O_CREAT。
def write_pid_to_pidfile(pidfile_path): """ Write the PID in the named PID file. Get the numeric process ID (“PID”) of the current process and write it to the named file as a line of text. """ open_flags = (os.O_CREAT | os.O_EXCL | os.O_WRONLY) open_mode = 0o644 pidfile_fd = os.open(pidfile_path, open_flags, open_mode) pidfile = os.fdopen(pidfile_fd, 'w') # According to the FHS 2.3 section on PID files in /var/run: # # The file must consist of the process identifier in # ASCII-encoded decimal, followed by a newline character. For # example, if crond was process number 25, /var/run/crond.pid # would contain three characters: two, five, and newline. pid = os.getpid() pidfile.write("%s\n" % pid) pidfile.close()
def writeKeyToFile(key, filename): """Write **key** to **filename**, with ``0400`` permissions. If **filename** doesn't exist, it will be created. If it does exist already, and is writable by the owner of the current process, then it will be truncated to zero-length and overwritten. :param bytes key: A key (or some other private data) to write to **filename**. :param str filename: The path of the file to write to. :raises: Any exceptions which may occur. """ logging.info("Writing key to file: %r", filename) flags = os.O_WRONLY | os.O_TRUNC | os.O_CREAT | getattr(os, "O_BIN", 0) fd = os.open(filename, flags, 0400) os.write(fd, key) os.fsync(fd) os.close(fd)
def do_magic(self): if OS_WIN: try: if os.path.exists(LOCK_PATH): os.unlink(LOCK_PATH) self.fh = os.open(LOCK_PATH, os.O_CREAT | os.O_EXCL | os.O_RDWR) except EnvironmentError as err: if err.errno == 13: self.is_running = True else: raise else: try: self.fh = open(LOCK_PATH, 'w') fcntl.lockf(self.fh, fcntl.LOCK_EX | fcntl.LOCK_NB) except EnvironmentError as err: if self.fh is not None: self.is_running = True else: raise
def wipe(self): filter_bitmap_fd = os.open("/dev/shm/kafl_filter0", os.O_RDWR | os.O_SYNC | os.O_CREAT) os.ftruncate(filter_bitmap_fd, self.config.config_values['BITMAP_SHM_SIZE']) filter_bitmap = mmap.mmap(filter_bitmap_fd, self.config.config_values['BITMAP_SHM_SIZE'], mmap.MAP_SHARED, mmap.PROT_WRITE | mmap.PROT_READ) for i in range(self.config.config_values['BITMAP_SHM_SIZE']): filter_bitmap[i] = '\x00' filter_bitmap.close() os.close(filter_bitmap_fd) filter_bitmap_fd = os.open("/dev/shm/kafl_tfilter", os.O_RDWR | os.O_SYNC | os.O_CREAT) os.ftruncate(filter_bitmap_fd, 0x1000000) filter_bitmap = mmap.mmap(filter_bitmap_fd, 0x1000000, mmap.MAP_SHARED, mmap.PROT_WRITE | mmap.PROT_READ) for i in range(0x1000000): filter_bitmap[i] = '\x00' filter_bitmap.close() os.close(filter_bitmap_fd)
def __set_binary(self, filename, binaryfile, max_size): shm_fd = os.open(filename, os.O_RDWR | os.O_SYNC | os.O_CREAT) os.ftruncate(shm_fd, max_size) shm = mmap.mmap(shm_fd, max_size, mmap.MAP_SHARED, mmap.PROT_WRITE | mmap.PROT_READ) shm.seek(0x0) shm.write('\x00' * max_size) shm.seek(0x0) f = open(binaryfile, "rb") bytes = f.read(1024) if bytes: shm.write(bytes) while bytes != "": bytes = f.read(1024) if bytes: shm.write(bytes) f.close() shm.close() os.close(shm_fd)
def init(self): self.control = socket.socket(socket.AF_UNIX) while True: try: self.control.connect(self.control_filename) #self.control.connect(self.control_filename) break except socket_error: pass #time.sleep(0.01) self.kafl_shm_f = os.open(self.bitmap_filename, os.O_RDWR | os.O_SYNC | os.O_CREAT) self.fs_shm_f = os.open(self.payload_filename, os.O_RDWR | os.O_SYNC | os.O_CREAT) #argv_fd = os.open(self.argv_filename, os.O_RDWR | os.O_SYNC | os.O_CREAT) os.ftruncate(self.kafl_shm_f, self.bitmap_size) os.ftruncate(self.fs_shm_f, (128 << 10)) #os.ftruncate(argv_fd, (4 << 10)) self.kafl_shm = mmap.mmap(self.kafl_shm_f, self.bitmap_size, mmap.MAP_SHARED, mmap.PROT_WRITE | mmap.PROT_READ) self.fs_shm = mmap.mmap(self.fs_shm_f, (128 << 10), mmap.MAP_SHARED, mmap.PROT_WRITE | mmap.PROT_READ) return True
def writef_win32(f, data, m='w', encoding='ISO8859-1'): if sys.hexversion > 0x3000000 and not 'b' in m: data = data.encode(encoding) m += 'b' flags = os.O_CREAT | os.O_TRUNC | os.O_WRONLY | os.O_NOINHERIT if 'b' in m: flags |= os.O_BINARY if '+' in m: flags |= os.O_RDWR try: fd = os.open(f, flags) except OSError: raise IOError('Cannot write to %r' % f) f = os.fdopen(fd, m) try: f.write(data) finally: f.close()
def setup_environment(): root = os.getenv('LAMBDA_TASK_ROOT') bin_dir = os.path.join(root, 'bin') os.environ['PATH'] += ':' + bin_dir os.environ['GIT_EXEC_PATH'] = bin_dir ssh_dir = tempfile.mkdtemp() ssh_identity = os.path.join(ssh_dir, 'identity') with os.fdopen(os.open(ssh_identity, os.O_WRONLY | os.O_CREAT, 0o600), 'w') as f: f.write(base64.b64decode(os.getenv('SSH_IDENTITY'))) ssh_config = os.path.join(ssh_dir, 'config') with open(ssh_config, 'w') as f: f.write('CheckHostIP no\n' 'StrictHostKeyChecking yes\n' 'IdentityFile %s\n' 'UserKnownHostsFile %s\n' % (ssh_identity, os.path.join(root, 'known_hosts'))) os.environ['GIT_SSH_COMMAND'] = 'ssh -F %s' % ssh_config
def acquire(self, blocking=False): import fcntl # @UnresolvedImport flags = os.O_CREAT | os.O_WRONLY self.fd = os.open(self.filename, flags) mode = fcntl.LOCK_EX if not blocking: mode |= fcntl.LOCK_NB try: fcntl.flock(self.fd, mode) self.locked = True return True except IOError: e = sys.exc_info()[1] if e.errno not in (errno.EAGAIN, errno.EACCES): raise os.close(self.fd) self.fd = None return False
def acquire(self, blocking=False): import msvcrt # @UnresolvedImport flags = os.O_CREAT | os.O_WRONLY mode = msvcrt.LK_NBLCK if blocking: mode = msvcrt.LK_LOCK self.fd = os.open(self.filename, flags) try: msvcrt.locking(self.fd, mode, 1) return True except IOError: e = sys.exc_info()[1] if e.errno not in (errno.EAGAIN, errno.EACCES, errno.EDEADLK): raise os.close(self.fd) self.fd = None return False
def create_file(self, name, excl=False, mode="wb", **kwargs): """Creates a file with the given name in this storage. :param name: the name for the new file. :param excl: if True, try to open the file in "exclusive" mode. :param mode: the mode flags with which to open the file. The default is ``"wb"``. :return: a :class:`whoosh.filedb.structfile.StructFile` instance. """ if self.readonly: raise ReadOnlyError path = self._fpath(name) if excl: flags = os.O_CREAT | os.O_EXCL | os.O_RDWR if hasattr(os, "O_BINARY"): flags |= os.O_BINARY fd = os.open(path, flags) fileobj = os.fdopen(fd, mode) else: fileobj = open(path, mode) f = StructFile(fileobj, name=name, **kwargs) return f
def _convert_pflags(self, pflags): "convert SFTP-style open() flags to python's os.open() flags" if (pflags & SFTP_FLAG_READ) and (pflags & SFTP_FLAG_WRITE): flags = os.O_RDWR elif pflags & SFTP_FLAG_WRITE: flags = os.O_WRONLY else: flags = os.O_RDONLY if pflags & SFTP_FLAG_APPEND: flags |= os.O_APPEND if pflags & SFTP_FLAG_CREATE: flags |= os.O_CREAT if pflags & SFTP_FLAG_TRUNC: flags |= os.O_TRUNC if pflags & SFTP_FLAG_EXCL: flags |= os.O_EXCL return flags
def acquire(self): """ Acquire the lock, if possible. If the lock is in use, it check again every `wait` seconds. It does this until it either gets the lock or exceeds `timeout` number of seconds, in which case it throws an exception. """ start_time = time.time() while True: try: self.fd = os.open(self.lockfile, os.O_CREAT|os.O_EXCL|os.O_RDWR) break except OSError as e: if e.errno != errno.EEXIST: raise if (time.time() - start_time) >= self.timeout: raise FileLockTimeoutException("%d seconds passed." % self.timeout) time.sleep(self.delay) self.is_locked = True
def acquire(self): """Acquire the lock file. :raises errors.LockError: if lock is already held :raises OSError: if unable to open or stat the lock file """ while self._fd is None: # Open the file fd = os.open(self._path, os.O_CREAT | os.O_WRONLY, 0o600) try: self._try_lock(fd) if self._lock_success(fd): self._fd = fd finally: # Close the file if it is not the required one if self._fd is None: os.close(fd)
def safe_open(path, mode="w", chmod=None, buffering=None): """Safely open a file. :param str path: Path to a file. :param str mode: Same os `mode` for `open`. :param int chmod: Same as `mode` for `os.open`, uses Python defaults if ``None``. :param int buffering: Same as `bufsize` for `os.fdopen`, uses Python defaults if ``None``. """ # pylint: disable=star-args open_args = () if chmod is None else (chmod,) fdopen_args = () if buffering is None else (buffering,) return os.fdopen( os.open(path, os.O_CREAT | os.O_EXCL | os.O_RDWR, *open_args), mode, *fdopen_args)
def _init_dirs(self): test_dirs = ['a a', 'b', 'c\c', 'D_'] config = 'improbable' root = tempfile.mkdtemp() def cleanup(): try: os.removedirs(root) except (FileNotFoundError, OSError): pass os.chdir(root) for dir_ in test_dirs: os.mkdir(dir_, 0o0750) f = '{0}.toml'.format(config) flags = os.O_WRONLY | os.O_CREAT rel_path = '{0}/{1}'.format(dir_, f) abs_file_path = os.path.join(root, rel_path) with os.fdopen(os.open(abs_file_path, flags, 0o0640), 'w') as fp: fp.write("key = \"value is {0}\"\n".format(dir_)) return root, config, cleanup
def touch(self, mode=0o666, exist_ok=True): """ Create this file with the given access mode, if it doesn't exist. """ if self._closed: self._raise_closed() if exist_ok: # First try to bump modification time # Implementation note: GNU touch uses the UTIME_NOW option of # the utimensat() / futimens() functions. try: self._accessor.utime(self, None) except OSError: # Avoid exception chaining pass else: return flags = os.O_CREAT | os.O_WRONLY if not exist_ok: flags |= os.O_EXCL fd = self._raw_open(flags, mode) os.close(fd)
def _obtain_lock_or_raise(self): """Create a lock file as flag for other instances, mark our instance as lock-holder :raise IOError: if a lock was already present or a lock file could not be written""" if self._has_lock(): return lock_file = self._lock_file_path() if osp.isfile(lock_file): raise IOError("Lock for file %r did already exist, delete %r in case the lock is illegal" % (self._file_path, lock_file)) try: flags = os.O_WRONLY | os.O_CREAT | os.O_EXCL if is_win: flags |= os.O_SHORT_LIVED fd = os.open(lock_file, flags, 0) os.close(fd) except OSError as e: raise IOError(str(e)) self._owns_lock = True
def lock(lockfile): def decorator(clbl): def wrapper(*args, **kwargs): try: # Create or fail os.open(lockfile, os.O_CREAT | os.O_EXCL) except OSError: raise BackupError( "Another backup/restore process already running." " If it is not, try to remove `{0}` and " "try again.".format(lockfile)) try: result = clbl(*args, **kwargs) finally: os.unlink(lockfile) return result return wrapper return decorator
def lock(lockfile): def decorator(clbl): def wrapper(*args, **kwargs): try: # Create or fail os.open(lockfile, os.O_CREAT | os.O_EXCL) except OSError: raise BackupError("Another backup process already running." " If it is not, try to remove `{0}` and " "try again.".format(lockfile)) try: result = clbl(*args, **kwargs) finally: os.unlink(lockfile) return result return wrapper return decorator
def with_multi_lock(tag, n, unlock_after_with=True): get_lock_file_path = lambda i: os.path.join( '/tmp/', tag + '.lock' + (str(i) if i else '')) for i in range(n): lock_file_path = get_lock_file_path(i) fd = os.open(lock_file_path, os.O_CREAT | os.O_RDWR, 0660) try: if trylock(fd): yield True break finally: if unlock_after_with: os.close(fd) else: yield False
def _create_home(self): if not os.path.isdir(self._HOME + '/' + self._CONFIG_DIR): os.makedirs(self._HOME + '/' + self._CONFIG_DIR) with os.fdopen(os.open(self._HOME + '/' + self._CONFIG_DIR + '/' + self._CONFIG_FILE_NAME, os.O_WRONLY | os.O_CREAT, 0o600), 'w'): pass with os.fdopen(os.open(self._HOME + '/' + self._CONFIG_DIR + '/' + self._CREDENTIALS_FILE_NAME, os.O_WRONLY | os.O_CREAT, 0o600), 'w'): pass
def acquire(self, blocking=True): """Acquire the lock if possible. If the lock is in use and ``blocking`` is ``False``, return ``False``. Otherwise, check every `self.delay` seconds until it acquires lock or exceeds `self.timeout` and raises an `~AcquisitionError`. """ start = time.time() while True: self._validate_lockfile() try: fd = os.open(self.lockfile, os.O_CREAT | os.O_EXCL | os.O_RDWR) with os.fdopen(fd, 'w') as fd: fd.write('{0}'.format(os.getpid())) break except OSError as err: if err.errno != errno.EEXIST: # pragma: no cover raise if self.timeout and (time.time() - start) >= self.timeout: raise AcquisitionError('Lock acquisition timed out.') if not blocking: return False time.sleep(self.delay) self._locked = True return True
def __init__(self, name, mode): mode = { "r": os.O_RDONLY, "w": os.O_WRONLY | os.O_CREAT | os.O_TRUNC, }[mode] if hasattr(os, "O_BINARY"): mode |= os.O_BINARY self.fd = os.open(name, mode, 0o666)