我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用stat.S_IRWXO。
def write_code(id, pid, language, code): '''??????????''' try: work_path = os.path.join(config.work_dir, str(id)) os.mkdir(work_path) except OSError, e: logging.error(e) return False real_path = os.path.join(work_path, config.file_name[language]) f = open(real_path, 'w') try: f.write(code.encode('utf-8', 'ignore')) except Exception, e: logging.error("%s not write code to file\n%s" % (id, e)) f.close() return False f.close() #os.chmod(work_path, stat.S_IRWXO) #os.chmod(real_path, stat.S_IRWXO) return True
def test_copy_symlinks(self): tmp_dir = self.mkdtemp() src = os.path.join(tmp_dir, 'foo') dst = os.path.join(tmp_dir, 'bar') src_link = os.path.join(tmp_dir, 'baz') write_file(src, 'foo') os.symlink(src, src_link) if hasattr(os, 'lchmod'): os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO) # don't follow shutil.copy(src_link, dst, follow_symlinks=True) self.assertFalse(os.path.islink(dst)) self.assertEqual(read_file(src), read_file(dst)) os.remove(dst) # follow shutil.copy(src_link, dst, follow_symlinks=False) self.assertTrue(os.path.islink(dst)) self.assertEqual(os.readlink(dst), os.readlink(src_link)) if hasattr(os, 'lchmod'): self.assertEqual(os.lstat(src_link).st_mode, os.lstat(dst).st_mode)
def build_unpack_comic(self, comic_path): logging.info("%s unpack requested" % comic_path) for root, dirs, files in os.walk(os.path.join(gazee.TEMP_DIR, "build"), topdown=False): for f in files: os.chmod(os.path.join(root, f), stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) # 0777 os.remove(os.path.join(root, f)) for root, dirs, files in os.walk(os.path.join(gazee.TEMP_DIR, "build"), topdown=False): for d in dirs: os.chmod(os.path.join(root, d), stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) # 0777 os.rmdir(os.path.join(root, d)) if comic_path.endswith(".cbr"): opened_rar = rarfile.RarFile(comic_path) opened_rar.extractall(os.path.join(gazee.TEMP_DIR, "build")) elif comic_path.endswith(".cbz"): opened_zip = zipfile.ZipFile(comic_path) opened_zip.extractall(os.path.join(gazee.TEMP_DIR, "build")) return
def user_unpack_comic(self, comic_path, user): logging.info("%s unpack requested" % comic_path) for root, dirs, files in os.walk(os.path.join(gazee.TEMP_DIR, user), topdown=False): for f in files: os.chmod(os.path.join(root, f), stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) # 0777 os.remove(os.path.join(root, f)) for root, dirs, files in os.walk(os.path.join(gazee.TEMP_DIR, user), topdown=False): for d in dirs: os.chmod(os.path.join(root, d), stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) # 0777 os.rmdir(os.path.join(root, d)) if comic_path.endswith(".cbr"): opened_rar = rarfile.RarFile(comic_path) opened_rar.extractall(os.path.join(gazee.TEMP_DIR, user)) elif comic_path.endswith(".cbz"): opened_zip = zipfile.ZipFile(comic_path) opened_zip.extractall(os.path.join(gazee.TEMP_DIR, user)) return # This method will return a list of .jpg files in their numberical order to be fed into the reading view.
def _get_netrc(cls): # Buffer the '.netrc' path. Use an empty file if it doesn't exist. path = cls.get_netrc_path() content = '' if os.path.exists(path): st = os.stat(path) if st.st_mode & (stat.S_IRWXG | stat.S_IRWXO): print >> sys.stderr, ( 'WARNING: netrc file %s cannot be used because its file ' 'permissions are insecure. netrc file permissions should be ' '600.' % path) with open(path) as fd: content = fd.read() # Load the '.netrc' file. We strip comments from it because processing them # can trigger a bug in Windows. See crbug.com/664664. content = '\n'.join(l for l in content.splitlines() if l.strip() and not l.strip().startswith('#')) with tempdir() as tdir: netrc_path = os.path.join(tdir, 'netrc') with open(netrc_path, 'w') as fd: fd.write(content) os.chmod(netrc_path, (stat.S_IRUSR | stat.S_IWUSR)) return cls._get_netrc_from_path(netrc_path)
def cleanup(self): print("Cleaning up") if os.path.exists(self.temp): # The check for "tempxxx" is to guard against bugs with path operations # and command-line arguments which might lead to the accidental deletion # of an important directory. if "tempxxx" in self.temp: def handleRemoveReadonly(func, path, exc): excvalue = exc[1] if excvalue.errno == errno.EACCES: os.chmod(path, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) # 0777 func(path) else: raise print("rmtree {0}".format(self.temp)); shutil.rmtree(self.temp, ignore_errors=False, onerror=handleRemoveReadonly)
def _apply_operation_to_mode(user, operator, mode_to_apply, current_mode): if operator == '=': if user == 'u': mask = stat.S_IRWXU | stat.S_ISUID elif user == 'g': mask = stat.S_IRWXG | stat.S_ISGID elif user == 'o': mask = stat.S_IRWXO | stat.S_ISVTX # mask out u, g, or o permissions from current_mode and apply new permissions inverse_mask = mask ^ PERM_BITS new_mode = (current_mode & inverse_mask) | mode_to_apply elif operator == '+': new_mode = current_mode | mode_to_apply elif operator == '-': new_mode = current_mode - (current_mode & mode_to_apply) return new_mode
def removeReadOnly(func, path, exc): """Called by shutil.rmtree when it encounters a readonly file. """ excvalue = exc[1] if func in (os.rmdir, os.remove) and excvalue.errno == errno.EACCES: os.chmod(path, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) # 0777 func(path) else: raise RuntimeError('Could not remove {0}'.format(path))
def create_tempdir(): """Create a directory within the system temp directory used to create temp files.""" try: if os.path.isdir(tempdir): shutil.rmtree(tempdir) os.mkdir(tempdir) # Make sure the directory can be removed by anyone in case the user # runs ST later as another user. os.chmod(tempdir, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) except PermissionError: if sublime.platform() != 'windows': current_user = pwd.getpwuid(os.geteuid())[0] temp_uid = os.stat(tempdir).st_uid temp_user = pwd.getpwuid(temp_uid)[0] message = ( 'The SublimeLinter temp directory:\n\n{0}\n\ncould not be cleared ' 'because it is owned by \'{1}\' and you are logged in as \'{2}\'. ' 'Please use sudo to remove the temp directory from a terminal.' ).format(tempdir, temp_user, current_user) else: message = ( 'The SublimeLinter temp directory ({}) could not be reset ' 'because it belongs to a different user.' ).format(tempdir) sublime.error_message(message) from . import persist persist.debug('temp directory:', tempdir)
def startDB(self): import kinterbasdb self.DB_NAME = os.path.join(self.DB_DIR, DBTestConnector.DB_NAME) os.chmod(self.DB_DIR, stat.S_IRWXU + stat.S_IRWXG + stat.S_IRWXO) sql = 'create database "%s" user "%s" password "%s"' sql %= (self.DB_NAME, self.DB_USER, self.DB_PASS); conn = kinterbasdb.create_database(sql) conn.close()
def _RmTreeHandleReadOnly(func, path, exc): """An error handling function for use with shutil.rmtree. This will detect failures to remove read-only files, and will change their properties prior to removing them. This is necessary on Windows as os.remove will return an access error for read-only files, and git repos contain read-only pack/index files. """ excvalue = exc[1] if func in (os.rmdir, os.remove) and excvalue.errno == errno.EACCES: _LOGGER.debug('Removing read-only path: %s', path) os.chmod(path, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) func(path) else: raise
def remove_read_only(func, path, exc): """Called by shutil.rmtree when it encounters a readonly file. :param func: :param path: :param exc: """ excvalue = exc[1] if func in (os.rmdir, os.remove) and excvalue.errno == errno.EACCES: os.chmod(path, stat.S_IRWXU| stat.S_IRWXG| stat.S_IRWXO) # 0777 func(path) else: raise RuntimeError('Could not remove {0}'.format(path))
def validate_file_permissions(filename): """Ensure that the specified file exists and warn about unsafe permissions.""" validate_file(filename) permissions = stat.S_IMODE(os.stat(filename).st_mode) if permissions & stat.S_IRWXO: logger.warning('Unsafe permissions on credentials configuration file: %s', filename)
def handleRemoveReadonly(func, path, exc): excvalue = exc[1] if func in (os.rmdir, os.remove) and excvalue.errno == errno.EACCES: os.chmod(path, stat.S_IRWXU| stat.S_IRWXG| stat.S_IRWXO) # 0777 func(path) else: raise
def lstat(self, path): """Get attributes of a file, directory, or symlink This method queries the attributes of a file, directory, or symlink. Unlike :meth:`stat`, this method should return the attributes of a symlink itself rather than the target of that link. :param bytes path: The path of the file, directory, or link to get attributes for :returns: An :class:`SFTPAttrs` or an os.stat_result containing the file attributes :raises: :exc:`SFTPError` to return an error to the client """ try: stat = self._vfs.stat(path).stat except VFSFileNotFoundError: raise SFTPError(FX_NO_SUCH_FILE, 'No such file') mod = S_IFDIR if stat.type == Stat.DIRECTORY else S_IFREG return SFTPAttrs(**{ 'size': stat.size, 'uid': os.getuid(), 'gid': os.getgid(), 'permissions': mod | S_IRWXU | S_IRWXG | S_IRWXO, 'atime': stat.atime, 'mtime': stat.mtime })
def _apply_operation_to_mode(self, user, operator, mode_to_apply, current_mode): if operator == '=': if user == 'u': mask = stat.S_IRWXU | stat.S_ISUID elif user == 'g': mask = stat.S_IRWXG | stat.S_ISGID elif user == 'o': mask = stat.S_IRWXO | stat.S_ISVTX # mask out u, g, or o permissions from current_mode and apply new permissions inverse_mask = mask ^ PERM_BITS new_mode = (current_mode & inverse_mask) | mode_to_apply elif operator == '+': new_mode = current_mode | mode_to_apply elif operator == '-': new_mode = current_mode - (current_mode & mode_to_apply) return new_mode
def safe_delete(path): ''' 'Robust' delete. ''' path = sanitize_path(path) if os.path.isfile(path): try: os.chmod(path, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) os.remove(path) except OSError: pass
def cleanup_test_droppings(testname, verbose): import shutil import stat import gc # First kill any dangling references to open files etc. # This can also issue some ResourceWarnings which would otherwise get # triggered during the following test run, and possibly produce failures. gc.collect() # Try to clean up junk commonly left behind. While tests shouldn't leave # any files or directories behind, when a test fails that can be tedious # for it to arrange. The consequences can be especially nasty on Windows, # since if a test leaves a file open, it cannot be deleted by name (while # there's nothing we can do about that here either, we can display the # name of the offending test, which is a real help). for name in (support.TESTFN, "db_home", ): if not os.path.exists(name): continue if os.path.isdir(name): kind, nuker = "directory", shutil.rmtree elif os.path.isfile(name): kind, nuker = "file", os.unlink else: raise SystemError("os.path says %r exists but is neither " "directory nor file" % name) if verbose: print("%r left behind %s %r" % (testname, kind, name)) try: # if we have chmod, fix possible permissions problems # that might prevent cleanup if (hasattr(os, 'chmod')): os.chmod(name, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) nuker(name) except Exception as msg: print(("%r left behind %s %r and it couldn't be " "removed: %s" % (testname, kind, name, msg)), file=sys.stderr)
def test_get_app_data_home(self): path = xdg.get_data_dir() expected_path = os.path.join(os.environ[self.homes['data']], 'http-prompt') self.assertEqual(path, expected_path) self.assertTrue(os.path.exists(path)) if sys.platform != 'win32': # Make sure permission for the directory is 700 mask = stat.S_IMODE(os.stat(path).st_mode) self.assertTrue(mask & stat.S_IRWXU) self.assertFalse(mask & stat.S_IRWXG) self.assertFalse(mask & stat.S_IRWXO)
def test_get_app_config_home(self): path = xdg.get_config_dir() expected_path = os.path.join(os.environ[self.homes['config']], 'http-prompt') self.assertEqual(path, expected_path) self.assertTrue(os.path.exists(path)) if sys.platform != 'win32': # Make sure permission for the directory is 700 mask = stat.S_IMODE(os.stat(path).st_mode) self.assertTrue(mask & stat.S_IRWXU) self.assertFalse(mask & stat.S_IRWXG) self.assertFalse(mask & stat.S_IRWXO)
def cleanup_test_droppings(testname, verbose): import stat import gc # First kill any dangling references to open files etc. gc.collect() # Try to clean up junk commonly left behind. While tests shouldn't leave # any files or directories behind, when a test fails that can be tedious # for it to arrange. The consequences can be especially nasty on Windows, # since if a test leaves a file open, it cannot be deleted by name (while # there's nothing we can do about that here either, we can display the # name of the offending test, which is a real help). for name in (test_support.TESTFN, "db_home", ): if not os.path.exists(name): continue if os.path.isdir(name): kind, nuker = "directory", shutil.rmtree elif os.path.isfile(name): kind, nuker = "file", os.unlink else: raise SystemError("os.path says %r exists but is neither " "directory nor file" % name) if verbose: print "%r left behind %s %r" % (testname, kind, name) try: # if we have chmod, fix possible permissions problems # that might prevent cleanup if (hasattr(os, 'chmod')): os.chmod(name, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) nuker(name) except Exception, msg: print >> sys.stderr, ("%r left behind %s %r and it couldn't be " "removed: %s" % (testname, kind, name, msg))
def test_mode(self): with open(TESTFN, 'w'): pass if os.name == 'posix': os.chmod(TESTFN, 0o700) st_mode = self.get_mode() self.assertS_IS("REG", st_mode) self.assertEqual(stat.S_IMODE(st_mode), stat.S_IRWXU) os.chmod(TESTFN, 0o070) st_mode = self.get_mode() self.assertS_IS("REG", st_mode) self.assertEqual(stat.S_IMODE(st_mode), stat.S_IRWXG) os.chmod(TESTFN, 0o007) st_mode = self.get_mode() self.assertS_IS("REG", st_mode) self.assertEqual(stat.S_IMODE(st_mode), stat.S_IRWXO) os.chmod(TESTFN, 0o444) st_mode = self.get_mode() self.assertS_IS("REG", st_mode) self.assertEqual(stat.S_IMODE(st_mode), 0o444) else: os.chmod(TESTFN, 0o700) st_mode = self.get_mode() self.assertS_IS("REG", st_mode) self.assertEqual(stat.S_IFMT(st_mode), stat.S_IFREG)
def test_copymode_follow_symlinks(self): tmp_dir = self.mkdtemp() src = os.path.join(tmp_dir, 'foo') dst = os.path.join(tmp_dir, 'bar') src_link = os.path.join(tmp_dir, 'baz') dst_link = os.path.join(tmp_dir, 'quux') write_file(src, 'foo') write_file(dst, 'foo') os.symlink(src, src_link) os.symlink(dst, dst_link) os.chmod(src, stat.S_IRWXU|stat.S_IRWXG) # file to file os.chmod(dst, stat.S_IRWXO) self.assertNotEqual(os.stat(src).st_mode, os.stat(dst).st_mode) shutil.copymode(src, dst) self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode) # follow src link os.chmod(dst, stat.S_IRWXO) shutil.copymode(src_link, dst) self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode) # follow dst link os.chmod(dst, stat.S_IRWXO) shutil.copymode(src, dst_link) self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode) # follow both links os.chmod(dst, stat.S_IRWXO) shutil.copymode(src_link, dst) self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
def test_copymode_symlink_to_symlink(self): tmp_dir = self.mkdtemp() src = os.path.join(tmp_dir, 'foo') dst = os.path.join(tmp_dir, 'bar') src_link = os.path.join(tmp_dir, 'baz') dst_link = os.path.join(tmp_dir, 'quux') write_file(src, 'foo') write_file(dst, 'foo') os.symlink(src, src_link) os.symlink(dst, dst_link) os.chmod(src, stat.S_IRWXU|stat.S_IRWXG) os.chmod(dst, stat.S_IRWXU) os.lchmod(src_link, stat.S_IRWXO|stat.S_IRWXG) # link to link os.lchmod(dst_link, stat.S_IRWXO) shutil.copymode(src_link, dst_link, follow_symlinks=False) self.assertEqual(os.lstat(src_link).st_mode, os.lstat(dst_link).st_mode) self.assertNotEqual(os.stat(src).st_mode, os.stat(dst).st_mode) # src link - use chmod os.lchmod(dst_link, stat.S_IRWXO) shutil.copymode(src_link, dst, follow_symlinks=False) self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode) # dst link - use chmod os.lchmod(dst_link, stat.S_IRWXO) shutil.copymode(src, dst_link, follow_symlinks=False) self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
def test_copystat_symlinks(self): tmp_dir = self.mkdtemp() src = os.path.join(tmp_dir, 'foo') dst = os.path.join(tmp_dir, 'bar') src_link = os.path.join(tmp_dir, 'baz') dst_link = os.path.join(tmp_dir, 'qux') write_file(src, 'foo') src_stat = os.stat(src) os.utime(src, (src_stat.st_atime, src_stat.st_mtime - 42.0)) # ensure different mtimes write_file(dst, 'bar') self.assertNotEqual(os.stat(src).st_mtime, os.stat(dst).st_mtime) os.symlink(src, src_link) os.symlink(dst, dst_link) if hasattr(os, 'lchmod'): os.lchmod(src_link, stat.S_IRWXO) if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'): os.lchflags(src_link, stat.UF_NODUMP) src_link_stat = os.lstat(src_link) # follow if hasattr(os, 'lchmod'): shutil.copystat(src_link, dst_link, follow_symlinks=True) self.assertNotEqual(src_link_stat.st_mode, os.stat(dst).st_mode) # don't follow shutil.copystat(src_link, dst_link, follow_symlinks=False) dst_link_stat = os.lstat(dst_link) if os.utime in os.supports_follow_symlinks: for attr in 'st_atime', 'st_mtime': # The modification times may be truncated in the new file. self.assertLessEqual(getattr(src_link_stat, attr), getattr(dst_link_stat, attr) + 1) if hasattr(os, 'lchmod'): self.assertEqual(src_link_stat.st_mode, dst_link_stat.st_mode) if hasattr(os, 'lchflags') and hasattr(src_link_stat, 'st_flags'): self.assertEqual(src_link_stat.st_flags, dst_link_stat.st_flags) # tell to follow but dst is not a link shutil.copystat(src_link, dst, follow_symlinks=False) self.assertTrue(abs(os.stat(src).st_mtime - os.stat(dst).st_mtime) < 00000.1)
def test_copy2_symlinks(self): tmp_dir = self.mkdtemp() src = os.path.join(tmp_dir, 'foo') dst = os.path.join(tmp_dir, 'bar') src_link = os.path.join(tmp_dir, 'baz') write_file(src, 'foo') os.symlink(src, src_link) if hasattr(os, 'lchmod'): os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO) if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'): os.lchflags(src_link, stat.UF_NODUMP) src_stat = os.stat(src) src_link_stat = os.lstat(src_link) # follow shutil.copy2(src_link, dst, follow_symlinks=True) self.assertFalse(os.path.islink(dst)) self.assertEqual(read_file(src), read_file(dst)) os.remove(dst) # don't follow shutil.copy2(src_link, dst, follow_symlinks=False) self.assertTrue(os.path.islink(dst)) self.assertEqual(os.readlink(dst), os.readlink(src_link)) dst_stat = os.lstat(dst) if os.utime in os.supports_follow_symlinks: for attr in 'st_atime', 'st_mtime': # The modification times may be truncated in the new file. self.assertLessEqual(getattr(src_link_stat, attr), getattr(dst_stat, attr) + 1) if hasattr(os, 'lchmod'): self.assertEqual(src_link_stat.st_mode, dst_stat.st_mode) self.assertNotEqual(src_stat.st_mode, dst_stat.st_mode) if hasattr(os, 'lchflags') and hasattr(src_link_stat, 'st_flags'): self.assertEqual(src_link_stat.st_flags, dst_stat.st_flags)
def test_mode(self): with open(TESTFN, 'w'): pass if os.name == 'posix': os.chmod(TESTFN, 0o700) st_mode, modestr = self.get_mode() self.assertEqual(modestr, '-rwx------') self.assertS_IS("REG", st_mode) self.assertEqual(stat.S_IMODE(st_mode), stat.S_IRWXU) os.chmod(TESTFN, 0o070) st_mode, modestr = self.get_mode() self.assertEqual(modestr, '----rwx---') self.assertS_IS("REG", st_mode) self.assertEqual(stat.S_IMODE(st_mode), stat.S_IRWXG) os.chmod(TESTFN, 0o007) st_mode, modestr = self.get_mode() self.assertEqual(modestr, '-------rwx') self.assertS_IS("REG", st_mode) self.assertEqual(stat.S_IMODE(st_mode), stat.S_IRWXO) os.chmod(TESTFN, 0o444) st_mode, modestr = self.get_mode() self.assertS_IS("REG", st_mode) self.assertEqual(modestr, '-r--r--r--') self.assertEqual(stat.S_IMODE(st_mode), 0o444) else: os.chmod(TESTFN, 0o700) st_mode, modestr = self.get_mode() self.assertEqual(modestr[:3], '-rw') self.assertS_IS("REG", st_mode) self.assertEqual(stat.S_IFMT(st_mode), stat.S_IFREG)
def check_ssh_private_key_filemode(ssh_private_key): # type: (pathlib.Path) -> bool """Check SSH private key filemode :param pathlib.Path ssh_private_key: SSH private key :rtype: bool :return: private key filemode is ok """ def _mode_check(fstat, flag): return bool(fstat & flag) if util.on_windows(): return True fstat = ssh_private_key.stat().st_mode modes = frozenset((stat.S_IRWXG, stat.S_IRWXO)) return not any([_mode_check(fstat, x) for x in modes])
def validate_ssh_key_path(ssh_key_path): assert os.path.isfile(ssh_key_path), 'could not find ssh private key: {}'.format(ssh_key_path) assert stat.S_IMODE( os.stat(ssh_key_path).st_mode) & (stat.S_IRWXG + stat.S_IRWXO) == 0, ( 'ssh_key_path must be only read / write / executable by the owner. It may not be read / write / executable ' 'by group, or other.') with open(ssh_key_path) as fh: assert 'ENCRYPTED' not in fh.read(), ('Encrypted SSH keys (which contain passphrases) ' 'are not allowed. Use a key without a passphrase.')
def check(self): """Verify correct permissions and ownership of database files.""" scheme, _, file, _, _ = urlsplit(settings.DB_URI) if scheme != 'file': log.warning('DB_URI does not point to a file but %s -- forgot to set "BUILTIN_ZEO = False"?', scheme) return try: st = os.stat(file) except FileNotFoundError: # Database not created yet, not an error. return error = False if st.st_uid != os.geteuid(): log.error('Database file %s has wrong owner: %d (file) vs %d (daemon user)', file, st.st_uid, os.geteuid()) error = True if st.st_gid != os.getegid(): log.error('Database file %s has wrong group: %d (file) vs %d (daemon user)', file, st.st_gid, os.getegid()) # not a critical error, could be, theoretically, on purpose if error: sys.exit(1) # Fix perms, if any perms = stat.S_IMODE(st.st_mode) perms_should_be = perms & ~stat.S_IRWXO if perms != perms_should_be: try: os.chmod(file, perms_should_be) except OSError as ose: log.debug('Tried to fix permissions on database file, but it didn\'t work: %s', file, ose)
def get_tree(self, path): path = path.lstrip(os.sep.encode('utf-8')) directory = os.path.join(self.root, path) tree = Tree() directory_items = self._sftp.listdir_attr_b(directory) for fattr in directory_items: filename = fattr.filename item = {} if stat.S_ISREG(fattr.st_mode): item['type'] = 'blob' item['filetype'] = 'regular' elif stat.S_ISDIR(fattr.st_mode): item['type'] = 'tree' item['filetype'] = 'directory' elif stat.S_ISLNK(fattr.st_mode): item['filetype'] = 'link' item['link'] = self._sftp.readlink(os.path.join(directory, filename)).encode('utf8') elif stat.S_ISFIFO(fattr.st_mode): item['filetype'] = 'fifo' else: continue # FIXME: Warn fmode = fattr.st_mode & (stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO | stat.S_ISVTX) item['uid'] = fattr.st_uid item['gid'] = fattr.st_gid item['mode'] = fmode item['mtime'] = int(fattr.st_mtime) item['size'] = fattr.st_size tree.add(filename, item) return tree
def get_tree(self, path): path = path.lstrip(os.sep.encode('utf-8')) directory = os.path.join(self.root, path) tree = Tree() try: directory_items = os.listdir(directory) except OSError as err: raise RemoteOperationError(err.strerror) for filename in directory_items: assert isinstance(filename, bytes) item = {} fullname = os.path.join(directory, filename) fstat = os.lstat(fullname) if stat.S_ISREG(fstat.st_mode): item['type'] = 'blob' item['filetype'] = 'regular' elif stat.S_ISDIR(fstat.st_mode): item['type'] = 'tree' item['filetype'] = 'directory' elif stat.S_ISLNK(fstat.st_mode): item['filetype'] = 'link' item['link'] = os.readlink(os.path.join(directory, filename)) elif stat.S_ISFIFO(fstat.st_mode): item['filetype'] = 'fifo' else: continue # FIXME: Warn fmode = fstat.st_mode & (stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO | stat.S_ISVTX) item['uid'] = fstat.st_uid item['gid'] = fstat.st_gid item['mode'] = fmode item['atime'] = int(fstat.st_atime) item['mtime'] = int(fstat.st_mtime) item['ctime'] = int(fstat.st_ctime) item['size'] = fstat.st_size tree.add(filename, item) return tree
def test_ensure_key_pair_nonexist_local(runner, fake_ec2_resource, fake_ec2_client, fake_ssh_path_function, fake_key_pair_info): fake_ec2_client.create_key_pair.return_value = { 'KeyMaterial': 'test key' } with mock.patch('boto3.resource') as mocked_resource, mock.patch('boto3.client') as mocked_client, mock.patch('deploy.utils.decorators._ssh_path', new=fake_ssh_path_function): mocked_resource.return_value = fake_ec2_resource mocked_client.return_value = fake_ec2_client result = runner.invoke(dummy, input='\n') assert result.exit_code == exit_codes.SUCCESS fake_ec2_resource.KeyPair.assert_called_once_with('test') fake_key_pair_info.delete.assert_called_once_with() fake_ec2_client.create_key_pair.assert_called_once_with(KeyName='test') with open(fake_ssh_path_function('test'), 'r') as f: assert f.read() == 'test key' s = os.stat(fake_ssh_path_function('test')).st_mode # owner assert bool(s & stat.S_IRUSR) assert bool(s & stat.S_IWUSR) assert not bool(s & stat.S_IXUSR) # group assert not bool(s & stat.S_IRWXG) # others assert not bool(s & stat.S_IRWXO)
def test_ensure_key_pair_nonexist_local_private(runner, fake_ec2_resource, fake_ec2_client, fake_ssh_path_function, fake_key_pair_info): fake_ec2_client.create_key_pair.return_value = { 'KeyMaterial': 'test key' } type(fake_key_pair_info).key_fingerprint = mock.PropertyMock(side_effect=FakeClientError) assert not os.system('touch {}'.format(fake_ssh_path_function('test'))) with mock.patch('boto3.resource') as mocked_resource, mock.patch('boto3.client') as mocked_client, mock.patch('deploy.utils.decorators._ssh_path', new=fake_ssh_path_function): mocked_resource.return_value = fake_ec2_resource mocked_client.return_value = fake_ec2_client result = runner.invoke(dummy, input='\n') assert result.exit_code == exit_codes.SUCCESS fake_ec2_resource.KeyPair.assert_called_once_with('test') fake_key_pair_info.delete.assert_called_once_with() fake_ec2_client.create_key_pair.assert_called_once_with(KeyName='test') with open(fake_ssh_path_function('test'), 'r') as f: assert f.read() == 'test key' s = os.stat(fake_ssh_path_function('test')).st_mode # owner assert bool(s & stat.S_IRUSR) assert bool(s & stat.S_IWUSR) assert not bool(s & stat.S_IXUSR) # group assert not bool(s & stat.S_IRWXG) # others assert not bool(s & stat.S_IRWXO)
def test_ensure_key_pair_nonexist_local_public(runner, fake_ec2_resource, fake_ec2_client, fake_ssh_path_function, fake_key_pair_info): fake_ec2_client.create_key_pair.return_value = { 'KeyMaterial': 'test key' } assert not os.system('touch {}.pub'.format(fake_ssh_path_function('test'))) with mock.patch('boto3.resource') as mocked_resource, mock.patch('boto3.client') as mocked_client, mock.patch('deploy.utils.decorators._ssh_path', new=fake_ssh_path_function): mocked_resource.return_value = fake_ec2_resource mocked_client.return_value = fake_ec2_client result = runner.invoke(dummy, input='\n') assert result.exit_code == exit_codes.SUCCESS fake_ec2_resource.KeyPair.assert_called_once_with('test') fake_key_pair_info.delete.assert_called_once_with() fake_ec2_client.create_key_pair.assert_called_once_with(KeyName='test') with open(fake_ssh_path_function('test'), 'r') as f: assert f.read() == 'test key' s = os.stat(fake_ssh_path_function('test')).st_mode # owner assert bool(s & stat.S_IRUSR) assert bool(s & stat.S_IWUSR) assert not bool(s & stat.S_IXUSR) # group assert not bool(s & stat.S_IRWXG) # others assert not bool(s & stat.S_IRWXO)
def test_copymode_follow_symlinks(self): tmp_dir = self.mkdtemp() src = os.path.join(tmp_dir, 'foo') dst = os.path.join(tmp_dir, 'bar') src_link = os.path.join(tmp_dir, 'baz') dst_link = os.path.join(tmp_dir, 'quux') write_file(src, 'foo') write_file(dst, 'foo') os.symlink(src, src_link) os.symlink(dst, dst_link) os.chmod(src, stat.S_IRWXU|stat.S_IRWXG) # file to file os.chmod(dst, stat.S_IRWXO) self.assertNotEqual(os.stat(src).st_mode, os.stat(dst).st_mode) shutil.copymode(src, dst) self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode) # On Windows, os.chmod does not follow symlinks (issue #15411) if os.name != 'nt': # follow src link os.chmod(dst, stat.S_IRWXO) shutil.copymode(src_link, dst) self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode) # follow dst link os.chmod(dst, stat.S_IRWXO) shutil.copymode(src, dst_link) self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode) # follow both links os.chmod(dst, stat.S_IRWXO) shutil.copymode(src_link, dst_link) self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)