我们从Python开源项目中,提取了以下24个代码示例,用于说明如何使用shutil.chown()。
def daemote(pid_file, user, group): ''' Change gid and uid, dropping privileges. Either user or group may explicitly pass None to keep it the same. The pid_file will be chown'ed so it can still be cleaned up. ''' if not _SUPPORTED_PLATFORM: raise OSError('Daemotion is unsupported on your platform.') # No need to do anything special, just chown the pidfile # This will also catch any bad group, user names shutil.chown(pid_file, user, group) # Now update group and then user _setgroup(group) _setuser(user)
def install_service_dir(path, owner, mode): if not os.path.exists('/var/prologin'): mkdir('/var/prologin', mode=0o755, owner='root:root') name = os.path.basename(path) # strip service kind from path (eg. django) # Nothing in Python allows merging two directories together... # Be careful with rsync(1) arguments: to merge two directories, trailing # slash are meaningful. system('rsync -rv %s/ /var/prologin/%s' % (path, name)) user, group = owner.split(':') shutil.chown('/var/prologin/%s' % name, user, group) os.chmod('/var/prologin/%s' % name, mode) for root, dirs, files in os.walk('/var/prologin/%s' % name): for dir in dirs: shutil.chown(os.path.join(root, dir), user, group) os.chmod(os.path.join(root, dir), mode) for file in files: shutil.chown(os.path.join(root, file), user, group) os.chmod(os.path.join(root, file), mode)
def save_data(self, data): # Age data if hasattr(data, '__len__') and len(data) > self.GROOM_THRESHOLD: for i, e in enumerate(data): data[i] = ZEntry(e.path, int(e.rank * self.GROOM_LEVEL), e.time) # Use a temporary file to minimize time the file is open and minimize clobbering from tempfile import NamedTemporaryFile with NamedTemporaryFile('wt', encoding=sys.getfilesystemencoding()) as f: for e in data: f.write("{}|{}|{}\n".format(e.path, int(e.rank), int(e.time.timestamp()))) f.flush() if self.Z_OWNER: shutil.chown(f.name, user=self.Z_OWNER) # On POSIX, rename() is atomic and will clobber # On Windows, neither of these is true, so remove first. from xonsh.platform import ON_WINDOWS if ON_WINDOWS and os.path.exists(self.Z_DATA): os.remove(self.Z_DATA) shutil.copy(f.name, self.Z_DATA)
def setup_volumes(self): for img_file in ('overcloud-full.vmlinuz', 'overcloud-full.initrd', self.image_name): src_img = os.path.join(self.image_path, img_file) if img_file == self.image_name: dest_img = os.path.join(constants.LIBVIRT_VOLUME_PATH, 'undercloud.qcow2') else: dest_img = os.path.join(constants.LIBVIRT_VOLUME_PATH, img_file) if not os.path.isfile(src_img): raise ApexUndercloudException( "Required source file does not exist:{}".format(src_img)) if os.path.exists(dest_img): os.remove(dest_img) shutil.copyfile(src_img, dest_img) shutil.chown(dest_img, user='qemu', group='qemu') os.chmod(dest_img, 0o0744) # TODO(trozet):check if resize needed right now size is 50gb # there is a lib called vminspect which has some dependencies and is # not yet available in pip. Consider switching to this lib later.
def inject_auth(self): virt_ops = list() # virt-customize keys/pws if self.root_pw: pw_op = "password:{}".format(self.root_pw) virt_ops.append({constants.VIRT_PW: pw_op}) # ssh key setup virt_ops.append({constants.VIRT_RUN_CMD: 'mkdir -p /root/.ssh'}) virt_ops.append({constants.VIRT_UPLOAD: '/root/.ssh/id_rsa.pub:/root/.ssh/authorized_keys'}) run_cmds = [ 'chmod 600 /root/.ssh/authorized_keys', 'restorecon /root/.ssh/authorized_keys', 'cp /root/.ssh/authorized_keys /home/stack/.ssh/', 'chown stack:stack /home/stack/.ssh/authorized_keys', 'chmod 600 /home/stack/.ssh/authorized_keys' ] for cmd in run_cmds: virt_ops.append({constants.VIRT_RUN_CMD: cmd}) virt_utils.virt_customize(virt_ops, self.volume)
def init_db(): """Initialize the database.""" config_data = get_config_data() db_path = os.path.join(os.environ['SNAP_COMMON'], 'db') if os.path.exists(db_path): shutil.rmtree(db_path) os.mkdir(db_path) shutil.chown(db_path, user='nobody', group='nogroup') log_path = os.path.join(os.environ['SNAP_COMMON'], 'log', 'postgresql.log') if not os.path.exists(log_path): open(log_path, 'a').close() shutil.chown(log_path, user='nobody', group='nogroup') def _init_db(): subprocess.check_output([ os.path.join(os.environ['SNAP'], 'bin', 'initdb'), '-D', os.path.join(os.environ['SNAP_COMMON'], 'db'), '-U', 'postgres', '-E', 'UTF8', '--locale=C'], stderr=subprocess.STDOUT) with with_postgresql(): create_db(config_data) run_with_drop_privileges(_init_db)
def change_owner(full_path, user_name=None, group_name=None): try: shutil.chown(full_path, user_name, group_name) except: raise
def chown(path, filename): """Set owner and group of file to that of the parent directory.""" s = os.stat(path) os.chown(os.path.join(path, filename), s.st_uid, s.st_gid)
def chown_dir(directory, username): """Set owner and group of directory to username.""" shutil.chown(directory, username, username) for root, dirs, files in os.walk(directory): for child in dirs + files: shutil.chown(os.path.join(root, child), username, username) logger.info("{} chown'd to {}".format(directory, username))
def handle_sonarr(torrent): for index, file in torrent.files().items(): file_path = os.path.join(torrent.downloadDir, file['name']) if file_path.endswith('rar'): with tempfile.TemporaryDirectory() as temp_dir: paths = extract(file_path, temp_dir) if paths: # Move extracted files to sonarr drone factory for path in paths: shutil.chown(path, group=plex_group) os.chmod(path, 0o664) shutil.move(path, drone_factory)
def mkdir(path, mode, owner='root:root'): if os.path.exists(path): os.chmod(path, mode) else: os.mkdir(path, mode) user, group = owner.split(':') shutil.chown(path, user, group)
def copy(old, new, mode=0o600, owner='root:root'): print('Copying %s -> %s (mode: %o) (own: %s)' % (old, new, mode, owner)) shutil.copy(old, new) os.chmod(new, mode) user, group = owner.split(':') shutil.chown(new, user, group)
def copytree(old, new, dir_mode=0o700, file_mode=0o600, owner='root:root'): print('Copying %s -> %s (file mode: %o) (dir mode: %o) (own: %s)' % (old, new, file_mode, dir_mode, owner)) shutil.copytree(old, new) user, group = owner.split(':') for root, dirs, files in os.walk(new): for momo in dirs: path = os.path.join(root, momo) os.chmod(path, dir_mode) shutil.chown(path, user, group) for momo in files: path = os.path.join(root, momo) os.chmod(path, file_mode) shutil.chown(path, user, group)
def touch(path, mode=0o600, owner='root:root'): print('Touching %s' % path) with open(path, 'a'): os.utime(path, None) os.chmod(path, mode) user, group = owner.split(':') shutil.chown(path, user, group)
def chown_to_user(path): shutil.chown(path, get_user_uid(), get_user_gid())
def test_module_all_attribute(self): self.assertTrue(hasattr(shutil, '__all__')) target_api = ['copyfileobj', 'copyfile', 'copymode', 'copystat', 'copy', 'copy2', 'copytree', 'move', 'rmtree', 'Error', 'SpecialFileError', 'ExecError', 'make_archive', 'get_archive_formats', 'register_archive_format', 'unregister_archive_format', 'get_unpack_formats', 'register_unpack_format', 'unregister_unpack_format', 'unpack_archive', 'ignore_patterns', 'chown', 'which', 'get_terminal_size', 'SameFileError'] if hasattr(os, 'statvfs') or os.name == 'nt': target_api.append('disk_usage') self.assertEqual(set(shutil.__all__), set(target_api))
def changeOwnerAndGrpToLoggedInUser(directory, raiseEx=False): loggedInUser = getLoggedInUser() try: shutil.chown(directory, loggedInUser, loggedInUser) except Exception as e: if raiseEx: raise e else: pass
def chgrp(fname, grpname): try: shutil.chown(fname, group=grpname) except OSError as e: raise RuntimeError('chown(%s) failed: %s' % (fname, str(e)))
def test_chown(self): # cleaned-up automatically by TestShutil.tearDown method dirname = self.mkdtemp() filename = tempfile.mktemp(dir=dirname) write_file(filename, 'testing chown function') with self.assertRaises(ValueError): shutil.chown(filename) with self.assertRaises(LookupError): shutil.chown(filename, user='non-exising username') with self.assertRaises(LookupError): shutil.chown(filename, group='non-exising groupname') with self.assertRaises(TypeError): shutil.chown(filename, b'spam') with self.assertRaises(TypeError): shutil.chown(filename, 3.14) uid = os.getuid() gid = os.getgid() def check_chown(path, uid=None, gid=None): s = os.stat(filename) if uid is not None: self.assertEqual(uid, s.st_uid) if gid is not None: self.assertEqual(gid, s.st_gid) shutil.chown(filename, uid, gid) check_chown(filename, uid, gid) shutil.chown(filename, uid) check_chown(filename, uid) shutil.chown(filename, user=uid) check_chown(filename, uid) shutil.chown(filename, group=gid) check_chown(filename, gid=gid) shutil.chown(dirname, uid, gid) check_chown(dirname, uid, gid) shutil.chown(dirname, uid) check_chown(dirname, uid) shutil.chown(dirname, user=uid) check_chown(dirname, uid) shutil.chown(dirname, group=gid) check_chown(dirname, gid=gid) user = pwd.getpwuid(uid)[0] group = grp.getgrgid(gid)[0] shutil.chown(filename, user, group) check_chown(filename, uid, gid) shutil.chown(dirname, user, group) check_chown(dirname, uid, gid)
def handle_manual(torrent): auto_processed = False def handle_media(path, move): nonlocal auto_processed guess = guessit.guessit(path) if guess['type'] == 'episode': move_episode(path, guess, move) auto_processed = True elif guess['type'] == 'movie': move_movie(path, guess, move) auto_processed = True part_regex = re.compile('.*part(\d+).rar', re.IGNORECASE) for index, file in torrent.files().items(): file_path = os.path.join(torrent.downloadDir, file['name']) if check_extension(file_path) and 'sample' not in file_path.lower(): # Log and ignore mkv files of less than ~92MiB try: if os.path.getsize(file_path) >= 96811278: handle_media(file_path, False) else: syslog.syslog( syslog.LOG_ERR, 'Detected false media file, skipping' ) except FileNotFoundError: syslog.syslog(syslog.LOG_ERR, 'Torrent file missing, skipping') elif file_path.endswith('rar'): # Ignore parts beyond the first in a rar series match = part_regex.match(file_path) if match and int(match.group(1)) > 1: continue with tempfile.TemporaryDirectory() as temp_dir: paths = extract(file_path, temp_dir) if paths: for path in paths: shutil.chown(path, group=plex_group) os.chmod(path, 0o664) handle_media(path, True) if auto_processed: pb_notify( textwrap.dedent( ''' Manually added torrent {0} finished downloading and was auto-processed '''.format(torrent.name) ).strip() ) else: pb_notify( 'Manually added torrent {0} finished downloading'.format( torrent.name ) )