我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用stat.S_IRUSR。
def t05(w,h): pretty = '%s t5' % __file__ print(pretty) result_path = w.make_tempdir() os.chmod(result_path, stat.S_IRUSR) # user read permission only try: h.run_gtest('some_target', result_path) print( 'FAIL %s: expected error (not writable result directory): %s' % (pretty, temp_dir) ) return False except Exception as e: if e.message != 'result_path not a writable directory: %s' % result_path: print('FAIL %s: wrong error message: %s' % (pretty, str(e))) return False finally: os.removedirs(result_path) return True # successful execution of run_gtest(): check files on host, verfiy their # content and that traces from run removed from handset
def commit(self): """ Writes the configuration to disk (into '$RootDir/EXAConf') """ self.config.write() # reload in order to force type conversion # --> parameters added as lists during runtime are converted back to strings (as if they have been added manually) self.config.reload() # modify permissions try: os.chmod(self.conf_path, stat.S_IRUSR | stat.S_IWUSR) except OSError as e: raise EXAConfError("Failed to change permissions for '%s': %s" % (self.conf_path, e)) #}}} #{{{ Platform supported
def main(args): vm_name = args['<vm_name>'] # get domain from libvirt con = libvirt.open('qemu:///system') domain = con.lookupByName(vm_name) path = os.path.join(os.getcwd(), '{}.raw'.format(vm_name)) with open(path, 'w') as f: # chmod to be r/w by everyone os.chmod(path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IWGRP | stat.S_IROTH | stat.S_IWOTH) # take a ram dump flags = libvirt.VIR_DUMP_MEMORY_ONLY dumpformat = libvirt.VIR_DOMAIN_CORE_DUMP_FORMAT_RAW domain.coreDumpWithFormat(path, dumpformat, flags)
def cleanupdir(temporarydir): osgen = os.walk(temporarydir) try: while True: i = osgen.next() ## make sure all directories can be accessed for d in i[1]: if not os.path.islink(os.path.join(i[0], d)): os.chmod(os.path.join(i[0], d), stat.S_IRUSR|stat.S_IWUSR|stat.S_IXUSR) for p in i[2]: try: if not os.path.islink(os.path.join(i[0], p)): os.chmod(os.path.join(i[0], p), stat.S_IRUSR|stat.S_IWUSR|stat.S_IXUSR) except Exception, e: #print e pass except StopIteration: pass try: shutil.rmtree(temporarydir) except: ## nothing that can be done right now, so just give up pass
def test_read_only_bytecode(self): # When bytecode is read-only but should be rewritten, fail silently. with source_util.create_modules('_temp') as mapping: # Create bytecode that will need to be re-created. py_compile.compile(mapping['_temp']) bytecode_path = imp.cache_from_source(mapping['_temp']) with open(bytecode_path, 'r+b') as bytecode_file: bytecode_file.seek(0) bytecode_file.write(b'\x00\x00\x00\x00') # Make the bytecode read-only. os.chmod(bytecode_path, stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH) try: # Should not raise IOError! self.import_(mapping['_temp'], '_temp') finally: # Make writable for eventual clean-up. os.chmod(bytecode_path, stat.S_IWUSR)
def test_execute_bit_not_copied(self): # Issue 6070: under posix .pyc files got their execute bit set if # the .py file had the execute bit set, but they aren't executable. with temp_umask(0o022): sys.path.insert(0, os.curdir) try: fname = TESTFN + os.extsep + "py" open(fname, 'w').close() os.chmod(fname, (stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH)) fn = imp.cache_from_source(fname) unlink(fn) __import__(TESTFN) if not os.path.exists(fn): self.fail("__import__ did not result in creation of " "either a .pyc or .pyo file") s = os.stat(fn) self.assertEqual(stat.S_IMODE(s.st_mode), stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH) finally: del sys.path[0] remove_files(TESTFN) unload(TESTFN)
def set_own_perm(usr, dir_list): uid = pwd.getpwnam(usr).pw_uid gid = grp.getgrnam(usr).gr_gid perm_mask_rw = stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IWGRP perm_mask_rwx = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR | stat.S_IRGRP | stat.S_IWGRP | stat.S_IXGRP for cdir in dir_list: os.chown(cdir, uid, gid) os.chmod(cdir, perm_mask_rwx) for croot, sub_dirs, cfiles in os.walk(cdir): for fs_name in sub_dirs: os.chown(os.path.join(croot, fs_name), uid, gid) os.chmod(os.path.join(croot, fs_name), perm_mask_rwx) for fs_name in cfiles: os.chown(os.path.join(croot, fs_name), uid, gid) os.chmod(os.path.join(croot, fs_name), perm_mask_rw)
def write(self): """ Writes configuration file """ # wait for file to unlock. Timeout after 5 seconds for _ in range(5): if not self._writing: self._writing = True break time.sleep(1) else: _LOGGER.error('Could not write to configuration file. It is busy.') return # dump JSON to config file and unlock encoded = self.encode() json.dump(encoded, open(self._filetmp, 'w'), sort_keys=True, indent=4, separators=(',', ': ')) os.chmod(self._filetmp, stat.S_IRUSR | stat.S_IWUSR) try: shutil.move(self._filetmp, self._file) _LOGGER.debug('Config file succesfully updated.') except shutil.Error as e: _LOGGER.error('Failed to move temp config file to original error: ' + str(e)) self._writing = False _LOGGER.debug('Wrote configuration file')
def test_execute_bit_not_copied(self): # Issue 6070: under posix .pyc files got their execute bit set if # the .py file had the execute bit set, but they aren't executable. oldmask = os.umask(022) sys.path.insert(0, os.curdir) try: fname = TESTFN + os.extsep + "py" f = open(fname, 'w').close() os.chmod(fname, (stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH)) __import__(TESTFN) fn = fname + 'c' if not os.path.exists(fn): fn = fname + 'o' if not os.path.exists(fn): self.fail("__import__ did not result in creation of " "either a .pyc or .pyo file") s = os.stat(fn) self.assertEqual(stat.S_IMODE(s.st_mode), stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH) finally: os.umask(oldmask) remove_files(TESTFN) unload(TESTFN) del sys.path[0]
def test_readonly_files(self): dir = _fname os.mkdir(dir) try: fname = os.path.join(dir, 'db') f = dumbdbm.open(fname, 'n') self.assertEqual(list(f.keys()), []) for key in self._dict: f[key] = self._dict[key] f.close() os.chmod(fname + ".dir", stat.S_IRUSR) os.chmod(fname + ".dat", stat.S_IRUSR) os.chmod(dir, stat.S_IRUSR|stat.S_IXUSR) f = dumbdbm.open(fname, 'r') self.assertEqual(sorted(f.keys()), sorted(self._dict)) f.close() # don't write finally: test_support.rmtree(dir)
def _init_client(): global _configuration_directory def prompt(msg): sys.stdout.write('%s: ' % msg) response = sys.stdin.readline() return response.rstrip('\r\n') client_id = prompt('Client Id') client_secret = prompt('Client Secret') redirect_uri = prompt('Redirect Uri') if not os.path.exists(_configuration_directory): os.mkdir(_configuration_directory, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR) client = _CommandClient(client_id, client_secret, redirect_uri) _init_oauth_process(client) # save first time, meaning configuration works client.save_configuration() return client
def create_ca(self): """ Create a new CA """ temp_ca_key_file = '/tmp/{}'.format(self.ca_name) temp_ca_cert_file = '{}.pub'.format(temp_ca_key_file) ca_key_file = self.full_path(self.ca_path, self.ca_name) ca_cert_file = '{}.pub'.format(ca_key_file) subprocess.call(['ssh-keygen', '-f', temp_ca_key_file, '-q', '-P', '']) if not os.path.exists(self.ca_path): self.mkdir_recursive(self.ca_path) # If CA was successfully created move it to the correct location and # set appropriate permissions if os.path.isfile(temp_ca_key_file) and os.path.isfile(temp_ca_cert_file): os.rename(temp_ca_cert_file, ca_cert_file) os.chmod(ca_cert_file, stat.S_IRUSR) os.rename(temp_ca_key_file, ca_key_file) os.chmod(ca_key_file, stat.S_IRUSR)
def test_execute_bit_not_copied(self): # Issue 6070: under posix .pyc files got their execute bit set if # the .py file had the execute bit set, but they aren't executable. oldmask = os.umask(0o22) sys.path.insert(0, os.curdir) try: fname = TESTFN + os.extsep + "py" f = open(fname, 'w').close() os.chmod(fname, (stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH)) __import__(TESTFN) fn = fname + 'c' if not os.path.exists(fn): fn = fname + 'o' if not os.path.exists(fn): self.fail("__import__ did not result in creation of " "either a .pyc or .pyo file") s = os.stat(fn) assert(s.st_mode & (stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)) assert(not (s.st_mode & (stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH))) finally: os.umask(oldmask) clean_tmpfiles(fname) unload(TESTFN) del sys.path[0]
def test_mknod_dir_fd(self): # Test using mknodat() to create a FIFO (the only use specified # by POSIX). support.unlink(support.TESTFN) mode = stat.S_IFIFO | stat.S_IRUSR | stat.S_IWUSR f = posix.open(posix.getcwd(), posix.O_RDONLY) try: posix.mknod(support.TESTFN, mode, 0, dir_fd=f) except OSError as e: # Some old systems don't allow unprivileged users to use # mknod(), or only support creating device nodes. self.assertIn(e.errno, (errno.EPERM, errno.EINVAL)) else: self.assertTrue(stat.S_ISFIFO(posix.stat(support.TESTFN).st_mode)) finally: posix.close(f)
def secure_file(fname, remove_existing=False): flags = os.O_WRONLY | os.O_CREAT | os.O_EXCL # Refer to "man 2 open". mode = stat.S_IRUSR | stat.S_IWUSR # This is 0o600 in octal and 384 in decimal. if remove_existing: # For security, remove file with potentially elevated mode try: os.remove(fname) except OSError: pass # Open file descriptor umask_original = os.umask(0) try: fdesc = os.open(fname, flags, mode) finally: os.umask(umask_original) return fdesc
def testXid(self): sc = StringCodec() xid = Xid(format=0, global_id="gid", branch_id="bid") sc.write_compound(xid) assert sc.encoded == '\x00\x00\x00\x10\x06\x04\x07\x00\x00\x00\x00\x00\x03gid\x03bid' dec = sc.read_compound(Xid) assert xid.__dict__ == dec.__dict__ # def testLoadReadOnly(self): # spec = "amqp.0-10-qpid-errata.xml" # f = testrunner.get_spec_file(spec) # dest = tempfile.mkdtemp() # shutil.copy(f, dest) # shutil.copy(os.path.join(os.path.dirname(f), "amqp.0-10.dtd"), dest) # os.chmod(dest, stat.S_IRUSR | stat.S_IXUSR) # fname = os.path.join(dest, spec) # load(fname) # assert not os.path.exists("%s.pcl" % fname)
def store_index(self,ipath): self.logger.info("# indexed_fasta.store_index('%s')" % ipath) # write to tmp-file first and in the end rename in order to have this atomic # otherwise parallel building of the same index may screw it up. import tempfile tmp = tempfile.NamedTemporaryFile(mode="w",dir = os.path.dirname(ipath),delete=False) for chrom in sorted(self.chrom_stats.keys()): ofs,ldata,skip,skipchar,size = self.chrom_stats[chrom] tmp.write("%s\t%d\t%d\t%d\t%r\t%d\n" % (chrom,ofs,ldata,skip,skipchar,size)) # make sure everything is on disk os.fsync(tmp) tmp.close() # make it accessible to everyone import stat os.chmod(tmp.name, stat.S_IROTH | stat.S_IRGRP | stat.S_IRUSR) # this is atomic on POSIX as we have created tmp in the same directory, # therefore same filesystem os.rename(tmp.name,ipath)
def store_index(self,ipath): debug("# indexed_fasta.store_index('%s')" % ipath) # write to tmp-file first and in the end rename in order to have this atomic # otherwise parallel building of the same index may screw it up. import tempfile tmp = tempfile.NamedTemporaryFile(mode="w",dir = os.path.dirname(ipath),delete=False) for chrom in sorted(self.chrom_stats.keys()): ofs,ldata,skip,skipchar,size = self.chrom_stats[chrom] tmp.write("%s\t%d\t%d\t%d\t%r\t%d\n" % (chrom,ofs,ldata,skip,skipchar,size)) # make sure everything is on disk os.fsync(tmp) tmp.close() # make it accessible to everyone import stat os.chmod(tmp.name, stat.S_IROTH | stat.S_IRGRP | stat.S_IRUSR) # this is atomic on POSIX as we have created tmp in the same directory, # therefore same filesystem os.rename(tmp.name,ipath)
def login(func): """Install Python function as a LoginHook.""" def func_wrapper(hook='login'): path = '/var/root/Library/mh_%shook.py' % hook writesource(func, path) # only root should read and execute os.chown(path, 0, 0) os.chmod(path, stat.S_IXUSR | stat.S_IRUSR) if hook == 'login': hooks.login(path) else: hooks.logout(path) return path return func_wrapper
def tearDown(self): ''' Disconnect ramdisk, unloading data to pre-build location. ''' self.mbl.chownR(self.keyuser, self.tmphome + "/src") # chmod so it's readable by everyone, writable by the group self.mbl.chmodR(stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH | stat.S_IWGRP, self.tmphome + "/src", "append") self.libc.sync() self.libc.sync() # Copy back to pseudo-build directory call([self.RSYNC, "-aqp", self.tmphome + "/src/MacBuild/dmgs", self.buildHome]) call([self.RSYNC, "-aqp", self.tmphome + "/src/", self.buildHome + "/builtSrc"]) self.libc.sync() self.libc.sync() os.chdir(self.buildHome) self._exit(self.ramdisk, self.luggage, 0)
def _create_eae_zipfile(self, zip_file_name, main_file_path, data_files=None): to_zip = [] if data_files is None: data_files = [] # Handle main script to_zip.append(main_file_path) # Prepare the zip file zip_path = "/tmp/" + zip_file_name zipf = zipfile.ZipFile(zip_path, mode='w', compression=zipfile.ZIP_DEFLATED, allowZip64=True) for f in to_zip: zipf.write(f) zipf.close() # Handle other files & dirs for f in data_files: zipCommand = "zip -r -u -0 " + zip_path + " " + f call([zipCommand], shell=True) # Chmod 666 the zip file so it can be accessed os.chmod(zip_path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IWGRP | stat.S_IROTH | stat.S_IWOTH) return zip_path
def test_create_script_perms(self): """this tests the create_script function (permissions). """ script_file = os.path.join(self.root, 'script') # Test non-default mode (+x) mode = (stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH) utils.create_script( script_file, 's6.run', mode=mode, user='testproid', home='home', shell='shell', _alias={ 's6_setuidgid': '/test/s6-setuidgid', } ) self.assertEqual(utils.os.stat(script_file).st_mode, 33060)
def save_app(manifest, container_dir, app_json=STATE_JSON): """Saves app manifest and freezes to object.""" # Save the manifest with allocated vip and ports in the state state_file = os.path.join(container_dir, app_json) fs.write_safe( state_file, lambda f: json.dump(manifest, f) ) # chmod for the file to be world readable. if os.name == 'posix': os.chmod( state_file, stat.S_IWUSR | stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH ) # Freeze the app data into a namedtuple object return utils.to_obj(manifest)
def test_read_only_bytecode(self): # When bytecode is read-only but should be rewritten, fail silently. with source_util.create_modules('_temp') as mapping: # Create bytecode that will need to be re-created. py_compile.compile(mapping['_temp']) bytecode_path = self.util.cache_from_source(mapping['_temp']) with open(bytecode_path, 'r+b') as bytecode_file: bytecode_file.seek(0) bytecode_file.write(b'\x00\x00\x00\x00') # Make the bytecode read-only. os.chmod(bytecode_path, stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH) try: # Should not raise OSError! self.import_(mapping['_temp'], '_temp') finally: # Make writable for eventual clean-up. os.chmod(bytecode_path, stat.S_IWUSR)
def test_mknod(self): # Test using mknod() to create a FIFO (the only use specified # by POSIX). support.unlink(support.TESTFN) mode = stat.S_IFIFO | stat.S_IRUSR | stat.S_IWUSR try: posix.mknod(support.TESTFN, mode, 0) except OSError as e: # Some old systems don't allow unprivileged users to use # mknod(), or only support creating device nodes. self.assertIn(e.errno, (errno.EPERM, errno.EINVAL)) else: self.assertTrue(stat.S_ISFIFO(posix.stat(support.TESTFN).st_mode)) # Keyword arguments are also supported support.unlink(support.TESTFN) try: posix.mknod(path=support.TESTFN, mode=mode, device=0, dir_fd=None) except OSError as e: self.assertIn(e.errno, (errno.EPERM, errno.EINVAL))
def _get_netrc(cls): # Buffer the '.netrc' path. Use an empty file if it doesn't exist. path = cls.get_netrc_path() content = '' if os.path.exists(path): st = os.stat(path) if st.st_mode & (stat.S_IRWXG | stat.S_IRWXO): print >> sys.stderr, ( 'WARNING: netrc file %s cannot be used because its file ' 'permissions are insecure. netrc file permissions should be ' '600.' % path) with open(path) as fd: content = fd.read() # Load the '.netrc' file. We strip comments from it because processing them # can trigger a bug in Windows. See crbug.com/664664. content = '\n'.join(l for l in content.splitlines() if l.strip() and not l.strip().startswith('#')) with tempdir() as tdir: netrc_path = os.path.join(tdir, 'netrc') with open(netrc_path, 'w') as fd: fd.write(content) os.chmod(netrc_path, (stat.S_IRUSR | stat.S_IWUSR)) return cls._get_netrc_from_path(netrc_path)
def create_boilerplate(): """ Create kibitzr.yml and kibitzr-creds.yml in current directory if they do not exist. """ if not os.path.exists('kibitzr.yml'): with open('kibitzr.yml', 'wt') as fp: logger.info("Saving sample check in kibitzr.yml") fp.write(KIBITZR_YML) else: logger.info("kibitzr.yml already exists. Skipping") if not os.path.exists('kibitzr-creds.yml'): with open('kibitzr-creds.yml', 'wt') as fp: logger.info("Creating kibitzr-creds.yml") fp.write(KIBITZR_CREDS_YML) os.chmod('kibitzr-creds.yml', stat.S_IRUSR | stat.S_IWUSR) else: logger.info("kibitzr-creds.yml already exists. Skipping")
def _octal_to_perm(octal): perms = list("-" * 9) if octal & stat.S_IRUSR: perms[0] = "r" if octal & stat.S_IWUSR: perms[1] = "w" if octal & stat.S_IXUSR: perms[2] = "x" if octal & stat.S_IRGRP: perms[3] = "r" if octal & stat.S_IWGRP: perms[4] = "w" if octal & stat.S_IXGRP: perms[5] = "x" if octal & stat.S_IROTH: perms[6] = "r" if octal & stat.S_IWOTH: perms[7] = "w" if octal & stat.S_IXOTH: perms[8] = "x" return "".join(perms)
def init_workspace(cfg, is_inplace=False): log.info("init workspace is_inplace:%r",is_inplace) if is_inplace: init_workspace_inplace(cfg) else: # create a clean workspace workspace_dir = os.path.join(BASE_HOME,cfg.exec_home) if not os.path.exists(workspace_dir): os.mkdir(workspace_dir) else: shutil.rmtree(workspace_dir) os.mkdir(workspace_dir) cfg.workspace_dir = workspace_dir # copy target file into workspace if os.path.exists(cfg.target): shutil.copy(cfg.target, workspace_dir) target_abs_path = os.path.join(workspace_dir, os.path.basename(cfg.target)) cfg.target_abs_path = target_abs_path os.chmod(target_abs_path, stat.S_IRUSR) else: log.critical("%s dose not exist.", cfg.target) os._exit(1) log.info("Target absolute path: %s", cfg.target_abs_path) # I can not change dir at here, the dir will be changed everytime when analyzer starts.
def write(filename, data, undisclosed=False): """ Write JSON data to (owner only readable) file. This will also create missing directories. Args: filename (str): file path data (str): data to write undisclosed (bool): whether data should be owner only readable """ makedirs(os.path.dirname(filename)) with open(filename, 'w') as fp: if undisclosed: os.chmod(filename, stat.S_IRUSR | stat.S_IWUSR) json.dump(data, fp)
def open_for_writing(filename, permissions=None, mode="w", force=False): if permissions is None: permissions = stat.S_IRUSR | stat.S_IWUSR if force and os.path.exists(filename): os.unlink(filename) umask_original = os.umask(0) try: fd = os.open(filename, os.O_WRONLY | os.O_CREAT | os.O_EXCL, permissions) finally: os.umask(umask_original) # Open file handle and write to file with os.fdopen(fd, mode) as f: yield f
def printlog(message, logpath=False): # I think the logging module is great, but this will be used for the time being # Eventually, we will want to write out multiple output formats: xml,json, etc if logpath: # Next iteration of project will include a more secure logger, # for now, we will just write results to a file directly. # flags = os.O_WRONLY | os.O_CREAT | os.O_EXCL # mode = stat.S_IRUSR | stat.S_IWUSR # umask_original = os.umask(0) # try: # fdesc = os.open(logpath, flags, mode) # # except(OSError): # # print("[-] Log file exists. Remove it, or change log filename") # # raise SystemExit # finally: # os.umask(umask_original) # with os.fdopen(fdesc, 'w') as fout: with open(logpath, 'a') as fout: fout.write("{}\n".format(message)) print("{}".format(message))
def is_file_readable(filepath): """ Check if the file given is readable to the user we are currently running at """ uid = os.getuid() euid = os.geteuid() gid = os.getgid() egid = os.getegid() # This is probably true most of the time, so just let os.access() # handle it. Avoids potential bugs in the rest of this function. if uid == euid and gid == egid: return os.access(filepath, os.R_OK) st = os.stat(filepath) if st.st_uid == euid: return st.st_mode & stat.S_IRUSR != 0 groups = os.getgroups() if st.st_gid == egid or st.st_gid in groups: return st.st_mode & stat.S_IRGRP != 0 return st.st_mode & stat.S_IROTH != 0
def test_safe_makedirs(self): from softwarecenter.utils import safe_makedirs from tempfile import mkdtemp tmp = mkdtemp() # create base dir target = os.path.join(tmp, "foo", "bar") safe_makedirs(target) # we need the patch to ensure that the code is actually executed with patch("os.path.exists") as mock_: mock_.return_value = False self.assertTrue(os.path.isdir(target)) # ensure that creating the base dir again does not crash safe_makedirs(target) self.assertTrue(os.path.isdir(target)) # ensure we still get regular errors like permission denied # (stat.S_IRUSR) os.chmod(os.path.join(tmp, "foo"), 0400) self.assertRaises(OSError, safe_makedirs, target) # set back to stat.(S_IRUSR|S_IWUSR|S_IXUSR) to make rmtree work os.chmod(os.path.join(tmp, "foo"), 0700) # cleanup shutil.rmtree(tmp)
def test_no_write_access_for_cache_dir(self): """ test for bug LP: #688682 """ # make the test cache dir non-writeable import softwarecenter.paths cache_dir = softwarecenter.paths.SOFTWARE_CENTER_CACHE_DIR # set not-writable (mode 0400) os.chmod(cache_dir, stat.S_IRUSR) self.assertFalse(os.access(cache_dir, os.W_OK)) # and then start up the logger import softwarecenter.log softwarecenter.log # pyflakes # check that the old directory was moved aside (renamed with a ".0" appended) self.assertTrue(os.path.exists(cache_dir + ".0")) self.assertFalse(os.path.exists(cache_dir + ".1")) # and check that the new directory was created and is now writable self.assertTrue(os.path.exists(cache_dir)) self.assertTrue(os.access(cache_dir, os.W_OK))
def apply_playbook(playbook, tags=None, extra_vars=None): tags = tags or [] tags = ",".join(tags) charmhelpers.contrib.templating.contexts.juju_state_to_yaml( ansible_vars_path, namespace_separator='__', allow_hyphens_in_keys=False, mode=(stat.S_IRUSR | stat.S_IWUSR)) # we want ansible's log output to be unbuffered env = os.environ.copy() env['PYTHONUNBUFFERED'] = "1" call = [ 'ansible-playbook', '-c', 'local', playbook, ] if tags: call.extend(['--tags', '{}'.format(tags)]) if extra_vars: extra = ["%s=%s" % (k, v) for k, v in extra_vars.items()] call.extend(['--extra-vars', " ".join(extra)]) subprocess.check_call(call, env=env)
def testPermission(self): with TemporaryDirectory() as tmp: d = os.path.join(tmp, "dir") os.mkdir(d) with open(os.path.join(d, "file"), "w") as f: f.write("data") os.chmod(d, stat.S_IRUSR | stat.S_IXUSR) self.assertRaises(BuildError, removePath, tmp) os.chmod(d, stat.S_IRWXU)