我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.O_TRUNC。
def __setitem__(self, key, message): """Replace the keyed message; raise KeyError if it doesn't exist.""" path = os.path.join(self._path, str(key)) try: f = open(path, 'rb+') except IOError, e: if e.errno == errno.ENOENT: raise KeyError('No message with key: %s' % key) else: raise try: if self._locked: _lock_file(f) try: os.close(os.open(path, os.O_WRONLY | os.O_TRUNC)) self._dump_message(message, f) if isinstance(message, MHMessage): self._dump_sequences(message, key) finally: if self._locked: _unlock_file(f) finally: _sync_close(f)
def writeKeyToFile(key, filename): """Write **key** to **filename**, with ``0400`` permissions. If **filename** doesn't exist, it will be created. If it does exist already, and is writable by the owner of the current process, then it will be truncated to zero-length and overwritten. :param bytes key: A key (or some other private data) to write to **filename**. :param str filename: The path of the file to write to. :raises: Any exceptions which may occur. """ logging.info("Writing key to file: %r", filename) flags = os.O_WRONLY | os.O_TRUNC | os.O_CREAT | getattr(os, "O_BIN", 0) fd = os.open(filename, flags, 0400) os.write(fd, key) os.fsync(fd) os.close(fd)
def writef_win32(f, data, m='w', encoding='ISO8859-1'): if sys.hexversion > 0x3000000 and not 'b' in m: data = data.encode(encoding) m += 'b' flags = os.O_CREAT | os.O_TRUNC | os.O_WRONLY | os.O_NOINHERIT if 'b' in m: flags |= os.O_BINARY if '+' in m: flags |= os.O_RDWR try: fd = os.open(f, flags) except OSError: raise IOError('Cannot write to %r' % f) f = os.fdopen(fd, m) try: f.write(data) finally: f.close()
def _convert_pflags(self, pflags): "convert SFTP-style open() flags to python's os.open() flags" if (pflags & SFTP_FLAG_READ) and (pflags & SFTP_FLAG_WRITE): flags = os.O_RDWR elif pflags & SFTP_FLAG_WRITE: flags = os.O_WRONLY else: flags = os.O_RDONLY if pflags & SFTP_FLAG_APPEND: flags |= os.O_APPEND if pflags & SFTP_FLAG_CREATE: flags |= os.O_CREAT if pflags & SFTP_FLAG_TRUNC: flags |= os.O_TRUNC if pflags & SFTP_FLAG_EXCL: flags |= os.O_EXCL return flags
def __setitem__(self, key, message): """Replace the keyed message; raise KeyError if it doesn't exist.""" path = os.path.join(self._path, str(key)) try: f = open(path, 'rb+') except IOError as e: if e.errno == errno.ENOENT: raise KeyError('No message with key: %s' % key) else: raise try: if self._locked: _lock_file(f) try: os.close(os.open(path, os.O_WRONLY | os.O_TRUNC)) self._dump_message(message, f) if isinstance(message, MHMessage): self._dump_sequences(message, key) finally: if self._locked: _unlock_file(f) finally: _sync_close(f)
def _convert_pflags(self, pflags): """convert SFTP-style open() flags to Python's os.open() flags""" if (pflags & SFTP_FLAG_READ) and (pflags & SFTP_FLAG_WRITE): flags = os.O_RDWR elif pflags & SFTP_FLAG_WRITE: flags = os.O_WRONLY else: flags = os.O_RDONLY if pflags & SFTP_FLAG_APPEND: flags |= os.O_APPEND if pflags & SFTP_FLAG_CREATE: flags |= os.O_CREAT if pflags & SFTP_FLAG_TRUNC: flags |= os.O_TRUNC if pflags & SFTP_FLAG_EXCL: flags |= os.O_EXCL return flags
def strip(self, stripped_filename): with mmapper(self.filename) as mapped: fd, file = mapped ehdr = self.elf.Ehdr.read(file) stripped_ehdr = ehdr._replace(e_shoff=0, e_shnum=0, e_shstrndx=0) stripped_size = max(phdr.p_offset + phdr.p_filesz for phdr in gen_phdrs(file, self.elf, ehdr) if phdr.p_type == PT_LOAD) assert ehdr.e_phoff + (ehdr.e_phnum * ehdr.e_phentsize) <= stripped_size # Create the new file with the same mode as the original. with os.fdopen(os.open(stripped_filename, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, os.fstat(fd).st_mode & 0777), 'wb') as stripped_file: stripped_file.write(self.elf.Ehdr.pack(stripped_ehdr)) stripped_file.write(file[self.elf.Ehdr.size:stripped_size])
def __init__(self, stream, filename): self._stream = stream self._fd = stream.fileno() self._filename = filename # Keep original stream for later self._uncaptured_fd = os.dup(self._fd) # Open file to save stream to cap_fd = os.open(self._filename, os.O_CREAT | os.O_TRUNC | os.O_WRONLY, 0600) # Send stream to this file self._stream.flush() os.dup2(cap_fd, self._fd) os.close(cap_fd)
def flags(self): ''' Adapted from http://hg.python.org/cpython/file/84cf25da86e8/Lib/_pyio.py#l154 See also open(2) which explains the modes os.O_BINARY and os.O_TEXT are only available on Windows. ''' return ( ((self.reading and not self.updating) and os.O_RDONLY or 0) | ((self.writing and not self.updating) and os.O_WRONLY or 0) | ((self.creating_exclusively and not self.updating) and os.O_EXCL or 0) | (self.updating and os.O_RDWR or 0) | (self.appending and os.O_APPEND or 0) | ((self.writing or self.creating_exclusively) and os.O_CREAT or 0) | (self.writing and os.O_TRUNC or 0) | ((self.binary and hasattr(os, 'O_BINARY')) and os.O_BINARY or 0) | ((self.text and hasattr(os, 'O_TEXT')) and os.O_TEXT or 0) )
def _write_file(path, data, mode, owner='root'): dirpath = os.path.dirname(os.path.abspath(path)) log.info('Opening {} for locking'.format(dirpath)) with utils.Directory(dirpath) as d: log.info('Taking exclusive lock on {}'.format(dirpath)) with d.lock(): umask_original = os.umask(0) try: flags = os.O_WRONLY | os.O_CREAT | os.O_TRUNC log.info('Writing {} with mode {:o}'.format(path, mode)) tmppath = path + '.tmp' with os.fdopen(os.open(tmppath, flags, mode), 'wb') as f: f.write(data) os.rename(tmppath, path) user = pwd.getpwnam(owner) os.chown(path, user.pw_uid, user.pw_gid) finally: os.umask(umask_original)
def store_acs_service_principal(subscription_id, client_secret, service_principal, config_path=os.path.join(get_config_dir(), 'acsServicePrincipal.json')): obj = {} if client_secret: obj['client_secret'] = client_secret if service_principal: obj['service_principal'] = service_principal fullConfig = load_acs_service_principals(config_path=config_path) if not fullConfig: fullConfig = {} fullConfig[subscription_id] = obj with os.fdopen(os.open(config_path, os.O_RDWR | os.O_CREAT | os.O_TRUNC, 0o600), 'w+') as spFile: json.dump(fullConfig, spFile)
def open(filename, flag='r', mode=0666): "open a DBM database" if not isinstance(filename, str): raise TypeError("expected string") openflag = 0 try: openflag = { 'r': os.O_RDONLY, 'rw': os.O_RDWR, 'w': os.O_RDWR | os.O_CREAT, 'c': os.O_RDWR | os.O_CREAT, 'n': os.O_RDWR | os.O_CREAT | os.O_TRUNC, }[flag] except KeyError: raise error("arg 2 to open should be 'r', 'w', 'c', or 'n'") a_db = getattr(lib, funcs['open'])(filename, openflag, mode) if a_db == 0: raise error("Could not open file %s.db" % filename) return dbm(a_db)
def __setitem__(self, key, message): """Replace the keyed message; raise KeyError if it doesn't exist.""" path = os.path.join(self._path, str(key)) try: f = open(path, 'rb+') except OSError as e: if e.errno == errno.ENOENT: raise KeyError('No message with key: %s' % key) else: raise try: if self._locked: _lock_file(f) try: os.close(os.open(path, os.O_WRONLY | os.O_TRUNC)) self._dump_message(message, f) if isinstance(message, MHMessage): self._dump_sequences(message, key) finally: if self._locked: _unlock_file(f) finally: _sync_close(f)
def __init__(self, name, mode): mode = { "r": os.O_RDONLY, "w": os.O_WRONLY | os.O_CREAT | os.O_TRUNC, }[mode] if hasattr(os, "O_BINARY"): mode |= os.O_BINARY self.fd = os.open(name, mode, 0o666)
def __init__(self, name, mode): mode = { "r": os.O_RDONLY, "w": os.O_WRONLY | os.O_CREAT | os.O_TRUNC, }[mode] if hasattr(os, "O_BINARY"): mode |= os.O_BINARY self.fd = os.open(name, mode, 0666)
def set_sequences(self, sequences): """Set sequences using the given name-to-key-list dictionary.""" f = open(os.path.join(self._path, '.mh_sequences'), 'r+') try: os.close(os.open(f.name, os.O_WRONLY | os.O_TRUNC)) for name, keys in sequences.iteritems(): if len(keys) == 0: continue f.write('%s:' % name) prev = None completing = False for key in sorted(set(keys)): if key - 1 == prev: if not completing: completing = True f.write('-') elif completing: completing = False f.write('%s %s' % (prev, key)) else: f.write(' %s' % key) prev = key if completing: f.write(str(prev) + '\n') else: f.write('\n') finally: _sync_close(f)
def lock (self): ''' Creates and holds on to the lock file with exclusive access. Returns True if lock successful, False if it is not, and raises an exception upon operating system errors encountered creating the lock file. ''' try: # # Create or else open and trucate lock file, in read-write mode. # # A crashed app might not delete the lock file, so the # os.O_CREAT | os.O_EXCL combination that guarantees # atomic create isn't useful here. That is, we don't want to # fail locking just because the file exists. # # Could use os.O_EXLOCK, but that doesn't exist yet in my Python # self.lockfd = os.open (self.lockfile, os.O_TRUNC | os.O_CREAT | os.O_RDWR) # Acquire exclusive lock on the file, but don't block waiting for it fcntl.flock (self.lockfd, fcntl.LOCK_EX | fcntl.LOCK_NB) # Writing to file is pointless, nobody can see it os.write (self.lockfd, "My Lockfile") return True except (OSError, IOError), e: # Lock cannot be acquired is okay, everything else reraise exception if e.errno in (errno.EACCES, errno.EAGAIN): return False else: raise
def init(ctx): global LOGFILE filename = os.path.abspath(LOGFILE) try: os.makedirs(os.path.dirname(os.path.abspath(filename))) except OSError: pass if hasattr(os, 'O_NOINHERIT'): fd = os.open(LOGFILE, os.O_CREAT | os.O_TRUNC | os.O_WRONLY | os.O_NOINHERIT) fileobj = os.fdopen(fd, 'w') else: fileobj = open(LOGFILE, 'w') old_stderr = sys.stderr # sys.stdout has already been replaced, so __stdout__ will be faster #sys.stdout = log_to_file(sys.stdout, fileobj, filename) #sys.stderr = log_to_file(sys.stderr, fileobj, filename) def wrap(stream): if stream.isatty(): return ansiterm.AnsiTerm(stream) return stream sys.stdout = log_to_file(wrap(sys.__stdout__), fileobj, filename) sys.stderr = log_to_file(wrap(sys.__stderr__), fileobj, filename) # now mess with the logging module... for x in Logs.log.handlers: try: stream = x.stream except AttributeError: pass else: if id(stream) == id(old_stderr): x.stream = sys.stderr
def init(ctx): global LOGFILE filename = os.path.abspath(LOGFILE) try: os.makedirs(os.path.dirname(os.path.abspath(filename))) except OSError: pass if hasattr(os, 'O_NOINHERIT'): fd = os.open(LOGFILE, os.O_CREAT | os.O_TRUNC | os.O_WRONLY | os.O_NOINHERIT) fileobj = os.fdopen(fd, 'w') else: fileobj = open(LOGFILE, 'w') old_stderr = sys.stderr # sys.stdout has already been replaced, so __stdout__ will be faster #sys.stdout = log_to_file(sys.stdout, fileobj, filename) #sys.stderr = log_to_file(sys.stderr, fileobj, filename) sys.stdout = log_to_file(sys.__stdout__, fileobj, filename) sys.stderr = log_to_file(sys.__stderr__, fileobj, filename) # now mess with the logging module... for x in Logs.log.handlers: try: stream = x.stream except AttributeError: pass else: if id(stream) == id(old_stderr): x.stream = sys.stderr
def save(self): config_file = RelativePathResolver().absolute("config.json") try: # Check if file exists (supports also hidden files) if os.path.isfile(config_file): # Use write mode that also works with hidden files with open(os.open(config_file, os.O_WRONLY | os.O_TRUNC), 'w') as file: json.dump(self.serialize(), file) else: # Simply create a new file with open(config_file, "w") as file: json.dump(self.serialize(), file) except IOError: pass
def __init__(self, server, filename, flags, attrs): self.server = server openFlags = 0 if flags & FXF_READ == FXF_READ and flags & FXF_WRITE == 0: openFlags = os.O_RDONLY if flags & FXF_WRITE == FXF_WRITE and flags & FXF_READ == 0: openFlags = os.O_WRONLY if flags & FXF_WRITE == FXF_WRITE and flags & FXF_READ == FXF_READ: openFlags = os.O_RDWR if flags & FXF_APPEND == FXF_APPEND: openFlags |= os.O_APPEND if flags & FXF_CREAT == FXF_CREAT: openFlags |= os.O_CREAT if flags & FXF_TRUNC == FXF_TRUNC: openFlags |= os.O_TRUNC if flags & FXF_EXCL == FXF_EXCL: openFlags |= os.O_EXCL if attrs.has_key("permissions"): mode = attrs["permissions"] del attrs["permissions"] else: mode = 0777 fd = server.avatar._runAsUser(os.open, filename, openFlags, mode) if attrs: server.avatar._runAsUser(server._setAttrs, filename, attrs) self.fd = fd