我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.W_OK。
def check_path_owner(path): # If we don't have a way to check the effective uid of this process, then # we'll just assume that we own the directory. if not hasattr(os, "geteuid"): return True previous = None while path != previous: if os.path.lexists(path): # Check if path is writable by current user. if os.geteuid() == 0: # Special handling for root user in order to handle properly # cases where users use sudo without -H flag. try: path_uid = get_path_uid(path) except OSError: return False return path_uid == 0 else: return os.access(path, os.W_OK) else: previous, path = path, os.path.dirname(path)
def get_config_file_path(cls): if not cls.IS_GLOBAL: # local to this directory base_path = os.path.join('.') else: base_path = os.path.expanduser('~') if not os.access(base_path, os.W_OK): base_path = '/tmp' base_path = os.path.join(base_path, '.polyaxon') if not os.path.exists(base_path): try: os.makedirs(base_path) except OSError: # Except permission denied and potential race conditions # in multi-threaded environments. logger.error('Could not create config directory `{}`'.format(base_path)) return os.path.join(base_path, cls.CONFIG_FILE_NAME)
def safeInstall(): FACTORIOPATH = getFactorioPath() try: if not os.path.isdir("%s" % (FACTORIOPATH) ): if os.access("%s/.." % (FACTORIOPATH), os.W_OK): os.mkdir(FACTORIOPATH, 0o777) else: subprocess.call(['sudo', 'mkdir', '-p', FACTORIOPATH]) subprocess.call(['sudo', 'chown', getpass.getuser(), FACTORIOPATH]) os.mkdir(os.path.join(FACTORIOPATH, "saves")) os.mkdir(os.path.join(FACTORIOPATH, "config")) with open("%s/.bashrc" % (os.path.expanduser("~")), "r+") as bashrc: lines = bashrc.read() if lines.find("eval \"$(_FACTOTUM_COMPLETE=source factotum)\"\n") == -1: bashrc.write("eval \"$(_FACTOTUM_COMPLETE=source factotum)\"\n") print("You'll want to restart your shell for command autocompletion. Tab is your friend.") updateFactorio() except IOError as e: print("Cannot make %s. Please check permissions. Error %s" % (FACTORIOPATH, e)) sys.exit(1)
def _test_resource_paths(my): path = my.server.get_resource_path('admin') # not a very accurate test my.assertEquals(True, 'etc/admin.tacticrc' in path) paths = my.server.create_resource_paths() sys_login = getpass.getuser() dir = my.server.get_home_dir() is_dir_writeable = os.access(dir, os.W_OK) and os.path.isdir(dir) if dir and is_dir_writeable: dir = "%s/.tactic/etc" % dir else: if os.name == 'nt': dir = 'C:/sthpw/etc' else: dir = '/tmp/sthpw/etc' compared = '%s/%s.tacticrc' %(dir, sys_login) in paths my.assertEquals(True, compared) # since we use admin to get resource path , my.login should also be admin my.assertEquals('admin', my.server.get_login())
def check_access(filename, write_required=True): """ Checks if user has read and optionaly write access to specified file. Uses acl first and possix file permisions if acl cannot be used. Returns true only if user has both required access rights. """ if HAVE_POSIX1E: for pset in posix1e.ACL(file=filename): if pset.tag_type == posix1e.ACL_USER and pset.qualifier == os.geteuid(): if pset.permset.test(posix1e.ACL_READ) and (not write_required or pset.permset.test(posix1e.ACL_WRITE)): return True if pset.tag_type == posix1e.ACL_GROUP and pset.qualifier in os.getgroups(): if pset.permset.test(posix1e.ACL_READ) and (not write_required or pset.permset.test(posix1e.ACL_WRITE)): return True if write_required: return os.access(filename, os.R_OK | os.W_OK) return os.access(filename, os.R_OK)
def verify_path(self, val): try: n_path = os.path.expanduser(val) logger.debug("Expanded user path.") except: n_path = val if not n_path: logger.warning("Please specify a path.") return False elif not os.path.isdir(n_path): logger.warning("This is not a valid path.") return False # elif not os.access(n_path, os.W_OK): elif not writable_dir(n_path): logger.warning("Do not have write access to '{}'.".format(n_path)) return False else: return n_path
def save_to_disk(self): """ Commit the data from memory to disk. Returns True or False depending on success/failure. """ # Create file if it doesn't exist. if not os.path.exists(self.file_path): open(self.file_path, "w").close() # Write new data to specified file. if os.access(self.file_path, os.W_OK): f = open(self.file_path, "w+") f.write(json.dumps(self.data, sort_keys=True, indent=4)) f.close() return True else: return False
def test_dir_isWritable(self, path_mocker_stopall): # mock mock_os_access = path_mocker_stopall.MagicMock(name="mock_os_access") mock_os_path_isdir = path_mocker_stopall.MagicMock(name="mock_os_path_isdir") # patch path_mocker_stopall.patch.object(scarlett_os.internal.path.os, 'access', mock_os_access) path_mocker_stopall.patch.object(scarlett_os.internal.path.os.path, 'isdir', mock_os_path_isdir) path = 'file:///tmp' # patch return values mock_os_path_isdir.return_value = True mock_os_access.return_value = True # run test result = s_path.isWritable(path) # tests mock_os_path_isdir.assert_called_once_with('file:///tmp') mock_os_access.assert_called_once_with('file:///tmp', os.W_OK) assert result == True
def isWritable(path): """Returns whether the file/path is writable.""" try: if os.path.isdir(path): # The given path is an existing directory. # To properly check if it is writable, you need to use os.access. return os.access(path, os.W_OK) else: # The given path is supposed to be a file. # Avoid using open(path, "w"), as it might corrupt existing files. # And yet, even if the parent directory is actually writable, # open(path, "rw") will IOError if the file doesn't already exist. # Therefore, simply check the directory permissions instead: # path = 'file:///etc/fstab' # In [22]: os.path.dirname(path) # Out[22]: 'file:///etc' return os.access(os.path.dirname(path), os.W_OK) # In [23]: os.access(os.path.dirname(path), os.W_OK) # Out[23]: False except UnicodeDecodeError: unicode_error_dialog()
def setup(self): """ Defers loading until needed. """ from haystack import connections new_index = False # Make sure the index is there. if self.use_file_storage and not os.path.exists(self.path): os.makedirs(self.path) new_index = True if self.use_file_storage and not os.access(self.path, os.W_OK): raise IOError("The path to your Whoosh index '%s' is not writable for the current user/group." % self.path) if self.use_file_storage: self.storage = FileStorage(self.path) else: global LOCALS if getattr(LOCALS, 'RAM_STORE', None) is None: LOCALS.RAM_STORE = RamStorage() self.storage = LOCALS.RAM_STORE self.content_field_name, self.schema = self.build_schema(connections[self.connection_alias].get_unified_index().all_searchfields()) self.parser = QueryParser(self.content_field_name, schema=self.schema) if new_index is True: self.index = self.storage.create_index(self.schema) else: try: self.index = self.storage.open_index(schema=self.schema) except index.EmptyIndexError: self.index = self.storage.create_index(self.schema) self.setup_complete = True
def ExecRecursiveMirror(self, source, dest): """Emulation of rm -rf out && cp -af in out.""" if os.path.exists(dest): if os.path.isdir(dest): def _on_error(fn, path, dummy_excinfo): # The operation failed, possibly because the file is set to # read-only. If that's why, make it writable and try the op again. if not os.access(path, os.W_OK): os.chmod(path, stat.S_IWRITE) fn(path) shutil.rmtree(dest, onerror=_on_error) else: if not os.access(dest, os.W_OK): # Attempt to make the file writable before deleting it. os.chmod(dest, stat.S_IWRITE) os.unlink(dest) if os.path.isdir(source): shutil.copytree(source, dest) else: shutil.copy2(source, dest) # Try to diagnose crbug.com/741603 if not os.path.exists(dest): raise Exception("Copying of %s to %s failed" % (source, dest))
def run(): if Data.image_path == "": output_label["text"] = "Choose the image file first." return if os.access("{}/AppIcon.appiconset".format(Data.save_path), os.W_OK) == False: os.mkdir("{}/AppIcon.appiconset".format(Data.save_path)) file = open("{}/AppIcon.appiconset/Contents.json".format(Data.save_path), mode="w") json.dump(json_data, file, indent=4, sort_keys=True, separators=(',', ':')) file.close() with open("{}/AppIcon.appiconset/Contents.json".format(Data.save_path), mode="r") as data_file: data = json.load(data_file) images = data["images"] try: im = Image.open(Data.image_path) output_label["text"] = "Image specs: {} {} {}\n{}\n".format(im.format, im.size, im.mode, "-"*35) for image in images: size = get_size(image) out = im.resize(size, Image.ANTIALIAS) out.save("{}/AppIcon.appiconset/{}".format(Data.save_path, image["filename"]), format="PNG") output_label["text"] += "Image generated: {}\n".format(size) open_saved_button.grid(row=1, column=0, columnspan=2, sticky=tk.W+tk.E) except IOError: output_label["text"] = "Please select a supported image file."
def apply(self, software, theme): new_files = theme.files[software] files = self.get_supported_software()[software] for name, theme_path in new_files.items(): if name in files: for orig_path in files[name]: filename = os.path.expanduser(orig_path) if os.access(filename, os.W_OK): theme.themes.backup_if_necessary(software, name, filename) # reload orig files because the original theme might have # changed (in case a backup was necessary) orig_files = theme.themes.original.files[software] with open(filename, "w") as f: logging.debug("Merging %s with %s" % (orig_files[name], theme_path)) f.writelines(self.merge(software, name, orig_files[name], theme_path)) break
def try_init_cgroup(): euid = geteuid() cgroups_to_init = list() if not (path.isdir(CPUACCT_CGROUP_ROOT) and access(CPUACCT_CGROUP_ROOT, W_OK)): cgroups_to_init.append(CPUACCT_CGROUP_ROOT) if not (path.isdir(MEMORY_CGROUP_ROOT) and access(MEMORY_CGROUP_ROOT, W_OK)): cgroups_to_init.append(MEMORY_CGROUP_ROOT) if not (path.isdir(PIDS_CGROUP_ROOT) and access(PIDS_CGROUP_ROOT, W_OK)): cgroups_to_init.append(PIDS_CGROUP_ROOT) if cgroups_to_init: if euid == 0: logger.info('Initializing cgroup: %s', ', '.join(cgroups_to_init)) for cgroup_to_init in cgroups_to_init: makedirs(cgroup_to_init, exist_ok=True) elif __stdin__.isatty(): logger.info('Initializing cgroup: %s', ', '.join(cgroups_to_init)) call(['sudo', 'sh', '-c', 'mkdir -p "{1}" && chown -R "{0}" "{1}"'.format( euid, '" "'.join(cgroups_to_init))]) else: logger.error('Cgroup not initialized')
def is_admin(user=lib.global_parameters['USER'], activeroot='/'): """ Check if the specified user has administrative privileges on given system""" platform = get_system_type_from_active_root(activeroot) ip = get_address_from_active_root(activeroot) if activeroot.startswith('/'): if platform.startswith('linux'): # on linux check if euid is 0 from pwd import getpwnam if getpwnam(user).pw_uid == 0: return True return False elif platform.startswith('win'): from win32net import NetUserGetLocalGroups return 'Administrators' in NetUserGetLocalGroups(ip, user) # should work on remote systems too # on Windows only admins can write to C:\Windows\temp #if os.access(os.path.join(os.environ.get('SystemRoot', 'C:\\Windows'), 'temp'), os.W_OK): # return True #return False else: log.warn('Cannot check root privileges, platform is not fully supported (%s).' % platform) return False else: log.warn('Cannot check root privileges, remote system analysis (%s) not supported yet.' % activeroot) return False
def CheckOutputFile(filename): """Check that the given file does not exist and can be opened for writing. Args: filename: The name of the file. Raises: FileExistsError: if the given filename is not found FileNotWritableError: if the given filename is not readable. """ full_path = os.path.abspath(filename) if os.path.exists(full_path): raise FileExistsError('%s: output file exists' % filename) elif not os.access(os.path.dirname(full_path), os.W_OK): raise FileNotWritableError( '%s: not writable' % os.path.dirname(full_path))
def ExecRecursiveMirror(self, source, dest): """Emulation of rm -rf out && cp -af in out.""" if os.path.exists(dest): if os.path.isdir(dest): def _on_error(fn, path, dummy_excinfo): # The operation failed, possibly because the file is set to # read-only. If that's why, make it writable and try the op again. if not os.access(path, os.W_OK): os.chmod(path, stat.S_IWRITE) fn(path) shutil.rmtree(dest, onerror=_on_error) else: if not os.access(dest, os.W_OK): # Attempt to make the file writable before deleting it. os.chmod(dest, stat.S_IWRITE) os.unlink(dest) if os.path.isdir(source): shutil.copytree(source, dest) else: shutil.copy2(source, dest)
def export(self): ''' Export visible layers and layer groups to CoaSprite ''' if os.path.isfile(os.path.join(self.path, self.name)): show_error_msg('ABORTING!\nDestination is not a folder.\n {path}/{name}'.format(path=self.path, name=self.name)) return if not os.access(self.path, os.W_OK): show_error_msg('ABORTING!\nDestination is not a writable.\n {path}'.format(path=self.path)) return if os.path.isdir(os.path.join(self.path, self.name)): show_error_msg('Destination exists, I may have overwritten something in {path}/{name}'.format(path=self.path, name=self.name)) self.mkdir() # Loop through visible layers self.img = self.original_img.duplicate() self.img.undo_group_start() for layer in self.img.layers: if layer.visible: name = '{name}.png'.format(name=layer.name) pdb.gimp_image_set_active_layer(self.img, layer) # Crop and the layer position pdb.plug_in_autocrop_layer(self.img, layer) z = 0 - pdb.gimp_image_get_item_position(self.img, layer) if isinstance(layer, gimp.GroupLayer): if len(layer.children) > 0: self.sprites.append(self.export_sprite_sheet(layer, name, layer.offsets, z)) else: self.sprites.append(self.export_sprite(layer, name, layer.offsets, z)) self.write_json() self.img.undo_group_end() pdb.gimp_image_delete(self.img)
def is_writable(self, path): result = False while not result: if os.path.exists(path): result = os.access(path, os.W_OK) break parent = os.path.dirname(path) if parent == path: break path = parent return result
def get_cache_base(suffix=None): """ Return the default base location for distlib caches. If the directory does not exist, it is created. Use the suffix provided for the base directory, and default to '.distlib' if it isn't provided. On Windows, if LOCALAPPDATA is defined in the environment, then it is assumed to be a directory, and will be the parent directory of the result. On POSIX, and on Windows if LOCALAPPDATA is not defined, the user's home directory - using os.expanduser('~') - will be the parent directory of the result. The result is just the directory '.distlib' in the parent directory as determined above, or with the name specified with ``suffix``. """ if suffix is None: suffix = '.distlib' if os.name == 'nt' and 'LOCALAPPDATA' in os.environ: result = os.path.expandvars('$localappdata') else: # Assume posix, or old Windows result = os.path.expanduser('~') # we use 'isdir' instead of 'exists', because we want to # fail if there's a file with that name if os.path.isdir(result): usable = os.access(result, os.W_OK) if not usable: logger.warning('Directory exists but is not writable: %s', result) else: try: os.makedirs(result) usable = True except OSError: logger.warning('Unable to create %s', result, exc_info=True) usable = False if not usable: result = tempfile.mkdtemp() logger.warning('Default location unusable, using %s', result) return os.path.join(result, suffix)
def _mkstemp_inner(dir, pre, suf, flags, output_type): """Code common to mkstemp, TemporaryFile, and NamedTemporaryFile.""" names = _get_candidate_names() if output_type is bytes: names = map(_os.fsencode, names) for seq in range(TMP_MAX): name = next(names) file = _os.path.join(dir, pre + name + suf) try: fd = _os.open(file, flags, 0o600) except FileExistsError: continue # try again except PermissionError: # This exception is thrown when a directory with the chosen name # already exists on windows. if (_os.name == 'nt' and _os.path.isdir(dir) and _os.access(dir, _os.W_OK)): continue else: raise return (fd, _os.path.abspath(file)) raise FileExistsError(_errno.EEXIST, "No usable temporary file name found") # User visible interfaces.
def test_get_default_keystore_path(self): """ Checks we the default keystore directory exists or create it. Verify the path is correct and that we have read/write access to it. """ keystore_dir = PyWalib.get_default_keystore_path() if not os.path.exists(keystore_dir): os.makedirs(keystore_dir) # checks path correctness self.assertTrue(keystore_dir.endswith(".config/pyethapp/keystore/")) # checks read/write access self.assertEqual(os.access(keystore_dir, os.R_OK), True) self.assertEqual(os.access(keystore_dir, os.W_OK), True)
def check_folder_writable(checkfolder): if not os.access(checkfolder, os.W_OK): logging.error("You don't have access to %s" % checkfolder) sys.exit(1) # Function that gets protected packages or returns an empty dictionary
def make_writeable(self, filename): """ Make sure that the file is writeable. Useful if our source is read-only. """ if sys.platform.startswith('java'): # On Jython there is no os.access() return if not os.access(filename, os.W_OK): st = os.stat(filename) new_permissions = stat.S_IMODE(st.st_mode) | stat.S_IWUSR os.chmod(filename, new_permissions)
def main(args, outs): hostname = socket.gethostname() print "Checking run folder..." tk_preflight.check_rta_complete(args.run_path) print "Checking RunInfo.xml..." tk_preflight.check_runinfo_xml(args.run_path) print "Checking system environment..." ok, msg = tk_preflight.check_ld_library_path() if not ok: martian.exit(msg) print "Checking barcode whitelist..." tk_preflight.check_barcode_whitelist(args.barcode_whitelist) if args.check_executables: print "Checking bcl2fastq..." (rta_version, rc_i2_read, bcl_params) = tk_bcl.get_rta_version(args.run_path) martian.log_info("RTA Version: %s" % rta_version) martian.log_info("BCL Params: %s" % str(bcl_params)) (major_ver, full_ver) = tk_bcl.check_bcl2fastq(hostname, rta_version) martian.log_info("Running bcl2fastq mode: %s. Version: %s" % (major_ver, full_ver)) ok, msg = tk_preflight.check_open_fh() if not ok: martian.exit(msg) if args.output_path is not None: tk_preflight.check_folder_or_create("--output-dir", args.output_path, hostname, permission=os.W_OK|os.X_OK) if args.interop_output_path is not None: tk_preflight.check_folder_or_create("--interop-dir", args.interop_output_path, hostname, permission=os.W_OK|os.X_OK) if args.max_bcl2fastq_threads < 1: msg = "Cannot run bcl2fastq with zero threads." martian.exit(msg)