我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用importlib.invalidate_caches()。
def test_include_1(): sub_ffi = FFI() sub_ffi.cdef("static const int k2 = 121212;") sub_ffi.include(original_ffi) assert 'macro FOOBAR' in original_ffi._parser._declarations assert 'macro FOOBAZ' in original_ffi._parser._declarations sub_ffi.set_source('re_python_pysrc', None) sub_ffi.emit_python_code(str(tmpdir.join('_re_include_1.py'))) # if sys.version_info[:2] >= (3, 3): import importlib importlib.invalidate_caches() # issue 197 (but can't reproduce myself) # from _re_include_1 import ffi assert ffi.integer_const('FOOBAR') == -42 assert ffi.integer_const('FOOBAZ') == -43 assert ffi.integer_const('k2') == 121212 lib = ffi.dlopen(extmod) # <- a random unrelated library would be fine assert lib.FOOBAR == -42 assert lib.FOOBAZ == -43 assert lib.k2 == 121212 # p = ffi.new("bar_t *", [5, b"foobar"]) assert p.a[4] == ord('a')
def addPyFile(self, path): """ Add a .py or .zip dependency for all tasks to be executed on this SparkContext in the future. The C{path} passed can be either a local file, a file in HDFS (or other Hadoop-supported filesystems), or an HTTP, HTTPS or FTP URI. """ self.addFile(path) (dirname, filename) = os.path.split(path) # dirname may be directory or HDFS/S3 prefix if filename[-4:].lower() in self.PACKAGE_EXTENSIONS: self._python_includes.append(filename) # for tests in local mode sys.path.insert(1, os.path.join(SparkFiles.getRootDirectory(), filename)) if sys.version > '3': import importlib importlib.invalidate_caches()
def _raw_import(path): # First, invalidate caches! # From the docs: # "If you are dynamically importing a module that was created since the interpreter began execution # (e.g., created a Python source file), you may need to call invalidate_caches() in order for the # new module to be noticed by the import system." # Since we're dealing with config files, we need to support re-reading config files # which are modified after the interpreter started. # See: # https://docs.python.org/3/library/importlib.html#importlib.import_module # https://docs.python.org/3/library/importlib.html#importlib.invalidate_caches try: invalidate_caches = importlib.invalidate_caches except AttributeError: # python2 pass else: # python3 invalidate_caches() # Now can safely import the module return importlib.import_module(path)
def import_arbitrary(fileName, modName): """ Returns a module reference. This function acts like a Python import statement, but it will import an arbitrary Python file located at any path. """ importlib.invalidate_caches() loader = ilmac.SourceFileLoader(modName.replace('.', '_'), fileName) spec = ilutil.spec_from_loader(loader.name, loader) mod = ilutil.module_from_spec(spec) loader.exec_module(mod) return mod # End import_arbitrary
def test_failing_import_sticks(self): source = TESTFN + ".py" with open(source, "w") as f: print("a = 1/0", file=f) # New in 2.4, we shouldn't be able to import that no matter how often # we try. sys.path.insert(0, os.curdir) importlib.invalidate_caches() if TESTFN in sys.modules: del sys.modules[TESTFN] try: for i in [1, 2, 3]: self.assertRaises(ZeroDivisionError, __import__, TESTFN) self.assertNotIn(TESTFN, sys.modules, "damaged module in sys.modules on %i try" % i) finally: del sys.path[0] remove_files(TESTFN)
def test_file_to_source(self): # check if __file__ points to the source file where available source = TESTFN + ".py" with open(source, "w") as f: f.write("test = None\n") sys.path.insert(0, os.curdir) try: mod = __import__(TESTFN) self.assertTrue(mod.__file__.endswith('.py')) os.remove(source) del sys.modules[TESTFN] make_legacy_pyc(source) importlib.invalidate_caches() mod = __import__(TESTFN) base, ext = os.path.splitext(mod.__file__) self.assertIn(ext, ('.pyc', '.pyo')) finally: del sys.path[0] remove_files(TESTFN) if TESTFN in sys.modules: del sys.modules[TESTFN]
def test_package___cached__(self): # Like test___cached__ but for packages. def cleanup(): rmtree('pep3147') unload('pep3147.foo') unload('pep3147') os.mkdir('pep3147') self.addCleanup(cleanup) # Touch the __init__.py with open(os.path.join('pep3147', '__init__.py'), 'w'): pass with open(os.path.join('pep3147', 'foo.py'), 'w'): pass importlib.invalidate_caches() m = __import__('pep3147.foo') init_pyc = imp.cache_from_source( os.path.join('pep3147', '__init__.py')) self.assertEqual(m.__cached__, os.path.join(os.curdir, init_pyc)) foo_pyc = imp.cache_from_source(os.path.join('pep3147', 'foo.py')) self.assertEqual(sys.modules['pep3147.foo'].__cached__, os.path.join(os.curdir, foo_pyc))
def setUp(self): test.support.rmtree(self.tagged) test.support.rmtree(self.package_name) self.orig_sys_path = sys.path[:] # create a sample package; imagine you have a package with a tag and # you want to symbolically link it from its untagged name. os.mkdir(self.tagged) self.addCleanup(test.support.rmtree, self.tagged) init_file = os.path.join(self.tagged, '__init__.py') test.support.create_empty_file(init_file) assert os.path.exists(init_file) # now create a symlink to the tagged package # sample -> sample-tagged os.symlink(self.tagged, self.package_name, target_is_directory=True) self.addCleanup(test.support.unlink, self.package_name) importlib.invalidate_caches() self.assertEqual(os.path.isdir(self.package_name), True) assert os.path.isfile(os.path.join(self.package_name, '__init__.py'))
def test_file_parse(self): # issue1134: all encodings outside latin-1 and utf-8 fail on # multiline strings and long lines (>512 columns) unload(TESTFN) filename = TESTFN + ".py" f = open(filename, "w", encoding="cp1252") sys.path.insert(0, os.curdir) try: with f: f.write("# -*- coding: cp1252 -*-\n") f.write("'''A short string\n") f.write("'''\n") f.write("'A very long string %s'\n" % ("X" * 1000)) importlib.invalidate_caches() __import__(TESTFN) finally: del sys.path[0] unlink(filename) unlink(filename + "c") unlink(filename + "o") unload(TESTFN)
def test_side_effect_import(self): code = """if 1: import threading def target(): import random t = threading.Thread(target=target) t.start() t.join()""" sys.path.insert(0, os.curdir) self.addCleanup(sys.path.remove, os.curdir) filename = TESTFN + ".py" with open(filename, "wb") as f: f.write(code.encode('utf-8')) self.addCleanup(unlink, filename) self.addCleanup(forget, TESTFN) importlib.invalidate_caches() __import__(TESTFN)
def setUp(self): self.pkgname = os.path.join(self.longname) self.subpkgname = os.path.join(self.longname, self.longname) # Make the package and subpackage shutil.rmtree(self.pkgname, ignore_errors=True) os.mkdir(self.pkgname) create_empty_file(os.path.join(self.pkgname, '__init__.py')) shutil.rmtree(self.subpkgname, ignore_errors=True) os.mkdir(self.subpkgname) create_empty_file(os.path.join(self.subpkgname, '__init__.py')) # Remember where we are self.here = os.getcwd() sys.path.insert(0, self.here) # When regrtest is run with its -j option, this command alone is not # enough. importlib.invalidate_caches()
def test_method(self): self._check_path_limitations('qux') eq = self.assertEqual write_file(os.path.join(self.subpkgname, 'qux.py'), '''\ class aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa: def amethod(self): pass ''') importlib.invalidate_caches() from areallylongpackageandmodulenametotestreprtruncation.areallylongpackageandmodulenametotestreprtruncation import qux # Unbound methods first r = repr(qux.aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa.amethod) self.assertTrue(r.startswith('<function aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa.amethod'), r) # Bound method next iqux = qux.aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa() r = repr(iqux.amethod) self.assertTrue(r.startswith( '<bound method aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa.amethod of <%s.aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa object at 0x' \ % (qux.__name__,) ), r)
def test_package___cached__(self): # Like test___cached__ but for packages. def cleanup(): rmtree('pep3147') unload('pep3147.foo') unload('pep3147') os.mkdir('pep3147') self.addCleanup(cleanup) # Touch the __init__.py with open(os.path.join('pep3147', '__init__.py'), 'w'): pass with open(os.path.join('pep3147', 'foo.py'), 'w'): pass importlib.invalidate_caches() m = __import__('pep3147.foo') init_pyc = importlib.util.cache_from_source( os.path.join('pep3147', '__init__.py')) self.assertEqual(m.__cached__, os.path.join(os.curdir, init_pyc)) foo_pyc = importlib.util.cache_from_source(os.path.join('pep3147', 'foo.py')) self.assertEqual(sys.modules['pep3147.foo'].__cached__, os.path.join(os.curdir, foo_pyc))
def test_forget(self): mod_filename = TESTFN + '.py' with open(mod_filename, 'w') as f: print('foo = 1', file=f) sys.path.insert(0, os.curdir) importlib.invalidate_caches() try: mod = __import__(TESTFN) self.assertIn(TESTFN, sys.modules) support.forget(TESTFN) self.assertNotIn(TESTFN, sys.modules) finally: del sys.path[0] support.unlink(mod_filename) support.rmtree('__pycache__')
def test_side_effect_import(self): code = """if 1: import threading def target(): import random t = threading.Thread(target=target) t.start() t.join()""" sys.path.insert(0, os.curdir) self.addCleanup(sys.path.remove, os.curdir) filename = TESTFN + ".py" with open(filename, "wb") as f: f.write(code.encode('utf-8')) self.addCleanup(unlink, filename) self.addCleanup(forget, TESTFN) self.addCleanup(rmtree, '__pycache__') importlib.invalidate_caches() __import__(TESTFN)
def _possibly_invalidate_import_caches(self): # invalidate caches if we can (py33 and above) try: import importlib except ImportError: pass else: if hasattr(importlib, "invalidate_caches"): importlib.invalidate_caches()
def memories(): "Load python modules from memory folder" # Invalidates current cache importlib.invalidate_caches() # Path where the modules are stored memory_path = os.path.join(os.path.dirname(__file__), MEMORY_DIR) knowledge = list() # If the folder exists, get the files if os.path.isdir(memory_path): memories = os.listdir(memory_path) else: logger.warn("%s missing, i'm useless :(" % memory_path) return knowledge # For each .py file, get name and load the module for memory in memories: if not memory.startswith("__") and memory.endswith(".py"): pypos = memory.find(".py") memory_name = memory[:pypos] try: memory = importlib.import_module( "{}.{}.{}".format(__package__, MEMORY_DIR, memory_name)) knowledge.append(importlib.reload(memory)) except Exception as err: logger.warn("%s is confusing, skipping" % (memory_name)) logger.error("%s: %s" % (memory_name, err)) return knowledge
def clean_cache(): """ Python won't realise that new module has appeared in the runtime We need to clean the cache of module finders. Hacking again :return: """ import importlib try: # Python ver < 3.3 vermod = importlib.import_module("versioneer") globals()["versioneer"] = vermod except ImportError: importlib.invalidate_caches()
def setUp(self): remove_files(TESTFN) importlib.invalidate_caches()
def test_module_with_large_stack(self, module='longlist'): # Regression test for http://bugs.python.org/issue561858. filename = module + '.py' # Create a file with a list of 65000 elements. with open(filename, 'w') as f: f.write('d = [\n') for i in range(65000): f.write('"",\n') f.write(']') try: # Compile & remove .py file; we only need .pyc (or .pyo). # Bytecode must be relocated from the PEP 3147 bytecode-only location. py_compile.compile(filename) finally: unlink(filename) # Need to be able to load from current dir. sys.path.append('') importlib.invalidate_caches() try: make_legacy_pyc(filename) # This used to crash. exec('import ' + module) finally: # Cleanup. del sys.path[-1] unlink(filename + 'c') unlink(filename + 'o')
def setUp(self): self.sys_path = sys.path[:] self.orig_module = sys.modules.pop(self.module_name, None) os.mkdir(self.dir_name) with open(self.file_name, "w") as f: f.write(self.module_source) sys.path.insert(0, self.dir_name) importlib.invalidate_caches()
def test_module_without_source(self): target = "another_module.py" py_compile.compile(self.file_name, dfile=target) os.remove(self.file_name) pyc_file = make_legacy_pyc(self.file_name) importlib.invalidate_caches() mod = self.import_module() self.assertEqual(mod.module_filename, pyc_file) self.assertEqual(mod.code_filename, target) self.assertEqual(mod.func_filename, target)
def test_UNC_path(self): with open(os.path.join(self.path, 'test_unc_path.py'), 'w') as f: f.write("testdata = 'test_unc_path'") importlib.invalidate_caches() # Create the UNC path, like \\myhost\c$\foo\bar. path = os.path.abspath(self.path) import socket hn = socket.gethostname() drive = path[0] unc = "\\\\%s\\%s$"%(hn, drive) unc += path[2:] try: os.listdir(unc) except OSError as e: if e.errno in (errno.EPERM, errno.EACCES): # See issue #15338 self.skipTest("cannot access administrative share %r" % (unc,)) raise sys.path.insert(0, unc) try: mod = __import__("test_unc_path") except ImportError as e: self.fail("could not import 'test_unc_path' from %r: %r" % (unc, e)) self.assertEqual(mod.testdata, 'test_unc_path') self.assertTrue(mod.__file__.startswith(unc), mod.__file__) unload("test_unc_path")
def setUp(self): self.source = TESTFN + '.py' self._clean() with open(self.source, 'w') as fp: print('# This is a test file written by test_import.py', file=fp) sys.path.insert(0, os.curdir) importlib.invalidate_caches()
def test_missing_source_legacy(self): # Like test_missing_source() except that for backward compatibility, # when the pyc file lives where the py file would have been (and named # without the tag), it is importable. The __file__ of the imported # module is the pyc location. __import__(TESTFN) # pyc_file gets removed in _clean() via tearDown(). pyc_file = make_legacy_pyc(self.source) os.remove(self.source) unload(TESTFN) importlib.invalidate_caches() m = __import__(TESTFN) self.assertEqual(m.__file__, os.path.join(os.curdir, os.path.relpath(pyc_file)))
def test_package___cached___from_pyc(self): # Like test___cached__ but ensuring __cached__ when imported from a # PEP 3147 pyc file. def cleanup(): rmtree('pep3147') unload('pep3147.foo') unload('pep3147') os.mkdir('pep3147') self.addCleanup(cleanup) # Touch the __init__.py with open(os.path.join('pep3147', '__init__.py'), 'w'): pass with open(os.path.join('pep3147', 'foo.py'), 'w'): pass importlib.invalidate_caches() m = __import__('pep3147.foo') unload('pep3147.foo') unload('pep3147') importlib.invalidate_caches() m = __import__('pep3147.foo') init_pyc = imp.cache_from_source( os.path.join('pep3147', '__init__.py')) self.assertEqual(m.__cached__, os.path.join(os.curdir, init_pyc)) foo_pyc = imp.cache_from_source(os.path.join('pep3147', 'foo.py')) self.assertEqual(sys.modules['pep3147.foo'].__cached__, os.path.join(os.curdir, foo_pyc))
def create_module(self, mod, contents, ext=".py"): fname = os.path.join(TESTFN, mod + ext) with open(fname, "w") as f: f.write(contents) self.addCleanup(unload, mod) importlib.invalidate_caches() return fname
def _setup_broken_package(self, parent, child): pkg_name = "_parent_foo" self.addCleanup(unload, pkg_name) pkg_path = os.path.join(TESTFN, pkg_name) os.mkdir(pkg_path) # Touch the __init__.py init_path = os.path.join(pkg_path, '__init__.py') with open(init_path, 'w') as f: f.write(parent) bar_path = os.path.join(pkg_path, 'bar.py') with open(bar_path, 'w') as f: f.write(child) importlib.invalidate_caches() return init_path, bar_path
def test_main(): run_unittest(PkgutilTests, PkgutilPEP302Tests, ExtendPathTests, NestedNamespacePackageTest, ImportlibMigrationTests) # this is necessary if test is run repeated (like when finding leaks) import zipimport import importlib zipimport._zip_directory_cache.clear() importlib.invalidate_caches()
def _make_test_script(script_dir, script_basename, source=test_source): to_return = make_script(script_dir, script_basename, source) importlib.invalidate_caches() return to_return
def _make_test_zip_pkg(zip_dir, zip_basename, pkg_name, script_basename, source=test_source, depth=1): to_return = make_zip_pkg(zip_dir, zip_basename, pkg_name, script_basename, source, depth) importlib.invalidate_caches() return to_return # There's no easy way to pass the script directory in to get # -m to work (avoiding that is the whole point of making # directories and zipfiles executable!) # So we fake it for testing purposes with a custom launch script
def _make_launch_script(script_dir, script_basename, module_name, path=None): if path is None: path = "os.path.dirname(__file__)" else: path = repr(path) source = launch_source % (path, module_name) to_return = make_script(script_dir, script_basename, source) importlib.invalidate_caches() return to_return
def _check_module(self, depth, alter_sys=False): pkg_dir, mod_fname, mod_name = ( self._make_pkg(example_source, depth)) forget(mod_name) expected_ns = example_namespace.copy() expected_ns.update({ "__name__": mod_name, "__file__": mod_fname, "__package__": mod_name.rpartition(".")[0], }) if alter_sys: expected_ns.update({ "run_argv0": mod_fname, "run_name_in_sys_modules": True, "module_in_sys_modules": True, }) def create_ns(init_globals): return run_module(mod_name, init_globals, alter_sys=alter_sys) try: if verbose > 1: print("Running from source:", mod_name) self.check_code_execution(create_ns, expected_ns) importlib.invalidate_caches() __import__(mod_name) os.remove(mod_fname) if not sys.dont_write_bytecode: make_legacy_pyc(mod_fname) unload(mod_name) # In case loader caches paths importlib.invalidate_caches() if verbose > 1: print("Running from compiled:", mod_name) self._fix_ns_for_legacy_pyc(expected_ns, alter_sys) self.check_code_execution(create_ns, expected_ns) finally: self._del_pkg(pkg_dir, depth, mod_name) if verbose > 1: print("Module executed successfully")
def _check_package(self, depth, alter_sys=False): pkg_dir, mod_fname, mod_name = ( self._make_pkg(example_source, depth, "__main__")) pkg_name = mod_name.rpartition(".")[0] forget(mod_name) expected_ns = example_namespace.copy() expected_ns.update({ "__name__": mod_name, "__file__": mod_fname, "__package__": pkg_name, }) if alter_sys: expected_ns.update({ "run_argv0": mod_fname, "run_name_in_sys_modules": True, "module_in_sys_modules": True, }) def create_ns(init_globals): return run_module(pkg_name, init_globals, alter_sys=alter_sys) try: if verbose > 1: print("Running from source:", pkg_name) self.check_code_execution(create_ns, expected_ns) importlib.invalidate_caches() __import__(mod_name) os.remove(mod_fname) if not sys.dont_write_bytecode: make_legacy_pyc(mod_fname) unload(mod_name) # In case loader caches paths if verbose > 1: print("Running from compiled:", pkg_name) importlib.invalidate_caches() self._fix_ns_for_legacy_pyc(expected_ns, alter_sys) self.check_code_execution(create_ns, expected_ns) finally: self._del_pkg(pkg_dir, depth, pkg_name) if verbose > 1: print("Package executed successfully")
def _check_relative_imports(self, depth, run_name=None): contents = r"""\ from __future__ import absolute_import from . import sibling from ..uncle.cousin import nephew """ pkg_dir, mod_fname, mod_name = ( self._make_pkg(contents, depth)) if run_name is None: expected_name = mod_name else: expected_name = run_name try: self._add_relative_modules(pkg_dir, contents, depth) pkg_name = mod_name.rpartition('.')[0] if verbose > 1: print("Running from source:", mod_name) d1 = run_module(mod_name, run_name=run_name) # Read from source self.assertEqual(d1["__name__"], expected_name) self.assertEqual(d1["__package__"], pkg_name) self.assertIn("sibling", d1) self.assertIn("nephew", d1) del d1 # Ensure __loader__ entry doesn't keep file open importlib.invalidate_caches() __import__(mod_name) os.remove(mod_fname) if not sys.dont_write_bytecode: make_legacy_pyc(mod_fname) unload(mod_name) # In case the loader caches paths if verbose > 1: print("Running from compiled:", mod_name) importlib.invalidate_caches() d2 = run_module(mod_name, run_name=run_name) # Read from bytecode self.assertEqual(d2["__name__"], expected_name) self.assertEqual(d2["__package__"], pkg_name) self.assertIn("sibling", d2) self.assertIn("nephew", d2) del d2 # Ensure __loader__ entry doesn't keep file open finally: self._del_pkg(pkg_dir, depth, mod_name) if verbose > 1: print("Module executed successfully")
def test_package___file__(self): try: m = __import__('pep3147') except ImportError: pass else: self.fail("pep3147 module already exists: %r" % (m,)) # Test that a package's __file__ points to the right source directory. os.mkdir('pep3147') sys.path.insert(0, os.curdir) def cleanup(): if sys.path[0] == os.curdir: del sys.path[0] shutil.rmtree('pep3147') self.addCleanup(cleanup) # Touch the __init__.py file. support.create_empty_file('pep3147/__init__.py') importlib.invalidate_caches() expected___file__ = os.sep.join(('.', 'pep3147', '__init__.py')) m = __import__('pep3147') self.assertEqual(m.__file__, expected___file__, (m.__file__, m.__path__, sys.path, sys.path_importer_cache)) # Ensure we load the pyc file. support.unload('pep3147') m = __import__('pep3147') support.unload('pep3147') self.assertEqual(m.__file__, expected___file__, (m.__file__, m.__path__, sys.path, sys.path_importer_cache))