我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用shutil.rmtree()。
def __close_computation(self, client=None, await_async=False, task=None): self.__server_locations.clear() if self._cur_computation: close_tasks = [SysTask(self.__close_node, node, self._cur_computation, await_async=await_async) for node in self._nodes.itervalues()] close_tasks.extend([SysTask(self.__close_node, node, self._cur_computation) for node in self._disabled_nodes.itervalues() if node.status == Scheduler.NodeDiscovered]) for close_task in close_tasks: yield close_task.finish() if self.__cur_client_auth: computation_path = os.path.join(self.__dest_path, self.__cur_client_auth) if os.path.isdir(computation_path): shutil.rmtree(computation_path, ignore_errors=True) if self._cur_computation and self._cur_computation.status_task: self._cur_computation.status_task.send(DispycosStatus(Scheduler.ComputationClosed, id(self._cur_computation))) self.__cur_client_auth = self._cur_computation = None self.__computation_sched_event.set() if client: client.send('closed') raise StopIteration(0)
def __close_computation(self, client=None, await_async=False, task=None): self.__server_locations.clear() if self._cur_computation: close_tasks = [SysTask(self.__close_node, node, self._cur_computation, await_async=await_async) for node in self._nodes.values()] close_tasks.extend([SysTask(self.__close_node, node, self._cur_computation) for node in self._disabled_nodes.values() if node.status == Scheduler.NodeDiscovered]) for close_task in close_tasks: yield close_task.finish() if self.__cur_client_auth: computation_path = os.path.join(self.__dest_path, self.__cur_client_auth) if os.path.isdir(computation_path): shutil.rmtree(computation_path, ignore_errors=True) if self._cur_computation and self._cur_computation.status_task: self._cur_computation.status_task.send(DispycosStatus(Scheduler.ComputationClosed, id(self._cur_computation))) self.__cur_client_auth = self._cur_computation = None self.__computation_sched_event.set() if client: client.send('closed') raise StopIteration(0)
def archive_context(filename): # extracting the archive tmpdir = tempfile.mkdtemp() log.warn('Extracting in %s', tmpdir) old_wd = os.getcwd() try: os.chdir(tmpdir) with get_zip_class()(filename) as archive: archive.extractall() # going in the directory subdir = os.path.join(tmpdir, os.listdir(tmpdir)[0]) os.chdir(subdir) log.warn('Now working in %s', subdir) yield finally: os.chdir(old_wd) shutil.rmtree(tmpdir)
def _delete_directory_contents(self, dirpath, filter_func): """Delete all files in a directory. :param dirpath: path to directory to clear :type dirpath: ``unicode`` or ``str`` :param filter_func function to determine whether a file shall be deleted or not. :type filter_func ``callable`` """ if os.path.exists(dirpath): for filename in os.listdir(dirpath): if not filter_func(filename): continue path = os.path.join(dirpath, filename) if os.path.isdir(path): shutil.rmtree(path) else: os.unlink(path) self.logger.debug('Deleted : %r', path)
def generate2(): """ Call an external Python 2 program to retrieve the AST symbols of that language version :return: """ import subprocess as sp import tempfile, shutil, sys, traceback tempdir = tempfile.mkdtemp() tempfile = os.path.join(tempdir, "py2_ast_code.py") py2_proc_out = "" try: with open(tempfile, 'w') as py2code: py2code.write(generate_str + WRITESYMS_CODE) py2_proc_out = sp.check_output(["python2", tempfile]).decode() finally: try: shutil.rmtree(tempdir) except: print("Warning: error trying to delete the temporal directory:", file=sys.stderr) print(traceback.format_exc(), file=sys.stderr) return set(py2_proc_out.splitlines())
def ensure_removed(self, path): if os.path.exists(path): if os.path.isdir(path) and not os.path.islink(path): logger.debug('Removing directory tree at %s', path) if not self.dry_run: shutil.rmtree(path) if self.record: if path in self.dirs_created: self.dirs_created.remove(path) else: if os.path.islink(path): s = 'link' else: s = 'file' logger.debug('Removing %s %s', s, path) if not self.dry_run: os.remove(path) if self.record: if path in self.files_written: self.files_written.remove(path)
def test_install(): tempdir = mkdtemp() def get_supported(): return list(wheel.pep425tags.get_supported()) + [('py3', 'none', 'win32')] whl = WheelFile(TESTWHEEL, context=get_supported) assert whl.supports_current_python(get_supported) try: locs = {} for key in ('purelib', 'platlib', 'scripts', 'headers', 'data'): locs[key] = os.path.join(tempdir, key) os.mkdir(locs[key]) whl.install(overrides=locs) assert len(os.listdir(locs['purelib'])) == 0 assert check(locs['platlib'], 'hello.pyd') assert check(locs['platlib'], 'hello', 'hello.py') assert check(locs['platlib'], 'hello', '__init__.py') assert check(locs['data'], 'hello.dat') assert check(locs['headers'], 'hello.dat') assert check(locs['scripts'], 'hello.sh') assert check(locs['platlib'], 'test-1.0.dist-info', 'RECORD') finally: shutil.rmtree(tempdir)
def run(self): self.run_command("egg_info") from glob import glob for pattern in self.match: pattern = self.distribution.get_name() + '*' + pattern files = glob(os.path.join(self.dist_dir, pattern)) files = [(os.path.getmtime(f), f) for f in files] files.sort() files.reverse() log.info("%d file(s) matching %s", len(files), pattern) files = files[self.keep:] for (t, f) in files: log.info("Deleting %s", f) if not self.dry_run: if os.path.isdir(f): shutil.rmtree(f) else: os.unlink(f)
def build_and_install(self, setup_script, setup_base): args = ['bdist_egg', '--dist-dir'] dist_dir = tempfile.mkdtemp( prefix='egg-dist-tmp-', dir=os.path.dirname(setup_script) ) try: self._set_fetcher_options(os.path.dirname(setup_script)) args.append(dist_dir) self.run_setup(setup_script, setup_base, args) all_eggs = Environment([dist_dir]) eggs = [] for key in all_eggs: for dist in all_eggs[key]: eggs.append(self.install_egg(dist.location, setup_base)) if not eggs and not self.dry_run: log.warn("No eggs found in %s (setup script problem?)", dist_dir) return eggs finally: rmtree(dist_dir) log.set_verbosity(self.verbose) # restore our log verbosity
def upload_to_fileshare_test(self): #pylint: disable=no-self-use """Upload copies files to non-native store correctly with no progress""" import shutil import tempfile temp_file = tempfile.NamedTemporaryFile(dir=tempfile.mkdtemp()) temp_src_dir = os.path.dirname(temp_file.name) temp_dst_dir = tempfile.mkdtemp() shutil_mock = MagicMock() shutil_mock.copyfile.return_value = None with patch('sfctl.custom_app.shutil', new=shutil_mock): sf_c.upload_to_fileshare(temp_src_dir, temp_dst_dir, False) shutil_mock.copyfile.assert_called_once() temp_file.close() shutil.rmtree(os.path.dirname(temp_file.name)) shutil.rmtree(temp_dst_dir)
def create(self, data): if not os.path.exists(const.REPOS_DIR): os.mkdir(const.REPOS_DIR) repo_path = os.path.join(const.REPOS_DIR, data['repo_name']) if os.path.exists(repo_path): logger.debug('Repo directory exists. Removing...') shutil.rmtree(repo_path) user_key = data.get('user_key', '') if user_key: self._create_key_file(data['repo_name'], user_key) os.environ['GIT_SSH'] = self._get_ssh_cmd(data['repo_name']) repo = Repo.clone_from(data['git_url'], repo_path) instance = super(GitRepo, self).create(data) instance.repo = repo return instance
def tearDown(self): """clean up the test """ shutil.rmtree(defaults.server_side_storage_path) # remove generic_temp_folder shutil.rmtree(self.temp_test_data_folder, ignore_errors=True) # remove repository shutil.rmtree(self.test_repo_path, ignore_errors=True) # clean up test database # from stalker.db.declarative import Base # Base.metadata.drop_all(db.DBSession.connection()) # db.DBSession.commit() db.DBSession.remove() testing.tearDown()
def close(self): # TODO: should I do clean shutdown here? Do I have to? if self._makefile_refs < 1: self._closed = True if self.context: CoreFoundation.CFRelease(self.context) self.context = None if self._client_cert_chain: CoreFoundation.CFRelease(self._client_cert_chain) self._client_cert_chain = None if self._keychain: Security.SecKeychainDelete(self._keychain) CoreFoundation.CFRelease(self._keychain) shutil.rmtree(self._keychain_dir) self._keychain = self._keychain_dir = None return self.socket.close() else: self._makefile_refs -= 1
def clean(args, config): """ Main entrypoint for clean """ lock_file = join(HACKSPORTS_ROOT, "deploy.lock") # remove staging directories if os.path.isdir(STAGING_ROOT): logger.info("Removing the staging directories") shutil.rmtree(STAGING_ROOT) # remove lock file if os.path.isfile(lock_file): logger.info("Removing the stale lock file") os.remove(lock_file) #TODO: potentially perform more cleaning
def _test_Valgrind(self, valgrind): # Clear the device cache to prevent false positives deviceCacheDir = os.path.join(scatest.getSdrCache(), ".ExecutableDevice_node", "ExecutableDevice1") shutil.rmtree(deviceCacheDir, ignore_errors=True) os.environ['VALGRIND'] = valgrind try: # Checking that the node and device launch as expected nb, devMgr = self.launchDeviceManager("/nodes/test_ExecutableDevice_node/DeviceManager.dcd.xml") finally: del os.environ['VALGRIND'] self.assertFalse(devMgr is None) self.assertEquals(len(devMgr._get_registeredDevices()), 1, msg='device failed to launch with valgrind') children = getChildren(nb.pid) self.assertEqual(len(children), 1) devMgr.shutdown() # Check that a valgrind logfile exists logfile = os.path.join(deviceCacheDir, 'valgrind.%s.log' % children[0]) self.assertTrue(os.path.exists(logfile))
def createTestDomain(): domainName = getTestDomainName() print domainName domainPath = os.path.join(getSdrPath(), "dom", "domain") templatePath = os.path.join(getSdrPath(), "templates", "domain") # Create a test domain if os.path.isdir(domainPath): shutil.rmtree(domainPath) # Copy the template over shutil.copytree(templatePath, domainPath) # Open the DMD file and replace the name, using a very naive method dmd = open(os.path.join(domainPath, "DomainManager.dmd.xml"), "r") lines = dmd.read() dmd.close() lines = lines.replace("${DOMAINNAME}", domainName) dmd = open(os.path.join(domainPath, "DomainManager.dmd.xml"), "w+") dmd.write(lines) dmd.close() setupDeviceAndDomainMgrPackage()
def _demo(): info("hi") debug("shouldn't appear") set_level(DEBUG) debug("should appear") dir = "/tmp/testlogging" if os.path.exists(dir): shutil.rmtree(dir) with session(dir=dir): record_tabular("a", 3) record_tabular("b", 2.5) dump_tabular() record_tabular("b", -2.5) record_tabular("a", 5.5) dump_tabular() info("^^^ should see a = 5.5") record_tabular("b", -2.5) dump_tabular() record_tabular("a", "longasslongasslongasslongasslongasslongassvalue") dump_tabular()
def start_ab3(tmp_dir_loc, repo_dir, pkg_info, rm_abdir=False): start_time = int(time.time()) os.chdir(tmp_dir_loc) if not copy_abd(tmp_dir_loc, repo_dir, pkg_info): return False # For logging support: ptyprocess.PtyProcessUnicode.spawn(['autobuild']) shadow_defines_loc = os.path.abspath(os.path.curdir) if not parser_pass_through(pkg_info, shadow_defines_loc): return False try: subprocess.check_call(['autobuild']) except: return False time_span = int(time.time()) - start_time print('>>>>>>>>>>>>>>>>>> Time for building\033[36m {} \033[0m:\033[36m {} \033[0mseconds'.format( pkg_info['NAME'], time_span)) if rm_abdir is True: shutil.rmtree(os.path.abspath(os.path.curdir) + '/autobuild/') # Will get better display later return True
def build(**args): freezer = Freezer(app) if args.get('base_url'): app.config['FREEZER_BASE_URL'] = args.get('base_url') app.config['mathjax_node'] = args.get('node_mathjax', False) app.config['MATHJAX_WHOLEBOOK'] = args.get('node_mathjax', False) app.config['FREEZER_DESTINATION'] = os.path.join(curdir, "build") app.config['freeze'] = True freezer.freeze() if args.get('copy_mathjax'): mathjax_postfix = os.path.join('assets', "js", "mathjax") mathjax_from = os.path.join(scriptdir, mathjax_postfix) mathjax_to = os.path.join(curdir, "build", mathjax_postfix) try: shutil.rmtree(mathjax_to) except FileNotFoundError: pass shutil.copytree(mathjax_from, mathjax_to)
def archive_context(filename): """ Unzip filename to a temporary directory, set to the cwd. The unzipped target is cleaned up after. """ tmpdir = tempfile.mkdtemp() log.warn('Extracting in %s', tmpdir) old_wd = os.getcwd() try: os.chdir(tmpdir) with ContextualZipFile(filename) as archive: archive.extractall() # going in the directory subdir = os.path.join(tmpdir, os.listdir(tmpdir)[0]) os.chdir(subdir) log.warn('Now working in %s', subdir) yield finally: os.chdir(old_wd) shutil.rmtree(tmpdir)
def createDir(name, force=False): if os.path.exists(name): if force: shutil.rmtree(name) else: response = raw_input('%s already exists. Do you wish to overwrite it? (y/n) ' % name) if response.lower() == 'y' or response.lower() == 'yes': shutil.rmtree(name) elif response.lower() == 'n' or response.lower() == 'no': print 'Modeler aborted.' exit(0) else: print 'Response not understood.' print 'Modeler aborted.' exit(1) os.mkdir(name)
def delete_module(self,name): try: file = open("{directory}/route.py".format(directory=BASE),"r+") readfile = file.readlines() file.seek(0) #delete = ("\n \nfrom openedoo.{module} import {module}".format(module=name)) for line in readfile: if str(name) not in line: file.writelines(line) file.truncate() file.close() shutil.rmtree('{dir_file}/modules/{name}'.format(dir_file=BASE_DIR,name=name)) file = open("{directory}/tables.py".format(directory=BASE),"r+") readfile = file.readlines() file.seek(0) #delete = ("\n \nfrom openedoo.{module} import {module}".format(module=name)) for line in readfile: if str(name) not in line: file.writelines(line) file.truncate() file.close() except Exception as e: print e
def archive_context(filename): # extracting the archive tmpdir = tempfile.mkdtemp() log.warn('Extracting in %s', tmpdir) old_wd = os.getcwd() try: os.chdir(tmpdir) with ContextualZipFile(filename) as archive: archive.extractall() # going in the directory subdir = os.path.join(tmpdir, os.listdir(tmpdir)[0]) os.chdir(subdir) log.warn('Now working in %s', subdir) yield finally: os.chdir(old_wd) shutil.rmtree(tmpdir)
def clean_project_files(path_or_glob, logger) : """ Resolve file name references and ensure they are properly deleted """ if "*" in path_or_glob : files_to_clean = glob.glob(path_or_glob) else : files_to_clean = [os.path.expanduser(path_or_glob)] for file_to_clean in files_to_clean : if not os.path.exists(file_to_clean) : continue if os.path.isdir(file_to_clean) : logger.info("Removing directory {}".format(file_to_clean)) shutil.rmtree(file_to_clean) else : logger.info("Removing file {}".format(file_to_clean)) os.remove(file_to_clean)
def cleanup_dir(tmpdir, keep_data_files=False, ignore_errors=False): if keep_data_files: return #Remove our tmpdir, but don't fail the test if it doesn't remove try: shutil.rmtree(tmpdir, ignore_errors=ignore_errors) except OSError as oe: error = "" if oe.errno: error = "%s: " % oe.errno if oe.strerror: error += oe.strerror if oe.filename: error += " (filename: %s)" % oe.filename log.warning("Unable to remove powstream temporary directory %s due to error reported by OS: %s" % (tmpdir, error)) except: log.warning("Unable to remove powstream temporary directory %s: %s" % (tmpdir, sys.exc_info()[0])) ## # Called by signal handlers to clean-up then exit
def test_get_picture(self): picture_root = os.path.join( MockRunMod.load_config(None)[helper.DATA_ROOT], '..', 'pictures') try: shutil.rmtree(picture_root) except: pass try: os.makedirs(picture_root) except: pass with open(os.path.join(picture_root, 'test.jpg'), 'wb') as out: out.write(b'abcde') rv = self.app.get('/picture?name=test.jpg&mediatype=image/jpeg') expected = b'abcde' actual = rv.data self.assertEqual(expected, actual)
def test_simple(self): mock_pipeline = test_helper.get_mock_pipeline([]) data_root = os.path.join('local_data', 'unittests') if os.path.exists(data_root): shutil.rmtree(data_root) _copy = copy_file.Subscriber(mock_pipeline) _copy.setup({ helper.DATA_ROOT: data_root, 'workers': 1, 'tag': 'default', helper.COPY_EXT: ['xyz'] }) _copy.consume(document.get_document('mock.xyz'), BytesIO(b'juba.')) _copy.consume(document.get_document('ignore.doc'), BytesIO(b'mock')) expected = ['39bbf948-mock.xyz'] actual = os.listdir(os.path.join(data_root, 'files', 'xyz')) self.assertEqual(expected, actual)
def test_simple(self): mock_pipeline = test_helper.get_mock_pipeline([]) data_root = os.path.join('local_data', 'unittests') if os.path.exists(data_root): shutil.rmtree(data_root) _store_text = store_text.Subscriber(mock_pipeline) _store_text.setup({ helper.DATA_ROOT: data_root, 'workers': 1 }) doc = document.get_document('mock') doc.text = 'mock-mock-mock' _store_text.consume(doc, None) expected = 'local_data/unittests/text/17404a59-mock' actual = doc.meta['text_file'] self.assertEquals(expected, actual)
def test_event_logging(): logdir = './experiment/' summary_writer = FileWriter(logdir) scalar_value = 1.0 s = scalar('test_scalar', scalar_value) summary_writer.add_summary(s, global_step=1) summary_writer.close() assert os.path.isdir(logdir) assert len(os.listdir(logdir)) == 1 summary_writer = FileWriter(logdir) scalar_value = 1.0 s = scalar('test_scalar', scalar_value) summary_writer.add_summary(s, global_step=1) summary_writer.close() assert os.path.isdir(logdir) assert len(os.listdir(logdir)) == 2 # clean up. shutil.rmtree(logdir)
def clean(mcu_switch=None, supress_output=False): cmd = TOOLCHAIN_BASIC_CONFIGURE + ' ' if mcu_switch is None: sphinx_build_dir = os.path.join('build', 'sphinx') if os.path.isdir(sphinx_build_dir): shutil.rmtree(sphinx_build_dir) print "Successfully removed sphinx documentation" else: print 'Nothing to clean...' return elif mcu_switch == '-p' or mcu_switch == '-s' or mcu_switch == '-b' : cmd += ' ' + mcu_switch + ' ' + 'clean' else: print 'Invalid clean argument: \'{}\''.format(mcu_switch) sys.exit(1) start_process(cmd, supress_output)
def neo4j_test_ws_dir(datafiles): return datafiles # @pytest.fixture(scope="session") # def workspace(request, data_directory): # wsconf_file = data_directory.join("workspace.yaml") # temp_root = tempfile.mkdtemp() # ws = Workspace("saapy-test-ws", # temp_root, # "saapy-test-ws", # configuration_text=wsconf_file.read_text("utf-8")) # # def fin(): # shutil.rmtree(temp_root) # # request.addfinalizer(fin) # return ws # provide the fixture value
def stop(self): print("Cleaning up self %s." % self.getName()) # kill the running self _uncheckedDockerCommand(["kill", self.getName()]) # remove self _uncheckedDockerCommand(["rm", self.getName()]) # remove image -- later may be a preference to keep it #_uncheckedDockerCommand(["rmi", self.getImageName()]) # remove shared directory try: if "BP_LEAVE_FILES" in os.environ: pass else: shutil.rmtree(self.getSharedDir()) except OSError: print("Warning: failed to remove shared directory.", file=sys.stderr)
def compare_component_output(self, input_path, expected_output_path): rendering_engine = self.get_rendering_engine() temp_dir = tempfile.gettempdir() output_dir = os.path.join(temp_dir, str(uuid.uuid4())) process_sketch_archive(zip_path=input_path, compress_zip=False, output_path=output_dir, engine=rendering_engine) self.assertTrue(dircmp.is_same(expected_output_path, output_dir)) shutil.rmtree(output_dir) storage.clear() output_zip = os.path.join(temp_dir, "{}.zip".format(str(uuid.uuid4()))) process_sketch_archive(zip_path=input_path, compress_zip=True, output_path=output_zip, engine=rendering_engine) z = zipfile.ZipFile(output_zip) z.extractall(output_dir) self.assertTrue(dircmp.is_same(expected_output_path, output_dir)) shutil.rmtree(output_dir) os.remove(output_zip)
def __init__(self, **kwargs): self.__class__._instance = self self._nodes = {} self._disabled_nodes = {} self._avail_nodes = set() self._nodes_avail = pycos.Event() self._nodes_avail.clear() self._shared = False self._cur_computation = None self.__cur_client_auth = None self.__cur_node_allocations = [] self.__pulse_interval = kwargs.pop('pulse_interval', MaxPulseInterval) self.__ping_interval = kwargs.pop('ping_interval', 0) self.__zombie_period = kwargs.pop('zombie_period', 100 * MaxPulseInterval) self._node_port = kwargs.pop('dispycosnode_port', 51351) self.__server_locations = set() self.__job_scheduler_task = None kwargs['name'] = 'dispycos_scheduler' clean = kwargs.pop('clean', False) nodes = kwargs.pop('nodes', []) self.pycos = pycos.Pycos.instance(**kwargs) self.__dest_path = os.path.join(self.pycos.dest_path, 'dispycos', 'dispycosscheduler') if clean: shutil.rmtree(self.__dest_path) self.pycos.dest_path = self.__dest_path self.__computation_sched_event = pycos.Event() self.__computation_scheduler_task = SysTask(self.__computation_scheduler_proc, nodes) self.__client_task = SysTask(self.__client_proc) self.__timer_task = SysTask(self.__timer_proc) Scheduler.__status_task = self.__status_task = SysTask(self.__status_proc) self.__client_task.register('dispycos_scheduler') self.pycos.discover_peers(port=self._node_port)
def sync_directory(src, dest, opts=None): if os.path.exists(dest): logging.debug('Removing existing directory: %s' % dest) shutil.rmtree(dest) logging.info('Syncing directory: %s -> %s.' % (src, dest)) shutil.copytree(src, dest, ignore=get_filter(opts)) ensure_init(dest)
def rmdir(d): ''' Remove an existingdirectory''' if os.path.exists(d): shutil.rmtree(d)
def gen_makeself(conf_dir,alias): mkself_tmp = os.path.join(conf_dir,'tmp') conf_mkself = os.path.join(conf_dir,'Installers') if not os.path.exists(conf_mkself): os.makedirs(conf_mkself) if not os.path.exists(mkself_tmp): os.makedirs(mkself_tmp) if sys.platform.startswith('darwin'): alias_app = os.path.join(conf_dir,'{}.app'.format(alias)) if os.path.exists(alias_app): run_command('cp -R {} {}'.format(alias_app,mkself_tmp)) gen_osx_plist(alias,mkself_tmp) gen_st_setup(alias,mkself_tmp) mkself_installer = 'bash "{}" "{}" "{}/{}_Installer" "Stitch" bash st_setup.sh'.format(mkself_exe, mkself_tmp, conf_mkself,alias) st_log.info(mkself_installer) st_log.info(run_command(mkself_installer)) shutil.rmtree(mkself_tmp) else: binry_dir = os.path.join(conf_dir,'Binaries') alias_dir = os.path.join(binry_dir, alias) if os.path.exists(alias_dir): run_command('cp -R {} {}'.format(alias_dir,mkself_tmp)) gen_lnx_daemon(alias,mkself_tmp) gen_st_setup(alias,mkself_tmp) mkself_installer = 'bash "{}" "{}" "{}/{}_Installer" "Stitch" bash st_setup.sh'.format(mkself_exe, mkself_tmp, conf_mkself,alias) st_log.info(mkself_installer) st_log.info(run_command(mkself_installer)) shutil.rmtree(mkself_tmp)
def tearDown(self): """ Destroy the app """ if self.app: self.app.frame.Hide() self.app.OnCloseFrame(wx.PyEvent()) self.app.frame.Destroy() del os.environ['PYUPDATER_FILESERVER_DIR'] del os.environ['WXUPDATEDEMO_TESTING'] shutil.rmtree(self.fileServerDir)