我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用stat.S_IRWXG。
def getattr(self, path, fh=None): stat = self.send_cmd(cmd='stat', path=path) if stat['status'] != 'ok': raise FuseOSError(ENOENT) fuse_stat = {} # Set it to 777 access fuse_stat['st_mode'] = 0 if stat['type'] == 'folder': fuse_stat['st_mode'] |= S_IFDIR else: fuse_stat['st_mode'] |= S_IFREG fuse_stat['st_size'] = stat.get('size', 0) fuse_stat['st_ctime'] = dateparse(stat['created']).timestamp() # TODO change to local timezone fuse_stat['st_mtime'] = dateparse(stat['updated']).timestamp() fuse_stat['st_atime'] = dateparse(stat['updated']).timestamp() # TODO not supported ? fuse_stat['st_mode'] |= S_IRWXU | S_IRWXG | S_IRWXO fuse_stat['st_nlink'] = 1 fuse_stat['st_uid'] = os.getuid() fuse_stat['st_gid'] = os.getgid() return fuse_stat
def build_unpack_comic(self, comic_path): logging.info("%s unpack requested" % comic_path) for root, dirs, files in os.walk(os.path.join(gazee.TEMP_DIR, "build"), topdown=False): for f in files: os.chmod(os.path.join(root, f), stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) # 0777 os.remove(os.path.join(root, f)) for root, dirs, files in os.walk(os.path.join(gazee.TEMP_DIR, "build"), topdown=False): for d in dirs: os.chmod(os.path.join(root, d), stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) # 0777 os.rmdir(os.path.join(root, d)) if comic_path.endswith(".cbr"): opened_rar = rarfile.RarFile(comic_path) opened_rar.extractall(os.path.join(gazee.TEMP_DIR, "build")) elif comic_path.endswith(".cbz"): opened_zip = zipfile.ZipFile(comic_path) opened_zip.extractall(os.path.join(gazee.TEMP_DIR, "build")) return
def user_unpack_comic(self, comic_path, user): logging.info("%s unpack requested" % comic_path) for root, dirs, files in os.walk(os.path.join(gazee.TEMP_DIR, user), topdown=False): for f in files: os.chmod(os.path.join(root, f), stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) # 0777 os.remove(os.path.join(root, f)) for root, dirs, files in os.walk(os.path.join(gazee.TEMP_DIR, user), topdown=False): for d in dirs: os.chmod(os.path.join(root, d), stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) # 0777 os.rmdir(os.path.join(root, d)) if comic_path.endswith(".cbr"): opened_rar = rarfile.RarFile(comic_path) opened_rar.extractall(os.path.join(gazee.TEMP_DIR, user)) elif comic_path.endswith(".cbz"): opened_zip = zipfile.ZipFile(comic_path) opened_zip.extractall(os.path.join(gazee.TEMP_DIR, user)) return # This method will return a list of .jpg files in their numberical order to be fed into the reading view.
def _get_netrc(cls): # Buffer the '.netrc' path. Use an empty file if it doesn't exist. path = cls.get_netrc_path() content = '' if os.path.exists(path): st = os.stat(path) if st.st_mode & (stat.S_IRWXG | stat.S_IRWXO): print >> sys.stderr, ( 'WARNING: netrc file %s cannot be used because its file ' 'permissions are insecure. netrc file permissions should be ' '600.' % path) with open(path) as fd: content = fd.read() # Load the '.netrc' file. We strip comments from it because processing them # can trigger a bug in Windows. See crbug.com/664664. content = '\n'.join(l for l in content.splitlines() if l.strip() and not l.strip().startswith('#')) with tempdir() as tdir: netrc_path = os.path.join(tdir, 'netrc') with open(netrc_path, 'w') as fd: fd.write(content) os.chmod(netrc_path, (stat.S_IRUSR | stat.S_IWUSR)) return cls._get_netrc_from_path(netrc_path)
def cleanup(self): print("Cleaning up") if os.path.exists(self.temp): # The check for "tempxxx" is to guard against bugs with path operations # and command-line arguments which might lead to the accidental deletion # of an important directory. if "tempxxx" in self.temp: def handleRemoveReadonly(func, path, exc): excvalue = exc[1] if excvalue.errno == errno.EACCES: os.chmod(path, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) # 0777 func(path) else: raise print("rmtree {0}".format(self.temp)); shutil.rmtree(self.temp, ignore_errors=False, onerror=handleRemoveReadonly)
def _apply_operation_to_mode(user, operator, mode_to_apply, current_mode): if operator == '=': if user == 'u': mask = stat.S_IRWXU | stat.S_ISUID elif user == 'g': mask = stat.S_IRWXG | stat.S_ISGID elif user == 'o': mask = stat.S_IRWXO | stat.S_ISVTX # mask out u, g, or o permissions from current_mode and apply new permissions inverse_mask = mask ^ PERM_BITS new_mode = (current_mode & inverse_mask) | mode_to_apply elif operator == '+': new_mode = current_mode | mode_to_apply elif operator == '-': new_mode = current_mode - (current_mode & mode_to_apply) return new_mode
def removeReadOnly(func, path, exc): """Called by shutil.rmtree when it encounters a readonly file. """ excvalue = exc[1] if func in (os.rmdir, os.remove) and excvalue.errno == errno.EACCES: os.chmod(path, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) # 0777 func(path) else: raise RuntimeError('Could not remove {0}'.format(path))
def create_tempdir(): """Create a directory within the system temp directory used to create temp files.""" try: if os.path.isdir(tempdir): shutil.rmtree(tempdir) os.mkdir(tempdir) # Make sure the directory can be removed by anyone in case the user # runs ST later as another user. os.chmod(tempdir, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) except PermissionError: if sublime.platform() != 'windows': current_user = pwd.getpwuid(os.geteuid())[0] temp_uid = os.stat(tempdir).st_uid temp_user = pwd.getpwuid(temp_uid)[0] message = ( 'The SublimeLinter temp directory:\n\n{0}\n\ncould not be cleared ' 'because it is owned by \'{1}\' and you are logged in as \'{2}\'. ' 'Please use sudo to remove the temp directory from a terminal.' ).format(tempdir, temp_user, current_user) else: message = ( 'The SublimeLinter temp directory ({}) could not be reset ' 'because it belongs to a different user.' ).format(tempdir) sublime.error_message(message) from . import persist persist.debug('temp directory:', tempdir)
def startDB(self): import kinterbasdb self.DB_NAME = os.path.join(self.DB_DIR, DBTestConnector.DB_NAME) os.chmod(self.DB_DIR, stat.S_IRWXU + stat.S_IRWXG + stat.S_IRWXO) sql = 'create database "%s" user "%s" password "%s"' sql %= (self.DB_NAME, self.DB_USER, self.DB_PASS); conn = kinterbasdb.create_database(sql) conn.close()
def _RmTreeHandleReadOnly(func, path, exc): """An error handling function for use with shutil.rmtree. This will detect failures to remove read-only files, and will change their properties prior to removing them. This is necessary on Windows as os.remove will return an access error for read-only files, and git repos contain read-only pack/index files. """ excvalue = exc[1] if func in (os.rmdir, os.remove) and excvalue.errno == errno.EACCES: _LOGGER.debug('Removing read-only path: %s', path) os.chmod(path, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) func(path) else: raise
def remove_read_only(func, path, exc): """Called by shutil.rmtree when it encounters a readonly file. :param func: :param path: :param exc: """ excvalue = exc[1] if func in (os.rmdir, os.remove) and excvalue.errno == errno.EACCES: os.chmod(path, stat.S_IRWXU| stat.S_IRWXG| stat.S_IRWXO) # 0777 func(path) else: raise RuntimeError('Could not remove {0}'.format(path))
def handleRemoveReadonly(func, path, exc): excvalue = exc[1] if func in (os.rmdir, os.remove) and excvalue.errno == errno.EACCES: os.chmod(path, stat.S_IRWXU| stat.S_IRWXG| stat.S_IRWXO) # 0777 func(path) else: raise
def lstat(self, path): """Get attributes of a file, directory, or symlink This method queries the attributes of a file, directory, or symlink. Unlike :meth:`stat`, this method should return the attributes of a symlink itself rather than the target of that link. :param bytes path: The path of the file, directory, or link to get attributes for :returns: An :class:`SFTPAttrs` or an os.stat_result containing the file attributes :raises: :exc:`SFTPError` to return an error to the client """ try: stat = self._vfs.stat(path).stat except VFSFileNotFoundError: raise SFTPError(FX_NO_SUCH_FILE, 'No such file') mod = S_IFDIR if stat.type == Stat.DIRECTORY else S_IFREG return SFTPAttrs(**{ 'size': stat.size, 'uid': os.getuid(), 'gid': os.getgid(), 'permissions': mod | S_IRWXU | S_IRWXG | S_IRWXO, 'atime': stat.atime, 'mtime': stat.mtime })
def _apply_operation_to_mode(self, user, operator, mode_to_apply, current_mode): if operator == '=': if user == 'u': mask = stat.S_IRWXU | stat.S_ISUID elif user == 'g': mask = stat.S_IRWXG | stat.S_ISGID elif user == 'o': mask = stat.S_IRWXO | stat.S_ISVTX # mask out u, g, or o permissions from current_mode and apply new permissions inverse_mask = mask ^ PERM_BITS new_mode = (current_mode & inverse_mask) | mode_to_apply elif operator == '+': new_mode = current_mode | mode_to_apply elif operator == '-': new_mode = current_mode - (current_mode & mode_to_apply) return new_mode
def safe_delete(path): ''' 'Robust' delete. ''' path = sanitize_path(path) if os.path.isfile(path): try: os.chmod(path, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) os.remove(path) except OSError: pass
def cleanup_test_droppings(testname, verbose): import shutil import stat import gc # First kill any dangling references to open files etc. # This can also issue some ResourceWarnings which would otherwise get # triggered during the following test run, and possibly produce failures. gc.collect() # Try to clean up junk commonly left behind. While tests shouldn't leave # any files or directories behind, when a test fails that can be tedious # for it to arrange. The consequences can be especially nasty on Windows, # since if a test leaves a file open, it cannot be deleted by name (while # there's nothing we can do about that here either, we can display the # name of the offending test, which is a real help). for name in (support.TESTFN, "db_home", ): if not os.path.exists(name): continue if os.path.isdir(name): kind, nuker = "directory", shutil.rmtree elif os.path.isfile(name): kind, nuker = "file", os.unlink else: raise SystemError("os.path says %r exists but is neither " "directory nor file" % name) if verbose: print("%r left behind %s %r" % (testname, kind, name)) try: # if we have chmod, fix possible permissions problems # that might prevent cleanup if (hasattr(os, 'chmod')): os.chmod(name, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) nuker(name) except Exception as msg: print(("%r left behind %s %r and it couldn't be " "removed: %s" % (testname, kind, name, msg)), file=sys.stderr)
def test_get_app_data_home(self): path = xdg.get_data_dir() expected_path = os.path.join(os.environ[self.homes['data']], 'http-prompt') self.assertEqual(path, expected_path) self.assertTrue(os.path.exists(path)) if sys.platform != 'win32': # Make sure permission for the directory is 700 mask = stat.S_IMODE(os.stat(path).st_mode) self.assertTrue(mask & stat.S_IRWXU) self.assertFalse(mask & stat.S_IRWXG) self.assertFalse(mask & stat.S_IRWXO)
def test_get_app_config_home(self): path = xdg.get_config_dir() expected_path = os.path.join(os.environ[self.homes['config']], 'http-prompt') self.assertEqual(path, expected_path) self.assertTrue(os.path.exists(path)) if sys.platform != 'win32': # Make sure permission for the directory is 700 mask = stat.S_IMODE(os.stat(path).st_mode) self.assertTrue(mask & stat.S_IRWXU) self.assertFalse(mask & stat.S_IRWXG) self.assertFalse(mask & stat.S_IRWXO)
def cleanup_test_droppings(testname, verbose): import stat import gc # First kill any dangling references to open files etc. gc.collect() # Try to clean up junk commonly left behind. While tests shouldn't leave # any files or directories behind, when a test fails that can be tedious # for it to arrange. The consequences can be especially nasty on Windows, # since if a test leaves a file open, it cannot be deleted by name (while # there's nothing we can do about that here either, we can display the # name of the offending test, which is a real help). for name in (test_support.TESTFN, "db_home", ): if not os.path.exists(name): continue if os.path.isdir(name): kind, nuker = "directory", shutil.rmtree elif os.path.isfile(name): kind, nuker = "file", os.unlink else: raise SystemError("os.path says %r exists but is neither " "directory nor file" % name) if verbose: print "%r left behind %s %r" % (testname, kind, name) try: # if we have chmod, fix possible permissions problems # that might prevent cleanup if (hasattr(os, 'chmod')): os.chmod(name, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) nuker(name) except Exception, msg: print >> sys.stderr, ("%r left behind %s %r and it couldn't be " "removed: %s" % (testname, kind, name, msg))
def test_mode(self): with open(TESTFN, 'w'): pass if os.name == 'posix': os.chmod(TESTFN, 0o700) st_mode = self.get_mode() self.assertS_IS("REG", st_mode) self.assertEqual(stat.S_IMODE(st_mode), stat.S_IRWXU) os.chmod(TESTFN, 0o070) st_mode = self.get_mode() self.assertS_IS("REG", st_mode) self.assertEqual(stat.S_IMODE(st_mode), stat.S_IRWXG) os.chmod(TESTFN, 0o007) st_mode = self.get_mode() self.assertS_IS("REG", st_mode) self.assertEqual(stat.S_IMODE(st_mode), stat.S_IRWXO) os.chmod(TESTFN, 0o444) st_mode = self.get_mode() self.assertS_IS("REG", st_mode) self.assertEqual(stat.S_IMODE(st_mode), 0o444) else: os.chmod(TESTFN, 0o700) st_mode = self.get_mode() self.assertS_IS("REG", st_mode) self.assertEqual(stat.S_IFMT(st_mode), stat.S_IFREG)
def test_mode(self): with open(TESTFN, 'w'): pass if os.name == 'posix': os.chmod(TESTFN, 0o700) st_mode, modestr = self.get_mode() self.assertEqual(modestr, '-rwx------') self.assertS_IS("REG", st_mode) self.assertEqual(stat.S_IMODE(st_mode), stat.S_IRWXU) os.chmod(TESTFN, 0o070) st_mode, modestr = self.get_mode() self.assertEqual(modestr, '----rwx---') self.assertS_IS("REG", st_mode) self.assertEqual(stat.S_IMODE(st_mode), stat.S_IRWXG) os.chmod(TESTFN, 0o007) st_mode, modestr = self.get_mode() self.assertEqual(modestr, '-------rwx') self.assertS_IS("REG", st_mode) self.assertEqual(stat.S_IMODE(st_mode), stat.S_IRWXO) os.chmod(TESTFN, 0o444) st_mode, modestr = self.get_mode() self.assertS_IS("REG", st_mode) self.assertEqual(modestr, '-r--r--r--') self.assertEqual(stat.S_IMODE(st_mode), 0o444) else: os.chmod(TESTFN, 0o700) st_mode, modestr = self.get_mode() self.assertEqual(modestr[:3], '-rw') self.assertS_IS("REG", st_mode) self.assertEqual(stat.S_IFMT(st_mode), stat.S_IFREG)
def check_ssh_private_key_filemode(ssh_private_key): # type: (pathlib.Path) -> bool """Check SSH private key filemode :param pathlib.Path ssh_private_key: SSH private key :rtype: bool :return: private key filemode is ok """ def _mode_check(fstat, flag): return bool(fstat & flag) if util.on_windows(): return True fstat = ssh_private_key.stat().st_mode modes = frozenset((stat.S_IRWXG, stat.S_IRWXO)) return not any([_mode_check(fstat, x) for x in modes])
def validate_ssh_key_path(ssh_key_path): assert os.path.isfile(ssh_key_path), 'could not find ssh private key: {}'.format(ssh_key_path) assert stat.S_IMODE( os.stat(ssh_key_path).st_mode) & (stat.S_IRWXG + stat.S_IRWXO) == 0, ( 'ssh_key_path must be only read / write / executable by the owner. It may not be read / write / executable ' 'by group, or other.') with open(ssh_key_path) as fh: assert 'ENCRYPTED' not in fh.read(), ('Encrypted SSH keys (which contain passphrases) ' 'are not allowed. Use a key without a passphrase.')
def get_tree(self, path): path = path.lstrip(os.sep.encode('utf-8')) directory = os.path.join(self.root, path) tree = Tree() directory_items = self._sftp.listdir_attr_b(directory) for fattr in directory_items: filename = fattr.filename item = {} if stat.S_ISREG(fattr.st_mode): item['type'] = 'blob' item['filetype'] = 'regular' elif stat.S_ISDIR(fattr.st_mode): item['type'] = 'tree' item['filetype'] = 'directory' elif stat.S_ISLNK(fattr.st_mode): item['filetype'] = 'link' item['link'] = self._sftp.readlink(os.path.join(directory, filename)).encode('utf8') elif stat.S_ISFIFO(fattr.st_mode): item['filetype'] = 'fifo' else: continue # FIXME: Warn fmode = fattr.st_mode & (stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO | stat.S_ISVTX) item['uid'] = fattr.st_uid item['gid'] = fattr.st_gid item['mode'] = fmode item['mtime'] = int(fattr.st_mtime) item['size'] = fattr.st_size tree.add(filename, item) return tree
def get_tree(self, path): path = path.lstrip(os.sep.encode('utf-8')) directory = os.path.join(self.root, path) tree = Tree() try: directory_items = os.listdir(directory) except OSError as err: raise RemoteOperationError(err.strerror) for filename in directory_items: assert isinstance(filename, bytes) item = {} fullname = os.path.join(directory, filename) fstat = os.lstat(fullname) if stat.S_ISREG(fstat.st_mode): item['type'] = 'blob' item['filetype'] = 'regular' elif stat.S_ISDIR(fstat.st_mode): item['type'] = 'tree' item['filetype'] = 'directory' elif stat.S_ISLNK(fstat.st_mode): item['filetype'] = 'link' item['link'] = os.readlink(os.path.join(directory, filename)) elif stat.S_ISFIFO(fstat.st_mode): item['filetype'] = 'fifo' else: continue # FIXME: Warn fmode = fstat.st_mode & (stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO | stat.S_ISVTX) item['uid'] = fstat.st_uid item['gid'] = fstat.st_gid item['mode'] = fmode item['atime'] = int(fstat.st_atime) item['mtime'] = int(fstat.st_mtime) item['ctime'] = int(fstat.st_ctime) item['size'] = fstat.st_size tree.add(filename, item) return tree
def test_ensure_key_pair_nonexist_local(runner, fake_ec2_resource, fake_ec2_client, fake_ssh_path_function, fake_key_pair_info): fake_ec2_client.create_key_pair.return_value = { 'KeyMaterial': 'test key' } with mock.patch('boto3.resource') as mocked_resource, mock.patch('boto3.client') as mocked_client, mock.patch('deploy.utils.decorators._ssh_path', new=fake_ssh_path_function): mocked_resource.return_value = fake_ec2_resource mocked_client.return_value = fake_ec2_client result = runner.invoke(dummy, input='\n') assert result.exit_code == exit_codes.SUCCESS fake_ec2_resource.KeyPair.assert_called_once_with('test') fake_key_pair_info.delete.assert_called_once_with() fake_ec2_client.create_key_pair.assert_called_once_with(KeyName='test') with open(fake_ssh_path_function('test'), 'r') as f: assert f.read() == 'test key' s = os.stat(fake_ssh_path_function('test')).st_mode # owner assert bool(s & stat.S_IRUSR) assert bool(s & stat.S_IWUSR) assert not bool(s & stat.S_IXUSR) # group assert not bool(s & stat.S_IRWXG) # others assert not bool(s & stat.S_IRWXO)
def test_ensure_key_pair_nonexist_local_private(runner, fake_ec2_resource, fake_ec2_client, fake_ssh_path_function, fake_key_pair_info): fake_ec2_client.create_key_pair.return_value = { 'KeyMaterial': 'test key' } type(fake_key_pair_info).key_fingerprint = mock.PropertyMock(side_effect=FakeClientError) assert not os.system('touch {}'.format(fake_ssh_path_function('test'))) with mock.patch('boto3.resource') as mocked_resource, mock.patch('boto3.client') as mocked_client, mock.patch('deploy.utils.decorators._ssh_path', new=fake_ssh_path_function): mocked_resource.return_value = fake_ec2_resource mocked_client.return_value = fake_ec2_client result = runner.invoke(dummy, input='\n') assert result.exit_code == exit_codes.SUCCESS fake_ec2_resource.KeyPair.assert_called_once_with('test') fake_key_pair_info.delete.assert_called_once_with() fake_ec2_client.create_key_pair.assert_called_once_with(KeyName='test') with open(fake_ssh_path_function('test'), 'r') as f: assert f.read() == 'test key' s = os.stat(fake_ssh_path_function('test')).st_mode # owner assert bool(s & stat.S_IRUSR) assert bool(s & stat.S_IWUSR) assert not bool(s & stat.S_IXUSR) # group assert not bool(s & stat.S_IRWXG) # others assert not bool(s & stat.S_IRWXO)
def test_ensure_key_pair_nonexist_local_public(runner, fake_ec2_resource, fake_ec2_client, fake_ssh_path_function, fake_key_pair_info): fake_ec2_client.create_key_pair.return_value = { 'KeyMaterial': 'test key' } assert not os.system('touch {}.pub'.format(fake_ssh_path_function('test'))) with mock.patch('boto3.resource') as mocked_resource, mock.patch('boto3.client') as mocked_client, mock.patch('deploy.utils.decorators._ssh_path', new=fake_ssh_path_function): mocked_resource.return_value = fake_ec2_resource mocked_client.return_value = fake_ec2_client result = runner.invoke(dummy, input='\n') assert result.exit_code == exit_codes.SUCCESS fake_ec2_resource.KeyPair.assert_called_once_with('test') fake_key_pair_info.delete.assert_called_once_with() fake_ec2_client.create_key_pair.assert_called_once_with(KeyName='test') with open(fake_ssh_path_function('test'), 'r') as f: assert f.read() == 'test key' s = os.stat(fake_ssh_path_function('test')).st_mode # owner assert bool(s & stat.S_IRUSR) assert bool(s & stat.S_IWUSR) assert not bool(s & stat.S_IXUSR) # group assert not bool(s & stat.S_IRWXG) # others assert not bool(s & stat.S_IRWXO)
def cli_generate_key(ctx, key_file, passphrase): """Generate Public and Private Keys.""" if os.path.isfile(key_file): raise click.UsageError("%s already exists." % key_file) key_file_pub = "%s.pub" % key_file if os.path.isfile(key_file_pub): raise click.UsageError("%s already exists." % key_file_pub) if len(passphrase) < 8: raise click.UsageError("Passphrase too short") key_file_dir = os.path.dirname(key_file) if key_file_dir and not os.path.isdir(key_file_dir): os.mkdir(key_file_dir, 0o700) if not key_file_dir: # take currently dir if none was specified key_file_dir = "." if not os.access(key_file_dir, os.W_OK): raise click.UsageError("Directory '%s' not read and writeable" % key_file_dir) output_file_dir_stat = os.stat(key_file_dir).st_mode if output_file_dir_stat & stat.S_IRWXG: raise click.UsageError("Directory '%s' has group permissions and is insecure" % key_file_dir) if output_file_dir_stat & stat.S_IRWXO: raise click.UsageError("Directory '%s' has other permissions and is insecure" % key_file_dir) p = nacl.signing.SigningKey.generate() save_public_keyfile(key_file_pub, p.verify_key) save_private_keyfile(key_file, p, passphrase)
def test_save(self, tmpdir): first = PrivateKey.create() fh = tmpdir.join('somefile') first.save(fh.strpath) second = PrivateKey.load(filename=fh.strpath) assert first.dump() == second.dump() bits = os.stat(fh.strpath).st_mode perms = bits & (stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) assert perms == (stat.S_IRUSR | stat.S_IWUSR)
def test_store_pubkey(self, chown_mock): store_pubkey('foo', self.tempdir, 'abc') chown_mock.assert_called_once_with('-R', 'foo:foo', self.tempdir) authorized_keys_file = os.path.join(self.tempdir, '.ssh/authorized_keys') authorized_keys = open(authorized_keys_file).read() self.assertEqual(authorized_keys, 'abc\n') file_stat = os.stat(authorized_keys_file).st_mode access = file_stat & (stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) self.assertEqual(access, 0o0600) ssh_dir = os.path.join(self.tempdir, '.ssh') filestat_ssh = os.stat(ssh_dir).st_mode access = filestat_ssh & (stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) self.assertEqual(access, 0o0700)
def _get_octal_mode_from_symbolic_perms(self, path_stat, user, perms): prev_mode = stat.S_IMODE(path_stat.st_mode) is_directory = stat.S_ISDIR(path_stat.st_mode) has_x_permissions = (prev_mode & EXEC_PERM_BITS) > 0 apply_X_permission = is_directory or has_x_permissions # Permission bits constants documented at: # http://docs.python.org/2/library/stat.html#stat.S_ISUID if apply_X_permission: X_perms = { 'u': {'X': stat.S_IXUSR}, 'g': {'X': stat.S_IXGRP}, 'o': {'X': stat.S_IXOTH} } else: X_perms = { 'u': {'X': 0}, 'g': {'X': 0}, 'o': {'X': 0} } user_perms_to_modes = { 'u': { 'r': stat.S_IRUSR, 'w': stat.S_IWUSR, 'x': stat.S_IXUSR, 's': stat.S_ISUID, 't': 0, 'u': prev_mode & stat.S_IRWXU, 'g': (prev_mode & stat.S_IRWXG) << 3, 'o': (prev_mode & stat.S_IRWXO) << 6 }, 'g': { 'r': stat.S_IRGRP, 'w': stat.S_IWGRP, 'x': stat.S_IXGRP, 's': stat.S_ISGID, 't': 0, 'u': (prev_mode & stat.S_IRWXU) >> 3, 'g': prev_mode & stat.S_IRWXG, 'o': (prev_mode & stat.S_IRWXO) << 3 }, 'o': { 'r': stat.S_IROTH, 'w': stat.S_IWOTH, 'x': stat.S_IXOTH, 's': 0, 't': stat.S_ISVTX, 'u': (prev_mode & stat.S_IRWXU) >> 6, 'g': (prev_mode & stat.S_IRWXG) >> 3, 'o': prev_mode & stat.S_IRWXO } } # Insert X_perms into user_perms_to_modes for key, value in X_perms.items(): user_perms_to_modes[key].update(value) or_reduce = lambda mode, perm: mode | user_perms_to_modes[user][perm] return reduce(or_reduce, perms, 0)