我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用stat.S_IWRITE。
def rmtree_errorhandler(func, path, exc_info): """On Windows, the files in .svn are read-only, so when rmtree() tries to remove them, an exception is thrown. We catch that here, remove the read-only attribute, and hopefully continue without problems.""" exctype, value = exc_info[:2] if not ((exctype is WindowsError and value.args[0] == 5) or #others (exctype is OSError and value.args[0] == 13) or #python2.4 (exctype is PermissionError and value.args[3] == 5) #python3.3 ): raise # file type should currently be read only if ((os.stat(path).st_mode & stat.S_IREAD) != stat.S_IREAD): raise # convert to read/write os.chmod(path, stat.S_IWRITE) # use the original function to repeat the operation func(path)
def delete(self): """ WindowsError is raised if the file cannot be deleted """ if self.isfile(): selfStr = str(self) try: os.remove(selfStr) except WindowsError: os.chmod(selfStr, stat.S_IWRITE) os.remove(selfStr) elif self.isdir(): selfStr = str(self.asDir()) for f in self.files(recursive=True): f.delete() shutil.rmtree(selfStr, True)
def ExecRecursiveMirror(self, source, dest): """Emulation of rm -rf out && cp -af in out.""" if os.path.exists(dest): if os.path.isdir(dest): def _on_error(fn, path, dummy_excinfo): # The operation failed, possibly because the file is set to # read-only. If that's why, make it writable and try the op again. if not os.access(path, os.W_OK): os.chmod(path, stat.S_IWRITE) fn(path) shutil.rmtree(dest, onerror=_on_error) else: if not os.access(dest, os.W_OK): # Attempt to make the file writable before deleting it. os.chmod(dest, stat.S_IWRITE) os.unlink(dest) if os.path.isdir(source): shutil.copytree(source, dest) else: shutil.copy2(source, dest) # Try to diagnose crbug.com/741603 if not os.path.exists(dest): raise Exception("Copying of %s to %s failed" % (source, dest))
def _CopyRuntimeImpl(target, source, verbose=True): """Copy |source| to |target| if it doesn't already exist or if it needs to be updated (comparing last modified time as an approximate float match as for some reason the values tend to differ by ~1e-07 despite being copies of the same file... https://crbug.com/603603). """ if (os.path.isdir(os.path.dirname(target)) and (not os.path.isfile(target) or abs(os.stat(target).st_mtime - os.stat(source).st_mtime) >= 0.01)): if verbose: print 'Copying %s to %s...' % (source, target) if os.path.exists(target): # Make the file writable so that we can delete it now, and keep it # readable. os.chmod(target, stat.S_IWRITE | stat.S_IREAD) os.unlink(target) shutil.copy2(source, target) # Make the file writable so that we can overwrite or delete it later, # keep it readable. os.chmod(target, stat.S_IWRITE | stat.S_IREAD)
def test_sync_works(self): try: synced_file = os.path.join(self.workspace_root, "microDAQ", "exportLocation") config = { KEYS.USER: "riormt", KEYS.P4_PORT: "penguin.natinst.com:1666", KEYS.PERFORCE_PATH: "//microDAQ/exportLocation", KEYS.CLIENT_WORKSPACE: self.workspace, KEYS.FORCE: True } sync_perforce_path(config, self.status) self.assertEqual(self.export_location_path, self.status.status["local_path"]) self.assertTrue(os.path.exists(synced_file)) self.assertTrue(self.status.status["succeeded"]) finally: try: os.chmod(self.export_location_path, stat.S_IWRITE) shutil.rmtree(self.workspace_root) except OSError: self.fail("Directory was not synced from perforce...")
def ExecRecursiveMirror(self, source, dest): """Emulation of rm -rf out && cp -af in out.""" if os.path.exists(dest): if os.path.isdir(dest): def _on_error(fn, path, dummy_excinfo): # The operation failed, possibly because the file is set to # read-only. If that's why, make it writable and try the op again. if not os.access(path, os.W_OK): os.chmod(path, stat.S_IWRITE) fn(path) shutil.rmtree(dest, onerror=_on_error) else: if not os.access(dest, os.W_OK): # Attempt to make the file writable before deleting it. os.chmod(dest, stat.S_IWRITE) os.unlink(dest) if os.path.isdir(source): shutil.copytree(source, dest) else: shutil.copy2(source, dest)
def _CopyRuntimeImpl(target, source, verbose=True): """Copy |source| to |target| if it doesn't already exist or if it needs to be updated (comparing last modified time as an approximate float match as for some reason the values tend to differ by ~1e-07 despite being copies of the same file... https://crbug.com/603603). """ if (os.path.isdir(os.path.dirname(target)) and (not os.path.isfile(target) or abs(os.stat(target).st_mtime - os.stat(source).st_mtime) >= 0.01)): if verbose: print 'Copying %s to %s...' % (source, target) if os.path.exists(target): # Make the file writable so that we can delete it now. os.chmod(target, stat.S_IWRITE) os.unlink(target) shutil.copy2(source, target) # Make the file writable so that we can overwrite or delete it later. os.chmod(target, stat.S_IWRITE)
def save_token_file(token): token_file = raw_input("> Enter token file location [~/.sympy/release-token] ") token_file = token_file or "~/.sympy/release-token" token_file_expand = os.path.expanduser(token_file) token_file_expand = os.path.abspath(token_file_expand) token_folder, _ = os.path.split(token_file_expand) try: if not os.path.isdir(token_folder): os.mkdir(token_folder, 0o700) with open(token_file_expand, 'w') as f: f.write(token + '\n') os.chmod(token_file_expand, stat.S_IREAD | stat.S_IWRITE) except OSError as e: print("> Unable to create folder for token file: ", e) return except IOError as e: print("> Unable to save token file: ", e) return return token_file
def ExecRecursiveMirror(self, source, dest): """Emulation of rm -rf out && cp -af in out.""" if os.path.exists(dest): if os.path.isdir(dest): def _on_error(fn, path, excinfo): # The operation failed, possibly because the file is set to # read-only. If that's why, make it writable and try the op again. if not os.access(path, os.W_OK): os.chmod(path, stat.S_IWRITE) fn(path) shutil.rmtree(dest, onerror=_on_error) else: if not os.access(dest, os.W_OK): # Attempt to make the file writable before deleting it. os.chmod(dest, stat.S_IWRITE) os.unlink(dest) if os.path.isdir(source): shutil.copytree(source, dest) else: shutil.copy2(source, dest)
def CacheRetrieveFunc(target, source, env): t = target[0] fs = t.fs cd = env.get_CacheDir() cachedir, cachefile = cd.cachepath(t) if not fs.exists(cachefile): cd.CacheDebug('CacheRetrieve(%s): %s not in cache\n', t, cachefile) return 1 cd.CacheDebug('CacheRetrieve(%s): retrieving from %s\n', t, cachefile) if SCons.Action.execute_actions: if fs.islink(cachefile): fs.symlink(fs.readlink(cachefile), t.get_internal_path()) else: env.copy_from_cache(cachefile, t.get_internal_path()) st = fs.stat(cachefile) fs.chmod(t.get_internal_path(), stat.S_IMODE(st[stat.ST_MODE]) | stat.S_IWRITE) return 0
def copyFunc(dest, source, env): """Install a source file or directory into a destination by copying, (including copying permission/mode bits).""" if os.path.isdir(source): if os.path.exists(dest): if not os.path.isdir(dest): raise SCons.Errors.UserError("cannot overwrite non-directory `%s' with a directory `%s'" % (str(dest), str(source))) else: parent = os.path.split(dest)[0] if not os.path.exists(parent): os.makedirs(parent) scons_copytree(source, dest) else: shutil.copy2(source, dest) st = os.stat(source) os.chmod(dest, stat.S_IMODE(st[stat.ST_MODE]) | stat.S_IWRITE) return 0 # # Functions doing the actual work of the InstallVersionedLib Builder. #
def copyFuncVersionedLib(dest, source, env): """Install a versioned library into a destination by copying, (including copying permission/mode bits) and then creating required symlinks.""" if os.path.isdir(source): raise SCons.Errors.UserError("cannot install directory `%s' as a version library" % str(source) ) else: # remove the link if it is already there try: os.remove(dest) except: pass shutil.copy2(source, dest) st = os.stat(source) os.chmod(dest, stat.S_IMODE(st[stat.ST_MODE]) | stat.S_IWRITE) installShlibLinks(dest, source, env) return 0
def mirror_pic(output_data_path, _dir, pic, img): fname, fextension = os.path.splitext(pic) mirror_x_img = img[:, ::-1, :] mirror_x_img_gray = rgb2gray(mirror_x_img) mirror_y_img = img[::-1, :, :] mirror_y_img_gray = rgb2gray(mirror_y_img) mirror_xy_img = img[::-1, ::-1, :] mirror_xy_img_gray = rgb2gray(mirror_xy_img) misc.imsave(os.path.join(output_data_path, _dir, (fname + '_mirror_x' + fextension)), mirror_x_img_gray) os.chmod(os.path.join(output_data_path, _dir, (fname + '_mirror_x' + fextension)), stat.S_IWRITE) misc.imsave(os.path.join(output_data_path, _dir, (fname + '_mirror_y' + fextension)), mirror_y_img_gray) os.chmod(os.path.join(output_data_path, _dir, (fname + '_mirror_y' + fextension)), stat.S_IWRITE) misc.imsave(os.path.join(output_data_path, _dir, (fname + '_mirror_xy' + fextension)), mirror_xy_img_gray) os.chmod(os.path.join(output_data_path, _dir, (fname + '_mirror_xy' + fextension)), stat.S_IWRITE) return mirror_x_img, mirror_y_img, mirror_xy_img
def _write(self): if threading.currentThread().isDaemon(): log.warning('daemon thread cannot write wallet') return if not self.modified: return s = json.dumps(self.data, indent=4, sort_keys=True) temp_path = "%s.tmp.%s" % (self.path, os.getpid()) with open(temp_path, "w") as f: f.write(s) f.flush() os.fsync(f.fileno()) if os.path.exists(self.path): mode = os.stat(self.path).st_mode else: mode = stat.S_IREAD | stat.S_IWRITE # perform atomic write on POSIX systems try: os.rename(temp_path, self.path) except: os.remove(self.path) os.rename(temp_path, self.path) os.chmod(self.path, mode) self.modified = False
def force_delete(self, path=None): path = os.path.abspath(path or self.path) if path is None or os.path.exists(path) == False : return if os.path.isfile(path): os.chmod(path, stat.S_IWRITE) os.remove(path) return try : shutil.rmtree(path) except : del_dir_paths = [] for dirpath, dirnames, filenames in os.walk(path): try : shutil.rmtree(dirpath) except : del_dir_paths.append(dirpath) for filename in filenames: del_file_path = os.path.join(dirpath, filename) os.chmod(del_file_path, stat.S_IWRITE) os.remove(del_file_path) del_dir_paths.sort(reverse=True) for del_dir_path in del_dir_paths: os.chmod(del_dir_path, stat.S_IWRITE) os.rmdir(del_dir_path)
def rmtree_errorhandler(func, path, exc_info): """On Windows, the files in .svn are read-only, so when rmtree() tries to remove them, an exception is thrown. We catch that here, remove the read-only attribute, and hopefully continue without problems.""" # if file type currently read only if os.stat(path).st_mode & stat.S_IREAD: # convert to read/write os.chmod(path, stat.S_IWRITE) # use the original function to repeat the operation func(path) return else: raise
def auto_chmod(func, arg, exc): if func in [os.unlink, os.remove] and os.name == 'nt': chmod(arg, stat.S_IWRITE) return func(arg) et, ev, _ = sys.exc_info() six.reraise(et, (ev[0], ev[1] + (" %s %s" % (func, arg))))
def execute(self, name, options, parameters): try: self._cmdLock.acquire() self._log.debug("execute(%s, %s, %s)", name, options, parameters) if not name.startswith("/"): raise CF.InvalidFileName(CF.CF_EINVAL, "Filename must be absolute") if self.isLocked(): raise CF.Device.InvalidState("System is locked down") if self.isDisabled(): raise CF.Device.InvalidState("System is disabled") priority = 0 stack_size = 4096 invalidOptions = [] for option in options: val = option.value.value() if option.id == CF.ExecutableDevice.PRIORITY_ID: if ((not isinstance(val, int)) and (not isinstance(val, long))): invalidOptions.append(option) else: priority = val elif option.id == CF.ExecutableDevice.STACK_SIZE_ID: if ((not isinstance(val, int)) and (not isinstance(val, long))): invalidOptions.append(option) else: stack_size = val if len(invalidOptions) > 0: self._log.error("execute() received invalid options %s", invalidOptions) raise CF.ExecutableDevice.InvalidOptions(invalidOptions) command = name[1:] # This is relative to our CWD self._log.debug("Running %s %s", command, os.getcwd()) if not os.path.isfile(command): raise CF.InvalidFileName(CF.CF_EINVAL, "File could not be found %s" % command) os.chmod(command, os.stat(command)[0] | stat.S_IEXEC | stat.S_IREAD | stat.S_IWRITE) finally: self._cmdLock.release() return self._execute(command, options, parameters)
def rmtree_readonly(directory): if os.path.islink(directory): os.remove(directory) else: def remove_readonly(func, path, _): os.chmod(path, stat.S_IWRITE) func(path) shutil.rmtree(directory, onerror=remove_readonly) # Directory navigation
def remove(path): def remove_readonly(func, path, _): os.chmod(path, stat.S_IWRITE) func(path) shutil.rmtree(path, onerror=remove_readonly)
def auto_chmod(func, arg, exc): if func is os.remove and os.name=='nt': chmod(arg, stat.S_IWRITE) return func(arg) et, ev, _ = sys.exc_info() reraise(et, (ev[0], ev[1] + (" %s %s" % (func,arg))))
def _remove_dir(target): #on windows this seems to a problem for dir_path, dirs, files in os.walk(target): os.chmod(dir_path, stat.S_IWRITE) for filename in files: os.chmod(os.path.join(dir_path, filename), stat.S_IWRITE) shutil.rmtree(target)
def remove_readonly(func, path, excinfo): os.chmod(path, stat.S_IWRITE) func(path)
def auto_chmod(func, arg, exc): if func is os.remove and os.name == 'nt': chmod(arg, stat.S_IWRITE) return func(arg) et, ev, _ = sys.exc_info() six.reraise(et, (ev[0], ev[1] + (" %s %s" % (func, arg))))
def auto_chmod(func, arg, exc): if func is os.remove and os.name == 'nt': chmod(arg, stat.S_IWRITE) return func(arg) et, ev, _ = sys.exc_info() reraise(et, (ev[0], ev[1] + (" %s %s" % (func, arg))))
def setWritable(self, state=True): """ sets the writeable flag (ie: !readonly) """ try: setTo = stat.S_IREAD if state: setTo = stat.S_IWRITE os.chmod(self, setTo) except: pass