我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.mknod()。
def set_log(level, filename='jumpserver.log'): """ return a log file object ??????log?? """ log_file = os.path.join(LOG_DIR, filename) if not os.path.isfile(log_file): os.mknod(log_file) os.chmod(log_file, 0777) log_level_total = {'debug': logging.DEBUG, 'info': logging.INFO, 'warning': logging.WARN, 'error': logging.ERROR, 'critical': logging.CRITICAL} logger_f = logging.getLogger('jumpserver') logger_f.setLevel(logging.DEBUG) fh = logging.FileHandler(log_file) fh.setLevel(log_level_total.get(level, logging.DEBUG)) formatter = logging.Formatter('%(asctime)s - %(filename)s - %(levelname)s - %(message)s') fh.setFormatter(formatter) logger_f.addHandler(fh) return logger_f
def execute_config_strategy(): config_strategy = os.environ.get("KOLLA_CONFIG_STRATEGY") LOG.info("Kolla config strategy set to: %s", config_strategy) config = load_config() if config_strategy == "COPY_ALWAYS": copy_config(config) elif config_strategy == "COPY_ONCE": if os.path.exists('/configured'): LOG.info("The config strategy prevents copying new configs") sys.exit(0) else: copy_config(config) os.mknod('/configured') else: LOG.error('KOLLA_CONFIG_STRATEGY is not set properly') sys.exit(1)
def set_log(level, filename='spider.log'): """ return a log file object ??????log?? """ if not os.path.isdir(LOG_DIR): os.mkdir(LOG_DIR) log_file = os.path.join(LOG_DIR, filename) if not os.path.isfile(log_file): os.mknod(log_file) os.chmod(log_file, 0777) log_level_total = {'debug': logging.DEBUG, 'info': logging.INFO, 'warning': logging.WARN, 'error': logging.ERROR, 'critical': logging.CRITICAL} logger_f = logging.getLogger('spider') logger_f.setLevel(logging.DEBUG) fh = logging.FileHandler(log_file,'a') fh.setLevel(log_level_total.get(level, logging.DEBUG)) formatter = logging.Formatter('%(asctime)s %(filename)s [line:%(lineno)d] %(levelname)s %(message)s') fh.setFormatter(formatter) logger_f.addHandler(fh) keep_fds = [fh.stream.fileno()] return logger_f,keep_fds
def _mkfifo(self, fifo_path): try: if os.path.exists(fifo_path): # self._run_shell_cmd("rm %s " % fifo_path, wait=True) os.remove(fifo_path) os.mknod(fifo_path, 0o666 | stat.S_IFIFO) except OSError as err: if err.errno == errno.EEXIST: # if already exists, skip this step pass # print "Fifo file already exists, skipping..." elif err.errno == errno.EPERM: # not permitted, try shell command # print "Not permitted to create fifo file, try to switch to # root..." retcode = self._run_shell_cmd( "mknod %s p" % fifo_path, wait=True) if retcode != 0: raise RuntimeError("mknod returns %s" % str(retcode)) else: raise err
def test_check_file_exists(mock_err_exit, tmpdir, test_file_name, include_dirname): if include_dirname: file_path = os.path.join(tmpdir.dirname, tmpdir.basename, test_file_name) if test_file_name == 'exist_file': os.mknod(file_path) else: file_path = test_file_name check_file_exists(file_path) if include_dirname is True: if test_file_name == 'exist_file': mock_err_exit.assert_not_called() else: mock_err_exit.assert_called_once() if include_dirname is False: mock_err_exit.assert_called_once() tmpdir.remove()
def test_make_file_executable(mock_err_exit, mock_getstatusoutput, tmpdir, test_file_name, status, include_dir): if include_dir: file_path = os.path.join(tmpdir.dirname, tmpdir.basename, test_file_name) else: file_path = test_file_name if test_file_name == 'no_exe_file': os.mknod(file_path) if test_file_name == 'exe_file': os.mknod(file_path, 0700) output = 'test_out' mock_getstatusoutput.return_value = (status, output) make_file_executable(file_path) if test_file_name == 'exe_file': mock_err_exit.assert_not_called() assert os.access(file_path, os.X_OK) if test_file_name == 'no_exe_file' and status is False: mock_err_exit.assert_not_called() if test_file_name == 'no_exe_file' and status is True: mock_err_exit.assert_called() if test_file_name == 'no_exist_file': mock_err_exit.assert_called() tmpdir.remove()
def make_chunky_home(chunky_home_root): try: if not os.path.exists(os.path.join(chunky_home_root, '.chunky')): os.mkdir(os.path.join(chunky_home_root, '.chunky')) if not os.path.exists(os.path.join(chunky_home_root, '.chunky', 'pipelines')): os.mkdir(os.path.join(chunky_home_root, '.chunky', 'pipelines')) if not os.path.isfile(os.path.join(chunky_home_root, '.chunky', 'pipelines', '__init__.py')): os.mknod(os.path.join(chunky_home_root, '.chunky', 'pipelines', '__init__.py'), 0o644) if not os.path.exists(os.path.join(chunky_home_root, '.chunky', 'configs')): os.mkdir(os.path.join(chunky_home_root, '.chunky', 'configs')) sys.stdout.write('ChunkyPipes successfully initialized at {}\n'.format(chunky_home_root)) if chunky_home_root != os.path.expanduser('~'): sys.stdout.write('Please set a CHUNKY_HOME environment variable to {}\n'.format(chunky_home_root)) except OSError as e: sys.stderr.write('An error occurred initializing ChunkyPipes at {}.\n{}\n'.format( chunky_home_root, e.message ))
def test_mknod_dir_fd(self): # Test using mknodat() to create a FIFO (the only use specified # by POSIX). support.unlink(support.TESTFN) mode = stat.S_IFIFO | stat.S_IRUSR | stat.S_IWUSR f = posix.open(posix.getcwd(), posix.O_RDONLY) try: posix.mknod(support.TESTFN, mode, 0, dir_fd=f) except OSError as e: # Some old systems don't allow unprivileged users to use # mknod(), or only support creating device nodes. self.assertIn(e.errno, (errno.EPERM, errno.EINVAL)) else: self.assertTrue(stat.S_ISFIFO(posix.stat(support.TESTFN).st_mode)) finally: posix.close(f)
def get_tasklist(): ''' :return: ???????????List ''' taskList = [] try: fileHander = open(_task_file, 'r', encoding='utf-8') # ???????? except IOError as e: print('????{0}???'.format(_task_file)) print("## ????{0}??...".format(_task_file)) os.mknod(_task_file) # ???? print("??????'{0}'?????????".format(_task_file)) print("???????????README.txt??") fileList = fileHander.readlines() # ???? for line in fileList: if line[0] == '#': # ????????‘#’?????? continue else: if len(line) > 6: # ?? ?? ?? ???6??? taskList.append(line.split()) # ????? else: continue # ???????????? return taskList
def test_mknod(self): # Test using mknod() to create a FIFO (the only use specified # by POSIX). support.unlink(support.TESTFN) mode = stat.S_IFIFO | stat.S_IRUSR | stat.S_IWUSR try: posix.mknod(support.TESTFN, mode, 0) except OSError as e: # Some old systems don't allow unprivileged users to use # mknod(), or only support creating device nodes. self.assertIn(e.errno, (errno.EPERM, errno.EINVAL)) else: self.assertTrue(stat.S_ISFIFO(posix.stat(support.TESTFN).st_mode)) # Keyword arguments are also supported support.unlink(support.TESTFN) try: posix.mknod(path=support.TESTFN, mode=mode, device=0, dir_fd=None) except OSError as e: self.assertIn(e.errno, (errno.EPERM, errno.EINVAL))
def makedev(self, type, major, minor): """Make a special file with specified type, and major/minor nums""" if type == 'c': datatype = 'chr' mode = stat.S_IFCHR | 0600 elif type == 'b': datatype = 'blk' mode = stat.S_IFBLK | 0600 else: raise RPathException try: self.conn.os.mknod(self.path, mode, self.conn.os.makedev(major, minor)) except (OSError, AttributeError), e: if isinstance(e, AttributeError) or e.errno == errno.EPERM: # AttributeError will be raised by Python 2.2, which # doesn't have os.mknod log.Log("unable to mknod %s -- using touch instead" % self.path, 4) self.touch() self.setdata()
def execute_config_strategy(config): config_strategy = os.environ.get("KOLLA_CONFIG_STRATEGY") LOG.info("Kolla config strategy set to: %s", config_strategy) if config_strategy == "COPY_ALWAYS": copy_config(config) handle_permissions(config) elif config_strategy == "COPY_ONCE": if os.path.exists('/configured'): raise ImmutableConfig( "The config strategy prevents copying new configs", exit_code=0) else: copy_config(config) handle_permissions(config) os.mknod('/configured') else: raise InvalidConfig('KOLLA_CONFIG_STRATEGY is not set properly')
def makedev(self, tarinfo, targetpath): """Make a character or block device called targetpath. """ if not hasattr(os, "mknod") or not hasattr(os, "makedev"): raise ExtractError("special devices not supported by system") mode = tarinfo.mode if tarinfo.isblk(): mode |= stat.S_IFBLK else: mode |= stat.S_IFCHR os.mknod(targetpath, mode, os.makedev(tarinfo.devmajor, tarinfo.devminor))
def mknod(self, path, mode, dev): return os.mknod(self._full_path(path), mode, dev)
def makedev(dev_path): for i, dev in enumerate(['stdin', 'stdout', 'stderr']): os.symlink('/proc/self/fd/%d' % i, os.path.join(dev_path, dev)) os.symlink('/proc/self/fd', os.path.join(dev_path, 'fd')) # Add extra devices DEVICES = {'null': (stat.S_IFCHR, 1, 3), 'zero': (stat.S_IFCHR, 1, 5), 'random': (stat.S_IFCHR, 1, 8), 'urandom': (stat.S_IFCHR, 1, 9), 'console': (stat.S_IFCHR, 136, 1), 'tty': (stat.S_IFCHR, 5, 0), 'full': (stat.S_IFCHR, 1, 7)} for device, (dev_type, major, minor) in DEVICES.iteritems(): os.mknod(os.path.join(dev_path, device), 0666 | dev_type, os.makedev(major, minor))
def mknod(self, path, mode, dev): return os.mknod(self.join_path(path), mode, dev)
def setFlag(self, flag): status_flag = os.path.join(self.saveFolder, self.saveSubFolders['status'], flag) if not os.path.exists(status_flag): os.mknod(status_flag)