我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.open()。
def _validate_lockfile(self): """Check existence and validity of lockfile. If the lockfile exists, but contains an invalid PID or the PID of a non-existant process, it is removed. """ try: with open(self.lockfile) as fp: s = fp.read() except Exception: return try: pid = int(s) except ValueError: return self.release() from background import _process_exists if not _process_exists(pid): self.release()
def atomic_writer(file_path, mode): """Atomic file writer. :param file_path: path of file to write to. :type file_path: ``unicode`` :param mode: sames as for `func:open` :type mode: string .. versionadded:: 1.12 Context manager that ensures the file is only written if the write succeeds. The data is first written to a temporary file. """ temp_suffix = '.aw.temp' temp_file_path = file_path + temp_suffix with open(temp_file_path, mode) as file_obj: try: yield file_obj os.rename(temp_file_path, file_path) finally: try: os.remove(temp_file_path) except (OSError, IOError): pass
def write_pid_to_pidfile(pidfile_path): """ Write the PID in the named PID file. Get the numeric process ID (“PID”) of the current process and write it to the named file as a line of text. """ open_flags = (os.O_CREAT | os.O_EXCL | os.O_WRONLY) open_mode = 0o644 pidfile_fd = os.open(pidfile_path, open_flags, open_mode) pidfile = os.fdopen(pidfile_fd, 'w') # According to the FHS 2.3 section on PID files in /var/run: # # The file must consist of the process identifier in # ASCII-encoded decimal, followed by a newline character. For # example, if crond was process number 25, /var/run/crond.pid # would contain three characters: two, five, and newline. pid = os.getpid() pidfile.write("%s\n" % pid) pidfile.close()
def read_text_file(filename): """Return the contents of *filename*. Try to decode the file contents with utf-8, the preferred system encoding (e.g., cp1252 on some Windows machines), and latin1, in that order. Decoding a byte string with latin1 will never raise an error. In the worst case, the returned string will contain some garbage characters. """ with open(filename, 'rb') as fp: data = fp.read() encodings = ['utf-8', locale.getpreferredencoding(False), 'latin1'] for enc in encodings: try: data = data.decode(enc) except UnicodeDecodeError: continue break assert type(data) != bytes # Latin1 should have worked. return data
def run_script(self, script_name, namespace): script = 'scripts/' + script_name if not self.has_metadata(script): raise ResolutionError("No script named %r" % script_name) script_text = self.get_metadata(script).replace('\r\n', '\n') script_text = script_text.replace('\r', '\n') script_filename = self._fn(self.egg_info, script) namespace['__file__'] = script_filename if os.path.exists(script_filename): source = open(script_filename).read() code = compile(source, script_filename, 'exec') exec(code, namespace, namespace) else: from linecache import cache cache[script_filename] = ( len(script_text), 0, script_text.split('\n'), script_filename ) script_code = compile(script_text, script_filename, 'exec') exec(script_code, namespace, namespace)
def bz2open(cls, name, mode="r", fileobj=None, compresslevel=9, **kwargs): """Open bzip2 compressed tar archive name for reading or writing. Appending is not allowed. """ if len(mode) > 1 or mode not in "rw": raise ValueError("mode must be 'r' or 'w'.") try: import bz2 except ImportError: raise CompressionError("bz2 module is not available") if fileobj is not None: fileobj = _BZ2Proxy(fileobj, mode) else: fileobj = bz2.BZ2File(name, mode, compresslevel=compresslevel) try: t = cls.taropen(name, mode, fileobj, **kwargs) except (IOError, EOFError): raise ReadError("not a bzip2 file") t._extfileobj = False return t # All *open() methods are registered here.
def _mkstemp_inner(dir, pre, suf, flags): """Code common to mkstemp, TemporaryFile, and NamedTemporaryFile.""" names = _get_candidate_names() for seq in xrange(TMP_MAX): name = names.next() file = _os.path.join(dir, pre + name + suf) try: fd = _os.open(file, flags, 0600) _set_cloexec(fd) return (fd, _os.path.abspath(file)) except OSError, e: if e.errno == _errno.EEXIST: continue # try again raise raise IOError, (_errno.EEXIST, "No usable temporary file name found") # User visible interfaces.
def get_message(self, key): """Return a Message representation or raise a KeyError.""" subpath = self._lookup(key) f = open(os.path.join(self._path, subpath), 'r') try: if self._factory: msg = self._factory(f) else: msg = MaildirMessage(f) finally: f.close() subdir, name = os.path.split(subpath) msg.set_subdir(subdir) if self.colon in name: msg.set_info(name.split(self.colon)[-1]) msg.set_date(os.path.getmtime(os.path.join(self._path, subpath))) return msg
def __setitem__(self, key, message): """Replace the keyed message; raise KeyError if it doesn't exist.""" path = os.path.join(self._path, str(key)) try: f = open(path, 'rb+') except IOError, e: if e.errno == errno.ENOENT: raise KeyError('No message with key: %s' % key) else: raise try: if self._locked: _lock_file(f) try: os.close(os.open(path, os.O_WRONLY | os.O_TRUNC)) self._dump_message(message, f) if isinstance(message, MHMessage): self._dump_sequences(message, key) finally: if self._locked: _unlock_file(f) finally: _sync_close(f)
def get_terminal_size(): def ioctl_GWINSZ(fd): try: import fcntl import termios import struct cr = struct.unpack('hh', fcntl.ioctl(fd, termios.TIOCGWINSZ, '1234')) except: return None return cr cr = ioctl_GWINSZ(0) or ioctl_GWINSZ(1) or ioctl_GWINSZ(2) if not cr: try: fd = os.open(os.ctermid(), os.O_RDONLY) cr = ioctl_GWINSZ(fd) os.close(fd) except: pass if not cr: try: cr = (os.env['LINES'], os.env['COLUMNS']) except: cr = (25, 80) return int(cr[1]), int(cr[0])
def __save(self): if self.__asynchronous == 0: state = { "version" : _BobState.CUR_VERSION, "byNameDirs" : self.__byNameDirs, "results" : self.__results, "inputs" : self.__inputs, "jenkins" : self.__jenkins, "dirStates" : self.__dirStates, "buildState" : self.__buildState, } tmpFile = self.__path+".new" try: with open(tmpFile, "wb") as f: pickle.dump(state, f) f.flush() os.fsync(f.fileno()) os.replace(tmpFile, self.__path) except OSError as e: raise ParseError("Error saving workspace state: " + str(e)) self.__dirty = False else: self.__dirty = True
def dump(cls, obj, file_obj): """Serialize object ``obj`` to open pickle file. .. versionadded:: 1.8 :param obj: Python object to serialize :type obj: Python object :param file_obj: file handle :type file_obj: ``file`` object """ return pickle.dump(obj, file_obj, protocol=-1) # Set up default manager and register built-in serializers
def get_path_uid(path): """ Return path's uid. Does not follow symlinks: https://github.com/pypa/pip/pull/935#discussion_r5307003 Placed this function in backwardcompat due to differences on AIX and Jython, that should eventually go away. :raises OSError: When path is a symlink or can't be read. """ if hasattr(os, 'O_NOFOLLOW'): fd = os.open(path, os.O_RDONLY | os.O_NOFOLLOW) file_uid = os.fstat(fd).st_uid os.close(fd) else: # AIX and Jython # WARNING: time of check vulnerabity, but best we can do w/o NOFOLLOW if not os.path.islink(path): # older versions of Jython don't have `os.fstat` file_uid = os.stat(path).st_uid else: # raise OSError for parity with os.O_NOFOLLOW above raise OSError("%s is a symlink; Will not return uid for symlinks" % path) return file_uid
def writeKeyToFile(key, filename): """Write **key** to **filename**, with ``0400`` permissions. If **filename** doesn't exist, it will be created. If it does exist already, and is writable by the owner of the current process, then it will be truncated to zero-length and overwritten. :param bytes key: A key (or some other private data) to write to **filename**. :param str filename: The path of the file to write to. :raises: Any exceptions which may occur. """ logging.info("Writing key to file: %r", filename) flags = os.O_WRONLY | os.O_TRUNC | os.O_CREAT | getattr(os, "O_BIN", 0) fd = os.open(filename, flags, 0400) os.write(fd, key) os.fsync(fd) os.close(fd)
def do_magic(self): if OS_WIN: try: if os.path.exists(LOCK_PATH): os.unlink(LOCK_PATH) self.fh = os.open(LOCK_PATH, os.O_CREAT | os.O_EXCL | os.O_RDWR) except EnvironmentError as err: if err.errno == 13: self.is_running = True else: raise else: try: self.fh = open(LOCK_PATH, 'w') fcntl.lockf(self.fh, fcntl.LOCK_EX | fcntl.LOCK_NB) except EnvironmentError as err: if self.fh is not None: self.is_running = True else: raise
def wipe(self): filter_bitmap_fd = os.open("/dev/shm/kafl_filter0", os.O_RDWR | os.O_SYNC | os.O_CREAT) os.ftruncate(filter_bitmap_fd, self.config.config_values['BITMAP_SHM_SIZE']) filter_bitmap = mmap.mmap(filter_bitmap_fd, self.config.config_values['BITMAP_SHM_SIZE'], mmap.MAP_SHARED, mmap.PROT_WRITE | mmap.PROT_READ) for i in range(self.config.config_values['BITMAP_SHM_SIZE']): filter_bitmap[i] = '\x00' filter_bitmap.close() os.close(filter_bitmap_fd) filter_bitmap_fd = os.open("/dev/shm/kafl_tfilter", os.O_RDWR | os.O_SYNC | os.O_CREAT) os.ftruncate(filter_bitmap_fd, 0x1000000) filter_bitmap = mmap.mmap(filter_bitmap_fd, 0x1000000, mmap.MAP_SHARED, mmap.PROT_WRITE | mmap.PROT_READ) for i in range(0x1000000): filter_bitmap[i] = '\x00' filter_bitmap.close() os.close(filter_bitmap_fd)
def load(cls, file_obj): """Load serialized object from open JSON file. .. versionadded:: 1.8 :param file_obj: file handle :type file_obj: ``file`` object :returns: object loaded from JSON file :rtype: object """ return json.load(file_obj)
def dump(cls, obj, file_obj): """Serialize object ``obj`` to open JSON file. .. versionadded:: 1.8 :param obj: Python object to serialize :type obj: JSON-serializable data structure :param file_obj: file handle :type file_obj: ``file`` object """ return json.dump(obj, file_obj, indent=2, encoding='utf-8')
def load(cls, file_obj): """Load serialized object from open pickle file. .. versionadded:: 1.8 :param file_obj: file handle :type file_obj: ``file`` object :returns: object loaded from pickle file :rtype: object """ return cPickle.load(file_obj)
def dump(cls, obj, file_obj): """Serialize object ``obj`` to open pickle file. .. versionadded:: 1.8 :param obj: Python object to serialize :type obj: Python object :param file_obj: file handle :type file_obj: ``file`` object """ return cPickle.dump(obj, file_obj, protocol=-1)
def load(cls, file_obj): """Load serialized object from open pickle file. .. versionadded:: 1.8 :param file_obj: file handle :type file_obj: ``file`` object :returns: object loaded from pickle file :rtype: object """ return pickle.load(file_obj)
def acquire(self, blocking=True): """Acquire the lock if possible. If the lock is in use and ``blocking`` is ``False``, return ``False``. Otherwise, check every `self.delay` seconds until it acquires lock or exceeds `self.timeout` and raises an `~AcquisitionError`. """ start = time.time() while True: self._validate_lockfile() try: fd = os.open(self.lockfile, os.O_CREAT | os.O_EXCL | os.O_RDWR) with os.fdopen(fd, 'w') as fd: fd.write('{0}'.format(os.getpid())) break except OSError as err: if err.errno != errno.EEXIST: # pragma: no cover raise if self.timeout and (time.time() - start) >= self.timeout: raise AcquisitionError('Lock acquisition timed out.') if not blocking: return False time.sleep(self.delay) self._locked = True return True
def _load(self): """Load cached settings from JSON file `self._filepath`.""" self._nosave = True d = {} with open(self._filepath, 'rb') as file_obj: for key, value in json.load(file_obj, encoding='utf-8').items(): d[key] = value self.update(d) self._original = deepcopy(d) self._nosave = False
def debugging(self): """Whether Alfred's debugger is open. :returns: ``True`` if Alfred's debugger is open. :rtype: ``bool`` """ if self._debugging is None: if self.alfred_env.get('debug') == 1: self._debugging = True else: self._debugging = False return self._debugging
def cached_data(self, name, data_func=None, max_age=60): """Return cached data if younger than ``max_age`` seconds. Retrieve data from cache or re-generate and re-cache data if stale/non-existant. If ``max_age`` is 0, return cached data no matter how old. :param name: name of datastore :param data_func: function to (re-)generate data. :type data_func: ``callable`` :param max_age: maximum age of cached data in seconds :type max_age: ``int`` :returns: cached data, return value of ``data_func`` or ``None`` if ``data_func`` is not set """ serializer = manager.serializer(self.cache_serializer) cache_path = self.cachefile('%s.%s' % (name, self.cache_serializer)) age = self.cached_data_age(name) if (age < max_age or max_age == 0) and os.path.exists(cache_path): with open(cache_path, 'rb') as file_obj: self.logger.debug('Loading cached data from : %s', cache_path) return serializer.load(file_obj) if not data_func: return None data = data_func() self.cache_data(name, data) return data
def open_log(self): """Open :attr:`logfile` in default app (usually Console.app).""" subprocess.call(['open', self.logfile])
def open_cachedir(self): """Open the workflow's :attr:`cachedir` in Finder.""" subprocess.call(['open', self.cachedir])
def open_workflowdir(self): """Open the workflow's :attr:`workflowdir` in Finder.""" subprocess.call(['open', self.workflowdir])
def open_terminal(self): """Open a Terminal window at workflow's :attr:`workflowdir`.""" subprocess.call(['open', '-a', 'Terminal', self.workflowdir])
def open_help(self): """Open :attr:`help_url` in default browser.""" subprocess.call(['open', self.help_url]) return 'Opening workflow help URL in browser' #################################################################### # Helper methods ####################################################################
def read_pid_from_pidfile(pidfile_path): """ Read the PID recorded in the named PID file. Read and return the numeric PID recorded as text in the named PID file. If the PID file cannot be read, or if the content is not a valid PID, return ``None``. """ pid = None try: pidfile = open(pidfile_path, 'r') except IOError: pass else: # According to the FHS 2.3 section on PID files in /var/run: # # The file must consist of the process identifier in # ASCII-encoded decimal, followed by a newline character. # # Programs that read PID files should be somewhat flexible # in what they accept; i.e., they should ignore extra # whitespace, leading zeroes, absence of the trailing # newline, or additional lines in the PID file. line = pidfile.readline().strip() try: pid = int(line) except ValueError: pass pidfile.close() return pid
def __init__(self, name, mode): mode = { "r": os.O_RDONLY, "w": os.O_WRONLY | os.O_CREAT | os.O_TRUNC, }[mode] if hasattr(os, "O_BINARY"): mode |= os.O_BINARY self.fd = os.open(name, mode, 0o666)
def bz2open(cls, name, mode="r", fileobj=None, compresslevel=9, **kwargs): """Open bzip2 compressed tar archive name for reading or writing. Appending is not allowed. """ if len(mode) > 1 or mode not in "rw": raise ValueError("mode must be 'r' or 'w'.") try: import bz2 except ImportError: raise CompressionError("bz2 module is not available") if fileobj is not None: fileobj = _BZ2Proxy(fileobj, mode) else: fileobj = bz2.BZ2File(name, mode, compresslevel=compresslevel) try: t = cls.taropen(name, mode, fileobj, **kwargs) except (IOError, EOFError): fileobj.close() raise ReadError("not a bzip2 file") t._extfileobj = False return t # All *open() methods are registered here.
def _check(self, mode=None): """Check if TarFile is still open, and if the operation's mode corresponds to TarFile's mode. """ if self.closed: raise IOError("%s is closed" % self.__class__.__name__) if mode is not None and self.mode not in mode: raise IOError("bad operation for mode %r" % self.mode)
def get_resource_stream(self, manager, resource_name): return open(self._fn(self.module_path, resource_name), 'rb')
def _get(self, path): with open(path, 'rb') as stream: return stream.read()
def _is_current(self, file_path, zip_path): """ Return True if the file_path is current for this zip_path """ timestamp, size = self._get_date_and_size(self.zipinfo[zip_path]) if not os.path.isfile(file_path): return False stat = os.stat(file_path) if stat.st_size != size or stat.st_mtime != timestamp: return False # check that the contents match zip_contents = self.loader.get_data(zip_path) with open(file_path, 'rb') as f: file_contents = f.read() return zip_contents == file_contents
def get_metadata(self, name): if name != 'PKG-INFO': raise KeyError("No metadata except PKG-INFO is available") with io.open(self.path, encoding='utf-8', errors="replace") as f: metadata = f.read() self._warn_on_replacement(metadata) return metadata
def _mkstemp(*args, **kw): old_open = os.open try: # temporarily bypass sandboxing os.open = os_open return tempfile.mkstemp(*args, **kw) finally: # and then put it back os.open = old_open # Silence the PEP440Warning by default, so that end users don't get hit by it # randomly just because they use pkg_resources. We want to append the rule # because we want earlier uses of filterwarnings to take precedence over this # one.
def _secure_open_write(filename, fmode): # We only want to write to this file, so open it in write only mode flags = os.O_WRONLY # os.O_CREAT | os.O_EXCL will fail if the file already exists, so we only # will open *new* files. # We specify this because we want to ensure that the mode we pass is the # mode of the file. flags |= os.O_CREAT | os.O_EXCL # Do not follow symlinks to prevent someone from making a symlink that # we follow and insecurely open a cache file. if hasattr(os, "O_NOFOLLOW"): flags |= os.O_NOFOLLOW # On Windows we'll mark this file as binary if hasattr(os, "O_BINARY"): flags |= os.O_BINARY # Before we open our file, we want to delete any existing file that is # there try: os.remove(filename) except (IOError, OSError): # The file must not exist already, so we can just skip ahead to opening pass # Open our file, the use of os.O_CREAT | os.O_EXCL will ensure that if a # race condition happens between the os.remove and this line, that an # error will be raised. Because we utilize a lockfile this should only # happen if someone is attempting to attack us. fd = os.open(filename, flags, fmode) try: return os.fdopen(fd, "wb") except: # An error occurred wrapping our FD in a file object os.close(fd) raise
def get(self, key): name = self._fn(key) if not os.path.exists(name): return None with open(name, 'rb') as fh: return fh.read()
def get_path_uid(path): """ Return path's uid. Does not follow symlinks: https://github.com/pypa/pip/pull/935#discussion_r5307003 Placed this function in compat due to differences on AIX and Jython, that should eventually go away. :raises OSError: When path is a symlink or can't be read. """ if hasattr(os, 'O_NOFOLLOW'): fd = os.open(path, os.O_RDONLY | os.O_NOFOLLOW) file_uid = os.fstat(fd).st_uid os.close(fd) else: # AIX and Jython # WARNING: time of check vulnerability, but best we can do w/o NOFOLLOW if not os.path.islink(path): # older versions of Jython don't have `os.fstat` file_uid = os.stat(path).st_uid else: # raise OSError for parity with os.O_NOFOLLOW above raise OSError( "%s is a symlink; Will not return uid for symlinks" % path ) return file_uid