我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.fdopen()。
def empty_db(self): cmd = [ "mysqldump", "-u%(user)s" % self.db_config, "-h%(host)s" % self.db_config, "--add_drop-table", "--no-data", "%(name)s" % self.db_config, ] tmphandle, tmppath = tempfile.mkstemp(text=True) tmpfile = os.fdopen(tmphandle, "w") sql_data = subprocess.check_output(cmd, stderr=None).split('\n') tmpfile.write("SET FOREIGN_KEY_CHECKS = 0;\n") tmpfile.write("use %(name)s;\n" % self.db_config) for line in sql_data: if line.startswith("DROP"): tmpfile.write(line + '\n') tmpfile.close() self._run_mysql_cmd("source %s" % tmppath) os.remove(tmppath)
def main(): ''' Run code specifed by data received over pipe ''' assert is_forking(sys.argv) handle = int(sys.argv[-1]) fd = msvcrt.open_osfhandle(handle, os.O_RDONLY) from_parent = os.fdopen(fd, 'rb') process.current_process()._inheriting = True preparation_data = load(from_parent) prepare(preparation_data) self = load(from_parent) process.current_process()._inheriting = False from_parent.close() exitcode = self._bootstrap() exit(exitcode)
def write_pid_to_pidfile(pidfile_path): """ Write the PID in the named PID file. Get the numeric process ID (“PID”) of the current process and write it to the named file as a line of text. """ open_flags = (os.O_CREAT | os.O_EXCL | os.O_WRONLY) open_mode = 0o644 pidfile_fd = os.open(pidfile_path, open_flags, open_mode) pidfile = os.fdopen(pidfile_fd, 'w') # According to the FHS 2.3 section on PID files in /var/run: # # The file must consist of the process identifier in # ASCII-encoded decimal, followed by a newline character. For # example, if crond was process number 25, /var/run/crond.pid # would contain three characters: two, five, and newline. pid = os.getpid() pidfile.write("%s\n" % pid) pidfile.close()
def __init__(self, cmd, bufsize=-1): _cleanup() self.cmd = cmd p2cread, p2cwrite = os.pipe() c2pread, c2pwrite = os.pipe() self.pid = os.fork() if self.pid == 0: # Child os.dup2(p2cread, 0) os.dup2(c2pwrite, 1) os.dup2(c2pwrite, 2) self._run_child(cmd) os.close(p2cread) self.tochild = os.fdopen(p2cwrite, 'w', bufsize) os.close(c2pwrite) self.fromchild = os.fdopen(c2pread, 'r', bufsize)
def set(self, key, value, timeout=None): if timeout is None: timeout = int(time() + self.default_timeout) elif timeout != 0: timeout = int(time() + timeout) filename = self._get_filename(key) self._prune() try: fd, tmp = tempfile.mkstemp(suffix=self._fs_transaction_suffix, dir=self._path) with os.fdopen(fd, 'wb') as f: pickle.dump(timeout, f, 1) pickle.dump(value, f, pickle.HIGHEST_PROTOCOL) rename(tmp, filename) os.chmod(filename, self._mode) except (IOError, OSError): return False else: return True
def Set(self,key,data): path = self._GetPath(key) directory = os.path.dirname(path) if not os.path.exists(directory): os.makedirs(directory) if not os.path.isdir(directory): raise _FileCacheError('%s exists but is not a directory' % directory) temp_fd, temp_path = tempfile.mkstemp() temp_fp = os.fdopen(temp_fd, 'w') temp_fp.write(data) temp_fp.close() if not path.startswith(self._root_directory): raise _FileCacheError('%s does not appear to live under %s' % (path, self._root_directory)) if os.path.exists(path): os.remove(path) os.rename(temp_path, path)
def writef_win32(f, data, m='w', encoding='ISO8859-1'): if sys.hexversion > 0x3000000 and not 'b' in m: data = data.encode(encoding) m += 'b' flags = os.O_CREAT | os.O_TRUNC | os.O_WRONLY | os.O_NOINHERIT if 'b' in m: flags |= os.O_BINARY if '+' in m: flags |= os.O_RDWR try: fd = os.open(f, flags) except OSError: raise IOError('Cannot write to %r' % f) f = os.fdopen(fd, m) try: f.write(data) finally: f.close()
def h_file_win32(fname): try: fd = os.open(fname, os.O_BINARY | os.O_RDONLY | os.O_NOINHERIT) except OSError: raise IOError('Cannot read from %r' % fname) f = os.fdopen(fd, 'rb') m = md5() try: while fname: fname = f.read(200000) m.update(fname) finally: f.close() return m.digest() # always save these
def setup_environment(): root = os.getenv('LAMBDA_TASK_ROOT') bin_dir = os.path.join(root, 'bin') os.environ['PATH'] += ':' + bin_dir os.environ['GIT_EXEC_PATH'] = bin_dir ssh_dir = tempfile.mkdtemp() ssh_identity = os.path.join(ssh_dir, 'identity') with os.fdopen(os.open(ssh_identity, os.O_WRONLY | os.O_CREAT, 0o600), 'w') as f: f.write(base64.b64decode(os.getenv('SSH_IDENTITY'))) ssh_config = os.path.join(ssh_dir, 'config') with open(ssh_config, 'w') as f: f.write('CheckHostIP no\n' 'StrictHostKeyChecking yes\n' 'IdentityFile %s\n' 'UserKnownHostsFile %s\n' % (ssh_identity, os.path.join(root, 'known_hosts'))) os.environ['GIT_SSH_COMMAND'] = 'ssh -F %s' % ssh_config
def convert(cls, report, data): "converts the report data to another mimetype if necessary" input_format = report.template_extension output_format = report.extension or report.template_extension if output_format in MIMETYPES: return output_format, data fd, path = tempfile.mkstemp(suffix=(os.extsep + input_format), prefix='trytond_') oext = FORMAT2EXT.get(output_format, output_format) with os.fdopen(fd, 'wb+') as fp: fp.write(data) cmd = ['unoconv', '--connection=%s' % config.get('report', 'unoconv'), '-f', oext, '--stdout', path] try: proc = subprocess.Popen(cmd, stdout=subprocess.PIPE) stdoutdata, stderrdata = proc.communicate() if proc.wait() != 0: raise Exception(stderrdata) return oext, stdoutdata finally: os.remove(path)
def test_basic_config(): fd, path = tempfile.mkstemp() f = os.fdopen(fd,'w') f.write(yaml.dump(testcfg)) f.flush() cfg = ny.get_config(path) ny.write_supervisor_conf() config = ConfigParser.ConfigParser() config.readfp(open(cfg['supervisor.conf'])) # from IPython import embed # embed() print(config.get('program:testtunnel','command')) assert 'sshuttle -r 1.1.1.1 2.2.2.2 -x 3.3.3.3' in config.get('program:testtunnel','command')
def stdout_redirected(to): """Lifted from: https://stackoverflow.com/questions/4675728/redirect-stdout-to-a-file-in-python This is the only way I've found to redirect stdout with curses. This way the output from questionnaire can be piped to another program, without piping what's written to the terminal by the prompters. """ stdout = sys.stdout stdout_fd = fileno(stdout) # copy stdout_fd before it is overwritten with os.fdopen(os.dup(stdout_fd), 'wb') as copied: stdout.flush() # flush library buffers that dup2 knows nothing about try: os.dup2(fileno(to), stdout_fd) # $ exec >&to except ValueError: # filename with open(to, 'wb') as to_file: os.dup2(to_file.fileno(), stdout_fd) # $ exec > to try: yield stdout # allow code to be run with the redirected stdout finally: # restore stdout to its previous value stdout.flush() os.dup2(copied.fileno(), stdout_fd)
def main(): ''' Run code specified by data received over pipe ''' assert is_forking(sys.argv) handle = int(sys.argv[-1]) fd = msvcrt.open_osfhandle(handle, os.O_RDONLY) from_parent = os.fdopen(fd, 'rb') process.current_process()._inheriting = True preparation_data = load(from_parent) prepare(preparation_data) self = load(from_parent) process.current_process()._inheriting = False from_parent.close() exitcode = self._bootstrap() exit(exitcode)
def create_file(self, name, excl=False, mode="wb", **kwargs): """Creates a file with the given name in this storage. :param name: the name for the new file. :param excl: if True, try to open the file in "exclusive" mode. :param mode: the mode flags with which to open the file. The default is ``"wb"``. :return: a :class:`whoosh.filedb.structfile.StructFile` instance. """ if self.readonly: raise ReadOnlyError path = self._fpath(name) if excl: flags = os.O_CREAT | os.O_EXCL | os.O_RDWR if hasattr(os, "O_BINARY"): flags |= os.O_BINARY fd = os.open(path, flags) fileobj = os.fdopen(fd, mode) else: fileobj = open(path, mode) f = StructFile(fileobj, name=name, **kwargs) return f
def test_FILE_stored_explicitly(): ffi = FFI() ffi.cdef("int myprintf11(const char *, int); FILE *myfile;") lib = ffi.verify(""" #include <stdio.h> FILE *myfile; int myprintf11(const char *out, int value) { return fprintf(myfile, out, value); } """) import os fdr, fdw = os.pipe() fw1 = os.fdopen(fdw, 'wb', 256) lib.myfile = ffi.cast("FILE *", fw1) # fw1.write(b"X") r = lib.myprintf11(b"hello, %d!\n", ffi.cast("int", 42)) fw1.close() assert r == len("hello, 42!\n") # result = os.read(fdr, 256) os.close(fdr) # the 'X' might remain in the user-level buffer of 'fw1' and # end up showing up after the 'hello, 42!\n' assert result == b"Xhello, 42!\n" or result == b"hello, 42!\nX"
def test_ffi_buffer_with_file(self): import tempfile, os, array fd, filename = tempfile.mkstemp() f = os.fdopen(fd, 'r+b') a = ffi.new("int[]", list(range(1005))) try: ffi.buffer(a, 512) except NotImplementedError as e: py.test.skip(str(e)) f.write(ffi.buffer(a, 1000 * ffi.sizeof("int"))) f.seek(0) assert f.read() == array.array('i', range(1000)).tostring() f.seek(0) b = ffi.new("int[]", 1005) f.readinto(ffi.buffer(b, 1000 * ffi.sizeof("int"))) assert list(a)[:1000] + [0] * (len(a)-1000) == list(b) f.close() os.unlink(filename)
def test_ffi_buffer_with_file(self): ffi = FFI(backend=self.Backend()) import tempfile, os, array fd, filename = tempfile.mkstemp() f = os.fdopen(fd, 'r+b') a = ffi.new("int[]", list(range(1005))) try: ffi.buffer(a, 512) except NotImplementedError as e: py.test.skip(str(e)) f.write(ffi.buffer(a, 1000 * ffi.sizeof("int"))) f.seek(0) assert f.read() == array.array('i', range(1000)).tostring() f.seek(0) b = ffi.new("int[]", 1005) f.readinto(ffi.buffer(b, 1000 * ffi.sizeof("int"))) assert list(a)[:1000] + [0] * (len(a)-1000) == list(b) f.close() os.unlink(filename)
def _decompress_bz2(filename, blocksize=900*1024): """ Decompress .tar.bz2 to .tar on disk (for faster access) Use TemporaryFile to guarentee write access. """ if not filename.endswith('.tar.bz2'): return filename fd, path = mkstemp() with os.fdopen(fd, 'wb') as fo: with open(filename, 'rb') as fi: z = bz2.BZ2Decompressor() for block in iter(lambda: fi.read(blocksize), b''): fo.write(z.decompress(block)) return path
def spawn(argv, stdout=False): """Asynchronously run a program. argv[0] is the executable name, which must be fully qualified or in the path. If stdout is True, return a file object corresponding to the child's standard output; otherwise, return the child's process ID. argv must be strictly str objects to avoid encoding confusion. """ types = map(type, argv) if not (min(types) == max(types) == str): raise TypeError("executables and arguments must be str objects") logger.debug("Running %r" % " ".join(argv)) args = GLib.spawn_async(argv=argv, flags=GLib.SpawnFlags.SEARCH_PATH, standard_output=stdout) if stdout: return os.fdopen(args[2]) else: return args[0]
def main(): if sys.platform in ['win32', 'cygwin']: return 0 # This script is called by gclient. gclient opens its hooks subprocesses with # (stdout=subprocess.PIPE, stderr=subprocess.STDOUT) and then does custom # output processing that breaks printing '\r' characters for single-line # updating status messages as printed by curl and wget. # Work around this by setting stderr of the update.sh process to stdin (!): # gclient doesn't redirect stdin, and while stdin itself is read-only, a # dup()ed sys.stdin is writable, try # fd2 = os.dup(sys.stdin.fileno()); os.write(fd2, 'hi') # TODO: Fix gclient instead, http://crbug.com/95350 return subprocess.call( [os.path.join(os.path.dirname(__file__), 'update.sh')] + sys.argv[1:], stderr=os.fdopen( os.dup( sys.stdin.fileno())))
def close(self): """ Implements GVSvgDoc.close() """ GVDocBase.close(self) # Make sure the extension is correct if self._filename[-4:] != ".svg": self._filename += ".svg" # Create a temporary dot file (handle, tmp_dot) = tempfile.mkstemp(".gv" ) dotfile = os.fdopen(handle, "wb") dotfile.write(self._dot.getvalue()) dotfile.close() # Generate the SVG file. os.system( 'dot -Tsvg:cairo -o"%s" "%s"' % (self._filename, tmp_dot) ) # Delete the temporary dot file os.remove(tmp_dot) #------------------------------------------------------------------------------- # # GVSvgzDoc # #-------------------------------------------------------------------------------
def close(self): """ Implements GVSvgzDoc.close() """ GVDocBase.close(self) # Make sure the extension is correct if self._filename[-5:] != ".svgz": self._filename += ".svgz" # Create a temporary dot file (handle, tmp_dot) = tempfile.mkstemp(".gv" ) dotfile = os.fdopen(handle, "wb") dotfile.write(self._dot.getvalue()) dotfile.close() # Generate the SVGZ file. os.system( 'dot -Tsvgz -o"%s" "%s"' % (self._filename, tmp_dot) ) # Delete the temporary dot file os.remove(tmp_dot) #------------------------------------------------------------------------------- # # GVPngDoc # #-------------------------------------------------------------------------------
def close(self): """ Implements GVPngDoc.close() """ GVDocBase.close(self) # Make sure the extension is correct if self._filename[-4:] != ".png": self._filename += ".png" # Create a temporary dot file (handle, tmp_dot) = tempfile.mkstemp(".gv" ) dotfile = os.fdopen(handle, "wb") dotfile.write(self._dot.getvalue()) dotfile.close() # Generate the PNG file. os.system( 'dot -Tpng -o"%s" "%s"' % (self._filename, tmp_dot) ) # Delete the temporary dot file os.remove(tmp_dot) #------------------------------------------------------------------------------- # # GVJpegDoc # #-------------------------------------------------------------------------------
def close(self): """ Implements GVGifDoc.close() """ GVDocBase.close(self) # Make sure the extension is correct if self._filename[-4:] != ".gif": self._filename += ".gif" # Create a temporary dot file (handle, tmp_dot) = tempfile.mkstemp(".gv" ) dotfile = os.fdopen(handle, "wb") dotfile.write(self._dot.getvalue()) dotfile.close() # Generate the GIF file. os.system( 'dot -Tgif -o"%s" "%s"' % (self._filename, tmp_dot) ) # Delete the temporary dot file os.remove(tmp_dot) #------------------------------------------------------------------------------- # # GVPdfGvDoc # #-------------------------------------------------------------------------------
def set(self, key, value, timeout=None): if timeout is None: timeout = self.default_timeout filename = self._get_filename(key) self._prune() try: fd, tmp = tempfile.mkstemp(suffix=self._fs_transaction_suffix, dir=self._path) f = os.fdopen(fd, 'wb') try: pickle.dump(int(time() + timeout), f, 1) pickle.dump(value, f, pickle.HIGHEST_PROTOCOL) finally: f.close() rename(tmp, filename) os.chmod(filename, self._mode) except (IOError, OSError): pass
def safe_open(path, mode="w", chmod=None, buffering=None): """Safely open a file. :param str path: Path to a file. :param str mode: Same os `mode` for `open`. :param int chmod: Same as `mode` for `os.open`, uses Python defaults if ``None``. :param int buffering: Same as `bufsize` for `os.fdopen`, uses Python defaults if ``None``. """ # pylint: disable=star-args open_args = () if chmod is None else (chmod,) fdopen_args = () if buffering is None else (buffering,) return os.fdopen( os.open(path, os.O_CREAT | os.O_EXCL | os.O_RDWR, *open_args), mode, *fdopen_args)
def _init_dirs(self): test_dirs = ['a a', 'b', 'c\c', 'D_'] config = 'improbable' root = tempfile.mkdtemp() def cleanup(): try: os.removedirs(root) except (FileNotFoundError, OSError): pass os.chdir(root) for dir_ in test_dirs: os.mkdir(dir_, 0o0750) f = '{0}.toml'.format(config) flags = os.O_WRONLY | os.O_CREAT rel_path = '{0}/{1}'.format(dir_, f) abs_file_path = os.path.join(root, rel_path) with os.fdopen(os.open(abs_file_path, flags, 0o0640), 'w') as fp: fp.write("key = \"value is {0}\"\n".format(dir_)) return root, config, cleanup
def set(self, key, value, timeout=None): if timeout is None: timeout = self.default_timeout filename = self._get_filename(key) self._prune() try: fd, tmp = tempfile.mkstemp(suffix=self._fs_transaction_suffix, dir=self._path) with os.fdopen(fd, 'wb') as f: pickle.dump(int(time() + timeout), f, 1) pickle.dump(value, f, pickle.HIGHEST_PROTOCOL) rename(tmp, filename) os.chmod(filename, self._mode) except (IOError, OSError): return False else: return True