我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用imp.new_module()。
def _handle_ns(packageName, path_item): """Ensure that named package includes a subpath of path_item (if needed)""" importer = get_importer(path_item) if importer is None: return None loader = importer.find_module(packageName) if loader is None: return None module = sys.modules.get(packageName) if module is None: module = sys.modules[packageName] = imp.new_module(packageName) module.__path__ = [] _set_parent_ns(packageName) elif not hasattr(module,'__path__'): raise TypeError("Not a package:", packageName) handler = _find_adapter(_namespace_handlers, importer) subpath = handler(importer,path_item,packageName,module) if subpath is not None: path = module.__path__ path.append(subpath) loader.load_module(packageName) module.__path__ = path return subpath
def main(): host_config = parse_argv() namespace = probe_host(host_config) host_config.namespace = namespace live_test_mod = imp.new_module('livetests') sys.modules['livetests'] = live_test_mod def add_test_class(klass, name=None): if name is None: name = klass.__name__ else: if not PY3: name = name.encode('ascii') klass.__name__ = name setattr(live_test_mod, name, klass) TestGeneral.conf = host_config add_test_class(TestGeneral) add_test_class(createUidTestClass(host_config, use_uid=True), 'TestWithUIDs') add_test_class(createUidTestClass(host_config, use_uid=False), 'TestWithoutUIDs') unittest.main(module='livetests')
def test_wrap_module(self): """??wrap_module??? """ plugin_module = imp.new_module("test_plugin") plugin_module.execute = lambda ctx, arg: "execute {}".format(arg) plugin = Plugin.wrap_module(plugin_module) self.assertEquals(plugin.name, "test_plugin") plugin.sys_prepare(None) self.assertEquals(plugin.execute(None, "cat"), "execute cat") plugin_module.execute = lambda: "execute" self.failUnlessException( InvalidPluginException, Plugin.wrap_module, plugin_module )
def load_module(self, name): # If there is an existing module object named 'fullname' in # sys.modules, the loader must use that existing module. (Otherwise, # the reload() builtin will not work correctly.) if name in sys.modules: return sys.modules[name] co, pyc = self.modules.pop(name) # I wish I could just call imp.load_compiled here, but __file__ has to # be set properly. In Python 3.2+, this all would be handled correctly # by load_compiled. mod = sys.modules[name] = imp.new_module(name) try: mod.__file__ = co.co_filename # Normally, this attribute is 3.2+. mod.__cached__ = pyc mod.__loader__ = self py.builtin.exec_(co, mod.__dict__) except: del sys.modules[name] raise return sys.modules[name]
def __init__(self, mod_name='__main__'): import threading ReplBackend.__init__(self) if mod_name is not None: if sys.platform == 'cli': self.exec_mod = Scope() self.exec_mod.__name__ = '__main__' else: sys.modules[mod_name] = self.exec_mod = imp.new_module(mod_name) else: self.exec_mod = sys.modules['__main__'] self.code_flags = 0 self.execute_item = None self.execute_item_lock = threading.Lock() self.execute_item_lock.acquire() # lock starts acquired (we use it like manual reset event)
def load_module(self, fullname): print('loading {}'.format(fullname)) if fullname in sys.modules: mod = sys.modules[fullname] else: mod = sys.modules.setdefault( fullname, imp.new_module(fullname)) # Set a few properties required by PEP 302 mod.__file__ = fullname mod.__name__ = fullname # always looks like a package mod.__path__ = ['path-entry-goes-here'] mod.__loader__ = self mod.__package__ = '.'.join(fullname.split('.')[:-1]) return mod # Install the meta-path finder
def load_ai(filename): # the sandbox is currently as clever as ... well ... it's not clever code = open(filename).read() # quick and dirty, yet production ready code ;) module = imp.new_module('ai') mdict = module.__dict__ def getboard(board, x, y): if x in range(8) and y in range(8): return board[y][x] else: return None mdict['getboard'] = getboard # Pop the obvious exploits mdict.pop('exec',0) mdict.pop('eval',0) # Lexical analyzer # TODO: Fix the stupid forbidden = ['import subprocess', 'import sys', 'import os'] for word in forbidden: if word in code: raise Exception('Found security issue in {} while looking for {}'.format(filename, word)) exec(code, mdict) # 'exec' is the root of all eval, here we gonna sandbox return module
def load_pupyimporter(self): """ load pupyimporter in case it is not """ if "pupyimporter" not in self.conn.modules.sys.modules: pupyimporter_code="" with open(os.path.join(ROOT, "packages","all","pupyimporter.py"),'rb') as f: pupyimporter_code=f.read() self.conn.execute(textwrap.dedent( """ import imp import sys def pupyimporter_preimporter(code): mod = imp.new_module("pupyimporter") mod.__name__="pupyimporter" mod.__file__="<memimport>\\\\pupyimporter" mod.__package__="pupyimporter" sys.modules["pupyimporter"]=mod exec code+"\\n" in mod.__dict__ mod.install() """)) self.conn.namespace["pupyimporter_preimporter"](pupyimporter_code)
def _setup_package(self, context_id, parent_ids): global mitogen mitogen = imp.new_module('mitogen') mitogen.__package__ = 'mitogen' mitogen.__path__ = [] mitogen.__loader__ = self.importer mitogen.is_master = False mitogen.context_id = context_id mitogen.parent_ids = parent_ids mitogen.parent_id = parent_ids[0] mitogen.core = sys.modules['__main__'] mitogen.core.__file__ = 'x/mitogen/core.py' # For inspect.getsource() mitogen.core.__loader__ = self.importer sys.modules['mitogen'] = mitogen sys.modules['mitogen.core'] = mitogen.core del sys.modules['__main__']
def __init__(self, expr): # solution found in: # http://stackoverflow.com/questions/10303248/true-dynamic-and-anonymous-functions-possible-in-python self.code = compile( "from numpy import *\n"+ "from math import *\n"+ "avg = None\n"+ "sd = None\n"+ "xmin = None\n"+ "xmax = None\n"+ "ymin = None\n"+ "ymax = None\n"+ "f = lambda x, y: "+expr, 'dyn-mark-string', 'exec') self.mod = imp.new_module("dyn_marker_mod") exec self.code in self.mod.__dict__
def get_env_dict_from_string(s): """What it sounds like. http://stackoverflow.com/questions/5362771/load-module-from-string-in-python """ try: del sys.modules['requester.env'] # this avoids a subtle bug, DON'T REMOVE except KeyError: pass if not s: return {} env = imp.new_module('requester.env') try: exec(s, env.__dict__) except Exception as e: sublime.error_message('EnvBlock Error:\n{}'.format(e)) return {} else: return dict(env.__dict__)
def test_state_after_failure(self): # A failed reload should leave the original module intact. attributes = ('__file__', '__path__', '__package__') value = '<test>' name = '_temp' with source_util.create_modules(name) as mapping: orig_module = imp.new_module(name) for attr in attributes: setattr(orig_module, attr, value) with open(mapping[name], 'w') as file: file.write('+++ bad syntax +++') loader = _bootstrap._SourceFileLoader('_temp', mapping['_temp']) with self.assertRaises(SyntaxError): loader.load_module(name) for attr in attributes: self.assertEqual(getattr(orig_module, attr), value) # [syntax error]
def __init__(self, *names, module_code={}): self.modules = {} self.module_code = {} for name in names: if not name.endswith('.__init__'): import_name = name else: import_name = name[:-len('.__init__')] if '.' not in name: package = None elif import_name == name: package = name.rsplit('.', 1)[0] else: package = import_name module = imp.new_module(import_name) module.__loader__ = self module.__file__ = '<mock __file__>' module.__package__ = package module.attr = name if import_name != name: module.__path__ = ['<mock __path__>'] self.modules[import_name] = module if import_name in module_code: self.module_code[import_name] = module_code[import_name]
def make_plugin_module(version=None, name="test_plugin_name", extra_body=None): """Creates a plugin module with only the version field, optionally The module is added to sys.modules, and both the name and module returned. """ # Create a plugin encoded with a version plugin_module = imp.new_module(name) if version is not None: plugin_module_body = "version = %s" % version else: plugin_module_body = "" if extra_body is not None: plugin_module_body = "%s\n\n%s" % (plugin_module_body, extra_body) exec plugin_module_body in plugin_module.__dict__ # Simulate imported sys.modules[name] = plugin_module return name, plugin_module
def __init__(self, *names): self.modules = {} for name in names: if not name.endswith('.__init__'): import_name = name else: import_name = name[:-len('.__init__')] if '.' not in name: package = None elif import_name == name: package = name.rsplit('.', 1)[0] else: package = import_name module = imp.new_module(import_name) module.__loader__ = self module.__file__ = '<mock __file__>' module.__package__ = package module.attr = name if import_name != name: module.__path__ = ['<mock __path__>'] self.modules[import_name] = module
def load_module(self, fullname): """This method is called by Python if CustomImporter.find_module does not return None. fullname is the fully-qualified name of the module/package that was requested.""" if fullname in sys.modules: return sys.modules[fullname] else: logger.debug('Importing Mock Module: {}'.format(fullname)) # mod = imp.new_module(fullname) # import pdb; pdb.set_trace() mod = MockObject(fullname=fullname) mod.__loader__ = self mod.__file__ = fullname mod.__path__ = [fullname] mod.__name__ = fullname sys.modules[fullname] = mod return mod # This gives errors
def save_main_module(file, module_name): # patch provided by: Scott Schlesier - when script is run, it does not # use globals from pydevd: # This will prevent the pydevd script from contaminating the namespace for the script to be debugged # pretend pydevd is not the main module, and # convince the file to be debugged that it was loaded as main sys.modules[module_name] = sys.modules['__main__'] sys.modules[module_name].__name__ = module_name from imp import new_module m = new_module('__main__') sys.modules['__main__'] = m if hasattr(sys.modules[module_name], '__loader__'): setattr(m, '__loader__', getattr(sys.modules[module_name], '__loader__')) m.__file__ = file return m
def test_state_after_failure(self): # A failed reload should leave the original module intact. attributes = ('__file__', '__path__', '__package__') value = '<test>' name = '_temp' with source_util.create_modules(name) as mapping: orig_module = imp.new_module(name) for attr in attributes: setattr(orig_module, attr, value) with open(mapping[name], 'w') as file: file.write('+++ bad syntax +++') loader = machinery.SourceFileLoader('_temp', mapping['_temp']) with self.assertRaises(SyntaxError): loader.load_module(name) for attr in attributes: self.assertEqual(getattr(orig_module, attr), value) # [syntax error]
def db_migrate(): import imp from config import config from app.infrastructure.mappings import mapping from app.infrastructure.mappings.mapping import metadata db = SQLAlchemy() config = config[os.getenv('FLASK_CONFIG') or 'default'] metadata.create_all(db.engine) v = api.db_version(config.SQLALCHEMY_DATABASE_URI, config.SQLALCHEMY_MIGRATE_REPO) migration = config.SQLALCHEMY_MIGRATE_REPO + ('/versions/%03d_migration.py' % (v+1)) tmp_module = imp.new_module('old_model') old_model = api.create_model(config.SQLALCHEMY_DATABASE_URI, config.SQLALCHEMY_MIGRATE_REPO) exec(old_model, tmp_module.__dict__) script = api.make_update_script_for_model(config.SQLALCHEMY_DATABASE_URI, config.SQLALCHEMY_MIGRATE_REPO, tmp_module.meta, metadata) open(migration, "wt").write(script) api.upgrade(config.SQLALCHEMY_DATABASE_URI, config.SQLALCHEMY_MIGRATE_REPO) v = api.db_version(config.SQLALCHEMY_DATABASE_URI, config.SQLALCHEMY_MIGRATE_REPO) print('New migration saved as ' + migration) print('Current database version: ' + str(v))
def _handle_ns(packageName, path_item): """Ensure that named package includes a subpath of path_item (if needed)""" importer = get_importer(path_item) if importer is None: return None loader = importer.find_module(packageName) if loader is None: return None module = sys.modules.get(packageName) if module is None: module = sys.modules[packageName] = imp.new_module(packageName) module.__path__ = []; _set_parent_ns(packageName) elif not hasattr(module,'__path__'): raise TypeError("Not a package:", packageName) handler = _find_adapter(_namespace_handlers, importer) subpath = handler(importer,path_item,packageName,module) if subpath is not None: path = module.__path__; path.append(subpath) loader.load_module(packageName); module.__path__ = path return subpath
def parse_eb(file_path): # type: (object) -> object file_name) -> dict: """ interpret easyconfig file with 'exec'. Interperting fails if undefined constants are within the Easyconfig file. Add undefined constants to <header>. """ header = 'SOURCE_TGZ = "%(name)s-%(version)s.tgz"\n' header += 'SOURCE_TAR_GZ = "%(name)s-%(version)s.tar.gz"\n' code = header eb = imp.new_module("easyconfig") with open(file_path, "r") as f: code += f.read() try: exec (code, eb.__dict__) except Exception, e: print("interperting easyconfig error: %s" % e) eb = {} return eb
def parse_eb(self, file_name, primary): """ interpret easyconfig file with 'exec'. Interperting fails if constants that are not defined within the easyconfig file. Add undefined constants to <header>. """ header = 'SOURCE_TGZ = "%(name)s-%(version)s.tgz"\n' header += 'SOURCE_TAR_GZ = "%(name)s-%(version)s.tar.gz"\n' header += self.prolog code = header eb = imp.new_module("easyconfig") with open(file_name, "r") as f: code += f.read() try: exec (code, eb.__dict__) except Exception as err: print("interperting easyconfig error: %s" % err) sys.exit(1) if primary: # save original text of source code self.code = code self.ptr_head = len(header) return eb
def import_object_from_string_code(code, object): """Used to import an object from arbitrary passed code. Passed in code is treated as a module and is imported and added to `sys.modules` with its SHA256 hash as key. Args: code (string): Python code to import as module object (string): Name of object to extract from imported module """ sha256 = hashlib.sha256(code.encode('UTF-8')).hexdigest() module = imp.new_module(sha256) try: exec_(code, module.__dict__) except Exception as e: raise exceptions.UserError('User code exception', exception_message=str(e)) sys.modules[sha256] = module try: return getattr(module, object) except AttributeError: raise exceptions.UserError("{} not found in code".format(object))
def import_string_code_as_module(code): """Used to run arbitrary passed code as a module Args: code (string): Python code to import as module Returns: module: Python module """ sha256 = hashlib.sha256(code.encode('UTF-8')).hexdigest() module = imp.new_module(sha256) try: exec_(code, module.__dict__) except Exception as e: raise exceptions.UserError('User code exception', exception_message=str(e)) sys.modules[sha256] = module return module
def dbmigrate(): app = Flask(__name__) conf = app.config.from_object('config.ProductionConfig') sqlalchemy_migrate_repo = app.config['SQLALCHEMY_MIGRATE_REPO'] sqlalchemy_database_uri = app.config['SQLALCHEMY_DATABASE_URI'] v = api.db_version(sqlalchemy_database_uri, sqlalchemy_migrate_repo) migration = sqlalchemy_migrate_repo + ('/versions/%03d_migration.py' % (v + 1)) tmp_module = imp.new_module('old_model') old_model = api.create_model(sqlalchemy_database_uri, sqlalchemy_migrate_repo) exec(old_model, tmp_module.__dict__) script = api.make_update_script_for_model(sqlalchemy_database_uri, sqlalchemy_migrate_repo, tmp_module.meta, db.metadata) open(migration, "wt").write(script) api.upgrade(sqlalchemy_database_uri, sqlalchemy_migrate_repo) v = api.db_version(sqlalchemy_database_uri, sqlalchemy_migrate_repo) print('New migration saved as ' + migration) print('Current database version: ' + str(v))
def __init__(self, mod_name = '__main__', launch_file = None): import threading ReplBackend.__init__(self) if mod_name is not None: if sys.platform == 'cli': self.exec_mod = Scope() self.exec_mod.__name__ = '__main__' else: sys.modules[mod_name] = self.exec_mod = imp.new_module(mod_name) else: self.exec_mod = sys.modules['__main__'] self.launch_file = launch_file self.code_flags = 0 self.execute_item = None self.execute_item_lock = threading.Lock() self.execute_item_lock.acquire() # lock starts acquired (we use it like manual reset event)
def load_module(self, fullname): """ Loads the source for a module found at `fullname`. For most of the module path here, we're going to be generating placeholders that don't actually have code; they'd be the equivalent of a bunch of empty dirs with __init__.py's, but ansible doesn't have the init file in the path. """ if fullname in sys.modules: mod = sys.modules[fullname] else: mod = sys.modules.setdefault(fullname, imp.new_module(fullname)) mod.__file__ = os.path.join(self.path, fullname) mod.__loader__ = self mod.__path__ = [self.path] mod.__package__ = '.'.join(fullname.split('.')[:-1]) # We don't want to actually load unless we get to the Python code. # This is assuming that the guards in the __init__() function worked if self.basename not in ANSIBLE_DIRS and self.basename not in PLUGINS: mod = imp.load_source(self.basename, self.path + self.extension) return mod
def __init__(self, mod_name): self.mod_name = mod_name self.module = imp.new_module(mod_name) self._saved_module = []
def new_module(self, name): return imp.new_module(name)
def add_module(self, name): d = self.modules_dict() if name in d: return d[name] d[name] = m = self.new_module(name) return m # sys interface
def _handle_ns(packageName, path_item): """Ensure that named package includes a subpath of path_item (if needed)""" importer = get_importer(path_item) if importer is None: return None loader = importer.find_module(packageName) if loader is None: return None module = sys.modules.get(packageName) if module is None: module = sys.modules[packageName] = imp.new_module(packageName) module.__path__ = [] _set_parent_ns(packageName) elif not hasattr(module,'__path__'): raise TypeError("Not a package:", packageName) handler = _find_adapter(_namespace_handlers, importer) subpath = handler(importer, path_item, packageName, module) if subpath is not None: path = module.__path__ path.append(subpath) loader.load_module(packageName) for path_item in path: if path_item not in module.__path__: module.__path__.append(path_item) return subpath
def from_pyfile(self, filename, silent=False): """Updates the values in the config from a Python file. This function behaves as if the file was imported as module with the :meth:`from_object` function. :param filename: the filename of the config. This can either be an absolute filename or a filename relative to the root path. :param silent: set to `True` if you want silent failure for missing files. .. versionadded:: 0.7 `silent` parameter. """ filename = os.path.join(self.root_path, filename) d = imp.new_module('config') d.__file__ = filename try: with open(filename) as config_file: exec(compile(config_file.read(), filename, 'exec'), d.__dict__) except IOError as e: if silent and e.errno in (errno.ENOENT, errno.EISDIR): return False e.strerror = 'Unable to load configuration file (%s)' % e.strerror raise self.from_object(d) return True
def run(task, inputs, outputs, task_inputs, task_outputs, **kwargs): custom = imp.new_module('__girder_worker__') custom.__dict__['_job_manager'] = kwargs.get('_job_manager') custom.__dict__['_tempdir'] = kwargs.get('_tempdir') custom.__dict__['_celery_task'] = kwargs.get('_celery_task') for name in inputs: custom.__dict__[name] = inputs[name]['script_data'] if task.get('write_script', kwargs.get('write_script', False)): debug_path = tempfile.mktemp() with open(debug_path, 'wb') as fh: fh.write(task['script']) with open(debug_path, 'r') as fh: exec fh in custom.__dict__ else: try: exec task['script'] in custom.__dict__ except Exception, e: trace = sys.exc_info()[2] lines = task['script'].split('\n') lines = [(str(i+1) + ': ' + lines[i]) for i in xrange(len(lines))] error = ( str(e) + '\nScript:\n' + '\n'.join(lines) + '\nTask:\n' + json.dumps(task, indent=4) ) raise Exception(error), None, trace for name, task_output in task_outputs.iteritems(): outputs[name]['script_data'] = custom.__dict__[name]
def load_module(self, fullname): # Only if a module is being reloaded and hasn't been scanned recently # do we force a refresh of the contents of the .sublime-package. This # allows proper code upgrades using Package Control. if fullname in imp._RELOADING: if self.refreshed < time.time() - 5: self._scan_zip() source, source_path, mod_file, is_pkg = self._read_source(fullname) if source is None: raise ImportError("No module named '%s'" % fullname) is_new = False if fullname in sys.modules: mod = sys.modules[fullname] old_mod_file = mod.__file__ else: is_new = True mod = sys.modules.setdefault(fullname, imp.new_module(fullname)) mod.__name__ = fullname mod.__path__ = [self.zippath] mod.__loader__ = self mod.__file__ = mod_file if is_pkg: mod.__package__ = mod.__name__ else: mod.__package__ = fullname.rpartition('.')[0] try: exec(compile(source, source_path, 'exec'), mod.__dict__) return mod except: if is_new: del sys.modules[fullname] else: mod.__file__ = old_mod_file raise
def load_module(path, encoding=None): """ Load a source file as a python module. :param path: file path :type path: string :return: Loaded Python module :rtype: module """ try: return cache_modules[path] except KeyError: pass module = imp.new_module(WSCRIPT_FILE) try: code = Utils.readf(path, m='rU', encoding=encoding) except EnvironmentError: raise Errors.WafError('Could not read the file %r' % path) module_dir = os.path.dirname(path) sys.path.insert(0, module_dir) try : exec(compile(code, path, 'exec'), module.__dict__) finally: sys.path.remove(module_dir) cache_modules[path] = module return module
def start(cwd, version, wafdir): # simple example, the file main.c is hard-coded try: os.stat(cwd + os.sep + 'bbit') except: print('call from a folder containing a file named "bbit"') sys.exit(1) Logs.init_log() Context.waf_dir = wafdir Context.top_dir = Context.run_dir = cwd Context.out_dir = os.path.join(cwd, 'build') Context.g_module = imp.new_module('wscript') Context.g_module.root_path = os.path.join(cwd, 'bbit') Context.Context.recurse = \ lambda x, y: getattr(Context.g_module, x.cmd or x.fun, Utils.nada)(x) Context.g_module.configure = lambda ctx: ctx.load('g++') Context.g_module.build = lambda bld: bld.objects(source='main.c') Options.OptionsContext().execute() do_config = 'configure' in sys.argv try: os.stat(cwd + os.sep + 'build') except: do_config = True if do_config: Context.create_context('configure').execute() if 'clean' in sys.argv: Context.create_context('clean').execute() if 'build' in sys.argv: Context.create_context('build').execute()
def start(cwd, version, wafdir): # this is the entry point of our small build system # no script file here Logs.init_log() Context.waf_dir = wafdir Context.out_dir = Context.top_dir = Context.run_dir = cwd Context.g_module = imp.new_module('wscript') Context.g_module.root_path = cwd Context.Context.recurse = recurse_rep Context.g_module.configure = configure Context.g_module.build = build Context.g_module.options = options Context.g_module.top = Context.g_module.out = '.' Options.OptionsContext().execute() do_config = 'configure' in sys.argv try: os.stat(cwd + os.sep + 'c4che') except: do_config = True if do_config: Context.create_context('configure').execute() if 'clean' in sys.argv: Context.create_context('clean').execute() if 'build' in sys.argv: Context.create_context('build').execute()
def start(cwd, version, wafdir): # simple example, the file main.c is hard-coded try: os.stat(cwd + os.sep + 'cbit') except: print('call from a folder containing a file named "cbit"') sys.exit(1) Logs.init_log() Context.waf_dir = wafdir Context.top_dir = Context.run_dir = cwd Context.out_dir = os.path.join(cwd, 'build') Context.g_module = imp.new_module('wscript') Context.g_module.root_path = os.path.join(cwd, 'cbit') Context.Context.recurse = recurse_rep # this is a fake module, which looks like a standard wscript file Context.g_module.options = options Context.g_module.configure = configure Context.g_module.build = build Options.OptionsContext().execute() do_config = 'configure' in sys.argv try: os.stat(cwd + os.sep + 'build') except: do_config = True if do_config: Context.create_context('configure').execute() if 'clean' in sys.argv: Context.create_context('clean').execute() if 'build' in sys.argv: Context.create_context('build').execute()