我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.O_RDONLY。
def main(): ''' Run code specifed by data received over pipe ''' assert is_forking(sys.argv) handle = int(sys.argv[-1]) fd = msvcrt.open_osfhandle(handle, os.O_RDONLY) from_parent = os.fdopen(fd, 'rb') process.current_process()._inheriting = True preparation_data = load(from_parent) prepare(preparation_data) self = load(from_parent) process.current_process()._inheriting = False from_parent.close() exitcode = self._bootstrap() exit(exitcode)
def get_terminal_size(): def ioctl_GWINSZ(fd): try: import fcntl import termios import struct cr = struct.unpack('hh', fcntl.ioctl(fd, termios.TIOCGWINSZ, '1234')) except: return None return cr cr = ioctl_GWINSZ(0) or ioctl_GWINSZ(1) or ioctl_GWINSZ(2) if not cr: try: fd = os.open(os.ctermid(), os.O_RDONLY) cr = ioctl_GWINSZ(fd) os.close(fd) except: pass if not cr: try: cr = (os.env['LINES'], os.env['COLUMNS']) except: cr = (25, 80) return int(cr[1]), int(cr[0])
def get_path_uid(path): """ Return path's uid. Does not follow symlinks: https://github.com/pypa/pip/pull/935#discussion_r5307003 Placed this function in backwardcompat due to differences on AIX and Jython, that should eventually go away. :raises OSError: When path is a symlink or can't be read. """ if hasattr(os, 'O_NOFOLLOW'): fd = os.open(path, os.O_RDONLY | os.O_NOFOLLOW) file_uid = os.fstat(fd).st_uid os.close(fd) else: # AIX and Jython # WARNING: time of check vulnerabity, but best we can do w/o NOFOLLOW if not os.path.islink(path): # older versions of Jython don't have `os.fstat` file_uid = os.stat(path).st_uid else: # raise OSError for parity with os.O_NOFOLLOW above raise OSError("%s is a symlink; Will not return uid for symlinks" % path) return file_uid
def testOutputStreams(self): output_spec = { 'mode': 'http', 'method': 'PUT', 'url': 'http://localhost:%d' % _socket_port } fd = os.open(_pipepath, os.O_RDONLY | os.O_NONBLOCK) adapters = { fd: make_stream_push_adapter(output_spec) } cmd = [sys.executable, _oscript, _pipepath] try: with captureOutput() as stdpipes: run_process(cmd, adapters) except Exception: print('Stdout/stderr from exception: ') print(stdpipes) raise self.assertEqual(stdpipes, ['start\ndone\n', '']) self.assertEqual(len(_req_chunks), 1) self.assertEqual(_req_chunks[0], (9, 'a message'))
def h_file_win32(fname): try: fd = os.open(fname, os.O_BINARY | os.O_RDONLY | os.O_NOINHERIT) except OSError: raise IOError('Cannot read from %r' % fname) f = os.fdopen(fd, 'rb') m = md5() try: while fname: fname = f.read(200000) m.update(fname) finally: f.close() return m.digest() # always save these
def follow_file(filename): while True: try: fd = os.open(filename, os.O_RDONLY | os.O_NONBLOCK) except OSError: yield None continue try: inode = os.fstat(fd).st_ino first = True while True: try: stat = os.stat(filename) except OSError: stat = None yield first, time.time(), fd if stat is None or inode != stat.st_ino: break first = False finally: os.close(fd)
def start(self): try: os.fstat(self._savefd) except OSError: raise ValueError("saved filedescriptor not valid, " "did you call start() twice?") if self.targetfd == 0 and not self.tmpfile: fd = os.open(devnullpath, os.O_RDONLY) os.dup2(fd, 0) os.close(fd) if hasattr(self, '_oldsys'): setattr(sys, patchsysdict[self.targetfd], DontReadFromInput()) else: os.dup2(self.tmpfile.fileno(), self.targetfd) if hasattr(self, '_oldsys'): setattr(sys, patchsysdict[self.targetfd], self.tmpfile)
def main(): ''' Run code specified by data received over pipe ''' assert is_forking(sys.argv) handle = int(sys.argv[-1]) fd = msvcrt.open_osfhandle(handle, os.O_RDONLY) from_parent = os.fdopen(fd, 'rb') process.current_process()._inheriting = True preparation_data = load(from_parent) prepare(preparation_data) self = load(from_parent) process.current_process()._inheriting = False from_parent.close() exitcode = self._bootstrap() exit(exitcode)
def _get_terminal_size(): def ioctl_GWINSZ(fd): try: import fcntl, termi_os, struct cr = struct.unpack('hh', fcntl.ioctl(fd, termi_os.TIOCGWINSZ,'1234')) except: return return cr cr = ioctl_GWINSZ(0) or ioctl_GWINSZ(1) or ioctl_GWINSZ(2) if not cr: try: fd = _os.open(_os.ctermid(), _os.O_RDONLY) cr = ioctl_GWINSZ(fd) _os.cl_ose(fd) except: pass if not cr: cr = (_os.environ.get('LINES', 25), _os.environ.get('COLUMNS', 80)) return list(map(int, cr))
def _get_terminal_size_linux(): def ioctl_GWINSZ(fd): try: import fcntl import termios cr = struct.unpack('hh', fcntl.ioctl(fd, termios.TIOCGWINSZ, '1234')) return cr except: pass cr = ioctl_GWINSZ(0) or ioctl_GWINSZ(1) or ioctl_GWINSZ(2) if not cr: try: fd = os.open(os.ctermid(), os.O_RDONLY) cr = ioctl_GWINSZ(fd) os.close(fd) except: pass if not cr: try: cr = (os.environ['LINES'], os.environ['COLUMNS']) except: return None return int(cr[1]), int(cr[0])
def _convert_pflags(self, pflags): "convert SFTP-style open() flags to python's os.open() flags" if (pflags & SFTP_FLAG_READ) and (pflags & SFTP_FLAG_WRITE): flags = os.O_RDWR elif pflags & SFTP_FLAG_WRITE: flags = os.O_WRONLY else: flags = os.O_RDONLY if pflags & SFTP_FLAG_APPEND: flags |= os.O_APPEND if pflags & SFTP_FLAG_CREATE: flags |= os.O_CREAT if pflags & SFTP_FLAG_TRUNC: flags |= os.O_TRUNC if pflags & SFTP_FLAG_EXCL: flags |= os.O_EXCL return flags
def __init__(self, slave=None, family=0, extra=None): Bus.__init__(self, "ONEWIRE", "/sys/bus/w1/devices/w1_bus_master1/w1_master_slaves", os.O_RDONLY) if self.fd > 0: os.close(self.fd) self.fd = 0 self.family = family if slave != None: addr = slave.split("-") if len(addr) == 1: self.slave = "%02x-%s" % (family, slave) elif len(addr) == 2: prefix = int(addr[0], 16) if family > 0 and family != prefix: raise Exception("1-Wire slave address %s does not match family %02x" % (slave, family)) self.slave = slave else: devices = self.deviceList() if len(devices) == 0: raise Exception("No device match family %02x" % family) self.slave = devices[0] loadExtraModule(extra)
def _getTerminalSize_linux(): def ioctl_GWINSZ(fd): try: import fcntl, termios, struct, os cr = struct.unpack('hh', fcntl.ioctl(fd, termios.TIOCGWINSZ,'1234')) except: return None return cr cr = ioctl_GWINSZ(0) or ioctl_GWINSZ(1) or ioctl_GWINSZ(2) if not cr: try: fd = os.open(os.ctermid(), os.O_RDONLY) cr = ioctl_GWINSZ(fd) os.close(fd) except: pass if not cr: try: cr = (env['LINES'], env['COLUMNS']) except: return None return int(cr[1]), int(cr[0])
def getTerminalColumns(self): def ioctl_GWINSZ(fd): try: cr = struct.unpack('hh', fcntl.ioctl(fd, self.termios.TIOCGWINSZ, '1234')) except: return None return cr cr = ioctl_GWINSZ(sys.stdout.fileno()) if not cr: try: fd = os.open(os.ctermid(), os.O_RDONLY) cr = ioctl_GWINSZ(fd) os.close(fd) except: pass if not cr: try: cr = (env['LINES'], env['COLUMNS']) except: cr = (25, 80) return cr
def _getTerminalSize_linux(): def ioctl_GWINSZ(fd): try: import fcntl, termios, struct, os cr = struct.unpack('hh', fcntl.ioctl(fd, termios.TIOCGWINSZ, '1234')) except: return None return cr cr = ioctl_GWINSZ(0) or ioctl_GWINSZ(1) or ioctl_GWINSZ(2) if not cr: try: fd = os.open(os.ctermid(), os.O_RDONLY) cr = ioctl_GWINSZ(fd) os.close(fd) except: pass if not cr: try: cr = (os.environ['LINES'], os.environ['COLUMNS']) except: return None return int(cr[1]), int(cr[0])
def _get_random_reader(): global _random_reader, _random_init if not _random_init: try: import random try: _random_reader = open("/dev/urandom","rb").read except: _random_reader = os.fdopen(os.open("/dev/random",os.O_RDONLY|os.O_NONBLOCK),"rb").read sec,usec = _gettimeofday() random.seed((os.getpid() << 16) ^ os.getuid() ^ sec ^ usec) except: pass _random_init = True sec,usec = _gettimeofday() i = (sec ^ usec) & 0x1F while i>0: _randint(0,_maxint) i -= 1 return _random_reader
def __init__(self, name, mode): mode = { "r": os.O_RDONLY, "w": os.O_WRONLY | os.O_CREAT | os.O_TRUNC, }[mode] if hasattr(os, "O_BINARY"): mode |= os.O_BINARY self.fd = os.open(name, mode, 0o666)
def get_path_uid(path): """ Return path's uid. Does not follow symlinks: https://github.com/pypa/pip/pull/935#discussion_r5307003 Placed this function in compat due to differences on AIX and Jython, that should eventually go away. :raises OSError: When path is a symlink or can't be read. """ if hasattr(os, 'O_NOFOLLOW'): fd = os.open(path, os.O_RDONLY | os.O_NOFOLLOW) file_uid = os.fstat(fd).st_uid os.close(fd) else: # AIX and Jython # WARNING: time of check vulnerability, but best we can do w/o NOFOLLOW if not os.path.islink(path): # older versions of Jython don't have `os.fstat` file_uid = os.stat(path).st_uid else: # raise OSError for parity with os.O_NOFOLLOW above raise OSError( "%s is a symlink; Will not return uid for symlinks" % path ) return file_uid
def get_terminal_size(): """Returns a tuple (x, y) representing the width(x) and the height(x) in characters of the terminal window.""" def ioctl_GWINSZ(fd): try: import fcntl import termios import struct cr = struct.unpack( 'hh', fcntl.ioctl(fd, termios.TIOCGWINSZ, '1234') ) except: return None if cr == (0, 0): return None return cr cr = ioctl_GWINSZ(0) or ioctl_GWINSZ(1) or ioctl_GWINSZ(2) if not cr: try: fd = os.open(os.ctermid(), os.O_RDONLY) cr = ioctl_GWINSZ(fd) os.close(fd) except: pass if not cr: cr = (os.environ.get('LINES', 25), os.environ.get('COLUMNS', 80)) return int(cr[1]), int(cr[0])
def _stat(fn): fd = _os.open(fn, _os.O_RDONLY) _os.close(fd)
def open(self, flags): os_flags = { 'r': os.O_RDONLY, 'w': os.O_WRONLY, 'w+': os.O_WRONLY | os.O_CREAT, } try: self._fd = os.open(self.url.path, os_flags[flags]) except KeyError: raise ValueError('Unknow flags: \'%s\'' % flags)
def open(self, flags): os.mkfifo(self._pipe) self._thread = threading.Thread(target=self._client.get, daemon=True, args=(self.url.path, self._pipe)) self._thread.start() self._fd = os.open(self._pipe, os.O_RDONLY)
def __init__(self, name, mode): mode = { "r": os.O_RDONLY, "w": os.O_WRONLY | os.O_CREAT | os.O_TRUNC, }[mode] if hasattr(os, "O_BINARY"): mode |= os.O_BINARY self.fd = os.open(name, mode, 0666)
def get_terminal_size(): """Returns a tuple (x, y) representing the width(x) and the height(x) in characters of the terminal window.""" def ioctl_GWINSZ(fd): try: import fcntl import termios import struct cr = struct.unpack('hh', fcntl.ioctl(fd, termios.TIOCGWINSZ, '1234')) except: return None if cr == (0, 0): return None if cr == (0, 0): return None return cr cr = ioctl_GWINSZ(0) or ioctl_GWINSZ(1) or ioctl_GWINSZ(2) if not cr: try: fd = os.open(os.ctermid(), os.O_RDONLY) cr = ioctl_GWINSZ(fd) os.close(fd) except: pass if not cr: cr = (os.environ.get('LINES', 25), os.environ.get('COLUMNS', 80)) return int(cr[1]), int(cr[0])
def open(self): self._pipe.open(os.O_RDONLY | os.O_NONBLOCK)
def getTerminalSize(): """ returns the terminal width and height :return: the terminal width and height """ import os env = os.environ def ioctl_GWINSZ(fdi): # noinspection PyBroadException try: import fcntl import termios import struct cri = struct.unpack('hh', fcntl.ioctl(fdi, termios.TIOCGWINSZ, '1234')) except: return return cri cr = ioctl_GWINSZ(0) or ioctl_GWINSZ(1) or ioctl_GWINSZ(2) if not cr: # noinspection PyBroadException try: fd = os.open(os.ctermid(), os.O_RDONLY) cr = ioctl_GWINSZ(fd) os.close(fd) except: pass if not cr: cr = (env.get('LINES', 25), env.get('COLUMNS', 80)) return int(cr[1]), int(cr[0])
def readf_win32(f, m='r', encoding='ISO8859-1'): flags = os.O_NOINHERIT | os.O_RDONLY if 'b' in m: flags |= os.O_BINARY if '+' in m: flags |= os.O_RDWR try: fd = os.open(f, flags) except OSError: raise IOError('Cannot read from %r' % f) if sys.hexversion > 0x3000000 and not 'b' in m: m += 'b' f = os.fdopen(fd, m) try: txt = f.read() finally: f.close() if encoding: txt = txt.decode(encoding) else: txt = txt.decode() else: f = os.fdopen(fd, m) try: txt = f.read() finally: f.close() return txt
def get_path_uid(path): """ Return path's uid. Does not follow symlinks: https://github.com/pypa/pip/pull/935#discussion_r5307003 Placed this function in compat due to differences on AIX and Jython, that should eventually go away. :raises OSError: When path is a symlink or can't be read. """ if hasattr(os, 'O_NOFOLLOW'): fd = os.open(path, os.O_RDONLY | os.O_NOFOLLOW) file_uid = os.fstat(fd).st_uid os.close(fd) else: # AIX and Jython # WARNING: time of check vulnerabity, but best we can do w/o NOFOLLOW if not os.path.islink(path): # older versions of Jython don't have `os.fstat` file_uid = os.stat(path).st_uid else: # raise OSError for parity with os.O_NOFOLLOW above raise OSError( "%s is a symlink; Will not return uid for symlinks" % path ) return file_uid
def get_terminal_size(): """Returns the current size of the terminal as tuple in the form ``(width, height)`` in columns and rows. """ # If shutil has get_terminal_size() (Python 3.3 and later) use that if sys.version_info >= (3, 3): import shutil shutil_get_terminal_size = getattr(shutil, 'get_terminal_size', None) if shutil_get_terminal_size: sz = shutil_get_terminal_size() return sz.columns, sz.lines if get_winterm_size is not None: return get_winterm_size() def ioctl_gwinsz(fd): try: import fcntl import termios cr = struct.unpack( 'hh', fcntl.ioctl(fd, termios.TIOCGWINSZ, '1234')) except Exception: return return cr cr = ioctl_gwinsz(0) or ioctl_gwinsz(1) or ioctl_gwinsz(2) if not cr: try: fd = os.open(os.ctermid(), os.O_RDONLY) try: cr = ioctl_gwinsz(fd) finally: os.close(fd) except Exception: pass if not cr or not cr[0] or not cr[1]: cr = (os.environ.get('LINES', 25), os.environ.get('COLUMNS', DEFAULT_COLUMNS)) return int(cr[1]), int(cr[0])