我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.O_WRONLY。
def write_pid_to_pidfile(pidfile_path): """ Write the PID in the named PID file. Get the numeric process ID (“PID”) of the current process and write it to the named file as a line of text. """ open_flags = (os.O_CREAT | os.O_EXCL | os.O_WRONLY) open_mode = 0o644 pidfile_fd = os.open(pidfile_path, open_flags, open_mode) pidfile = os.fdopen(pidfile_fd, 'w') # According to the FHS 2.3 section on PID files in /var/run: # # The file must consist of the process identifier in # ASCII-encoded decimal, followed by a newline character. For # example, if crond was process number 25, /var/run/crond.pid # would contain three characters: two, five, and newline. pid = os.getpid() pidfile.write("%s\n" % pid) pidfile.close()
def __setitem__(self, key, message): """Replace the keyed message; raise KeyError if it doesn't exist.""" path = os.path.join(self._path, str(key)) try: f = open(path, 'rb+') except IOError, e: if e.errno == errno.ENOENT: raise KeyError('No message with key: %s' % key) else: raise try: if self._locked: _lock_file(f) try: os.close(os.open(path, os.O_WRONLY | os.O_TRUNC)) self._dump_message(message, f) if isinstance(message, MHMessage): self._dump_sequences(message, key) finally: if self._locked: _unlock_file(f) finally: _sync_close(f)
def writeKeyToFile(key, filename): """Write **key** to **filename**, with ``0400`` permissions. If **filename** doesn't exist, it will be created. If it does exist already, and is writable by the owner of the current process, then it will be truncated to zero-length and overwritten. :param bytes key: A key (or some other private data) to write to **filename**. :param str filename: The path of the file to write to. :raises: Any exceptions which may occur. """ logging.info("Writing key to file: %r", filename) flags = os.O_WRONLY | os.O_TRUNC | os.O_CREAT | getattr(os, "O_BIN", 0) fd = os.open(filename, flags, 0400) os.write(fd, key) os.fsync(fd) os.close(fd)
def _open_ipipes(wds, fifos, input_pipes): """ This will attempt to open the named pipes in the set of ``fifos`` for writing, which will only succeed if the subprocess has opened them for reading already. This modifies and returns the list of write descriptors, the list of waiting fifo names, and the mapping back to input adapters. """ for fifo in fifos.copy(): try: fd = os.open(fifo, os.O_WRONLY | os.O_NONBLOCK) input_pipes[fd] = fifos.pop(fifo) wds.append(fd) except OSError as e: if e.errno != errno.ENXIO: raise e return wds, fifos, input_pipes
def writef_win32(f, data, m='w', encoding='ISO8859-1'): if sys.hexversion > 0x3000000 and not 'b' in m: data = data.encode(encoding) m += 'b' flags = os.O_CREAT | os.O_TRUNC | os.O_WRONLY | os.O_NOINHERIT if 'b' in m: flags |= os.O_BINARY if '+' in m: flags |= os.O_RDWR try: fd = os.open(f, flags) except OSError: raise IOError('Cannot write to %r' % f) f = os.fdopen(fd, m) try: f.write(data) finally: f.close()
def setup_environment(): root = os.getenv('LAMBDA_TASK_ROOT') bin_dir = os.path.join(root, 'bin') os.environ['PATH'] += ':' + bin_dir os.environ['GIT_EXEC_PATH'] = bin_dir ssh_dir = tempfile.mkdtemp() ssh_identity = os.path.join(ssh_dir, 'identity') with os.fdopen(os.open(ssh_identity, os.O_WRONLY | os.O_CREAT, 0o600), 'w') as f: f.write(base64.b64decode(os.getenv('SSH_IDENTITY'))) ssh_config = os.path.join(ssh_dir, 'config') with open(ssh_config, 'w') as f: f.write('CheckHostIP no\n' 'StrictHostKeyChecking yes\n' 'IdentityFile %s\n' 'UserKnownHostsFile %s\n' % (ssh_identity, os.path.join(root, 'known_hosts'))) os.environ['GIT_SSH_COMMAND'] = 'ssh -F %s' % ssh_config
def acquire(self, blocking=False): import fcntl # @UnresolvedImport flags = os.O_CREAT | os.O_WRONLY self.fd = os.open(self.filename, flags) mode = fcntl.LOCK_EX if not blocking: mode |= fcntl.LOCK_NB try: fcntl.flock(self.fd, mode) self.locked = True return True except IOError: e = sys.exc_info()[1] if e.errno not in (errno.EAGAIN, errno.EACCES): raise os.close(self.fd) self.fd = None return False
def acquire(self, blocking=False): import msvcrt # @UnresolvedImport flags = os.O_CREAT | os.O_WRONLY mode = msvcrt.LK_NBLCK if blocking: mode = msvcrt.LK_LOCK self.fd = os.open(self.filename, flags) try: msvcrt.locking(self.fd, mode, 1) return True except IOError: e = sys.exc_info()[1] if e.errno not in (errno.EAGAIN, errno.EACCES, errno.EDEADLK): raise os.close(self.fd) self.fd = None return False
def _convert_pflags(self, pflags): "convert SFTP-style open() flags to python's os.open() flags" if (pflags & SFTP_FLAG_READ) and (pflags & SFTP_FLAG_WRITE): flags = os.O_RDWR elif pflags & SFTP_FLAG_WRITE: flags = os.O_WRONLY else: flags = os.O_RDONLY if pflags & SFTP_FLAG_APPEND: flags |= os.O_APPEND if pflags & SFTP_FLAG_CREATE: flags |= os.O_CREAT if pflags & SFTP_FLAG_TRUNC: flags |= os.O_TRUNC if pflags & SFTP_FLAG_EXCL: flags |= os.O_EXCL return flags
def acquire(self): """Acquire the lock file. :raises errors.LockError: if lock is already held :raises OSError: if unable to open or stat the lock file """ while self._fd is None: # Open the file fd = os.open(self._path, os.O_CREAT | os.O_WRONLY, 0o600) try: self._try_lock(fd) if self._lock_success(fd): self._fd = fd finally: # Close the file if it is not the required one if self._fd is None: os.close(fd)
def _init_dirs(self): test_dirs = ['a a', 'b', 'c\c', 'D_'] config = 'improbable' root = tempfile.mkdtemp() def cleanup(): try: os.removedirs(root) except (FileNotFoundError, OSError): pass os.chdir(root) for dir_ in test_dirs: os.mkdir(dir_, 0o0750) f = '{0}.toml'.format(config) flags = os.O_WRONLY | os.O_CREAT rel_path = '{0}/{1}'.format(dir_, f) abs_file_path = os.path.join(root, rel_path) with os.fdopen(os.open(abs_file_path, flags, 0o0640), 'w') as fp: fp.write("key = \"value is {0}\"\n".format(dir_)) return root, config, cleanup
def touch(self, mode=0o666, exist_ok=True): """ Create this file with the given access mode, if it doesn't exist. """ if self._closed: self._raise_closed() if exist_ok: # First try to bump modification time # Implementation note: GNU touch uses the UTIME_NOW option of # the utimensat() / futimens() functions. try: self._accessor.utime(self, None) except OSError: # Avoid exception chaining pass else: return flags = os.O_CREAT | os.O_WRONLY if not exist_ok: flags |= os.O_EXCL fd = self._raw_open(flags, mode) os.close(fd)
def _obtain_lock_or_raise(self): """Create a lock file as flag for other instances, mark our instance as lock-holder :raise IOError: if a lock was already present or a lock file could not be written""" if self._has_lock(): return lock_file = self._lock_file_path() if osp.isfile(lock_file): raise IOError("Lock for file %r did already exist, delete %r in case the lock is illegal" % (self._file_path, lock_file)) try: flags = os.O_WRONLY | os.O_CREAT | os.O_EXCL if is_win: flags |= os.O_SHORT_LIVED fd = os.open(lock_file, flags, 0) os.close(fd) except OSError as e: raise IOError(str(e)) self._owns_lock = True
def _save_tracker_uri_to_file(self): """ Saves URI to tracker file if one was passed to constructor. """ if not self.tracker_file_name: return f = None try: with os.fdopen(os.open(self.tracker_file_name, os.O_WRONLY | os.O_CREAT, 0o600), 'w') as f: f.write(self.tracker_uri) except IOError as e: raise ResumableUploadException( 'Couldn\'t write URI tracker file (%s): %s.\nThis can happen' 'if you\'re using an incorrectly configured upload tool\n' '(e.g., gsutil configured to save tracker files to an ' 'unwritable directory)' % (self.tracker_file_name, e.strerror), ResumableTransferDisposition.ABORT)
def _create_home(self): if not os.path.isdir(self._HOME + '/' + self._CONFIG_DIR): os.makedirs(self._HOME + '/' + self._CONFIG_DIR) with os.fdopen(os.open(self._HOME + '/' + self._CONFIG_DIR + '/' + self._CONFIG_FILE_NAME, os.O_WRONLY | os.O_CREAT, 0o600), 'w'): pass with os.fdopen(os.open(self._HOME + '/' + self._CONFIG_DIR + '/' + self._CREDENTIALS_FILE_NAME, os.O_WRONLY | os.O_CREAT, 0o600), 'w'): pass
def __init__(self, name, mode): mode = { "r": os.O_RDONLY, "w": os.O_WRONLY | os.O_CREAT | os.O_TRUNC, }[mode] if hasattr(os, "O_BINARY"): mode |= os.O_BINARY self.fd = os.open(name, mode, 0o666)