我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用stat.S_IWUSR。
def check_is_readonly(self, fname): """ Return `True` if @fname is read-only on the filesystem. @fname Path to a file. """ if not fname: return try: mode = os.stat(fname) read_only = (stat.S_IMODE(mode.st_mode) & stat.S_IWUSR != stat.S_IWUSR) except FileNotFoundError: return return read_only
def commit(self): """ Writes the configuration to disk (into '$RootDir/EXAConf') """ self.config.write() # reload in order to force type conversion # --> parameters added as lists during runtime are converted back to strings (as if they have been added manually) self.config.reload() # modify permissions try: os.chmod(self.conf_path, stat.S_IRUSR | stat.S_IWUSR) except OSError as e: raise EXAConfError("Failed to change permissions for '%s': %s" % (self.conf_path, e)) #}}} #{{{ Platform supported
def rmtree(path): """Remove the given recursively. :note: we use shutil rmtree but adjust its behaviour to see whether files that couldn't be deleted are read-only. Windows will not remove them in that case""" def onerror(func, path, exc_info): # Is the error an access error ? os.chmod(path, stat.S_IWUSR) try: func(path) # Will scream if still not possible to delete. except Exception as ex: if HIDE_WINDOWS_KNOWN_ERRORS: raise SkipTest("FIXME: fails with: PermissionError\n %s", ex) else: raise return shutil.rmtree(path, False, onerror)
def main(args): vm_name = args['<vm_name>'] # get domain from libvirt con = libvirt.open('qemu:///system') domain = con.lookupByName(vm_name) path = os.path.join(os.getcwd(), '{}.raw'.format(vm_name)) with open(path, 'w') as f: # chmod to be r/w by everyone os.chmod(path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IWGRP | stat.S_IROTH | stat.S_IWOTH) # take a ram dump flags = libvirt.VIR_DUMP_MEMORY_ONLY dumpformat = libvirt.VIR_DOMAIN_CORE_DUMP_FORMAT_RAW domain.coreDumpWithFormat(path, dumpformat, flags)
def test_read_only_bytecode(self): # When bytecode is read-only but should be rewritten, fail silently. with source_util.create_modules('_temp') as mapping: # Create bytecode that will need to be re-created. py_compile.compile(mapping['_temp']) bytecode_path = imp.cache_from_source(mapping['_temp']) with open(bytecode_path, 'r+b') as bytecode_file: bytecode_file.seek(0) bytecode_file.write(b'\x00\x00\x00\x00') # Make the bytecode read-only. os.chmod(bytecode_path, stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH) try: # Should not raise IOError! self.import_(mapping['_temp'], '_temp') finally: # Make writable for eventual clean-up. os.chmod(bytecode_path, stat.S_IWUSR)
def set_own_perm(usr, dir_list): uid = pwd.getpwnam(usr).pw_uid gid = grp.getgrnam(usr).gr_gid perm_mask_rw = stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IWGRP perm_mask_rwx = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR | stat.S_IRGRP | stat.S_IWGRP | stat.S_IXGRP for cdir in dir_list: os.chown(cdir, uid, gid) os.chmod(cdir, perm_mask_rwx) for croot, sub_dirs, cfiles in os.walk(cdir): for fs_name in sub_dirs: os.chown(os.path.join(croot, fs_name), uid, gid) os.chmod(os.path.join(croot, fs_name), perm_mask_rwx) for fs_name in cfiles: os.chown(os.path.join(croot, fs_name), uid, gid) os.chmod(os.path.join(croot, fs_name), perm_mask_rw)
def write(self): """ Writes configuration file """ # wait for file to unlock. Timeout after 5 seconds for _ in range(5): if not self._writing: self._writing = True break time.sleep(1) else: _LOGGER.error('Could not write to configuration file. It is busy.') return # dump JSON to config file and unlock encoded = self.encode() json.dump(encoded, open(self._filetmp, 'w'), sort_keys=True, indent=4, separators=(',', ': ')) os.chmod(self._filetmp, stat.S_IRUSR | stat.S_IWUSR) try: shutil.move(self._filetmp, self._file) _LOGGER.debug('Config file succesfully updated.') except shutil.Error as e: _LOGGER.error('Failed to move temp config file to original error: ' + str(e)) self._writing = False _LOGGER.debug('Wrote configuration file')
def merge_blanksky(infiles, outfile, clobber=False): """ Merge multiple blanksky files (e.g., 4 blanksky files for ACIS-I) into a single file. """ if len(infiles) == 1: logger.info("Only one blanksky file, no need to merge.") if os.path.exists(outfile) and (not clobber): raise OSError("File already exists: %s" % outfile) shutil.move(infiles[0], outfile) else: logger.info("Merge multiple blanksky files ...") clobber = "yes" if clobber else "no" subprocess.check_call(["punlearn", "dmmerge"]) subprocess.check_call([ "dmmerge", "infile=%s" % ",".join(infiles), "outfile=%s" % outfile, "clobber=%s" % clobber ]) write_keyword(outfile, keyword="DETNAM", value="ACIS-0123") for f in infiles: os.remove(f) # Add write permission to the background file st = os.stat(outfile) os.chmod(outfile, st.st_mode | stat.S_IWUSR)
def _init_client(): global _configuration_directory def prompt(msg): sys.stdout.write('%s: ' % msg) response = sys.stdin.readline() return response.rstrip('\r\n') client_id = prompt('Client Id') client_secret = prompt('Client Secret') redirect_uri = prompt('Redirect Uri') if not os.path.exists(_configuration_directory): os.mkdir(_configuration_directory, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR) client = _CommandClient(client_id, client_secret, redirect_uri) _init_oauth_process(client) # save first time, meaning configuration works client.save_configuration() return client
def test_no_read_directory(self): # Issue #16730 tempdir = tempfile.TemporaryDirectory() original_mode = os.stat(tempdir.name).st_mode def cleanup(tempdir): """Cleanup function for the temporary directory. Since we muck with the permissions, we want to set them back to their original values to make sure the directory can be properly cleaned up. """ os.chmod(tempdir.name, original_mode) # If this is not explicitly called then the __del__ method is used, # but since already mucking around might as well explicitly clean # up. tempdir.__exit__(None, None, None) self.addCleanup(cleanup, tempdir) os.chmod(tempdir.name, stat.S_IWUSR | stat.S_IXUSR) finder = self.get_finder(tempdir.name) self.assertEqual((None, []), finder.find_loader('doesnotexist'))
def secure_file(fname, remove_existing=False): flags = os.O_WRONLY | os.O_CREAT | os.O_EXCL # Refer to "man 2 open". mode = stat.S_IRUSR | stat.S_IWUSR # This is 0o600 in octal and 384 in decimal. if remove_existing: # For security, remove file with potentially elevated mode try: os.remove(fname) except OSError: pass # Open file descriptor umask_original = os.umask(0) try: fdesc = os.open(fname, flags, mode) finally: os.umask(umask_original) return fdesc
def remove_safe_file(self, path): if not SideBarSelection().isNone(path): try: os.remove(path) except: try: if not os.access(path, os.W_OK): import stat os.chmod(path, stat.S_IWUSR) os.remove(path) except: # raise error in case we were unable to delete. if os.path.exists(path): print("Unable to remove file:\n"+path) os.remove(path) else: print('path is none') print(path)
def _create_eae_zipfile(self, zip_file_name, main_file_path, data_files=None): to_zip = [] if data_files is None: data_files = [] # Handle main script to_zip.append(main_file_path) # Prepare the zip file zip_path = "/tmp/" + zip_file_name zipf = zipfile.ZipFile(zip_path, mode='w', compression=zipfile.ZIP_DEFLATED, allowZip64=True) for f in to_zip: zipf.write(f) zipf.close() # Handle other files & dirs for f in data_files: zipCommand = "zip -r -u -0 " + zip_path + " " + f call([zipCommand], shell=True) # Chmod 666 the zip file so it can be accessed os.chmod(zip_path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IWGRP | stat.S_IROTH | stat.S_IWOTH) return zip_path
def save_app(manifest, container_dir, app_json=STATE_JSON): """Saves app manifest and freezes to object.""" # Save the manifest with allocated vip and ports in the state state_file = os.path.join(container_dir, app_json) fs.write_safe( state_file, lambda f: json.dump(manifest, f) ) # chmod for the file to be world readable. if os.name == 'posix': os.chmod( state_file, stat.S_IWUSR | stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH ) # Freeze the app data into a namedtuple object return utils.to_obj(manifest)
def test_no_read_directory(self): # Issue #16730 tempdir = tempfile.TemporaryDirectory() original_mode = os.stat(tempdir.name).st_mode def cleanup(tempdir): """Cleanup function for the temporary directory. Since we muck with the permissions, we want to set them back to their original values to make sure the directory can be properly cleaned up. """ os.chmod(tempdir.name, original_mode) # If this is not explicitly called then the __del__ method is used, # but since already mucking around might as well explicitly clean # up. tempdir.__exit__(None, None, None) self.addCleanup(cleanup, tempdir) os.chmod(tempdir.name, stat.S_IWUSR | stat.S_IXUSR) finder = self.get_finder(tempdir.name) found = self._find(finder, 'doesnotexist') self.assertEqual(found, self.NOT_FOUND)
def test_read_only_bytecode(self): # When bytecode is read-only but should be rewritten, fail silently. with source_util.create_modules('_temp') as mapping: # Create bytecode that will need to be re-created. py_compile.compile(mapping['_temp']) bytecode_path = self.util.cache_from_source(mapping['_temp']) with open(bytecode_path, 'r+b') as bytecode_file: bytecode_file.seek(0) bytecode_file.write(b'\x00\x00\x00\x00') # Make the bytecode read-only. os.chmod(bytecode_path, stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH) try: # Should not raise OSError! self.import_(mapping['_temp'], '_temp') finally: # Make writable for eventual clean-up. os.chmod(bytecode_path, stat.S_IWUSR)
def test_mknod(self): # Test using mknod() to create a FIFO (the only use specified # by POSIX). support.unlink(support.TESTFN) mode = stat.S_IFIFO | stat.S_IRUSR | stat.S_IWUSR try: posix.mknod(support.TESTFN, mode, 0) except OSError as e: # Some old systems don't allow unprivileged users to use # mknod(), or only support creating device nodes. self.assertIn(e.errno, (errno.EPERM, errno.EINVAL)) else: self.assertTrue(stat.S_ISFIFO(posix.stat(support.TESTFN).st_mode)) # Keyword arguments are also supported support.unlink(support.TESTFN) try: posix.mknod(path=support.TESTFN, mode=mode, device=0, dir_fd=None) except OSError as e: self.assertIn(e.errno, (errno.EPERM, errno.EINVAL))
def test_mknod_dir_fd(self): # Test using mknodat() to create a FIFO (the only use specified # by POSIX). support.unlink(support.TESTFN) mode = stat.S_IFIFO | stat.S_IRUSR | stat.S_IWUSR f = posix.open(posix.getcwd(), posix.O_RDONLY) try: posix.mknod(support.TESTFN, mode, 0, dir_fd=f) except OSError as e: # Some old systems don't allow unprivileged users to use # mknod(), or only support creating device nodes. self.assertIn(e.errno, (errno.EPERM, errno.EINVAL)) else: self.assertTrue(stat.S_ISFIFO(posix.stat(support.TESTFN).st_mode)) finally: posix.close(f)
def _get_netrc(cls): # Buffer the '.netrc' path. Use an empty file if it doesn't exist. path = cls.get_netrc_path() content = '' if os.path.exists(path): st = os.stat(path) if st.st_mode & (stat.S_IRWXG | stat.S_IRWXO): print >> sys.stderr, ( 'WARNING: netrc file %s cannot be used because its file ' 'permissions are insecure. netrc file permissions should be ' '600.' % path) with open(path) as fd: content = fd.read() # Load the '.netrc' file. We strip comments from it because processing them # can trigger a bug in Windows. See crbug.com/664664. content = '\n'.join(l for l in content.splitlines() if l.strip() and not l.strip().startswith('#')) with tempdir() as tdir: netrc_path = os.path.join(tdir, 'netrc') with open(netrc_path, 'w') as fd: fd.write(content) os.chmod(netrc_path, (stat.S_IRUSR | stat.S_IWUSR)) return cls._get_netrc_from_path(netrc_path)
def create_boilerplate(): """ Create kibitzr.yml and kibitzr-creds.yml in current directory if they do not exist. """ if not os.path.exists('kibitzr.yml'): with open('kibitzr.yml', 'wt') as fp: logger.info("Saving sample check in kibitzr.yml") fp.write(KIBITZR_YML) else: logger.info("kibitzr.yml already exists. Skipping") if not os.path.exists('kibitzr-creds.yml'): with open('kibitzr-creds.yml', 'wt') as fp: logger.info("Creating kibitzr-creds.yml") fp.write(KIBITZR_CREDS_YML) os.chmod('kibitzr-creds.yml', stat.S_IRUSR | stat.S_IWUSR) else: logger.info("kibitzr-creds.yml already exists. Skipping")
def _octal_to_perm(octal): perms = list("-" * 9) if octal & stat.S_IRUSR: perms[0] = "r" if octal & stat.S_IWUSR: perms[1] = "w" if octal & stat.S_IXUSR: perms[2] = "x" if octal & stat.S_IRGRP: perms[3] = "r" if octal & stat.S_IWGRP: perms[4] = "w" if octal & stat.S_IXGRP: perms[5] = "x" if octal & stat.S_IROTH: perms[6] = "r" if octal & stat.S_IWOTH: perms[7] = "w" if octal & stat.S_IXOTH: perms[8] = "x" return "".join(perms)
def rmtree_remove_readonly_files(func, path, exc_info): """ Error handler for ``shutil.rmtree``. If the error is due to an access error (read only file) it attempts to add write permission and then retries. If the error is for another reason it re-raises the error. Usage : ``shutil.rmtree(path, onerror=rmtree_remove_readonly_files)`` This code was copied from http://stackoverflow.com/questions/2656322/shutil-rmtree-fails-on-windows-with-access-is-denied """ import stat if not os.access(path, os.W_OK): # Is the error an access error ? os.chmod(path, stat.S_IWUSR) func(path) else: raise
def check_is_readonly(self, fname): ''' Returns `True` if @fname is read-only on the filesystem. @fname Path to a file. ''' if not fname: return try: mode = os.stat(fname) read_only = (stat.S_IMODE(mode.st_mode) & stat.S_IWUSR != stat.S_IWUSR) except FileNotFoundError: return return read_only
def write(filename, data, undisclosed=False): """ Write JSON data to (owner only readable) file. This will also create missing directories. Args: filename (str): file path data (str): data to write undisclosed (bool): whether data should be owner only readable """ makedirs(os.path.dirname(filename)) with open(filename, 'w') as fp: if undisclosed: os.chmod(filename, stat.S_IRUSR | stat.S_IWUSR) json.dump(data, fp)
def open_for_writing(filename, permissions=None, mode="w", force=False): if permissions is None: permissions = stat.S_IRUSR | stat.S_IWUSR if force and os.path.exists(filename): os.unlink(filename) umask_original = os.umask(0) try: fd = os.open(filename, os.O_WRONLY | os.O_CREAT | os.O_EXCL, permissions) finally: os.umask(umask_original) # Open file handle and write to file with os.fdopen(fd, mode) as f: yield f
def printlog(message, logpath=False): # I think the logging module is great, but this will be used for the time being # Eventually, we will want to write out multiple output formats: xml,json, etc if logpath: # Next iteration of project will include a more secure logger, # for now, we will just write results to a file directly. # flags = os.O_WRONLY | os.O_CREAT | os.O_EXCL # mode = stat.S_IRUSR | stat.S_IWUSR # umask_original = os.umask(0) # try: # fdesc = os.open(logpath, flags, mode) # # except(OSError): # # print("[-] Log file exists. Remove it, or change log filename") # # raise SystemExit # finally: # os.umask(umask_original) # with os.fdopen(fdesc, 'w') as fout: with open(logpath, 'a') as fout: fout.write("{}\n".format(message)) print("{}".format(message))
def make_writeable(self, filename): """ Make sure that the file is writeable. Useful if our source is read-only. """ if sys.platform.startswith('java'): # On Jython there is no os.access() return if not os.access(filename, os.W_OK): st = os.stat(filename) new_permissions = stat.S_IMODE(st.st_mode) | stat.S_IWUSR os.chmod(filename, new_permissions)
def updateFactorio(): file_name = "/tmp/latestFactorio.tar.gz" print("Downloading %s" % file_name) r = requests.get(DOWNLOADURL, stream=True) total_length = int(r.headers.get('content-length')) if not os.path.isfile(file_name) or total_length != os.path.getsize(file_name): with open(file_name, 'wb') as f: for chunk in progress.bar(r.iter_content(chunk_size=1024), expected_size=(total_length/1024) + 1): if chunk: f.write(chunk) f.flush() #os.chmod(file_name, stat.S_IWUSR | stat.S_IRUSR) else: print("File already exists and file sizes match. Skipping download.") if os.access(FACTORIOPATH, os.W_OK): if os.path.isfile(file_name): tar = tarfile.open(file_name, "r:gz") tar.extractall(path="/tmp") tar.close() copytree("/tmp/factorio", FACTORIOPATH) print("Success.") else: print("Help! Can't find %s, but I should have!" % (file_name)) sys.exit(1) else: print("Can't write to %s" % (FACTORIOPATH)) sys.exit(1)
def write_pid_file(pid_file, pid): import fcntl import stat try: fd = os.open(pid_file, os.O_RDWR | os.O_CREAT, stat.S_IRUSR | stat.S_IWUSR) except OSError as e: shell.print_exception(e) return -1 flags = fcntl.fcntl(fd, fcntl.F_GETFD) assert flags != -1 flags |= fcntl.FD_CLOEXEC r = fcntl.fcntl(fd, fcntl.F_SETFD, flags) assert r != -1 # There is no platform independent way to implement fcntl(fd, F_SETLK, &fl) # via fcntl.fcntl. So use lockf instead try: fcntl.lockf(fd, fcntl.LOCK_EX | fcntl.LOCK_NB, 0, 0, os.SEEK_SET) except IOError: r = os.read(fd, 32) if r: logging.error('already started at pid %s' % common.to_str(r)) else: logging.error('already started') os.close(fd) return -1 os.ftruncate(fd, 0) os.write(fd, common.to_bytes(str(pid))) return 0
def begin_handover(self, fdtx_dir='/tmp/ave'): # call this function on the broker that is going to be replaced. # steps: # stop listening for new connections from clients, stop the notifier, # disconnect all shares, disable further allocation (a session will # receive a Restarting exception if it manages to allocate at precisely # this moment and is expected to try again once or twice), create a # UNIX domain socket to transfer file descriptors (in a separate step), # serialize the state of the local allocator and all live sessions # (PIDs and RPC keys). finally return serialized data and the path to # UNIX domain socket. # first of all check that fdtx_dir is writable. otherwise the socket # will not be created in it and everything fails if os.path.exists(fdtx_dir) and os.path.isdir(fdtx_dir): if not os.access(fdtx_dir, os.R_OK | os.X_OK | os.W_OK): raise Exception('directory not writable: %s' % fdtx_dir) self.stop_listening() self.stop_sharing() self.drop_all_shares() self.stop_listers() self.allocating = False self.fdtx = FdTx(None) uds_path = self.fdtx.listen(fdtx_dir, 'handover-%s' % rand_authkey()) # make sure the caller will be able to interact with the new socket by # making it world readable and world writable mode = (stat.S_IRUSR # owner has read permission | stat.S_IWUSR # owner has write permission | stat.S_IRGRP # group has read permission | stat.S_IWGRP # group has write permission | stat.S_IROTH # others have read permission | stat.S_IWOTH) # others have write permission os.chmod(uds_path, mode) return self.serialize(), self.config, uds_path
def delete_extension(extension): def onerror(func, path, exc_info): """`shutil.rmtree` error handler that helps deleting read-only files on Windows.""" if not os.access(path, os.W_OK): os.chmod(path, stat.S_IWUSR) func(path) else: raise return shutil.rmtree('dwarf/' + extension, onerror=onerror)
def rmtree(path): """shutil.rmtree() with error handler. Handle 'access denied' from trying to delete read-only files. """ def onerror(func, path, exc_info): if not os.access(path, os.W_OK): os.chmod(path, stat.S_IWUSR) func(path) else: raise return shutil.rmtree(path, onerror=onerror)
def write_pid_file(pid_file, pid): try: fd = os.open(pid_file, os.O_RDWR | os.O_CREAT, stat.S_IRUSR | stat.S_IWUSR) except OSError as e: LOG.exception(e) return -1 flags = fcntl.fcntl(fd, fcntl.F_GETFD) assert flags != -1 flags |= fcntl.FD_CLOEXEC r = fcntl.fcntl(fd, fcntl.F_SETFD, flags) assert r != -1 # There is no platform independent way to implement fcntl(fd, F_SETLK, &fl) # via fcntl.fcntl. So use lockf instead try: fcntl.lockf(fd, fcntl.LOCK_EX | fcntl.LOCK_NB, 0, 0, os.SEEK_SET) except IOError: r = os.read(fd, 32) if r: logging.error('already started at pid %s' % utils.to_str(r)) else: logging.error('already started') os.close(fd) return -1 os.ftruncate(fd, 0) os.write(fd, utils.to_bytes(str(pid))) return 0
def _MakeWritable(dir_path): for root, dirs, files in os.walk(dir_path): for path in itertools.chain(dirs, files): st = os.stat(os.path.join(root, path)) os.chmod(os.path.join(root, path), st.st_mode | stat.S_IWUSR) # E.g. turn "base_1p" into "base"
def RmTree(dir): """Delete dir.""" def ChmodAndRetry(func, path, _): # Subversion can leave read-only files around. if not os.access(path, os.W_OK): os.chmod(path, stat.S_IWUSR) return func(path) raise shutil.rmtree(dir, onerror=ChmodAndRetry)
def copy_template_file(self, src, dest, ctx={}): template = self.env.get_template(src) template.stream(ctx).dump(dest, encoding='utf-8') # Make new file writable. if os.access(dest, os.W_OK): st = os.stat(dest) new_permissions = stat.S_IMODE(st.st_mode) | stat.S_IWUSR os.chmod(dest, new_permissions)