我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.rmdir()。
def rollback(self): if not self.dry_run: for f in list(self.files_written): if os.path.exists(f): os.remove(f) # dirs should all be empty now, except perhaps for # __pycache__ subdirs # reverse so that subdirs appear before their parents dirs = sorted(self.dirs_created, reverse=True) for d in dirs: flist = os.listdir(d) if flist: assert flist == ['__pycache__'] sd = os.path.join(d, flist[0]) os.rmdir(sd) os.rmdir(d) # should fail if non-empty self._init_record()
def _test_NoAccessDir(self, nodeName): devBooter, devMgr = self.launchDeviceManager("/nodes/%s/DeviceManager.dcd.xml" % nodeName) device = devMgr._get_registeredDevices()[0] fileMgr = self._domMgr._get_fileMgr() dirname = '/noaccess' testdir = os.path.join(scatest.getSdrPath(), 'dom' + dirname) if not os.path.exists(testdir): os.mkdir(testdir, 0000) else: os.chmod(testdir, 0000) try: self.assertFalse(os.access(testdir, os.R_OK|os.X_OK), 'Current user can still access directory') self.assertRaises(CF.LoadableDevice.LoadFail, device.load, fileMgr, dirname, CF.LoadableDevice.SHARED_LIBRARY) finally: os.rmdir(testdir)
def test_comp_macro_directories_config_python(self): file_loc = os.getcwd() self.comp = sb.launch(self.cname, impl="python", execparams={'LOGGING_CONFIG_URI':'file://'+os.getcwd()+'/logconfig.cfg'} ) fp = None try: fp = open('foo/bar/test.log','r') except: pass try: os.remove('foo/bar/test.log') except: pass try: os.rmdir('foo/bar') except: pass try: os.rmdir('foo') except: pass self.assertNotEquals(fp, None)
def test_comp_macro_directories_config_cpp(self): file_loc = os.getcwd() self.comp = sb.launch(self.cname, impl="cpp", execparams={'LOGGING_CONFIG_URI':'file://'+os.getcwd()+'/logconfig.cfg'} ) fp = None try: fp = open('foo/bar/test.log','r') except: pass try: os.remove('foo/bar/test.log') except: pass try: os.rmdir('foo/bar') except: pass try: os.rmdir('foo') except: pass self.assertNotEquals(fp, None)
def test_comp_macro_directories_config_java(self): file_loc = os.getcwd() self.comp = sb.launch(self.cname, impl="java", execparams={'LOGGING_CONFIG_URI':'file://'+os.getcwd()+'/logconfig.cfg'} ) fp = None try: fp = open('foo/bar/test.log','r') except: pass try: os.remove('foo/bar/test.log') except: pass try: os.rmdir('foo/bar') except: pass try: os.rmdir('foo') except: pass self.assertNotEquals(fp, None)
def test_ExistsException(self): self.assertNotEqual(self._domMgr, None) fileMgr = self._domMgr._get_fileMgr() # Makes sure that FileSystem::exists() throws correct exception and # doesn't kill domain for files in directories it cannot access dirname = '/noaccess' testdir = os.path.join(scatest.getSdrPath(), 'dom' + dirname) if not os.path.exists(testdir): os.mkdir(testdir, 0644) else: os.chmod(testdir, 0644) try: self.assertFalse(os.access(testdir, os.R_OK|os.X_OK), 'Current user can still access directory') self.assertRaises(CF.InvalidFileName, fileMgr.exists, os.path.join(dirname, 'testfile')) finally: os.rmdir(testdir)
def remove_folder(self, folder): """Delete the named folder, which must be empty.""" path = os.path.join(self._path, '.' + folder) for entry in os.listdir(os.path.join(path, 'new')) + \ os.listdir(os.path.join(path, 'cur')): if len(entry) < 1 or entry[0] != '.': raise NotEmptyError('Folder contains message(s): %s' % folder) for entry in os.listdir(path): if entry != 'new' and entry != 'cur' and entry != 'tmp' and \ os.path.isdir(os.path.join(path, entry)): raise NotEmptyError("Folder contains subdirectory '%s': %s" % (folder, entry)) for root, dirs, files in os.walk(path, topdown=False): for entry in files: os.remove(os.path.join(root, entry)) for entry in dirs: os.rmdir(os.path.join(root, entry)) os.rmdir(path)
def reformat(self, sourcefile, destfile, configfile): # type: (str, str, str) -> None """Reformats sourcefile according to configfile and writes it to destfile. This method is only used for testing. """ tmpdir = tempfile.mkdtemp(prefix='whatstyle_') cfg = os.path.join(tmpdir, self.configfilename) copyfile(configfile, cfg) tmpfilename = os.path.join(tmpdir, os.path.basename(sourcefile)) copyfile(sourcefile, tmpfilename) cmdargs = [tmpfilename] exeresult = run_executable(self.exe, cmdargs) writebinary(destfile, exeresult.stdout) os.remove(tmpfilename) os.remove(cfg) os.rmdir(tmpdir)
def test_replace_loggroup_section(mocked_configparser_class): config_dir = api_utils.user_config_dir(cli.lecli.__name__) if not os.path.exists(api_utils.CONFIG_FILE_PATH): if not os.path.exists(config_dir): os.makedirs(config_dir) loggroups_section = api_utils.LOGGROUPS_SECTION config_parser_mock = mocked_configparser_class.return_value config_parser_mock.add_section(loggroups_section) with patch.object(api_utils.CONFIG, 'items', return_value=[('test-log-group-favs', ["test-log-key1", "test-log-key2"])]): api_utils.replace_loggroup_section() assert not api_utils.CONFIG.has_section(loggroups_section) assert config_parser_mock.has_section(api_utils.CLI_FAVORITES_SECTION) try: os.remove(api_utils.CONFIG_FILE_PATH) os.rmdir(config_dir) except OSError: pass
def cleanup(self): """Close and remove all created temporary files. For the purposes of debugging, files are not removed if BAP finished with a positive nonzero code. I.e., they are removed only if BAP terminated normally, or was killed by a signal (terminated). All opened file descriptros are closed in any case.""" for desc in self.fds: desc.close() if not self.DEBUG and (self.proc is None or self.proc.returncode <= 0): for path in os.listdir(self.tmpdir): os.remove(os.path.join(self.tmpdir, path)) os.rmdir(self.tmpdir)
def remove_directory_files(path, remove_directory=False, remove_path=False): "Remove directory at path, respecting the flags." for root, dirs, files in os.walk(path, False): # Ignore path if remove_path is false. if remove_path or root != path: for name in files: filename = os.path.join(root, name) try: os.remove(filename) except OSError: pass # Ignore directory if remove_directory is false. if remove_directory: try: os.rmdir(root) except OSError: pass
def unlock(self): '''does not raise an exception, safe to unlock as often as you want it may just do nothing''' if self.locked.has_key(self.d2): #we're the ones that unlocked it, #if time matched if self.locked[self.d2]==\ os.stat(self.d2)[8]: try: del(self.locked[self.d2]) os.rmdir(self.d2) os.rmdir(self.d) return 1 except: return 0 else: del(self.locked[self.d2]) return 0 else: return 0
def prepare_working_dir(directory_path, purge=False): folders = ["/findings/", "/corpus", "/findings/panic", "/findings/kasan", "/findings/timeout", "/rbuf", "/evaluation"] if purge: if os.path.isdir(directory_path): for root, dirs, files in os.walk(directory_path, topdown=False): for name in files: os.remove(os.path.join(root, name)) for name in dirs: os.rmdir(os.path.join(root, name)) if os.path.exists("/dev/shm/kafl_filter0"): os.remove("/dev/shm/kafl_filter0") if os.path.exists("/dev/shm/kafl_tfilter"): os.remove("/dev/shm/kafl_tfilter") if len(os.listdir(directory_path)) == 0: for folder in folders: os.makedirs(directory_path + folder) else: for folder in folders: if not os.path.isdir(directory_path + folder): return False return True
def test_log_file_with_timed_rotating(self): tmpdir = tempfile.mkdtemp() try: self.options.log_file_prefix = tmpdir + '/test_log' self.options.log_rotate_mode = 'time' enable_pretty_logging(options=self.options, logger=self.logger) self.logger.error('hello') self.logger.handlers[0].flush() filenames = glob.glob(tmpdir + '/test_log*') self.assertEqual(1, len(filenames)) with open(filenames[0]) as f: self.assertRegexpMatches( f.read(), r'^\[E [^]]*\] hello$') finally: for handler in self.logger.handlers: handler.flush() handler.close() for filename in glob.glob(tmpdir + '/test_log*'): os.unlink(filename) os.rmdir(tmpdir)
def test_log_file(self): tmpdir = tempfile.mkdtemp() try: self.options.log_file_prefix = tmpdir + '/test_log' enable_pretty_logging(options=self.options, logger=self.logger) self.assertEqual(1, len(self.logger.handlers)) self.logger.error('hello') self.logger.handlers[0].flush() filenames = glob.glob(tmpdir + '/test_log*') self.assertEqual(1, len(filenames)) with open(filenames[0]) as f: self.assertRegexpMatches(f.read(), r'^\[E [^]]*\] hello$') finally: for handler in self.logger.handlers: handler.flush() handler.close() for filename in glob.glob(tmpdir + '/test_log*'): os.unlink(filename) os.rmdir(tmpdir)
def retrive_osd_details(device_name): osd_details = {} if device_name is None: return None try: tmpd = tempfile.mkdtemp() log.info("Create temp directory %s" %(tmpd)) try: out_mnt = utils.execute_local_command(['mount',device_name,tmpd]) if out_mnt['retcode'] == 0: osd_details = _retrive_osd_details_from_dir(tmpd) finally: utils.execute_local_command(['umount',tmpd]) finally: log.info("Destroy temp directory %s" %(tmpd)) os.rmdir(tmpd) return osd_details
def paired_files(): root = tempfile.mkdtemp() files = [[root, 'file1.mp3'], [root, 'file1.jams'], [root, 'file2.ogg'], [root, 'file2.jams'], [root, 'file3.wav'], [root, 'file3.jams'], [root, 'file4.flac'], [root, 'file4.jams']] files = [os.sep.join(_) for _ in files] for fname in files: with open(fname, 'w'): pass yield root, files for fname in files: try: os.remove(fname) except FileNotFoundError: pass os.rmdir(root)
def symlinks_supported(): """ A function to check if creating symlinks are supported in the host platform and/or if they are allowed to be created (e.g. on Windows it requires admin permissions). """ tmpdir = tempfile.mkdtemp() original_path = os.path.join(tmpdir, 'original') symlink_path = os.path.join(tmpdir, 'symlink') os.makedirs(original_path) try: os.symlink(original_path, symlink_path) supported = True except (OSError, NotImplementedError, AttributeError): supported = False else: os.remove(symlink_path) finally: os.rmdir(original_path) os.rmdir(tmpdir) return supported
def test_forge_globus_download(): f = forge.Forge() # Simple case res1 = f.globus_download(example_result1) assert os.path.exists("./test_fetch.txt") os.remove("./test_fetch.txt") # With dest and preserve_dir dest_path = os.path.expanduser("~/mdf") f.globus_download(example_result1, dest=dest_path, preserve_dir=True) assert os.path.exists(os.path.join(dest_path, "test", "test_fetch.txt")) os.remove(os.path.join(dest_path, "test", "test_fetch.txt")) os.rmdir(os.path.join(dest_path, "test")) # With multiple files f.globus_download(example_result2, dest=dest_path) assert os.path.exists(os.path.join(dest_path, "test_fetch.txt")) assert os.path.exists(os.path.join(dest_path, "test_multifetch.txt")) os.remove(os.path.join(dest_path, "test_fetch.txt")) os.remove(os.path.join(dest_path, "test_multifetch.txt"))
def remove(self, rec=1, ignore_errors=False): """ remove a file or directory (or a directory tree if rec=1). if ignore_errors is True, errors while removing directories will be ignored. """ if self.check(dir=1, link=0): if rec: # force remove of readonly files on windows if iswin32: self.chmod(448, rec=1) # octcal 0700 py.error.checked_call(py.std.shutil.rmtree, self.strpath, ignore_errors=ignore_errors) else: py.error.checked_call(os.rmdir, self.strpath) else: if iswin32: self.chmod(448) # octcal 0700 py.error.checked_call(os.remove, self.strpath)
def testFindBrainClasses(self): brainDir = "tmp-test-brains" brainModule1 = "barBrain" brainClass1 = "BarBrain" brainModule2 = "fooBrain" brainClass2 = "FooBrain" brainFile1 = os.path.join(brainDir, "%s.py" % brainModule1) brainFile2 = os.path.join(brainDir, "%s.py" % brainModule2) os.mkdir(brainDir) open(brainFile1, "w+").write("\n") open(brainFile2, "w+").write("\n") expected = [brainModule1 + "." + brainClass1, brainModule2 + "." + brainClass2] try: self.assertEqual(sorted(expected), sorted(simLauncher.SimulatorLauncher.findBrainClasses(brainDir))) finally: os.unlink(brainFile1) os.unlink(brainFile2) os.rmdir(brainDir)
def testWriteMapFile_writesStringIntoFile(self): dirname = "testWriteMapFile" filename = "test.txt" filepath = os.path.join(dirname, filename) os.mkdir(dirname) mObj = gameMap.GameMap() mObj.loadMapFile("maps/test-room2-l-shape.txt") expected = mObj.toText() mObj.writeMapFile(filepath) actual = None if os.path.exists(filepath): actual = open(filepath, "r").read() try: self.assertEqual(expected, actual) finally: if os.path.exists(filepath): os.unlink(filepath) os.rmdir(dirname)
def cleanAll(path=None): trash = getTrashFolder(path) if not os.path.isdir(trash): print "[Trashcan] No trash.", trash return 0 for root, dirs, files in os.walk(trash, topdown=False): for name in files: fn = os.path.join(root, name) try: enigma.eBackgroundFileEraser.getInstance().erase(fn) except Exception, e: print "[Trashcan] Failed to erase %s:"% name, e # Remove empty directories if possible for name in dirs: try: os.rmdir(os.path.join(root, name)) except: pass
def destroy(self): """Removes any files in this storage object and then removes the storage object's directory. What happens if any of the files or the directory are in use depends on the underlying platform. """ # Remove all files self.clean() try: # Try to remove the directory os.rmdir(self.folder) except IOError: e = sys.exc_info()[1] if e.errno == errno.ENOENT: pass else: raise e
def __del__(self): if os.path.isfile(os.path.join(self.rootdir, 'GAMIT.status')): os.remove(os.path.join(self.rootdir, 'GAMIT.status')) if os.path.isfile(os.path.join(self.rootdir, 'GAMIT.fatal')): os.remove(os.path.join(self.rootdir, 'GAMIT.fatal')) if os.path.isfile(os.path.join(self.rootdir, 'grdtab.out')): os.remove(os.path.join(self.rootdir, 'grdtab.out')) if os.path.isfile(os.path.join(self.rootdir, 'harpos.' + self.StationCode)): os.remove(os.path.join(self.rootdir, 'harpos.' + self.StationCode)) if os.path.isfile(os.path.join(self.rootdir, 'otl.grid')): os.remove(os.path.join(self.rootdir, 'otl.grid')) if os.path.isfile(os.path.join(self.rootdir, 'ufile.' + self.StationCode)): os.remove(os.path.join(self.rootdir, 'ufile.' + self.StationCode)) if os.path.isdir(self.rootdir): os.rmdir(self.rootdir)
def remove_empty_folders(folder): for dirpath, _, files in os.walk(folder, topdown=False): # Listing the files for file in files: if file.endswith('DS_Store'): # delete the stupid mac files try: os.remove(os.path.join(dirpath, file)) except: sys.exc_clear() if dirpath == folder: break try: os.rmdir(dirpath) except OSError: sys.exc_clear() return
def contain(command, image_name, image_dir, container_id, container_dir): linux.unshare(linux.CLONE_NEWNS) # create a new mount namespace linux.mount(None, '/', None, linux.MS_PRIVATE | linux.MS_REC, None) new_root = create_container_root( image_name, image_dir, container_id, container_dir) print('Created a new root fs for our container: {}'.format(new_root)) _create_mounts(new_root) old_root = os.path.join(new_root, 'old_root') os.makedirs(old_root) linux.pivot_root(new_root, old_root) os.chdir('/') linux.umount2('/old_root', linux.MNT_DETACH) # umount old root os.rmdir('/old_root') # rmdir the old_root dir os.execvp(command[0], command)
def contain(command, image_name, image_dir, container_id, container_dir): linux.unshare(linux.CLONE_NEWNS) # create a new mount namespace # TODO: switch to a new UTS namespace, change hostname to container_id # HINT: use linux.sethostname() linux.mount(None, '/', None, linux.MS_PRIVATE | linux.MS_REC, None) new_root = create_container_root( image_name, image_dir, container_id, container_dir) print('Created a new root fs for our container: {}'.format(new_root)) _create_mounts(new_root) old_root = os.path.join(new_root, 'old_root') os.makedirs(old_root) linux.pivot_root(new_root, old_root) os.chdir('/') linux.umount2('/old_root', linux.MNT_DETACH) # umount old root os.rmdir('/old_root') # rmdir the old_root dir os.execvp(command[0], command)
def contain(command, image_name, image_dir, container_id, container_dir): linux.sethostname(container_id) # change hostname to container_id linux.mount(None, '/', None, linux.MS_PRIVATE | linux.MS_REC, None) new_root = create_container_root( image_name, image_dir, container_id, container_dir) print('Created a new root fs for our container: {}'.format(new_root)) _create_mounts(new_root) old_root = os.path.join(new_root, 'old_root') os.makedirs(old_root) linux.pivot_root(new_root, old_root) os.chdir('/') linux.umount2('/old_root', linux.MNT_DETACH) # umount old root os.rmdir('/old_root') # rmdir the old_root dir os.execvp(command[0], command)
def clear_cache_directory(): for root, dirs, files in os.walk(download_cache_directory): for d in dirs: if d == '.git': dirpath = os.path.join(root,d) # print('rm dir ' + dirpath) try: os.rmdir(dirpath) except: pass for f in files: if f.endswith('.h') or f.endswith('.m'): continue filepath = os.path.join(root, f) #remove try: # print('remove ' + filepath) os.remove(filepath) except: pass
def tree_delete_selected(self): try: item = self.parent.widget_tree.item(self.parent.widget_tree.focus())["tags"][0] os.remove(item) print("{} | Deleting: {}".format(datetime.now().strftime("%H:%M:%S"), item)) except FileNotFoundError: for root, dirs, files in os.walk(self.parent.widget_tree.item(self.parent.widget_tree.focus())["tags"][1], topdown=False): for name in files: os.remove(os.path.join(root, name)) print("{} | Deleting: {}".format(datetime.now().strftime("%H:%M:%S"), name)) for name in dirs: os.rmdir(os.path.join(root, name)) print("{} | Deleting: {}".format(datetime.now().strftime("%H:%M:%S"), name)) item = self.parent.widget_tree.item(self.parent.widget_tree.focus())["tags"][1] os.rmdir(item) print("{} | Deleting: {}".format(datetime.now().strftime("%H:%M:%S"), item)) self.parent.cmd.tree_refresh()
def run(parser, args): tmpdir = tempfile.mkdtemp(dir='.') cmd = ("porechop --verbosity 2 --untrimmed -i \"%s\" -b %s --barcode_threshold 80 --threads %s --check_reads 10000 --barcode_diff 5 --require_two_barcodes > %s.demultiplexreport.txt" % (args.fasta, tmpdir, args.threads, args.fasta)) print >>sys.stderr, cmd os.system(cmd) a, b = os.path.split(args.fasta) prefix, ext = os.path.splitext(b) for fn in os.listdir(tmpdir): newfn = "%s-%s" % (prefix, os.path.basename(fn)) shutil.move(tmpdir + '/' + fn, newfn) os.system("gunzip -f %s" % (newfn,)) if not args.no_remove_directory: os.rmdir(tmpdir)
def get_key(self, name): try: # pull it down from ipfs self.api.get('{}/names/{}'.format(self.root_hash, name)) # stores in cwdir, so load the files into memory and delete them n = self.file_to_memory('{}/{}/n'.format(os.getcwd(), name)) e = self.file_to_memory('{}/{}/e'.format(os.getcwd(), name)) os.rmdir('{}/{}'.format(os.getcwd(), name)) return (n, e) except: return False
def release(self): if not self.is_locked(): raise NotLocked("%s is not locked" % self.path) elif not os.path.exists(self.unique_name): raise NotMyLock("%s is locked, but not by me" % self.path) os.unlink(self.unique_name) os.rmdir(self.lock_file)