我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用stat.S_IRWXU。
def _setup_script_dir(self, email_list): """ setup the directory in the log_dir to hold all of the shell scripts we will generate. Also write the epilogue script to this directory :param email_list: we need the list of email addresses (usually a single address) when generating the epilogue script from the template """ script_dir = os.path.join(self.log_dir, _SHELL_SCRIPT_DIR) try: os.makedirs(script_dir) os.chmod(script_dir, stat.S_IRWXU) if self.need_to_write_epilogue: with open(self.epilogue_filename, "w") as epilogue_file: epilogue_file.write(self.generate_epilogue(email_list)) os.chmod(self.epilogue_filename, stat.S_IRWXU) self.need_to_write_epilogue = False except OSError as exception: if exception.errno != errno.EEXIST: print('Error while creating directory ' + script_dir, file=sys.stderr) raise
def get_perf(filename): ''' run conlleval.pl perl script to obtain precision/recall and F1 score ''' _conlleval = 'conlleval.pl' if not isfile(_conlleval): # download('http://www-etud.iro.umontreal.ca/~mesnilgr/atis/conlleval.pl') os.system('wget https://www.comp.nus.edu.sg/%7Ekanmy/courses/practicalNLP_2008/packages/conlleval.pl') chmod('conlleval.pl', stat.S_IRWXU) # give the execute permissions proc = subprocess.Popen(["perl", _conlleval], stdin=subprocess.PIPE, stdout=subprocess.PIPE, universal_newlines=True) stdout, _ = proc.communicate(open(filename).read()) for line in stdout.split('\n'): if 'accuracy' in line: out = line.split() break # out = ['accuracy:', '16.26%;', 'precision:', '0.00%;', 'recall:', '0.00%;', 'FB1:', '0.00'] precision = float(out[3][:-2]) recall = float(out[5][:-2]) f1score = float(out[7]) return {'p': precision, 'r': recall, 'f1': f1score}
def getattr(self, path, fh=None): stat = self.send_cmd(cmd='stat', path=path) if stat['status'] != 'ok': raise FuseOSError(ENOENT) fuse_stat = {} # Set it to 777 access fuse_stat['st_mode'] = 0 if stat['type'] == 'folder': fuse_stat['st_mode'] |= S_IFDIR else: fuse_stat['st_mode'] |= S_IFREG fuse_stat['st_size'] = stat.get('size', 0) fuse_stat['st_ctime'] = dateparse(stat['created']).timestamp() # TODO change to local timezone fuse_stat['st_mtime'] = dateparse(stat['updated']).timestamp() fuse_stat['st_atime'] = dateparse(stat['updated']).timestamp() # TODO not supported ? fuse_stat['st_mode'] |= S_IRWXU | S_IRWXG | S_IRWXO fuse_stat['st_nlink'] = 1 fuse_stat['st_uid'] = os.getuid() fuse_stat['st_gid'] = os.getgid() return fuse_stat
def get_perf(filename): ''' run conlleval.pl perl script to obtain precision/recall and F1 score ''' _conlleval = os.path.dirname(os.path.realpath(__file__)) + '/conlleval.pl' os.chmod(_conlleval, stat.S_IRWXU) # give the execute permissions proc = subprocess.Popen(["perl", _conlleval], stdin=subprocess.PIPE, stdout=subprocess.PIPE) stdout, _ = proc.communicate(''.join(open(filename).readlines())) for line in stdout.split('\n'): if 'accuracy' in line: out = line.split() break precision = float(out[6][:-2]) recall = float(out[8][:-2]) f1score = float(out[10]) return {'p': precision, 'r': recall, 'f1': f1score}
def makeFilesExecutable(): scriptPath = xbmc.translatePath(xbmcaddon.Addon(id = 'emulator.tools.retroarch').getAddonInfo('path')) scriptPath = os.path.join(scriptPath, 'bin') file1 = os.path.join(scriptPath, 'retroarch.sh') file2 = os.path.join(scriptPath, 'retroarch.start') file3 = os.path.join(scriptPath, 'retroarch') try: os.chmod(file1, stat.S_IRWXU|stat.S_IRWXG|stat.S_IROTH|stat.S_IXOTH) os.chmod(file2, stat.S_IRWXU|stat.S_IRWXG|stat.S_IROTH|stat.S_IXOTH) os.chmod(file3, stat.S_IRWXU|stat.S_IRWXG|stat.S_IROTH|stat.S_IXOTH) d = xbmcgui.Dialog() d.ok('RetroArch', 'File permissions applied', 'scripts should now be executable') except: d = xbmcgui.Dialog() d.ok('RetroArch', 'Failed to apply permissions', 'Please try again later or do it manualy via ssh')
def test_copy_symlinks(self): tmp_dir = self.mkdtemp() src = os.path.join(tmp_dir, 'foo') dst = os.path.join(tmp_dir, 'bar') src_link = os.path.join(tmp_dir, 'baz') write_file(src, 'foo') os.symlink(src, src_link) if hasattr(os, 'lchmod'): os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO) # don't follow shutil.copy(src_link, dst, follow_symlinks=True) self.assertFalse(os.path.islink(dst)) self.assertEqual(read_file(src), read_file(dst)) os.remove(dst) # follow shutil.copy(src_link, dst, follow_symlinks=False) self.assertTrue(os.path.islink(dst)) self.assertEqual(os.readlink(dst), os.readlink(src_link)) if hasattr(os, 'lchmod'): self.assertEqual(os.lstat(src_link).st_mode, os.lstat(dst).st_mode)
def build_unpack_comic(self, comic_path): logging.info("%s unpack requested" % comic_path) for root, dirs, files in os.walk(os.path.join(gazee.TEMP_DIR, "build"), topdown=False): for f in files: os.chmod(os.path.join(root, f), stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) # 0777 os.remove(os.path.join(root, f)) for root, dirs, files in os.walk(os.path.join(gazee.TEMP_DIR, "build"), topdown=False): for d in dirs: os.chmod(os.path.join(root, d), stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) # 0777 os.rmdir(os.path.join(root, d)) if comic_path.endswith(".cbr"): opened_rar = rarfile.RarFile(comic_path) opened_rar.extractall(os.path.join(gazee.TEMP_DIR, "build")) elif comic_path.endswith(".cbz"): opened_zip = zipfile.ZipFile(comic_path) opened_zip.extractall(os.path.join(gazee.TEMP_DIR, "build")) return
def user_unpack_comic(self, comic_path, user): logging.info("%s unpack requested" % comic_path) for root, dirs, files in os.walk(os.path.join(gazee.TEMP_DIR, user), topdown=False): for f in files: os.chmod(os.path.join(root, f), stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) # 0777 os.remove(os.path.join(root, f)) for root, dirs, files in os.walk(os.path.join(gazee.TEMP_DIR, user), topdown=False): for d in dirs: os.chmod(os.path.join(root, d), stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) # 0777 os.rmdir(os.path.join(root, d)) if comic_path.endswith(".cbr"): opened_rar = rarfile.RarFile(comic_path) opened_rar.extractall(os.path.join(gazee.TEMP_DIR, user)) elif comic_path.endswith(".cbz"): opened_zip = zipfile.ZipFile(comic_path) opened_zip.extractall(os.path.join(gazee.TEMP_DIR, user)) return # This method will return a list of .jpg files in their numberical order to be fed into the reading view.
def env(self): class Environment(str): pass with contexts.tempdir(prefix='setuptools-test.') as env_dir: env = Environment(env_dir) os.chmod(env_dir, stat.S_IRWXU) subs = 'home', 'lib', 'scripts', 'data', 'egg-base' env.paths = dict( (dirname, os.path.join(env_dir, dirname)) for dirname in subs ) list(map(os.mkdir, env.paths.values())) config = os.path.join(env.paths['home'], '.pydistutils.cfg') with open(config, 'w') as f: f.write(DALS(""" [egg_info] egg-base = %(egg-base)s """ % env.paths )) yield env
def env(self, tmpdir): """ Create a package environment, similar to a virtualenv, in which packages are installed. """ class Environment(str): pass env = Environment(tmpdir) tmpdir.chmod(stat.S_IRWXU) subs = 'home', 'lib', 'scripts', 'data', 'egg-base' env.paths = dict( (dirname, str(tmpdir / dirname)) for dirname in subs ) list(map(os.mkdir, env.paths.values())) return env
def cleanup(self): print("Cleaning up") if os.path.exists(self.temp): # The check for "tempxxx" is to guard against bugs with path operations # and command-line arguments which might lead to the accidental deletion # of an important directory. if "tempxxx" in self.temp: def handleRemoveReadonly(func, path, exc): excvalue = exc[1] if excvalue.errno == errno.EACCES: os.chmod(path, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) # 0777 func(path) else: raise print("rmtree {0}".format(self.temp)); shutil.rmtree(self.temp, ignore_errors=False, onerror=handleRemoveReadonly)
def env(self): with contexts.tempdir(prefix='setuptools-test.') as env_dir: env = Environment(env_dir) os.chmod(env_dir, stat.S_IRWXU) subs = 'home', 'lib', 'scripts', 'data', 'egg-base' env.paths = dict( (dirname, os.path.join(env_dir, dirname)) for dirname in subs ) list(map(os.mkdir, env.paths.values())) build_files({ env.paths['home']: { '.pydistutils.cfg': DALS(""" [egg_info] egg-base = %(egg-base)s """ % env.paths) } }) yield env
def get_perf(filename): ''' run conlleval.pl perl script to obtain precision/recall and F1 score ''' _conlleval = PREFIX + 'conlleval.pl' if not isfile(_conlleval): #download('http://www-etud.iro.umontreal.ca/~mesnilgr/atis/conlleval.pl') os.system('wget https://www.comp.nus.edu.sg/%7Ekanmy/courses/practicalNLP_2008/packages/conlleval.pl') chmod('conlleval.pl', stat.S_IRWXU) # give the execute permissions proc = subprocess.Popen(["perl", _conlleval], stdin=subprocess.PIPE, stdout=subprocess.PIPE) stdout, _ = proc.communicate(open(filename,'rb').read()) for line in stdout.decode("utf-8").split('\n'): if 'accuracy' in line: out = line.split() break # out = ['accuracy:', '16.26%;', 'precision:', '0.00%;', 'recall:', '0.00%;', 'FB1:', '0.00'] precision = float(out[3][:-2]) recall = float(out[5][:-2]) f1score = float(out[7]) return {'p':precision, 'r':recall, 'f1':f1score}
def get_perfo(filename): ''' work around for using a PERL script in python dirty but still works. ''' tempfile = str(random.randint(1,numpy.iinfo('i').max)) + '.txt' if not isfile(PREFIX + 'conlleval.pl'): os.system('wget https://www.comp.nus.edu.sg/%7Ekanmy/courses/practicalNLP_2008/packages/conlleval.pl') #download('http://www-etud.iro.umontreal.ca/~mesnilgr/atis/conlleval.pl') chmod('conlleval.pl', stat.S_IRWXU) # give the execute permissions if len(PREFIX) > 0: chmod(PREFIX + 'conlleval.pl', stat.S_IRWXU) # give the execute permissions cmd = PREFIX + 'conlleval.pl < %s | grep accuracy > %s'%(filename,tempfile) else: cmd = './conlleval.pl < %s | grep accuracy > %s'%(filename,tempfile) print(cmd) out = os.system(cmd) out = open(tempfile).readlines()[0].split() os.system('rm %s'%tempfile) precision = float(out[6][:-2]) recall = float(out[8][:-2]) f1score = float(out[10]) return {'p':precision, 'r':recall, 'f1':f1score}
def setUpClass( cls ): print('Python: ' + sys.executable) try: os.makedirs( cls.TEST_RUN_DIR ) except: pass os.chdir( cls.TEST_RUN_DIR ) os.environ['HOME'] = cls.TEST_RUN_DIR cache_dir = os.path.join(os.environ['HOME'], '.cache', 'futoin-cid') for cleanup_dir in (cache_dir, cls.TEST_DIR): if os.path.exists( cleanup_dir ) : for ( path, dirs, files ) in os.walk( cleanup_dir ) : for id in dirs + files : try: os.chmod( os.path.join( path, id ), stat.S_IRWXU ) except: pass shutil.rmtree( cleanup_dir ) if cls._create_test_dir: os.mkdir(cls.TEST_DIR) os.chdir(cls.TEST_DIR)
def _apply_operation_to_mode(user, operator, mode_to_apply, current_mode): if operator == '=': if user == 'u': mask = stat.S_IRWXU | stat.S_ISUID elif user == 'g': mask = stat.S_IRWXG | stat.S_ISGID elif user == 'o': mask = stat.S_IRWXO | stat.S_ISVTX # mask out u, g, or o permissions from current_mode and apply new permissions inverse_mask = mask ^ PERM_BITS new_mode = (current_mode & inverse_mask) | mode_to_apply elif operator == '+': new_mode = current_mode | mode_to_apply elif operator == '-': new_mode = current_mode - (current_mode & mode_to_apply) return new_mode
def testPermission(self): with TemporaryDirectory() as tmp: d = os.path.join(tmp, "dir") os.mkdir(d) with open(os.path.join(d, "file"), "w") as f: f.write("data") os.chmod(d, stat.S_IRUSR | stat.S_IXUSR) self.assertRaises(BuildError, removePath, tmp) os.chmod(d, stat.S_IRWXU)
def testPermission(self): with TemporaryDirectory() as tmp: d = os.path.join(tmp, "dir") os.mkdir(d) with open(os.path.join(d, "file"), "w") as f: f.write("data") os.chmod(d, stat.S_IRUSR | stat.S_IXUSR) self.assertRaises(BuildError, emptyDirectory, tmp) os.chmod(d, stat.S_IRWXU)
def _get_default_cache_dir(self): def _unsafe_dir(): raise RuntimeError('Cannot determine safe temp directory. You ' 'need to explicitly provide one.') tmpdir = tempfile.gettempdir() # On windows the temporary directory is used specific unless # explicitly forced otherwise. We can just use that. if os.name == 'nt': return tmpdir if not hasattr(os, 'getuid'): _unsafe_dir() dirname = '_jinja2-cache-%d' % os.getuid() actual_dir = os.path.join(tmpdir, dirname) try: os.mkdir(actual_dir, stat.S_IRWXU) except OSError as e: if e.errno != errno.EEXIST: raise try: os.chmod(actual_dir, stat.S_IRWXU) actual_dir_stat = os.lstat(actual_dir) if actual_dir_stat.st_uid != os.getuid() \ or not stat.S_ISDIR(actual_dir_stat.st_mode) \ or stat.S_IMODE(actual_dir_stat.st_mode) != stat.S_IRWXU: _unsafe_dir() except OSError as e: if e.errno != errno.EEXIST: raise actual_dir_stat = os.lstat(actual_dir) if actual_dir_stat.st_uid != os.getuid() \ or not stat.S_ISDIR(actual_dir_stat.st_mode) \ or stat.S_IMODE(actual_dir_stat.st_mode) != stat.S_IRWXU: _unsafe_dir() return actual_dir
def removeReadOnly(func, path, exc): """Called by shutil.rmtree when it encounters a readonly file. """ excvalue = exc[1] if func in (os.rmdir, os.remove) and excvalue.errno == errno.EACCES: os.chmod(path, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) # 0777 func(path) else: raise RuntimeError('Could not remove {0}'.format(path))
def create_tempdir(): """Create a directory within the system temp directory used to create temp files.""" try: if os.path.isdir(tempdir): shutil.rmtree(tempdir) os.mkdir(tempdir) # Make sure the directory can be removed by anyone in case the user # runs ST later as another user. os.chmod(tempdir, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) except PermissionError: if sublime.platform() != 'windows': current_user = pwd.getpwuid(os.geteuid())[0] temp_uid = os.stat(tempdir).st_uid temp_user = pwd.getpwuid(temp_uid)[0] message = ( 'The SublimeLinter temp directory:\n\n{0}\n\ncould not be cleared ' 'because it is owned by \'{1}\' and you are logged in as \'{2}\'. ' 'Please use sudo to remove the temp directory from a terminal.' ).format(tempdir, temp_user, current_user) else: message = ( 'The SublimeLinter temp directory ({}) could not be reset ' 'because it belongs to a different user.' ).format(tempdir) sublime.error_message(message) from . import persist persist.debug('temp directory:', tempdir)
def startDB(self): import kinterbasdb self.DB_NAME = os.path.join(self.DB_DIR, DBTestConnector.DB_NAME) os.chmod(self.DB_DIR, stat.S_IRWXU + stat.S_IRWXG + stat.S_IRWXO) sql = 'create database "%s" user "%s" password "%s"' sql %= (self.DB_NAME, self.DB_USER, self.DB_PASS); conn = kinterbasdb.create_database(sql) conn.close()
def __init__(self, t): AgentSSH.__init__(self) self.__t = t self._dir = tempfile.mkdtemp('sshproxy') os.chmod(self._dir, stat.S_IRWXU) self._file = self._dir + '/sshproxy.ssh' self.thread = AgentLocalProxy(self) self.thread.start()
def _RmTreeHandleReadOnly(func, path, exc): """An error handling function for use with shutil.rmtree. This will detect failures to remove read-only files, and will change their properties prior to removing them. This is necessary on Windows as os.remove will return an access error for read-only files, and git repos contain read-only pack/index files. """ excvalue = exc[1] if func in (os.rmdir, os.remove) and excvalue.errno == errno.EACCES: _LOGGER.debug('Removing read-only path: %s', path) os.chmod(path, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) func(path) else: raise
def remove_read_only(func, path, exc): """Called by shutil.rmtree when it encounters a readonly file. :param func: :param path: :param exc: """ excvalue = exc[1] if func in (os.rmdir, os.remove) and excvalue.errno == errno.EACCES: os.chmod(path, stat.S_IRWXU| stat.S_IRWXG| stat.S_IRWXO) # 0777 func(path) else: raise RuntimeError('Could not remove {0}'.format(path))
def setup(self): fd, self.executable = tempfile.mkstemp(prefix='jtc-testsuite.') os.close(fd) os.chmod(self.executable, stat.S_IRWXU)
def handleRemoveReadonly(func, path, exc): excvalue = exc[1] if func in (os.rmdir, os.remove) and excvalue.errno == errno.EACCES: os.chmod(path, stat.S_IRWXU| stat.S_IRWXG| stat.S_IRWXO) # 0777 func(path) else: raise
def lstat(self, path): """Get attributes of a file, directory, or symlink This method queries the attributes of a file, directory, or symlink. Unlike :meth:`stat`, this method should return the attributes of a symlink itself rather than the target of that link. :param bytes path: The path of the file, directory, or link to get attributes for :returns: An :class:`SFTPAttrs` or an os.stat_result containing the file attributes :raises: :exc:`SFTPError` to return an error to the client """ try: stat = self._vfs.stat(path).stat except VFSFileNotFoundError: raise SFTPError(FX_NO_SUCH_FILE, 'No such file') mod = S_IFDIR if stat.type == Stat.DIRECTORY else S_IFREG return SFTPAttrs(**{ 'size': stat.size, 'uid': os.getuid(), 'gid': os.getgid(), 'permissions': mod | S_IRWXU | S_IRWXG | S_IRWXO, 'atime': stat.atime, 'mtime': stat.mtime })
def _get_default_cache_dir(self): tmpdir = tempfile.gettempdir() # On windows the temporary directory is used specific unless # explicitly forced otherwise. We can just use that. if os.name == 'nt': return tmpdir if not hasattr(os, 'getuid'): raise RuntimeError('Cannot determine safe temp directory. You ' 'need to explicitly provide one.') dirname = '_jinja2-cache-%d' % os.getuid() actual_dir = os.path.join(tmpdir, dirname) try: os.mkdir(actual_dir, stat.S_IRWXU) # 0o700 except OSError as e: if e.errno != errno.EEXIST: raise actual_dir_stat = os.lstat(actual_dir) if actual_dir_stat.st_uid != os.getuid() \ or not stat.S_ISDIR(actual_dir_stat.st_mode) \ or stat.S_IMODE(actual_dir_stat.st_mode) != stat.S_IRWXU: raise RuntimeError('Temporary directory \'%s\' has an incorrect ' 'owner, permissions, or type.' % actual_dir) return actual_dir
def _apply_operation_to_mode(self, user, operator, mode_to_apply, current_mode): if operator == '=': if user == 'u': mask = stat.S_IRWXU | stat.S_ISUID elif user == 'g': mask = stat.S_IRWXG | stat.S_ISGID elif user == 'o': mask = stat.S_IRWXO | stat.S_ISVTX # mask out u, g, or o permissions from current_mode and apply new permissions inverse_mask = mask ^ PERM_BITS new_mode = (current_mode & inverse_mask) | mode_to_apply elif operator == '+': new_mode = current_mode | mode_to_apply elif operator == '-': new_mode = current_mode - (current_mode & mode_to_apply) return new_mode
def copy_crt(env): printf('Copying CRT...') plat = ('x64' if is64bit else 'x86') vc_path = os.path.join(r'C:\Program Files (x86)\Microsoft Visual Studio 14.0\VC\redist', plat, 'Microsoft.VC140.CRT') if not os.path.exists(vc_path): raise SystemExit('Visual Studio redistributable CRT not found at: %r' % vc_path) # We cannot use the Universal CRT DLLs that come with Visual Studio, as # they are broken. Have to use the ones from the Standalone SDK for Windows 10. # However, I dont want to install this SDK, incase it messes things up, # so the below path points to dlls I got from installing the SDK in a # different VM. To re-create just copy the api-ms*.dll and ucrtbase.dll # files from a previous calibre install. sdk_path = os.path.join('C:\\ucrt', plat) if not os.path.exists(sdk_path): raise SystemExit('Windows 10 Universal CRT redistributable not found at: %r' % sdk_path) for dll in glob.glob(os.path.join(sdk_path, '*.dll')): shutil.copy2(dll, env.dll_dir) os.chmod(os.path.join(env.dll_dir, b(dll)), stat.S_IRWXU) for dll in glob.glob(os.path.join(vc_path, '*.dll')): bname = os.path.basename(dll) if not bname.startswith('vccorlib') and not bname.startswith('concrt'): # Those two DLLs are not required vccorlib is for the CORE CLR # I think concrt is the concurrency runtime for C++ which I believe # nothing in calibre currently uses shutil.copy(dll, env.dll_dir) os.chmod(os.path.join(env.dll_dir, bname), stat.S_IRWXU)
def safe_delete(path): ''' 'Robust' delete. ''' path = sanitize_path(path) if os.path.isfile(path): try: os.chmod(path, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) os.remove(path) except OSError: pass
def cleanup(self): """ Make the non-accessible directory created in setup() accessible again, otherwise deleting the workingdir will fail. """ os.chmod(self.noaccess, stat.S_IRWXU) TestController.cleanup(self)