我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.chown()。
def mkdir(path, owner='root', group='root', perms=0o555, force=False): """Create a directory""" log("Making dir {} {}:{} {:o}".format(path, owner, group, perms)) uid = pwd.getpwnam(owner).pw_uid gid = grp.getgrnam(group).gr_gid realpath = os.path.abspath(path) path_exists = os.path.exists(realpath) if path_exists and force: if not os.path.isdir(realpath): log("Removing non-directory file {} prior to mkdir()".format(path)) os.unlink(realpath) os.makedirs(realpath, perms) elif not path_exists: os.makedirs(realpath, perms) os.chown(realpath, uid, gid) os.chmod(realpath, perms)
def chown(self, tarinfo, targetpath): """Set owner of targetpath according to tarinfo. """ if pwd and hasattr(os, "geteuid") and os.geteuid() == 0: # We have to be root to do so. try: g = grp.getgrnam(tarinfo.gname)[2] except KeyError: g = tarinfo.gid try: u = pwd.getpwnam(tarinfo.uname)[2] except KeyError: u = tarinfo.uid try: if tarinfo.issym() and hasattr(os, "lchown"): os.lchown(targetpath, u, g) else: if sys.platform != "os2emx": os.chown(targetpath, u, g) except EnvironmentError as e: raise ExtractError("could not change owner")
def extract_zip(input_stream, output_path, chown_to_user=True): try: zipfile = ZipFile(BytesIO(input_stream)) zipfile.extractall(output_path) file_list = zipfile.namelist() if chown_to_user: # chown the extracted files to the current user, instead of root for file_name in file_list: file_path = os.path.join(output_path, file_name) chown_path_to_user(file_path) return True except Exception as ex: logger.error(ex) return False
def place_data_on_block_device(blk_device, data_src_dst): """Migrate data in data_src_dst to blk_device and then remount.""" # mount block device into /mnt mount(blk_device, '/mnt') # copy data to /mnt copy_files(data_src_dst, '/mnt') # umount block device umount('/mnt') # Grab user/group ID's from original source _dir = os.stat(data_src_dst) uid = _dir.st_uid gid = _dir.st_gid # re-mount where the data should originally be # TODO: persist is currently a NO-OP in core.host mount(blk_device, data_src_dst, persist=True) # ensure original ownership of new mount. os.chown(data_src_dst, uid, gid)
def ensure_ceph_keyring(service, user=None, group=None, relation='ceph'): """Ensures a ceph keyring is created for a named service and optionally ensures user and group ownership. Returns False if no ceph key is available in relation state. """ key = None for rid in relation_ids(relation): for unit in related_units(rid): key = relation_get('key', rid=rid, unit=unit) if key: break if not key: return False create_keyring(service=service, key=key) keyring = _keyring_path(service) if user and group: check_call(['chown', '%s.%s' % (user, group), keyring]) return True
def ensure_ceph_keyring(service, user=None, group=None, relation='ceph', key=None): """Ensures a ceph keyring is created for a named service and optionally ensures user and group ownership. @returns boolean: Flag to indicate whether a key was successfully written to disk based on either relation data or a supplied key """ if not key: for rid in relation_ids(relation): for unit in related_units(rid): key = relation_get('key', rid=rid, unit=unit) if key: break if not key: return False create_keyring(service=service, key=key) keyring = _keyring_path(service) if user and group: check_call(['chown', '%s.%s' % (user, group), keyring]) return True
def default(): """ Remove the .../stack/defaults directory. Preserve available_roles """ # Keep yaml human readable/editable friendly_dumper = yaml.SafeDumper friendly_dumper.ignore_aliases = lambda self, data: True preserve = {} content = None pathname = "/srv/pillar/ceph/stack/default/{}/cluster.yml".format('ceph') with open(pathname, "r") as sls_file: content = yaml.safe_load(sls_file) preserve['available_roles'] = content['available_roles'] stack_default = "/srv/pillar/ceph/stack/default" shutil.rmtree(stack_default) os.makedirs("{}/{}".format(stack_default, 'ceph')) with open(pathname, "w") as sls_file: sls_file.write(yaml.dump(preserve, Dumper=friendly_dumper, default_flow_style=False)) uid = pwd.getpwnam("salt").pw_uid gid = grp.getgrnam("salt").gr_gid for path in [stack_default, "{}/{}".format(stack_default, 'ceph'), pathname]: os.chown(path, uid, gid)
def get_role_key(user, role): """ ??role?key???????????? ansible????????600???????????? :param user: :param role: :return: self key path """ user_role_key_dir = os.path.join(KEY_DIR, 'user') user_role_key_path = os.path.join(user_role_key_dir, '%s_%s.pem' % (user.username, role.name)) mkdir(user_role_key_dir, mode=777) if not os.path.isfile(user_role_key_path): with open(os.path.join(role.key_path, 'id_rsa')) as fk: with open(user_role_key_path, 'w') as fu: fu.write(fk.read()) logger.debug(u"????????key %s, Owner: %s" % (user_role_key_path, user.username)) chown(user_role_key_path, user.username) os.chmod(user_role_key_path, 0600) return user_role_key_path
def copystat(cls, src, dest, copy_own=True, copy_xattr=True): """ Copy all stat info (mode bits, atime, mtime, flags) from `src` to `dest`. If `copy_own=True`, the uid and gid are also copied. If `copy_xattr=True`, the extended attributes are also copied (only available on Linux). """ st = os.stat(src) mode = stat.S_IMODE(st.st_mode) os.chmod(dest, mode=mode) os.utime(dest, ns=(st.st_atime_ns, st.st_mtime_ns)) if hasattr(st, "st_flags"): os.chflags(dest, flags=st.st_flags) if copy_own: os.chown(dest, uid=st.st_uid, gid=st.st_gid) if copy_xattr: cls.copyxattr(src, dest)
def chown(self, tarinfo, targetpath): """Set owner of targetpath according to tarinfo. """ if pwd and hasattr(os, "geteuid") and os.geteuid() == 0: # We have to be root to do so. try: g = grp.getgrnam(tarinfo.gname)[2] except KeyError: g = tarinfo.gid try: u = pwd.getpwnam(tarinfo.uname)[2] except KeyError: u = tarinfo.uid try: if tarinfo.issym() and hasattr(os, "lchown"): os.lchown(targetpath, u, g) else: if sys.platform != "os2emx": os.chown(targetpath, u, g) except EnvironmentError, e: raise ExtractError("could not change owner")
def handle(self): local_item_tmp_path = self.local_parent_path + get_tmp_filename(self.item_name) try: with open(local_item_tmp_path, 'wb') as f: self.drive.download_file(file=f, size=self._item.size, item_id=self._item.id) local_sha1 = hasher.hash_value(local_item_tmp_path) item_sha1 = None if self._item.file_props is not None and self._item.file_props.hashes is not None: item_sha1 = self._item.file_props.hashes.sha1 if item_sha1 is None: self.logger.warn('Remote file %s has not sha1 property, we keep the file but cannot check correctness of it', self.local_path) elif local_sha1 != item_sha1: self.logger.error('Mismatch hash of download file %s : remote:%s,%d local:%s %d', self.local_path, self._item.file_props.hashes.sha1, self._item.size, local_sha1, os.path.getsize(local_item_tmp_path)) return os.rename(local_item_tmp_path, self.local_path) t = datetime_to_timestamp(self._item.modified_time) os.utime(self.local_path, (t, t)) os.chown(self.local_path, OS_USER_ID, OS_USER_GID) self.items_store.update_item(self._item, ItemRecordStatuses.DOWNLOADED) except (IOError, OSError) as e: self.logger.error('An IO error occurred when downloading "%s":\n%s.', self.local_path, traceback.format_exc()) except errors.OneDriveError as e: self.logger.error('An API error occurred when downloading "%s":\n%s.', self.local_path, traceback.format_exc())
def chown(self, path, uid, gid): """ Change the owner (C{uid}) and group (C{gid}) of a file. As with python's C{os.chown} function, you must pass both arguments, so if you only want to change one, use L{stat} first to retrieve the current owner and group. @param path: path of the file to change the owner and group of @type path: str @param uid: new owner's uid @type uid: int @param gid: new group id @type gid: int """ path = self._adjust_cwd(path) self._log(DEBUG, 'chown(%r, %r, %r)' % (path, uid, gid)) attr = SFTPAttributes() attr.st_uid, attr.st_gid = uid, gid self._request(CMD_SETSTAT, path, attr)
def copystats(from_file, to_file): """ Copy stat info from ``from_file`` to ``to_file`` using `shutil.copystat`. If possible, also copy the user and/or group ownership information. """ shutil.copystat(from_file, to_file) if hasattr(os, 'chown'): st = os.stat(from_file) # Based on GNU sed's behavior: try: os.chown(to_file, st.st_uid, st.st_gid) except EnvironmentError: try: os.chown(to_file, -1, st.st_gid) except EnvironmentError: pass