我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用os.chmod()。
def mkdir(path, owner='root', group='root', perms=0o555, force=False): """Create a directory""" log("Making dir {} {}:{} {:o}".format(path, owner, group, perms)) uid = pwd.getpwnam(owner).pw_uid gid = grp.getgrnam(group).gr_gid realpath = os.path.abspath(path) path_exists = os.path.exists(realpath) if path_exists and force: if not os.path.isdir(realpath): log("Removing non-directory file {} prior to mkdir()".format(path)) os.unlink(realpath) os.makedirs(realpath, perms) elif not path_exists: os.makedirs(realpath, perms) os.chown(realpath, uid, gid) os.chmod(realpath, perms)
def postprocess(self, tempname, filename): """Perform any platform-specific postprocessing of `tempname` This is where Mac header rewrites should be done; other platforms don't have anything special they should do. Resource providers should call this method ONLY after successfully extracting a compressed resource. They must NOT call it on resources that are already in the filesystem. `tempname` is the current (temporary) name of the file, and `filename` is the name it will be renamed to by the caller after this routine returns. """ if os.name == 'posix': # Make the resource executable mode = ((os.stat(tempname).st_mode) | 0o555) & 0o7777 os.chmod(tempname, mode)
def test_zipfile_attributes(): # With the change from ZipFile.write() to .writestr(), we need to manually # set member attributes. with temporary_directory() as tempdir: files = (('foo', 0o644), ('bar', 0o755)) for filename, mode in files: path = os.path.join(tempdir, filename) with codecs.open(path, 'w', encoding='utf-8') as fp: fp.write(filename + '\n') os.chmod(path, mode) zip_base_name = os.path.join(tempdir, 'dummy') zip_filename = wheel.archive.make_wheelfile_inner( zip_base_name, tempdir) with readable_zipfile(zip_filename) as zf: for filename, mode in files: info = zf.getinfo(os.path.join(tempdir, filename)) assert info.external_attr == (mode | 0o100000) << 16 assert info.compress_type == zipfile.ZIP_DEFLATED
def copymode(src, dst, *, follow_symlinks=True): """Copy mode bits from src to dst. If follow_symlinks is not set, symlinks aren't followed if and only if both `src` and `dst` are symlinks. If `lchmod` isn't available (e.g. Linux) this method does nothing. """ if not follow_symlinks and os.path.islink(src) and os.path.islink(dst): if hasattr(os, 'lchmod'): stat_func, chmod_func = os.lstat, os.lchmod else: return elif hasattr(os, 'chmod'): stat_func, chmod_func = os.stat, os.chmod else: return st = stat_func(src) chmod_func(dst, stat.S_IMODE(st.st_mode))
def _test_NoAccessDir(self, nodeName): devBooter, devMgr = self.launchDeviceManager("/nodes/%s/DeviceManager.dcd.xml" % nodeName) device = devMgr._get_registeredDevices()[0] fileMgr = self._domMgr._get_fileMgr() dirname = '/noaccess' testdir = os.path.join(scatest.getSdrPath(), 'dom' + dirname) if not os.path.exists(testdir): os.mkdir(testdir, 0000) else: os.chmod(testdir, 0000) try: self.assertFalse(os.access(testdir, os.R_OK|os.X_OK), 'Current user can still access directory') self.assertRaises(CF.LoadableDevice.LoadFail, device.load, fileMgr, dirname, CF.LoadableDevice.SHARED_LIBRARY) finally: os.rmdir(testdir)
def test_ExistsException(self): self.assertNotEqual(self._domMgr, None) fileMgr = self._domMgr._get_fileMgr() # Makes sure that FileSystem::exists() throws correct exception and # doesn't kill domain for files in directories it cannot access dirname = '/noaccess' testdir = os.path.join(scatest.getSdrPath(), 'dom' + dirname) if not os.path.exists(testdir): os.mkdir(testdir, 0644) else: os.chmod(testdir, 0644) try: self.assertFalse(os.access(testdir, os.R_OK|os.X_OK), 'Current user can still access directory') self.assertRaises(CF.InvalidFileName, fileMgr.exists, os.path.join(dirname, 'testfile')) finally: os.rmdir(testdir)
def handle(self, *args, **options): commit_msg_path = os.path.join(self.HOOK_PATH, 'commit-msg') hook_exists = os.path.exists(commit_msg_path) if hook_exists: with open(commit_msg_path, 'r') as fp: hook_content = fp.read() else: hook_content = '#!/usr/bin/env bash\n\n' if 'ZERODOWNTIME_COMMIT_MSG_HOOK' not in hook_content: hook_content += COMMIT_MSG_HOOK with open(commit_msg_path, 'w') as fp: fp.write(hook_content) st = os.stat(commit_msg_path) os.chmod(commit_msg_path, st.st_mode | stat.S_IEXEC)
def _clean_upgrade(binary_ok, binary_path, path, temp_path): if binary_ok: import stat # save the permissions from the current binary old_stat = os.stat(binary_path) # rename the current binary in order to replace it with the latest os.rename(binary_path, path + "/old") os.rename(temp_path, binary_path) # set the same permissions that had the previous binary os.chmod(binary_path, old_stat.st_mode | stat.S_IEXEC) # delete the old binary os.remove(path + "/old") print("mongoaudit updated, restarting...") os.execl(binary_path, binary_path, *sys.argv) else: os.remove(temp_path) print("couldn't download the latest binary")
def download_driver_file(whichbin, url, base_path): if url.endswith('.tar.gz'): ext = '.tar.gz' else: ext = '.zip' print("Downloading from: {}".format(url)) download_file(url, '/tmp/pwr_temp{}'.format(ext)) if ext == '.tar.gz': import tarfile tar = tarfile.open('/tmp/pwr_temp{}'.format(ext), "r:gz") tar.extractall('{}/'.format(base_path)) tar.close() else: import zipfile with zipfile.ZipFile('/tmp/pwr_temp{}'.format(ext), "r") as z: z.extractall('{}/'.format(base_path)) # if whichbin == 'wires' and '/v{}/'.format(latest_gecko_driver) in url: # os.rename('{}/geckodriver'.format(base_path), # '{}/wires'.format(base_path)) # os.chmod('{}/wires'.format(base_path), 0o775) if whichbin == 'wires': os.chmod('{}/geckodriver'.format(base_path), 0o775) else: os.chmod('{}/chromedriver'.format(base_path), 0o775)
def execute(self): mode = self.rest(1) if not mode: mode = str(self.quantifier) try: mode = int(mode, 8) if mode < 0 or mode > 0o777: raise ValueError except ValueError: self.fm.notify("Need an octal number between 0 and 777!", bad=True) return for file in self.fm.thistab.get_selection(): try: os.chmod(file.path, mode) except Exception as ex: self.fm.notify(ex) try: # reloading directory. maybe its better to reload the selected # files only. self.fm.thisdir.load_content() except Exception: pass
def postprocess(self, tempname, filename): """Perform any platform-specific postprocessing of `tempname` This is where Mac header rewrites should be done; other platforms don't have anything special they should do. Resource providers should call this method ONLY after successfully extracting a compressed resource. They must NOT call it on resources that are already in the filesystem. `tempname` is the current (temporary) name of the file, and `filename` is the name it will be renamed to by the caller after this routine returns. """ if os.name == 'posix': # Make the resource executable mode = ((os.stat(tempname).st_mode) | 0x16D) & 0xFFF # 0555, 07777 os.chmod(tempname, mode)
def rmtree_errorhandler(func, path, exc_info): """On Windows, the files in .svn are read-only, so when rmtree() tries to remove them, an exception is thrown. We catch that here, remove the read-only attribute, and hopefully continue without problems.""" exctype, value = exc_info[:2] if not ((exctype is WindowsError and value.args[0] == 5) or #others (exctype is OSError and value.args[0] == 13) or #python2.4 (exctype is PermissionError and value.args[3] == 5) #python3.3 ): raise # file type should currently be read only if ((os.stat(path).st_mode & stat.S_IREAD) != stat.S_IREAD): raise # convert to read/write os.chmod(path, stat.S_IWRITE) # use the original function to repeat the operation func(path)
def decrypt_file(file, key): """ Decrypts the file ``file``. The encrypted file is assumed to end with the ``.enc`` extension. The decrypted file is saved to the same location without the ``.enc`` extension. The permissions on the decrypted file are automatically set to 0o600. See also :func:`doctr.local.encrypt_file`. """ if not file.endswith('.enc'): raise ValueError("%s does not end with .enc" % file) fer = Fernet(key) with open(file, 'rb') as f: decrypted_file = fer.decrypt(f.read()) with open(file[:-4], 'wb') as f: f.write(decrypted_file) os.chmod(file[:-4], 0o600)
def write_torque_script(command, outfile, walltime, queue, name, out, err, print_exec=True): with open(outfile, 'w') as script: script.write('#PBS -l walltime={}\n'.format(walltime)) script.write('#PBS -q regular\n') script.write('#PBS -N {}\n'.format(name)) script.write('#PBS -o {}.o\n'.format(out)) script.write('#PBS -e {}.e\n'.format(err)) script.write('cd ${PBS_O_WORKDIR}\n') script.write(command) os.chmod(outfile, 0o755) if print_exec: print('qsub {};'.format(outfile)) return outfile
def make_run_all_script(wd): run_all = """#!/bin/bash for i in *.pdb; do echo "Running ${i}..." seq=`echo ${i} | cut -d. -f1` curr_dir=`pwd` cp ${i} temp.pdb tleap -f leaprc rm temp.pdb mv temp_mod.pdb xleap_modified/${seq}_xleap.pdb mv temp_mod.prmtop amber_minimized/${seq}.prmtop mv temp_mod.inpcrd amber_minimized/${seq}.inpcrd done """ my_file = op.join(wd, 'run_all.sh') with open(my_file, 'w') as f: f.write(run_all) os.chmod(my_file, 0o755) return my_file
def get_stream(playlist, directory, own_name): modellist.append(str(own_name)) start_time = "_" + str(datetime.datetime.now().hour) + "-" + str(datetime.datetime.now().minute) ffs_script = directory + "ts_to_mp4.sh" merged_file = encoded + own_name + start_time + ".mp4" stream_file = directory + own_name + start_time + ".ts" with open(ffs_script, "a+", encoding="utf8") as ffs: ffs.write("\nffmpeg -i " + stream_file + " -strict -2 -c:v copy " + merged_file + "\n") ffs.write("chmod 666 " + merged_file + "\n") os.chmod(ffs_script, 0o777) with open(oneclick_file, "a+", encoding="utf8") as ocf: ocf.write("\nffmpeg -i " + stream_file + " -strict -2 -c:v copy " + merged_file + "\n") ocf.write("chmod 666 " + merged_file + "\n") os.chmod(oneclick_file, 0o777) hlsvar = "hlsvariant://" + playlist print("Retrieving the Streamfile for " + str(own_name)) baitlist_file = cwd + "baitlist.txt" with open(baitlist_file, "a+", encoding="utf8") as bl: bl.write(own_name + "\n") if not os.path.isfile(stream_file): subprocess.check_call(["livestreamer", "--hls-segment-threads", str(numcpucores), "--retry-streams", "5", "--retry-open", "5", "--hls-segment-attempts", "5", "--hls-segment-timeout", "20", "--http-header", "User-Agent=Mozilla/5.0 (X11; Fedora; Linux x86_64; rv:50.0) Gecko/20100101 Firefox/50.0", "-o", stream_file , hlsvar, "best"]) os.chmod(stream_file, 0o666) modellist.remove(str(own_name)) return
def apply_copy(self): Utils.def_attrs(self, fun=copy_func) self.default_install_path = 0 lst = self.to_list(self.source) self.meths.remove('process_source') for filename in lst: node = self.path.find_resource(filename) if not node: raise Errors.WafError('cannot find input file %s for processing' % filename) target = self.target if not target or len(lst)>1: target = node.name # TODO the file path may be incorrect newnode = self.path.find_or_declare(target) tsk = self.create_task('copy', node, newnode) tsk.fun = self.fun tsk.chmod = getattr(self, 'chmod', Utils.O644) if not tsk.env: tsk.debug() raise Errors.WafError('task without an environment')
def test_binary(self, enable_randomness=True, times=1, timeout=15): """ Test the binary generated """ # dump the binary code pov_binary_filename = tempfile.mktemp(dir='/tmp', prefix='rex-pov-') self.dump_binary(filename=pov_binary_filename) os.chmod(pov_binary_filename, 0755) pov_tester = CGCPovSimulator() result = pov_tester.test_binary_pov( pov_binary_filename, self.crash.binary, enable_randomness=enable_randomness, timeout=timeout, times=times) # remove the generated pov os.remove(pov_binary_filename) return result
def upload_file(self, filename, content, mode=None): ftp = self.client.open_sftp() file = ftp.file(filename, 'w', -1) file.write(content) file.flush() if mode: ftp.chmod(self.script_filename, 0o744) ftp.close()
def run_script(self, script): fd, self.tmp_script_filename = tempfile.mkstemp() os.close(fd) f = open(self.tmp_script_filename, 'w') f.write(script) f.close() os.chmod(self.tmp_script_filename, 0o744) p = subprocess.Popen( self.tmp_script_filename, shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, close_fds=True) self.process = p self.stream = p.stdout
def extractall(self, path=".", members=None): """Extract all members from the archive to the current working directory and set owner, modification time and permissions on directories afterwards. `path' specifies a different directory to extract to. `members' is optional and must be a subset of the list returned by getmembers(). """ directories = [] if members is None: members = self for tarinfo in members: if tarinfo.isdir(): # Extract directories with a safe mode. directories.append(tarinfo) tarinfo = copy.copy(tarinfo) tarinfo.mode = 0o700 # Do not set_attrs directories, as we will do that further down self.extract(tarinfo, path, set_attrs=not tarinfo.isdir()) # Reverse sort directories. directories.sort(key=lambda a: a.name) directories.reverse() # Set correct owner, mtime and filemode on directories. for tarinfo in directories: dirpath = os.path.join(path, tarinfo.name) try: self.chown(tarinfo, dirpath) self.utime(tarinfo, dirpath) self.chmod(tarinfo, dirpath) except ExtractError as e: if self.errorlevel > 1: raise else: self._dbg(1, "tarfile: %s" % e)
def chmod(self, tarinfo, targetpath): """Set file permissions of targetpath according to tarinfo. """ if hasattr(os, 'chmod'): try: os.chmod(targetpath, tarinfo.mode) except EnvironmentError as e: raise ExtractError("could not change mode")
def copymode(src, dst): """Copy mode bits from src to dst""" if hasattr(os, 'chmod'): st = os.stat(src) mode = stat.S_IMODE(st.st_mode) os.chmod(dst, mode)
def copystat(src, dst): """Copy all stat info (mode bits, atime, mtime, flags) from src to dst""" st = os.stat(src) mode = stat.S_IMODE(st.st_mode) if hasattr(os, 'utime'): os.utime(dst, (st.st_atime, st.st_mtime)) if hasattr(os, 'chmod'): os.chmod(dst, mode) if hasattr(os, 'chflags') and hasattr(st, 'st_flags'): try: os.chflags(dst, st.st_flags) except OSError as why: if (not hasattr(errno, 'EOPNOTSUPP') or why.errno != errno.EOPNOTSUPP): raise
def rmtree_errorhandler(func, path, exc_info): """On Windows, the files in .svn are read-only, so when rmtree() tries to remove them, an exception is thrown. We catch that here, remove the read-only attribute, and hopefully continue without problems.""" # if file type currently read only if os.stat(path).st_mode & stat.S_IREAD: # convert to read/write os.chmod(path, stat.S_IWRITE) # use the original function to repeat the operation func(path) return else: raise
def unzip_file(filename, location, flatten=True): """ Unzip the file (with path `filename`) to the destination `location`. All files are written based on system defaults and umask (i.e. permissions are not preserved), except that regular file members with any execute permissions (user, group, or world) have "chmod +x" applied after being written. Note that for windows, any execute changes using os.chmod are no-ops per the python docs. """ ensure_dir(location) zipfp = open(filename, 'rb') try: zip = zipfile.ZipFile(zipfp, allowZip64=True) leading = has_leading_dir(zip.namelist()) and flatten for info in zip.infolist(): name = info.filename data = zip.read(name) fn = name if leading: fn = split_leading_dir(name)[1] fn = os.path.join(location, fn) dir = os.path.dirname(fn) if fn.endswith('/') or fn.endswith('\\'): # A directory ensure_dir(fn) else: ensure_dir(dir) fp = open(fn, 'wb') try: fp.write(data) finally: fp.close() mode = info.external_attr >> 16 # if mode and regular file and any execute permissions for # user/group/world? if mode and stat.S_ISREG(mode) and mode & 0o111: # make dest file have execute for user/group/world # (chmod +x) no-op on windows per python docs os.chmod(fn, (0o777 - current_umask() | 0o111)) finally: zipfp.close()
def chmod(path, mode): log.debug("changing mode of %s to %o", path, mode) try: _chmod(path, mode) except os.error as e: log.debug("chmod failed: %s", e)
def unpack_zipfile(filename, extract_dir, progress_filter=default_filter): """Unpack zip `filename` to `extract_dir` Raises ``UnrecognizedFormat`` if `filename` is not a zipfile (as determined by ``zipfile.is_zipfile()``). See ``unpack_archive()`` for an explanation of the `progress_filter` argument. """ if not zipfile.is_zipfile(filename): raise UnrecognizedFormat("%s is not a zip file" % (filename,)) with ContextualZipFile(filename) as z: for info in z.infolist(): name = info.filename # don't extract absolute paths or ones with .. in them if name.startswith('/') or '..' in name.split('/'): continue target = os.path.join(extract_dir, *name.split('/')) target = progress_filter(name, target) if not target: continue if name.endswith('/'): # directory ensure_directory(target) else: # file ensure_directory(target) data = z.read(info.filename) with open(target, 'wb') as f: f.write(data) unix_attributes = info.external_attr >> 16 if unix_attributes: os.chmod(target, unix_attributes)
def extractall(self, path=".", members=None, *, numeric_owner=False): """Extract all members from the archive to the current working directory and set owner, modification time and permissions on directories afterwards. `path' specifies a different directory to extract to. `members' is optional and must be a subset of the list returned by getmembers(). If `numeric_owner` is True, only the numbers for user/group names are used and not the names. """ directories = [] if members is None: members = self for tarinfo in members: if tarinfo.isdir(): # Extract directories with a safe mode. directories.append(tarinfo) tarinfo = copy.copy(tarinfo) tarinfo.mode = 0o700 # Do not set_attrs directories, as we will do that further down self.extract(tarinfo, path, set_attrs=not tarinfo.isdir(), numeric_owner=numeric_owner) # Reverse sort directories. directories.sort(key=lambda a: a.name) directories.reverse() # Set correct owner, mtime and filemode on directories. for tarinfo in directories: dirpath = os.path.join(path, tarinfo.name) try: self.chown(tarinfo, dirpath, numeric_owner=numeric_owner) self.utime(tarinfo, dirpath) self.chmod(tarinfo, dirpath) except ExtractError as e: if self.errorlevel > 1: raise else: self._dbg(1, "tarfile: %s" % e)
def _setupSock(self): if self.sockStr.lower().startswith('inet:'): junk , ip , port = self.sockStr.split(':') self.sock = socket.socket(socket.AF_INET , socket.SOCK_STREAM) self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.sock.bind((ip , int(port))) else: if os.path.exists(self.sockStr): os.unlink(self.sockStr) self.sock = socket.socket(socket.AF_UNIX , socket.SOCK_STREAM) self.sock.bind(self.sockStr) os.chmod(self.sockStr , self.sockChmod) self.sock.settimeout(3) self.sock.listen(self.listenq)
def _create_key_file(self, repo_name, data): key_path = self._get_key_path(repo_name) with open(key_path, 'w') as key_file: key_file.write(data) os.chmod(key_path, 0o600)