我们从Python开源项目中,提取了以下48个代码示例,用于说明如何使用stat.S_IRGRP。
def main(args): vm_name = args['<vm_name>'] # get domain from libvirt con = libvirt.open('qemu:///system') domain = con.lookupByName(vm_name) path = os.path.join(os.getcwd(), '{}.raw'.format(vm_name)) with open(path, 'w') as f: # chmod to be r/w by everyone os.chmod(path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IWGRP | stat.S_IROTH | stat.S_IWOTH) # take a ram dump flags = libvirt.VIR_DUMP_MEMORY_ONLY dumpformat = libvirt.VIR_DOMAIN_CORE_DUMP_FORMAT_RAW domain.coreDumpWithFormat(path, dumpformat, flags)
def test_read_only_bytecode(self): # When bytecode is read-only but should be rewritten, fail silently. with source_util.create_modules('_temp') as mapping: # Create bytecode that will need to be re-created. py_compile.compile(mapping['_temp']) bytecode_path = imp.cache_from_source(mapping['_temp']) with open(bytecode_path, 'r+b') as bytecode_file: bytecode_file.seek(0) bytecode_file.write(b'\x00\x00\x00\x00') # Make the bytecode read-only. os.chmod(bytecode_path, stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH) try: # Should not raise IOError! self.import_(mapping['_temp'], '_temp') finally: # Make writable for eventual clean-up. os.chmod(bytecode_path, stat.S_IWUSR)
def test_execute_bit_not_copied(self): # Issue 6070: under posix .pyc files got their execute bit set if # the .py file had the execute bit set, but they aren't executable. with temp_umask(0o022): sys.path.insert(0, os.curdir) try: fname = TESTFN + os.extsep + "py" open(fname, 'w').close() os.chmod(fname, (stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH)) fn = imp.cache_from_source(fname) unlink(fn) __import__(TESTFN) if not os.path.exists(fn): self.fail("__import__ did not result in creation of " "either a .pyc or .pyo file") s = os.stat(fn) self.assertEqual(stat.S_IMODE(s.st_mode), stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH) finally: del sys.path[0] remove_files(TESTFN) unload(TESTFN)
def set_own_perm(usr, dir_list): uid = pwd.getpwnam(usr).pw_uid gid = grp.getgrnam(usr).gr_gid perm_mask_rw = stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IWGRP perm_mask_rwx = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR | stat.S_IRGRP | stat.S_IWGRP | stat.S_IXGRP for cdir in dir_list: os.chown(cdir, uid, gid) os.chmod(cdir, perm_mask_rwx) for croot, sub_dirs, cfiles in os.walk(cdir): for fs_name in sub_dirs: os.chown(os.path.join(croot, fs_name), uid, gid) os.chmod(os.path.join(croot, fs_name), perm_mask_rwx) for fs_name in cfiles: os.chown(os.path.join(croot, fs_name), uid, gid) os.chmod(os.path.join(croot, fs_name), perm_mask_rw)
def test_execute_bit_not_copied(self): # Issue 6070: under posix .pyc files got their execute bit set if # the .py file had the execute bit set, but they aren't executable. oldmask = os.umask(022) sys.path.insert(0, os.curdir) try: fname = TESTFN + os.extsep + "py" f = open(fname, 'w').close() os.chmod(fname, (stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH)) __import__(TESTFN) fn = fname + 'c' if not os.path.exists(fn): fn = fname + 'o' if not os.path.exists(fn): self.fail("__import__ did not result in creation of " "either a .pyc or .pyo file") s = os.stat(fn) self.assertEqual(stat.S_IMODE(s.st_mode), stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH) finally: os.umask(oldmask) remove_files(TESTFN) unload(TESTFN) del sys.path[0]
def test_execute_bit_not_copied(self): # Issue 6070: under posix .pyc files got their execute bit set if # the .py file had the execute bit set, but they aren't executable. oldmask = os.umask(0o22) sys.path.insert(0, os.curdir) try: fname = TESTFN + os.extsep + "py" f = open(fname, 'w').close() os.chmod(fname, (stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH)) __import__(TESTFN) fn = fname + 'c' if not os.path.exists(fn): fn = fname + 'o' if not os.path.exists(fn): self.fail("__import__ did not result in creation of " "either a .pyc or .pyo file") s = os.stat(fn) assert(s.st_mode & (stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)) assert(not (s.st_mode & (stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH))) finally: os.umask(oldmask) clean_tmpfiles(fname) unload(TESTFN) del sys.path[0]
def store_index(self,ipath): self.logger.info("# indexed_fasta.store_index('%s')" % ipath) # write to tmp-file first and in the end rename in order to have this atomic # otherwise parallel building of the same index may screw it up. import tempfile tmp = tempfile.NamedTemporaryFile(mode="w",dir = os.path.dirname(ipath),delete=False) for chrom in sorted(self.chrom_stats.keys()): ofs,ldata,skip,skipchar,size = self.chrom_stats[chrom] tmp.write("%s\t%d\t%d\t%d\t%r\t%d\n" % (chrom,ofs,ldata,skip,skipchar,size)) # make sure everything is on disk os.fsync(tmp) tmp.close() # make it accessible to everyone import stat os.chmod(tmp.name, stat.S_IROTH | stat.S_IRGRP | stat.S_IRUSR) # this is atomic on POSIX as we have created tmp in the same directory, # therefore same filesystem os.rename(tmp.name,ipath)
def store_index(self,ipath): debug("# indexed_fasta.store_index('%s')" % ipath) # write to tmp-file first and in the end rename in order to have this atomic # otherwise parallel building of the same index may screw it up. import tempfile tmp = tempfile.NamedTemporaryFile(mode="w",dir = os.path.dirname(ipath),delete=False) for chrom in sorted(self.chrom_stats.keys()): ofs,ldata,skip,skipchar,size = self.chrom_stats[chrom] tmp.write("%s\t%d\t%d\t%d\t%r\t%d\n" % (chrom,ofs,ldata,skip,skipchar,size)) # make sure everything is on disk os.fsync(tmp) tmp.close() # make it accessible to everyone import stat os.chmod(tmp.name, stat.S_IROTH | stat.S_IRGRP | stat.S_IRUSR) # this is atomic on POSIX as we have created tmp in the same directory, # therefore same filesystem os.rename(tmp.name,ipath)
def tearDown(self): ''' Disconnect ramdisk, unloading data to pre-build location. ''' self.mbl.chownR(self.keyuser, self.tmphome + "/src") # chmod so it's readable by everyone, writable by the group self.mbl.chmodR(stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH | stat.S_IWGRP, self.tmphome + "/src", "append") self.libc.sync() self.libc.sync() # Copy back to pseudo-build directory call([self.RSYNC, "-aqp", self.tmphome + "/src/MacBuild/dmgs", self.buildHome]) call([self.RSYNC, "-aqp", self.tmphome + "/src/", self.buildHome + "/builtSrc"]) self.libc.sync() self.libc.sync() os.chdir(self.buildHome) self._exit(self.ramdisk, self.luggage, 0)
def _create_eae_zipfile(self, zip_file_name, main_file_path, data_files=None): to_zip = [] if data_files is None: data_files = [] # Handle main script to_zip.append(main_file_path) # Prepare the zip file zip_path = "/tmp/" + zip_file_name zipf = zipfile.ZipFile(zip_path, mode='w', compression=zipfile.ZIP_DEFLATED, allowZip64=True) for f in to_zip: zipf.write(f) zipf.close() # Handle other files & dirs for f in data_files: zipCommand = "zip -r -u -0 " + zip_path + " " + f call([zipCommand], shell=True) # Chmod 666 the zip file so it can be accessed os.chmod(zip_path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IWGRP | stat.S_IROTH | stat.S_IWOTH) return zip_path
def test_create_script_perms(self): """this tests the create_script function (permissions). """ script_file = os.path.join(self.root, 'script') # Test non-default mode (+x) mode = (stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH) utils.create_script( script_file, 's6.run', mode=mode, user='testproid', home='home', shell='shell', _alias={ 's6_setuidgid': '/test/s6-setuidgid', } ) self.assertEqual(utils.os.stat(script_file).st_mode, 33060)
def save_app(manifest, container_dir, app_json=STATE_JSON): """Saves app manifest and freezes to object.""" # Save the manifest with allocated vip and ports in the state state_file = os.path.join(container_dir, app_json) fs.write_safe( state_file, lambda f: json.dump(manifest, f) ) # chmod for the file to be world readable. if os.name == 'posix': os.chmod( state_file, stat.S_IWUSR | stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH ) # Freeze the app data into a namedtuple object return utils.to_obj(manifest)
def test_read_only_bytecode(self): # When bytecode is read-only but should be rewritten, fail silently. with source_util.create_modules('_temp') as mapping: # Create bytecode that will need to be re-created. py_compile.compile(mapping['_temp']) bytecode_path = self.util.cache_from_source(mapping['_temp']) with open(bytecode_path, 'r+b') as bytecode_file: bytecode_file.seek(0) bytecode_file.write(b'\x00\x00\x00\x00') # Make the bytecode read-only. os.chmod(bytecode_path, stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH) try: # Should not raise OSError! self.import_(mapping['_temp'], '_temp') finally: # Make writable for eventual clean-up. os.chmod(bytecode_path, stat.S_IWUSR)
def _octal_to_perm(octal): perms = list("-" * 9) if octal & stat.S_IRUSR: perms[0] = "r" if octal & stat.S_IWUSR: perms[1] = "w" if octal & stat.S_IXUSR: perms[2] = "x" if octal & stat.S_IRGRP: perms[3] = "r" if octal & stat.S_IWGRP: perms[4] = "w" if octal & stat.S_IXGRP: perms[5] = "x" if octal & stat.S_IROTH: perms[6] = "r" if octal & stat.S_IWOTH: perms[7] = "w" if octal & stat.S_IXOTH: perms[8] = "x" return "".join(perms)
def gunzip_file(gz_path, new_path): """Unzips from gz_path into new_path. Args: gz_path: path to the zipped file. new_path: path to where the file will be unzipped. """ if tf.gfile.Exists(new_path): tf.logging.info("File %s already exists, skipping unpacking" % new_path) return tf.logging.info("Unpacking %s to %s" % (gz_path, new_path)) # We may be unpacking into a newly created directory, add write mode. mode = stat.S_IRWXU or stat.S_IXGRP or stat.S_IRGRP or stat.S_IROTH os.chmod(os.path.dirname(new_path), mode) with gzip.open(gz_path, "rb") as gz_file: with tf.gfile.GFile(new_path, mode="wb") as new_file: for line in gz_file: new_file.write(line)
def is_file_readable(filepath): """ Check if the file given is readable to the user we are currently running at """ uid = os.getuid() euid = os.geteuid() gid = os.getgid() egid = os.getegid() # This is probably true most of the time, so just let os.access() # handle it. Avoids potential bugs in the rest of this function. if uid == euid and gid == egid: return os.access(filepath, os.R_OK) st = os.stat(filepath) if st.st_uid == euid: return st.st_mode & stat.S_IRUSR != 0 groups = os.getgroups() if st.st_gid == egid or st.st_gid in groups: return st.st_mode & stat.S_IRGRP != 0 return st.st_mode & stat.S_IROTH != 0
def kill_task(task): status = check_task(task) if status == 'pending': return 'cancelled' # remove job file as well job_file = os.path.join(os.path.expanduser('~'), '.sos', 'tasks', task + '.sh') if os.path.isfile(job_file): try: os.remove(job_file) except Exception: pass if status != 'running': return status # job is running pulse_file = os.path.join(os.path.expanduser('~'), '.sos', 'tasks', task + '.pulse') from stat import S_IREAD, S_IRGRP, S_IROTH os.chmod(pulse_file, S_IREAD|S_IRGRP|S_IROTH) return 'killed'
def set_executable(top): error_count=0 def set_exec(name): mode = os.stat(name).st_mode new_mode = mode if new_mode & stat.S_IRUSR: new_mode = new_mode | stat.S_IXUSR if new_mode & stat.S_IRGRP: new_mode = new_mode | stat.S_IXGRP if new_mode & stat.S_IROTH: new_mode = new_mode | stat.S_IXOTH if (mode != new_mode): print "Setting exec for '%s' (mode %o => %o)" % (name, mode, new_mode) os.chmod(name, new_mode) def unset_exec(name): mode = os.stat(name).st_mode new_mode = mode & ~(stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH) if (mode != new_mode): print "Unsetting exec for '%s' (mode %o => %o)" % (name, mode, new_mode) os.chmod(name, new_mode) for root, dirs, files in os.walk(top): for name in files: complete_name = os.path.join(root, name) if os.path.islink(complete_name): continue try: f = open(complete_name, 'r') header = f.read(4) f.close() if header[0:2] == '#!' or header[1:4] == 'ELF': set_exec(complete_name) else: unset_exec(complete_name) except Exception as e: print "%s: %s" % (complete_name, e.__str__()) error_count += 1 for name in dirs: complete_name = os.path.join(root, name) set_exec(complete_name) return error_count
def begin_handover(self, fdtx_dir='/tmp/ave'): # call this function on the broker that is going to be replaced. # steps: # stop listening for new connections from clients, stop the notifier, # disconnect all shares, disable further allocation (a session will # receive a Restarting exception if it manages to allocate at precisely # this moment and is expected to try again once or twice), create a # UNIX domain socket to transfer file descriptors (in a separate step), # serialize the state of the local allocator and all live sessions # (PIDs and RPC keys). finally return serialized data and the path to # UNIX domain socket. # first of all check that fdtx_dir is writable. otherwise the socket # will not be created in it and everything fails if os.path.exists(fdtx_dir) and os.path.isdir(fdtx_dir): if not os.access(fdtx_dir, os.R_OK | os.X_OK | os.W_OK): raise Exception('directory not writable: %s' % fdtx_dir) self.stop_listening() self.stop_sharing() self.drop_all_shares() self.stop_listers() self.allocating = False self.fdtx = FdTx(None) uds_path = self.fdtx.listen(fdtx_dir, 'handover-%s' % rand_authkey()) # make sure the caller will be able to interact with the new socket by # making it world readable and world writable mode = (stat.S_IRUSR # owner has read permission | stat.S_IWUSR # owner has write permission | stat.S_IRGRP # group has read permission | stat.S_IWGRP # group has write permission | stat.S_IROTH # others have read permission | stat.S_IWOTH) # others have write permission os.chmod(uds_path, mode) return self.serialize(), self.config, uds_path
def assert_mode_644(mode): """Verify given mode is 644""" assert (mode & stat.S_IROTH) and (mode & stat.S_IRGRP) assert (mode & stat.S_IWUSR) and (mode & stat.S_IRUSR) and not (mode & stat.S_IXUSR)
def assert_mode_755(mode): """Verify given mode is 755""" assert (mode & stat.S_IROTH) and (mode & stat.S_IRGRP) and (mode & stat.S_IXOTH) and (mode & stat.S_IXGRP) assert (mode & stat.S_IWUSR) and (mode & stat.S_IRUSR) and (mode & stat.S_IXUSR)
def run(self): """Runs the default installer and sets rights on log directory.""" install.run(self) # Make argosd user owned of log directory uid = getpwnam('argosd').pw_uid gid = getpwnam('argosd').pw_gid os.chown(settings.LOG_PATH, uid, gid) # User has full access, others can read mode = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR | \ stat.S_IRGRP | stat.S_IROTH os.chmod(settings.LOG_PATH, mode)
def file_groupreadable(path): """Check whether a given path has bad permissons.""" if not bool(stat.S_IRGRP & path.stat().mode): return 'PROB_FILE_NOT_GRPRD'
def statinfo(st): return { 'mode' : "%04o" % stat.S_IMODE(st.st_mode), 'isdir' : stat.S_ISDIR(st.st_mode), 'ischr' : stat.S_ISCHR(st.st_mode), 'isblk' : stat.S_ISBLK(st.st_mode), 'isreg' : stat.S_ISREG(st.st_mode), 'isfifo' : stat.S_ISFIFO(st.st_mode), 'islnk' : stat.S_ISLNK(st.st_mode), 'issock' : stat.S_ISSOCK(st.st_mode), 'uid' : st.st_uid, 'gid' : st.st_gid, 'size' : st.st_size, 'inode' : st.st_ino, 'dev' : st.st_dev, 'nlink' : st.st_nlink, 'atime' : st.st_atime, 'mtime' : st.st_mtime, 'ctime' : st.st_ctime, 'wusr' : bool(st.st_mode & stat.S_IWUSR), 'rusr' : bool(st.st_mode & stat.S_IRUSR), 'xusr' : bool(st.st_mode & stat.S_IXUSR), 'wgrp' : bool(st.st_mode & stat.S_IWGRP), 'rgrp' : bool(st.st_mode & stat.S_IRGRP), 'xgrp' : bool(st.st_mode & stat.S_IXGRP), 'woth' : bool(st.st_mode & stat.S_IWOTH), 'roth' : bool(st.st_mode & stat.S_IROTH), 'xoth' : bool(st.st_mode & stat.S_IXOTH), 'isuid' : bool(st.st_mode & stat.S_ISUID), 'isgid' : bool(st.st_mode & stat.S_ISGID), }
def test_tmp_files(): tmp_dir = tempfile.mkdtemp(dir="/tmp") assert len(os.listdir(tmp_dir)) == 0 csvw = CSVW(csv_path="./tests/books.csv", metadata_path="./tests/books.csv-metadata.json", temp_dir=tmp_dir) assert len(os.listdir(tmp_dir)) == 0 csvw.to_rdf(fmt="nt") created_files = os.listdir(tmp_dir) assert len(created_files) == 1, "nt serialization should generate only 1 temp file" assert created_files[0].endswith(".nt") os.remove(os.path.join(tmp_dir, created_files[0])) assert len(os.listdir(tmp_dir)) == 0 csvw.to_rdf(fmt="turtle") created_files = os.listdir(tmp_dir) assert len(created_files) == 2, "ttl serialization should generate two temps file" assert any([f.endswith(".nt") for f in created_files]) assert any([f.endswith(".ttl") for f in created_files]) # Check permissions expected_flags = [stat.S_IRUSR, stat.S_IRGRP, stat.S_IROTH] unexpected_flags = [stat.S_IWUSR, stat.S_IWGRP, stat.S_IWOTH] for f in created_files: st = os.stat(os.path.join(tmp_dir, f)) for flag, non_flag in zip(expected_flags, unexpected_flags): assert bool(st.st_mode & flag) assert not bool(st.st_mode & non_flag) csvw.close() assert len(os.listdir(tmp_dir)) == 0
def load_package_by_remote(self): """ ??? ??????? ???? ?? :return: """ # socore package clone from remote repository repository_url = self.__score_base + ':' + self.__score_package + '.git' # Repository Key ? ?? ?? ???. logging.debug("git Path :"+self.__package_path) # repo = Repo(str(self.__package_path)) git = Git(os.getcwd()) # check deploy key if os.path.exists(conf.DEFAULT_SCORE_REPOSITORY_KEY): st = os.stat(conf.DEFAULT_SCORE_REPOSITORY_KEY) # owner read only if bool(st.st_mode & stat.S_IRGRP or st.st_mode & stat.S_IROTH): os.chmod(conf.DEFAULT_SCORE_REPOSITORY_KEY, 0o600) ssh_cmd = 'ssh -o StrictHostKeyChecking=no -i '+conf.DEFAULT_SCORE_REPOSITORY_KEY logging.debug("SSH KEY COMMAND : "+ssh_cmd) git.custom_environment(GIT_SSH_COMMAND=ssh_cmd) logging.debug(f"load_package_by_remote repository_url({repository_url}) package_path({self.__package_path})") self.__package_repository = Repo._clone(git, repository_url, self.__package_path, GitCmdObjectDB, None) logging.debug(f"load_package_by_remote result({self.__package_repository})") if conf.DEFAULT_SCORE_BRANCH != conf.DEFAULT_SCORE_BRANCH_MASTER: self.__package_repository.git.checkout(conf.DEFAULT_SCORE_BRANCH) return True
def save_code(path, code): with open(path, 'w') as f: f.write("#!/usr/bin/env bash\n") f.write(code) chmod(path, stat.S_IRWXU | stat.S_IRGRP | stat.S_IROTH)
def close(self): if self._fh: self._fh.close() self._fh = None subprocess.check_call([bgzip_exe, "--force", self._basepath]) os.rename(self._basepath + ".gz", self.filename) # open file with FastaFile to create indexes, then make all read-only _fh = FastaFile(self.filename) _fh.close() os.chmod(self.filename, stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH) os.chmod(self.filename + ".fai", stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH) os.chmod(self.filename + ".gzi", stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH) logger.info("{} written; added {} sequences".format(self.filename, len(self._added)))
def main(): parser = argparse.ArgumentParser( description='Generate a script that invokes the Dart tester') parser.add_argument('--out', help='Path to the invocation file to generate', required=True) parser.add_argument('--source-dir', help='Path to test sources', required=True) parser.add_argument('--dot-packages', help='Path to the .packages file', required=True) parser.add_argument('--test-runner', help='Path to the test runner', required=True) parser.add_argument('--flutter-shell', help='Path to the Flutter shell', required=True) args = parser.parse_args() test_file = args.out test_path = os.path.dirname(test_file) if not os.path.exists(test_path): os.makedirs(test_path) script_template = string.Template('''#!/bin/sh $test_runner \\ --packages=$dot_packages \\ --shell=$flutter_shell \\ --test-directory=$source_dir ''') with open(test_file, 'w') as file: file.write(script_template.substitute(args.__dict__)) permissions = (stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR | stat.S_IRGRP | stat.S_IWGRP | stat.S_IXGRP | stat.S_IROTH) os.chmod(test_file, permissions)
def test_path_no_perms(self): with utils.NamedTemporaryDirectory() as tmp_d: os.chmod(tmp_d, stat.S_IREAD | stat.S_IRGRP | stat.S_IROTH) self.assertFalse(utils.is_writeable(tmp_d))
def _selinuxCreateFauxFilesystems(): (fd, path) = tempfile.mkstemp(prefix="mock-selinux-plugin.") with os.fdopen(fd, 'w') as out: with open("/proc/filesystems") as host: for line in host: if "selinuxfs" not in line: out.write(line) os.chmod(path, stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH) return path
def setup_uwsgi_log(self): output_file = os.path.join('/var/log/uwsgi', 'uwsgi_{}.log'.format(self.array_name)) with open(output_file, 'a') as outFP: pass os.chmod(output_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IWGRP) print '[INFO] Updated: {}'.format(output_file)
def stats_to_str(s): return "%s%s%s%s" % (dbpcs(s.st_mode), rwx(s.st_mode, stat.S_IRUSR, stat.S_IWUSR, stat.S_IXUSR), rwx(s.st_mode, stat.S_IRGRP, stat.S_IWGRP, stat.S_IXGRP), rwx(s.st_mode, stat.S_IROTH, stat.S_IWOTH, stat.S_IXOTH))
def add_exec(file_path): """Add execution rights for user, group and others to the given file""" print("change permissions") # change Permission with bitwies or and the constants from stat modul os.chmod(file_path, stat.S_IRUSR | stat.S_IWUSR| stat.S_IXUSR | \ stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH) # global vars are very bad but I think the only solution for this
def make_pidfile(self): if not os.path.isdir(PIDFILE_PATH): os.mkdir(PIDFILE_PATH) pidfilename = os.path.join(PIDFILE_PATH, "{}.pid".format(self.name)) for fn in os.listdir(PIDFILE_PATH): if not fn.endswith('.pid') or fn.count('_') != 2: continue pid, model, clsname = fn.split('_') if clsname == self.__class__.__name__ + '.pid' and model == self.model_name: self.kill_process(fn) with open(pidfilename, 'w+') as f: f.write(str(self.process.pid)) os.chmod(pidfilename, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IWGRP | stat.S_IROTH | stat.S_IWOTH)
def checkFixPermissionsForPath(pth): mode = os.stat(pth).st_mode groupReadable = stat.S_IRGRP & mode == stat.S_IRGRP otherReadable = stat.S_IROTH & mode == stat.S_IROTH msg = '' if not groupReadable: msg += ' not group readable' mode |= stat.S_IRGRP if not otherReadable: msg += ' not other readable' mode |= stat.S_IROTH if len(msg.strip())>0: print("%s, fixing permissions for %s" % (msg, pth)) os.chmod(pth, mode)
def test_consoleloggingreporter_dir_not_writeable(consolelogger, serial_driver, tmpdir): os.chmod(str(tmpdir), stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH) def return_test(self, size=1, timeout=0.0): return b"test" serial_driver.serial.in_waiting = 4 serial_driver.serial.read = return_test serial_driver.read()
def is_executable_file(path): """Checks that path is an executable regular file (or a symlink to a file). This is roughly ``os.path isfile(path) and os.access(path, os.X_OK)``, but on some platforms :func:`os.access` gives us the wrong answer, so this checks permission bits directly. """ # follow symlinks, fpath = os.path.realpath(path) # return False for non-files (directories, fifo, etc.) if not os.path.isfile(fpath): return False # On Solaris, etc., "If the process has appropriate privileges, an # implementation may indicate success for X_OK even if none of the # execute file permission bits are set." # # For this reason, it is necessary to explicitly check st_mode # get file mode using os.stat, and check if `other', # that is anybody, may read and execute. mode = os.stat(fpath).st_mode if mode & stat.S_IROTH and mode & stat.S_IXOTH: return True # get current user's group ids, and check if `group', # when matching ours, may read and execute. user_gids = os.getgroups() + [os.getgid()] if (os.stat(fpath).st_gid in user_gids and mode & stat.S_IRGRP and mode & stat.S_IXGRP): return True # finally, if file owner matches our effective userid, # check if `user', may read and execute. user_gids = os.getgroups() + [os.getgid()] if (os.stat(fpath).st_uid == os.geteuid() and mode & stat.S_IRUSR and mode & stat.S_IXUSR): return True return False
def test_set_config_value_file_permissions(self): with mock.patch('azure.cli.core._config.GLOBAL_CONFIG_DIR', self.config_dir), \ mock.patch('azure.cli.core._config.GLOBAL_CONFIG_PATH', self.config_path): set_global_config_value('test_section', 'test_option', 'a_value') file_mode = os.stat(self.config_path).st_mode self.assertTrue(bool(file_mode & stat.S_IRUSR)) self.assertTrue(bool(file_mode & stat.S_IWUSR)) self.assertFalse(bool(file_mode & stat.S_IXUSR)) self.assertFalse(bool(file_mode & stat.S_IRGRP)) self.assertFalse(bool(file_mode & stat.S_IWGRP)) self.assertFalse(bool(file_mode & stat.S_IXGRP)) self.assertFalse(bool(file_mode & stat.S_IROTH)) self.assertFalse(bool(file_mode & stat.S_IWOTH)) self.assertFalse(bool(file_mode & stat.S_IXOTH))
def __init__(self, log, log_level, logger_name, logger_section=None): ''' Set up the logger runtime state. ''' super().__init__() self.log_file = log self.log_level = log_level self.logger_name = logger_name self.logger_section = logger_section self.name = "%s-%s" % (logger_name, logger_section) if logger_section else logger_name self.description = "logger '%s'" % self.name logf = str.format( "%(asctime)s {0}: <{1}> [%(levelname)s] %(message)s", logger_name, logger_section, ) if logger_section else "%(asctime)s %(name)s: [%(levelname)s] %(message)s" self.logger_formatter = logging.Formatter(logf) if self.log_file: util.directory_create(os.path.dirname(self.log_file)) self.logger_handler = logging.FileHandler(self.log_file) os.chmod(self.log_file, stat.S_IRUSR|stat.S_IWUSR|stat.S_IRGRP|stat.S_IROTH) else: self.logger_handler = logging.NullHandler() self.logger_handler.setFormatter(self.logger_formatter) # Initialised in start(). self.logger = None
def _rename_file(src, dst): """Rename the specified file""" if os.name == 'nt': # TODO: check that fs.rm_safe works on windows, and if not, fix # fs.rm_safe. fs.rm_safe(dst) os.rename(src, dst) mode = os.stat(dst).st_mode mode |= (stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH) if _is_executable(dst): mode |= (stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH) os.chmod(dst, mode)