我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.O_APPEND。
def _convert_pflags(self, pflags): "convert SFTP-style open() flags to python's os.open() flags" if (pflags & SFTP_FLAG_READ) and (pflags & SFTP_FLAG_WRITE): flags = os.O_RDWR elif pflags & SFTP_FLAG_WRITE: flags = os.O_WRONLY else: flags = os.O_RDONLY if pflags & SFTP_FLAG_APPEND: flags |= os.O_APPEND if pflags & SFTP_FLAG_CREATE: flags |= os.O_CREAT if pflags & SFTP_FLAG_TRUNC: flags |= os.O_TRUNC if pflags & SFTP_FLAG_EXCL: flags |= os.O_EXCL return flags
def _create_base_aws_cli_config_files_if_needed(adfs_config): def touch(fname, mode=0o600): flags = os.O_CREAT | os.O_APPEND with os.fdopen(os.open(fname, flags, mode)) as f: try: os.utime(fname, None) finally: f.close() aws_config_root = os.path.dirname(adfs_config.aws_config_location) if not os.path.exists(aws_config_root): os.mkdir(aws_config_root, 0o700) if not os.path.exists(adfs_config.aws_credentials_location): touch(adfs_config.aws_credentials_location) aws_credentials_root = os.path.dirname(adfs_config.aws_credentials_location) if not os.path.exists(aws_credentials_root): os.mkdir(aws_credentials_root, 0o700) if not os.path.exists(adfs_config.aws_config_location): touch(adfs_config.aws_config_location)
def _convert_pflags(self, pflags): """convert SFTP-style open() flags to Python's os.open() flags""" if (pflags & SFTP_FLAG_READ) and (pflags & SFTP_FLAG_WRITE): flags = os.O_RDWR elif pflags & SFTP_FLAG_WRITE: flags = os.O_WRONLY else: flags = os.O_RDONLY if pflags & SFTP_FLAG_APPEND: flags |= os.O_APPEND if pflags & SFTP_FLAG_CREATE: flags |= os.O_CREAT if pflags & SFTP_FLAG_TRUNC: flags |= os.O_TRUNC if pflags & SFTP_FLAG_EXCL: flags |= os.O_EXCL return flags
def touch(self, path, mode=0o644, dir_fd=None, **kwargs): """Create an empty file with selected permissions. Args: path (str): File path. mode (:obj:`int`, optional): File access mode. Defaults to 0o644. dir_fd (optional): If set, it should be a file descriptor open to a directory and path should then be relative to that directory. Defaults to None. **kwargs: Arbitrary keyword arguments. """ flags = os.O_CREAT | os.O_APPEND with os.fdopen( os.open(path, flags=flags, mode=mode, dir_fd=dir_fd) ) as f: os.utime( f.fileno() if os.utime in os.supports_fd else path, dir_fd=None if os.supports_fd else dir_fd, **kwargs )
def flags(self): ''' Adapted from http://hg.python.org/cpython/file/84cf25da86e8/Lib/_pyio.py#l154 See also open(2) which explains the modes os.O_BINARY and os.O_TEXT are only available on Windows. ''' return ( ((self.reading and not self.updating) and os.O_RDONLY or 0) | ((self.writing and not self.updating) and os.O_WRONLY or 0) | ((self.creating_exclusively and not self.updating) and os.O_EXCL or 0) | (self.updating and os.O_RDWR or 0) | (self.appending and os.O_APPEND or 0) | ((self.writing or self.creating_exclusively) and os.O_CREAT or 0) | (self.writing and os.O_TRUNC or 0) | ((self.binary and hasattr(os, 'O_BINARY')) and os.O_BINARY or 0) | ((self.text and hasattr(os, 'O_TEXT')) and os.O_TEXT or 0) )
def _create_base_aws_cli_config_files_if_needed(google_config): def touch(fname, mode=0o600): flags = os.O_CREAT | os.O_APPEND with os.fdopen(os.open(fname, flags, mode)) as f: try: os.utime(fname, None) finally: f.close() aws_config_root = os.path.dirname(google_config.aws_config_location) if not os.path.exists(aws_config_root): os.mkdir(aws_config_root, 0o700) if not os.path.exists(google_config.aws_credentials_location): touch(google_config.aws_credentials_location) aws_credentials_root = os.path.dirname(google_config.aws_credentials_location) if not os.path.exists(aws_credentials_root): os.mkdir(aws_credentials_root, 0o700) if not os.path.exists(google_config.aws_config_location): touch(google_config.aws_config_location)
def mode_to_flags(mode: str): if len(set("awrb+") | set(mode)) > 5: raise ValueError('Invalid mode %s' % repr(mode)) if len(set(mode) & set("awr")) > 1: raise ValueError('must have exactly one of create/read/write/append mode') flags = 0 flags |= os.O_NONBLOCK if '+' in mode: flags |= os.O_CREAT if "a" in mode: flags |= os.O_RDWR flags |= os.O_APPEND elif "w" in mode: flags |= os.O_RDWR elif "r" in mode: flags |= os.O_RDONLY return flags
def flags(self, *which): import fcntl, os if which: if len(which) > 1: raise TypeError, 'Too many arguments' which = which[0] else: which = '?' l_flags = 0 if 'n' in which: l_flags = l_flags | os.O_NDELAY if 'a' in which: l_flags = l_flags | os.O_APPEND if 's' in which: l_flags = l_flags | os.O_SYNC file = self._file_ if '=' not in which: cur_fl = fcntl.fcntl(file.fileno(), fcntl.F_GETFL, 0) if '!' in which: l_flags = cur_fl & ~ l_flags else: l_flags = cur_fl | l_flags l_flags = fcntl.fcntl(file.fileno(), fcntl.F_SETFL, l_flags) if 'c' in which: arg = ('!' not in which) # 0 is don't, 1 is do close on exec l_flags = fcntl.fcntl(file.fileno(), fcntl.F_SETFD, arg) if '?' in which: which = '' # Return current flags l_flags = fcntl.fcntl(file.fileno(), fcntl.F_GETFL, 0) if os.O_APPEND & l_flags: which = which + 'a' if fcntl.fcntl(file.fileno(), fcntl.F_GETFD, 0) & 1: which = which + 'c' if os.O_NDELAY & l_flags: which = which + 'n' if os.O_SYNC & l_flags: which = which + 's' return which
def get_event_log_handle(sec, flags=os.O_APPEND | os.O_CREAT | os.O_WRONLY): localtime = time.localtime(sec) _ = os.path.join(config.LOG_DIR, "%d-%02d-%02d.log" % (localtime.tm_year, localtime.tm_mon, localtime.tm_mday)) if _ != getattr(_thread_data, "event_log_path", None): if not os.path.exists(_): open(_, "w+").close() os.chmod(_, DEFAULT_EVENT_LOG_PERMISSIONS) _thread_data.event_log_path = _ _thread_data.event_log_handle = os.open(_thread_data.event_log_path, flags) return _thread_data.event_log_handle
def get_error_log_handle(flags=os.O_APPEND | os.O_CREAT | os.O_WRONLY): if not hasattr(_thread_data, "error_log_handle"): _ = os.path.join(config.LOG_DIR, "error.log") if not os.path.exists(_): open(_, "w+").close() os.chmod(_, DEFAULT_ERROR_LOG_PERMISSIONS) _thread_data.error_log_path = _ _thread_data.error_log_handle = os.open(_thread_data.error_log_path, flags) return _thread_data.error_log_handle
def __init__(self, server, filename, flags, attrs): self.server = server openFlags = 0 if flags & FXF_READ == FXF_READ and flags & FXF_WRITE == 0: openFlags = os.O_RDONLY if flags & FXF_WRITE == FXF_WRITE and flags & FXF_READ == 0: openFlags = os.O_WRONLY if flags & FXF_WRITE == FXF_WRITE and flags & FXF_READ == FXF_READ: openFlags = os.O_RDWR if flags & FXF_APPEND == FXF_APPEND: openFlags |= os.O_APPEND if flags & FXF_CREAT == FXF_CREAT: openFlags |= os.O_CREAT if flags & FXF_TRUNC == FXF_TRUNC: openFlags |= os.O_TRUNC if flags & FXF_EXCL == FXF_EXCL: openFlags |= os.O_EXCL if attrs.has_key("permissions"): mode = attrs["permissions"] del attrs["permissions"] else: mode = 0777 fd = server.avatar._runAsUser(os.open, filename, openFlags, mode) if attrs: server.avatar._runAsUser(server._setAttrs, filename, attrs) self.fd = fd
def file_flags_to_mode(flags): modes_map = {os.O_RDONLY: 'r', os.O_WRONLY: 'w', os.O_RDWR: 'w+'} mode = modes_map[flags & (os.O_RDONLY | os.O_WRONLY | os.O_RDWR)] if flags & os.O_APPEND: mode = mode.replace('w', 'a', 1) mode = mode.replace('w+', 'r+') # possible values: r, w, a, r+, a+ return mode
def touch(fname, mode=0o666, dir_fd=None, **kwargs): """ Implementation of coreutils touch http://stackoverflow.com/a/1160227 """ flags = os.O_CREAT | os.O_APPEND with os.fdopen(os.open(fname, flags=flags, mode=mode, dir_fd=dir_fd)) as f: os.utime(f.fileno() if os.utime in os.supports_fd else fname, dir_fd=None if os.supports_fd else dir_fd, **kwargs)
def write(self, offset, data): """ Write C{data} into this file at position C{offset}. Extending the file past its original end is expected. Unlike python's normal C{write()} methods, this method cannot do a partial write: it must write all of C{data} or else return an error. The default implementation checks for an attribute on C{self} named C{writefile}, and if present, performs the write operation on the python file-like object found there. The attribute is named differently from C{readfile} to make it easy to implement read-only (or write-only) files, but if both attributes are present, they should refer to the same file. @param offset: position in the file to start reading from. @type offset: int or long @param data: data to write into the file. @type data: str @return: an SFTP error code like L{SFTP_OK}. """ writefile = getattr(self, 'writefile', None) if writefile is None: return SFTP_OP_UNSUPPORTED try: # in append mode, don't care about seeking if (self.__flags & os.O_APPEND) == 0: if self.__tell is None: self.__tell = writefile.tell() if offset != self.__tell: writefile.seek(offset) self.__tell = offset writefile.write(data) writefile.flush() except IOError, e: self.__tell = None return SFTPServer.convert_errno(e.errno) if self.__tell is not None: self.__tell += len(data) return SFTP_OK
def open(self, path, flags, attr): """ Open a file on the server and create a handle for future operations on that file. On success, a new object subclassed from L{SFTPHandle} should be returned. This handle will be used for future operations on the file (read, write, etc). On failure, an error code such as L{SFTP_PERMISSION_DENIED} should be returned. C{flags} contains the requested mode for opening (read-only, write-append, etc) as a bitset of flags from the C{os} module: - C{os.O_RDONLY} - C{os.O_WRONLY} - C{os.O_RDWR} - C{os.O_APPEND} - C{os.O_CREAT} - C{os.O_TRUNC} - C{os.O_EXCL} (One of C{os.O_RDONLY}, C{os.O_WRONLY}, or C{os.O_RDWR} will always be set.) The C{attr} object contains requested attributes of the file if it has to be created. Some or all attribute fields may be missing if the client didn't specify them. @note: The SFTP protocol defines all files to be in "binary" mode. There is no equivalent to python's "text" mode. @param path: the requested path (relative or absolute) of the file to be opened. @type path: str @param flags: flags or'd together from the C{os} module indicating the requested mode for opening the file. @type flags: int @param attr: requested attributes of the file if it is newly created. @type attr: L{SFTPAttributes} @return: a new L{SFTPHandle} I{or error code}. @rtype L{SFTPHandle} """ return SFTP_OP_UNSUPPORTED
def touch(fname, mode=0o666, dir_fd=None, **kwargs): ## After http://stackoverflow.com/questions/1158076/implement-touch-using-python if sys.version_info < (3, 3, 0): with open(fname, 'a'): os.utime(fname, None) # set to now else: flags = os.O_CREAT | os.O_APPEND with os.fdopen(os.open(fname, flags=flags, mode=mode, dir_fd=dir_fd)) as f: os.utime(f.fileno() if os.utime in os.supports_fd else fname, dir_fd=None if os.supports_fd else dir_fd, **kwargs)
def open(self, path, flags, attr): try: if (flags & os.O_CREAT) and (attr is not None): attr._flags &= ~attr.FLAG_PERMISSIONS paramiko.SFTPServer.set_file_attr(self._parsePath(path), attr) if flags & os.O_WRONLY: if flags & os.O_APPEND: fstr = 'ab' else: fstr = 'wb' elif flags & os.O_RDWR: if flags & os.O_APPEND: fstr = 'a+b' else: fstr = 'r+b' else: # O_RDONLY (== 0) fstr = 'rb' f = self.client.open(self._parsePath(path), fstr) fobj = paramiko.SFTPHandle(flags) fobj.filename = self._parsePath(path) fobj.readfile = f fobj.writefile = f fobj.client = self.client return fobj # TODO: verify (socket.error when stopping file upload/download) except IOError as e: return paramiko.SFTPServer.convert_errno(e.errno)
def open(self, inode, flags, ctx): flags_writable = os.O_WRONLY | os.O_RDWR | os.O_APPEND if flags & flags_writable > 0: raise llfuse.FUSEError(errno.EROFS) return self._yarp_cell_relative_offset_to_handle(inode)
def test_send(self): d1 = b"Come again?" d2 = b"I want to buy some cheese." fd = os.open(TESTFN, os.O_WRONLY | os.O_APPEND) w = asyncore.file_wrapper(fd) os.close(fd) w.write(d1) w.send(d2) w.close() with open(TESTFN, 'rb') as file: self.assertEqual(file.read(), self.d + d1 + d2)
def write(self, offset, data): """ Write ``data`` into this file at position ``offset``. Extending the file past its original end is expected. Unlike Python's normal ``write()`` methods, this method cannot do a partial write: it must write all of ``data`` or else return an error. The default implementation checks for an attribute on ``self`` named ``writefile``, and if present, performs the write operation on the Python file-like object found there. The attribute is named differently from ``readfile`` to make it easy to implement read-only (or write-only) files, but if both attributes are present, they should refer to the same file. :param offset: position in the file to start reading from. :type offset: int or long :param str data: data to write into the file. :return: an SFTP error code like `.SFTP_OK`. """ writefile = getattr(self, 'writefile', None) if writefile is None: return SFTP_OP_UNSUPPORTED try: # in append mode, don't care about seeking if (self.__flags & os.O_APPEND) == 0: if self.__tell is None: self.__tell = writefile.tell() if offset != self.__tell: writefile.seek(offset) self.__tell = offset writefile.write(data) writefile.flush() except IOError as e: self.__tell = None return SFTPServer.convert_errno(e.errno) if self.__tell is not None: self.__tell += len(data) return SFTP_OK
def open(self, path, flags, attr): """ Open a file on the server and create a handle for future operations on that file. On success, a new object subclassed from `.SFTPHandle` should be returned. This handle will be used for future operations on the file (read, write, etc). On failure, an error code such as `.SFTP_PERMISSION_DENIED` should be returned. ``flags`` contains the requested mode for opening (read-only, write-append, etc) as a bitset of flags from the ``os`` module: - ``os.O_RDONLY`` - ``os.O_WRONLY`` - ``os.O_RDWR`` - ``os.O_APPEND`` - ``os.O_CREAT`` - ``os.O_TRUNC`` - ``os.O_EXCL`` (One of ``os.O_RDONLY``, ``os.O_WRONLY``, or ``os.O_RDWR`` will always be set.) The ``attr`` object contains requested attributes of the file if it has to be created. Some or all attribute fields may be missing if the client didn't specify them. .. note:: The SFTP protocol defines all files to be in "binary" mode. There is no equivalent to Python's "text" mode. :param str path: the requested path (relative or absolute) of the file to be opened. :param int flags: flags or'd together from the ``os`` module indicating the requested mode for opening the file. :param .SFTPAttributes attr: requested attributes of the file if it is newly created. :return: a new `.SFTPHandle` or error code. """ return SFTP_OP_UNSUPPORTED
def test_send(self): d1 = "Come again?" d2 = "I want to buy some cheese." fd = os.open(TESTFN, os.O_WRONLY | os.O_APPEND) w = asyncore.file_wrapper(fd) os.close(fd) w.write(d1) w.send(d2) w.close() self.assertEqual(file(TESTFN).read(), self.d + d1 + d2)
def touch(fname, time=None): # not a real equivalent to `touch`, but close enough flags = os.O_CREAT | os.O_APPEND time = datetime.now().timestamp() if time is None else time with os.fdopen(os.open(fname, flags=flags, mode=0o666)) as f: path = f.fileno() if os.utime in os.supports_fd else fname os.utime(path, times=(time, time)) return fname
def RestartCapture(self): """Resume capturing output to a file (after calling StopCapture).""" # Original stream fd assert self._uncaptured_fd # Append stream to file cap_fd = os.open(self._filename, os.O_CREAT | os.O_APPEND | os.O_WRONLY, 0600) # Send stream to this file self._stream.flush() os.dup2(cap_fd, self._fd) os.close(cap_fd)
def getFd(self, pio): "get file descriptor for given PInOut object" if self.fd is None: if isinstance(pio, PIn): self.fd = os.open(self.path, os.O_RDONLY) elif self.append: self.fd = os.open(self.path, os.O_WRONLY|os.O_CREAT|os.O_APPEND, 0666) else: self.fd = os.open(self.path, os.O_WRONLY|os.O_CREAT|os.O_TRUNC, 0666) return self.fd
def open(self, path, flags, attr): fd = os.open(path, flags) self.log.debug("open(%s): fd: %d", path, fd) if flags & (os.O_WRONLY | os.O_RDWR): mode = "w" elif flags & (os.O_APPEND): mode = "a" else: mode = "r" mode += "b" self.log.debug("open(%s): Mode: %s", path, mode) return SFTPHandle(os.fdopen(fd, mode), flags)
def open(self, path, flags, attr): path = self._realpath(path) try: binary_flag = getattr(os, 'O_BINARY', 0) flags |= binary_flag mode = getattr(attr, 'st_mode', None) if mode is not None: fd = os.open(path, flags, mode) else: # os.open() defaults to 0777 which is # an odd default mode for files fd = os.open(path, flags, o666) except OSError as e: return SFTPServer.convert_errno(e.errno) if (flags & os.O_CREAT) and (attr is not None): attr._flags &= ~attr.FLAG_PERMISSIONS SFTPServer.set_file_attr(path, attr) if flags & os.O_WRONLY: if flags & os.O_APPEND: fstr = 'ab' else: fstr = 'wb' elif flags & os.O_RDWR: if flags & os.O_APPEND: fstr = 'a+b' else: fstr = 'r+b' else: # O_RDONLY (== 0) fstr = 'rb' try: f = os.fdopen(fd, fstr) except OSError as e: return SFTPServer.convert_errno(e.errno) fobj = StubSFTPHandle(flags) fobj.filename = path fobj.readfile = f fobj.writefile = f return fobj
def touch(fname, mode=0o666, dir_fd=None, **kwargs): flags = os.O_CREAT | os.O_APPEND with os.fdopen(os.open(fname, flags=flags, mode=mode, dir_fd=dir_fd)) as f: os.utime(f.fileno() if os.utime in os.supports_fd else fname, dir_fd=None if os.supports_fd else dir_fd, **kwargs)