我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.mkfifo()。
def __init__(self, name, end=0, mode=0666): """Open a pair of pipes, name.in and name.out for communication with another process. One process should pass 1 for end, and the other 0. Data is marshalled with pickle.""" self.in_name, self.out_name = name +'.in', name +'.out', try: os.mkfifo(self.in_name,mode) except OSError: pass try: os.mkfifo(self.out_name,mode) except OSError: pass # NOTE: The order the ends are opened in is important - both ends # of pipe 1 must be opened before the second pipe can be opened. if end: self.inp = open(self.out_name,'r') self.out = open(self.in_name,'w') else: self.out = open(self.out_name,'w') self.inp = open(self.in_name,'r') self._open = True
def test_copytree_named_pipe(self): os.mkdir(TESTFN) try: subdir = os.path.join(TESTFN, "subdir") os.mkdir(subdir) pipe = os.path.join(subdir, "mypipe") os.mkfifo(pipe) try: shutil.copytree(TESTFN, TESTFN2) except shutil.Error as e: errors = e.args[0] self.assertEqual(len(errors), 1) src, dst, error_msg = errors[0] self.assertEqual("`%s` is a named pipe" % pipe, error_msg) else: self.fail("shutil.Error should have been raised") finally: shutil.rmtree(TESTFN, ignore_errors=True) shutil.rmtree(TESTFN2, ignore_errors=True)
def __init__(self, scene, game_server_guid, player_n): """ It doesn't know env_id yet, it creates pipes and waits for client to send his env_id. """ self.scene = scene assert isinstance(game_server_guid, str) assert isinstance(player_n, int) self.player_n = player_n self.prefix = MULTIPLAYER_FILES_DIR + "/multiplayer_%s_player%02i" % (game_server_guid, player_n) self.sh_pipe_actready_filename = self.prefix + "_actready" self.sh_pipe_obsready_filename = self.prefix + "_obsready" try: os.unlink(self.sh_pipe_actready_filename) except: pass os.mkfifo(self.sh_pipe_actready_filename) try: os.unlink(self.sh_pipe_obsready_filename) except: pass os.mkfifo(self.sh_pipe_obsready_filename) print("Waiting %s" % self.prefix) self.need_reset = True self.need_response_tuple = False
def open_fifo(self): try: os.mkfifo(self.copytool.event_fifo) except OSError as e: if e.errno != errno.EEXIST: raise e pids = lsof(file=self.copytool.event_fifo) readers = set() writers = set() for pid, files in pids.items(): for file, info in files.items(): if 'r' in info['mode']: readers.add(pid) if 'w' in info['mode']: writers.add(pid) if readers: raise FifoReaderConflict(readers) self.reader_fd = os.open(self.copytool.event_fifo, os.O_RDONLY | os.O_NONBLOCK) copytool_log.info("Opened %s for reading" % self.copytool.event_fifo)
def setup_reply_channel(self_or_session): r = Remote._resolve(self_or_session) r_pre = r.pre r.pre = lambda f: r_pre(f) + ''' __reply_fifo_dir=$(mktemp -d) __reply_fifo="${__reply_fifo_dir}/fifo" mkfifo ${__reply_fifo} ''' r.post = ''' \ncat ${__reply_fifo} rm ${__reply_fifo} rmdir ${__reply_fifo_dir} ''' + r.post r.arg_config['reply_fifo'] = ('__reply_fifo', Args.string) r.required_names.add('reply_fifo') return r
def __init__(self, fifo_dir_path=None, is_master=True): self.is_master = is_master self._fifo_dir_path = fifo_dir_path if fifo_dir_path is None: self._fifo_dir_path = tempfile.mkdtemp() atexit.register(shutil.rmtree, self._fifo_dir_path) if is_master: self._fifo_read_path = os.path.join(self._fifo_dir_path, 'cpy2py_s2c.ipc') self._fifo_write_path = os.path.join(self._fifo_dir_path, 'cpy2py_c2s.ipc') os.mkfifo(self._fifo_read_path) os.mkfifo(self._fifo_write_path) else: self._fifo_read_path = os.path.join(self._fifo_dir_path, 'cpy2py_c2s.ipc') self._fifo_write_path = os.path.join(self._fifo_dir_path, 'cpy2py_s2c.ipc') self._fifo_read = None self._fifo_write = None
def write(self, msg): # create the pipe if necessary try: os.mkfifo(self.filename) except OSError as oe: if oe.errno != errno.EEXIST: raise # open it up for reading if necessary if not self.FIFO: self.FIFO = open(self.filename, 'w') print(msg, file=self.FIFO) ############################################### # Functions # ###############################################
def _create_pipes(self): # Creates named pipe for inter-process communication self.pipe_name = seeding.hash_seed(None) % 2 ** 32 self.launch_vars['pipe_name'] = self.pipe_name if not self.disable_out_pipe: self.path_pipe_out = '%s-out.%d' % (self.path_pipe_prefix, self.pipe_name) os.mkfifo(self.path_pipe_out) # Launching a thread that will listen to incoming pipe # Thread exits if self.is_exiting = 1 or pipe_in is closed if not self.disable_in_pipe: thread_incoming = Thread(target=self._listen_to_incoming_pipe, kwargs={'pipe_name': self.pipe_name}) thread_incoming.start() # Cannot open output pipe now, otherwise it will block until # a reader tries to open the file in read mode - Must launch fceux first
def Start(): if(config.DEBUG or not config.LIVE_PREVIEW_ENABLE): return # Start recording live preview pygameEngine.DrawCenterMessage(config.LIVE_PREVIEW_MSG, True) print "Start recording live preview" if os.path.exists(config.LIVE_MOVIE_FILE): os.remove(config.LIVE_MOVIE_FILE) os.mkfifo(config.LIVE_MOVIE_FILE) # To avoid problem, wait while not os.path.exists(config.LIVE_MOVIE_FILE): time.sleep(.1) camera.RecordPreview(config.LIVE_MOVIE_FILE, config.PREVIEW_DURATION) # Playing live preview pygameEngine.DrawCenterMessage("") #Clean screen before preview print "Playing live preview" os.popen(config.CMD_LIVE_PREVIEW_PLAY.format(filename = config.LIVE_MOVIE_FILE))
def makefifo(self, tarinfo, targetpath): """Make a fifo called targetpath. """ if hasattr(os, "mkfifo"): os.mkfifo(targetpath) else: raise ExtractError("fifo not supported by system")
def open(self, flags): os.mkfifo(self._pipe) self._thread = threading.Thread(target=self._client.get, daemon=True, args=(self.url.path, self._pipe)) self._thread.start() self._fd = os.open(self._pipe, os.O_RDONLY)
def setUp(self): super(TestStream, self).setUp() if os.path.exists(_pipepath): os.unlink(_pipepath) os.mkfifo(_pipepath)
def __init__(self, path): self.path = path self._fd = None os.mkfifo(self.path)
def _ensure_create_pipe(self, pipe_name): if not os.path.exists(pipe_name): os.mkfifo(pipe_name) ## ------------ end pipes --------------
def makefifo(self, cpioinfo, cpiogetpath): """Make a fifo called cpiogetpath. """ if hasattr(os, "mkfifo"): os.mkfifo(cpiogetpath) else: raise ExtractError("fifo not supported by system")
def __init__(self, name=None): if name==None: # make one name = tempfile.mktemp() os.mkfifo(name, 0o600) self.name=name