我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用importlib.import_module()。
def plugins(fetch_handlers=None): if not fetch_handlers: fetch_handlers = FETCH_HANDLERS plugin_list = [] for handler_name in fetch_handlers: package, classname = handler_name.rsplit('.', 1) try: handler_class = getattr( importlib.import_module(package), classname) plugin_list.append(handler_class()) except NotImplementedError: # Skip missing plugins so that they can be ommitted from # installation if desired log("FetchHandler {} not found, skipping plugin".format( handler_name)) return plugin_list
def run_migration(self, migration_file, method='upgrade'): if RedVersion.already_run(migration_file): self.logger.warn("Migration %s has already been run, skipping." % migration_file) return try: module = importlib.import_module('redberry.models.migrations.%s' % migration_file) if method == 'upgrade': self.logger.info("Running upgrade on %s" % migration_file) module.upgrade(self.db) else: self.logger.info("Running downgrade on %s" % migration_file) module.downgrade(self.db) RedVersion.store_migration(migration_file) except Exception, e: self.logger.error("Error running %s" % migration_file) self.logger.error(e)
def load_migration_nodes(): nodes = {None: MigrationRouteHead()} for path in _migrations_dir_path.iterdir(): if not _migration_filename_pattern.match(path.name): continue module = importlib.import_module(f'quizzler.migrations.{path.stem}') nodes[path.stem] = MigrationNode(path.stem, module) # Fill linked-list info. for name, node in nodes.items(): if name is None: continue prev_name = node.module.previous prev_node = nodes[prev_name] if prev_node.next is not None: raise MigrationRouteConflict( f'{name} and {prev_node.next.name} ' f'both specify previous = {prev_name!r}', ) prev_node.next = node node.prev = prev_node return nodes
def launch(task, filename, nb_cpu, nb_gpu, use_gpu, output=None, benchmark=None, extension='', sim_same_elec=None): from circus.shared.parser import CircusParser params = CircusParser(filename) if task not in ['filtering', 'benchmarking']: params.get_data_file() module = importlib.import_module('circus.' + task) if task == 'benchmarking': module.main(params, nb_cpu, nb_gpu, use_gpu, output, benchmark, sim_same_elec) elif task in ['converting', 'merging']: module.main(params, nb_cpu, nb_gpu, use_gpu, extension) else: module.main(params, nb_cpu, nb_gpu, use_gpu)
def create_from_settings(cls, instance, conf): try: engine_path, _, cls_name = conf['ENGINE'].rpartition('.') db_name = conf['NAME'] db_options = conf['OPTIONS'] if 'OPTIONS' in conf and conf['OPTIONS'] else dict() # FIX for #331. Replace utf8 by utf8mb4 in the mysql driver encoding. if conf['ENGINE'] == 'peewee_async.MySQLDatabase' and 'charset' in db_options and db_options['charset'] == 'utf8': logging.info('Forcing to use \'utf8mb4\' instead of \'utf8\' for the MySQL charset option! (Fix #331).') db_options['charset'] = 'utf8mb4' # We will try to load it so we have the validation inside this class. engine = getattr(importlib.import_module(engine_path), cls_name) except ImportError: raise ImproperlyConfigured('Database engine doesn\'t exist!') except Exception as e: raise ImproperlyConfigured('Database configuration isn\'t complete or engine could\'t be found!') return cls(engine, instance, db_name, **db_options)
def run_migration(self, name, app_label, app_module, save_migration=True): """ Run + apply migration to the actual database. :param name: Name of migration. :param app_label: App label. :param app_module: App module path. :param save_migration: Save migration state? """ from .models.migration import Migration mod = importlib.import_module('{}.migrations.{}'.format(app_module, name)) try: with self.db.allow_sync(): mod.upgrade(self.migrator) if save_migration: Migration.create( app=app_label, name=name, applied=True ) except Exception as e: logger.warning('Can\'t migrate {}.migrations.{}: {}'.format(app_module, name, str(e))) raise
def init_app(self, app): """ Initiate app, load all signal/callbacks files. (just import, they should register with decorators). :param app: App instance :type app: pyplanet.apps.AppConfig """ self._current_namespace = app.label # Import the signals module. try: importlib.import_module('{}.signals'.format(app.name)) except ImportError: pass self._current_namespace = None # Import the callbacks module. try: importlib.import_module('{}.callbacks'.format(app.name)) except ImportError: pass
def load(self): # Make sure we load the defaults first. super().load() # Prepare the loading. self.module = os.environ.get('PYPLANET_SETTINGS_MODULE') if not self.module: raise ImproperlyConfigured( 'Settings module is not defined! Please define PYPLANET_SETTINGS_MODULE in your ' 'environment or start script.' ) # Add the module itself to the configuration. self.settings['SETTINGS_MODULE'] = self.module # Load the module, put the settings into the local context. module = importlib.import_module(self.module) for setting in dir(module): if setting.isupper(): self.settings[setting] = getattr(module, setting)
def _verify_api(self): """ Verifies the API and loads the proper VSPK version """ # Checking auth parameters if ('api_password' not in list(self.auth.keys()) or not self.auth['api_password']) and ('api_certificate' not in list(self.auth.keys()) or 'api_key' not in list(self.auth.keys()) or not self.auth['api_certificate'] or not self.auth['api_key']): self.module.fail_json(msg='Missing api_password or api_certificate and api_key parameter in auth') self.api_username = self.auth['api_username'] if 'api_password' in list(self.auth.keys()) and self.auth['api_password']: self.api_password = self.auth['api_password'] if 'api_certificate' in list(self.auth.keys()) and 'api_key' in list(self.auth.keys()) and self.auth['api_certificate'] and self.auth['api_key']: self.api_certificate = self.auth['api_certificate'] self.api_key = self.auth['api_key'] self.api_enterprise = self.auth['api_enterprise'] self.api_url = self.auth['api_url'] self.api_version = self.auth['api_version'] try: global VSPK VSPK = importlib.import_module('vspk.{0:s}'.format(self.api_version)) except ImportError: self.module.fail_json(msg='vspk is required for this module, or the API version specified does not exist.')
def serve(cwd, app, port): sys.path.insert(0, cwd) wsgi_fqn = app.rsplit('.', 1) wsgi_fqn_parts = wsgi_fqn[0].rsplit('/', 1) if len(wsgi_fqn_parts) == 2: sys.path.insert(0, os.path.join(cwd, wsgi_fqn_parts[0])) wsgi_module = importlib.import_module(wsgi_fqn_parts[-1]) wsgi_app = getattr(wsgi_module, wsgi_fqn[1]) # Attempt to force Flask into debug mode try: wsgi_app.debug = True except: # noqa: E722 pass os.environ['IS_OFFLINE'] = 'True' serving.run_simple( 'localhost', int(port), wsgi_app, use_debugger=True, use_reloader=True, use_evalex=True)
def _setup(module, extras): """Install common submodules""" Qt.__binding__ = module.__name__ for name in list(_common_members) + extras: try: # print("Trying %s" % name) submodule = importlib.import_module( module.__name__ + "." + name) except ImportError: # print("Failed %s" % name) continue setattr(Qt, "_" + name, submodule) if name not in extras: # Store reference to original binding, # but don't store speciality modules # such as uic or QtUiTools setattr(Qt, name, _new_module(name))
def scan_for_plugins(self): """Plugin scanner. Scans for plugins in the known plugin directories. Adds them to the available plugins dictionary, ready to be loaded. Returns: The available plugins dictionary. """ _logger.debug("Scanning for plugins.") for category,info in CATEGORIES.items(): _logger.debug("Scanning category {0}".format(category)) for module in os.listdir(os.path.join(os.path.dirname(__file__),info["directory"])): if module == "__init__.py" or module[-3:] != ".py": continue _logger.debug("\tFound plugin {0}".format(module[:-3])) importlib.import_module("aniping.{0}.{1}".format(info["directory"], module[:-3])) self._available_plugins[category].append(module[:-3]) _logger.debug("All available plugins found.") return self._available_plugins
def upload(filename, config): """Upload .su3 file with transports""" if "transports" in config and "enabled" in config["transports"]: for t in config["transports"]["enabled"].split(): if t in config: tconf = config[t] else: tconf = None try: importlib.import_module("pyseeder.transports.{}".format(t)) \ .run(filename, tconf) except ImportError: raise PyseederException( "{} transport can't be loaded".format(t)) except TransportException as e: log.error("Transport error: {}".format(e))
def email(self): try: to_email = request.args.get('to_email') path, attr = configuration.get('email', 'EMAIL_BACKEND').rsplit('.', 1) logging.info("path: " + str(path)) logging.info("attr: " + str(attr)) module = importlib.import_module(path) logging.info("module: " + str(module)) backend = getattr(module, attr) backend(to_email, "Test Email", "Test Email", files=None, dryrun=False) flash('Email Sent') except Exception as e: flash('Failed to Send Email: ' + str(e), 'error') return redirect("/admin/admintools", code=302)
def __init__(self, username: str, password: str, botModule: str, botconfig: Mapping, numPlayers: int, variant: Variant, spectators: bool, gameName: str, *args, **kwargs) -> None: super().__init__(*args, **kwargs) self.username: str = username self.password: str = password module = importlib.import_module(botModule + '.bot') self.botCls: Type[Bot] = module.Bot # type: ignore self.botconfig: Mapping = botconfig self.numPlayers: int = numPlayers self.variant: Variant = variant self.spectators: bool = spectators self.gameName: str = gameName self.conn: socketIO_client.SocketIO self.tablePlayers: List[str] = [] self.readyToStart: bool = False self.game: Optional[Game] = None
def load_config(config_name): config = {} # Get config defaults for key in dir(config_defaults): if key.isupper(): config[key] = getattr(config_defaults, key) # Merge given config with defaults obj = importlib.import_module(config_name) for key in dir(obj): if key in config: value = getattr(obj, key) if isinstance(config[key], dict): config[key].update(value) else: config[key] = value return config
def from_object(self, obj): """Updates the values from the given object. An object can be of one of the following two types: - a string: in this case the object with that name will be imported - an actual object reference: that object is used directly :param obj: an import name or object """ string_types = (str,) if isinstance(obj, string_types): # github.com/pallets/werkzeug/blob/master/werkzeug/utils.py+L399 # obj = import_string(obj) obj = importlib.import_module(obj) for key in dir(obj): if key.isupper(): self[key] = getattr(obj, key)
def as_view(path): def decorator(func): # .. path_name, klass_name = (path.split(':')) # ... @inlineCallbacks def wrapper(router, request, *args, **kwargs): # ... module = importlib.import_module(path_name) Klass = getattr(module,klass_name) klass = Klass(router, request,*args, **kwargs) # .. result = yield defer.maybeDeferred(klass) defer.returnValue(result) # .. # _conspect_name(wrapper, klass_name) _conspect_name(wrapper, func.__name__) _conspect_param(wrapper, func) _conspect_param_defaults(wrapper, func) return wrapper return decorator
def set_action(self, action, *args): """Set action and arguments to run in triggered time :param action: function path as anction :type action: str :param args: function arguments :type args: list or tuple """ if isinstance(action, (types.FunctionType, types.MethodType)): self.action = action else: action_module = '.'.join(action.split('.')[:-1]) action_module = importlib.import_module(action_module) action = action.split('.')[-1] self.action = getattr(action_module, action) self.action_args = args
def __init__(self, states: States, request: dict = None, with_graph: bool = False): self.states = states self.state = states.attributes.state state_names, transitions = type(states).get_states_transitions() machine_cls = \ importlib.import_module('transitions.extensions').GraphMachine if with_graph else \ importlib.import_module('transitions').Machine self.machine = machine_cls( model=self, states=state_names, initial=states.attributes.state, auto_transitions=False ) for transition in transitions: self.machine.add_transition(**transition)
def _load_project_pipelines(self): try: pipelines_module_name = settings.CML_PROJECT_PIPELINES except AttributeError: logger.info('Configure CML_PROJECT_PIPELINES in settings!') return try: pipelines_module = importlib.import_module(pipelines_module_name) except ImportError: return for item_class_name in PROCESSED_ITEMS: try: pipeline_class = getattr(pipelines_module, '{}Pipeline'.format(item_class_name)) except AttributeError: continue self._project_pipelines[item_class_name] = pipeline_class()
def _get_page_class(self, path, page_cls_name): # last module name does not contain '_' final_module = self.unify_page_path(path[-1], preserve_spaces=False) page_cls_path = ".".join(path[:-1] + (final_module,)) page_cls_path = self.unify_page_path(page_cls_path) # append 'page' as every page module ends with this keyword page_cls_path += "page" page_cls_name = page_cls_name or self._get_page_cls_name(final_module) module = None # return imported class for path in self.PAGES_IMPORT_PATH: try: module = importlib.import_module(path % page_cls_path) break except ImportError: pass if module is None: raise ImportError("Failed to import module: " + (path % page_cls_path)) return getattr(module, page_cls_name)
def _setup(module, extras): """Install common submodules""" Qt.__binding__ = module.__name__ for name in list(_common_members) + extras: try: submodule = importlib.import_module( module.__name__ + "." + name) except ImportError: continue setattr(Qt, "_" + name, submodule) if name not in extras: # Store reference to original binding, # but don't store speciality modules # such as uic or QtUiTools setattr(Qt, name, _new_module(name))
def get_module_version(cls): """ Return the string version of the imported module, without any prefix/suffix. This method handles the common case where a module (or one of its parents) defines a __version__ string. For other cases, subclasses should override this method and return the version string. """ if cls.module: module = cls.module while True: if isinstance(getattr(module, '__version__', None), str): return module.__version__ if hasattr(module, '__package__'): try: module = importlib.import_module(module.__package__) except ImportError: return None else: return None
def import_exploit(path): """ Import exploit module :param path: absolute path to exploit e.g. routersploit.modules.exploits.asus.pass_bypass :return: exploit module or error """ try: module = importlib.import_module(path) return getattr(module, 'Exploit') except (ImportError, AttributeError, KeyError) as err: raise RoutersploitException( "Error during loading '{}'\n\n" "Error: {}\n\n" "It should be valid path to the module. " "Use <tab> key multiple times for completion.".format(humanize_path(path), err) )
def plugin_run(self, name, method, content): root = relative(self.root) try: # Retrieve the _plugin.json file plugin = build.get_json( root(self.config["plugins"][name], "_plugin.json")) # Load and run the module if plugin["method"] == method: sys.path.append(root(self.config["plugins"][name])) module = importlib.import_module(plugin["init"], name) content = module.main(content, self) return content except (KeyError, OSError, TypeError, ImportError, AttributeError) as e: return content
def import_submodules(package, recursive=True): """ Import all submodules of a module, recursively, including subpackages. From http://stackoverflow.com/questions/3365740/how-to-import-all-submodules :param package: package (name or actual module) :type package: str | module :rtype: dict[str, types.ModuleType] """ import importlib import pkgutil if isinstance(package, str): package = importlib.import_module(package) results = {} for _loader, name, is_pkg in pkgutil.walk_packages(package.__path__): full_name = package.__name__ + '.' + name results[full_name] = importlib.import_module(full_name) if recursive and is_pkg: results.update(import_submodules(full_name))
def get_command(self, ctx, name): try: if sys.version_info[0] == 2: name = name.encode('ascii', 'replace') splitted = name.split('_') if len(splitted) <= 1: return module_name, command_name = splitted if not all([module_name, command_name]): return module = '{0}.{1}.commands.{2}'.format( self.base_module_name, module_name, command_name) mod = importlib.import_module(module) except ImportError: return return getattr(mod, 'cli', None)
def load_component(self, class_name): """ Loads and adds new component to editor. Returns component instance. """ if class_name in self.loaded_components: return self.loaded_components[class_name] mod = importlib.import_module("scc.gui.ae.%s" % (class_name,)) for x in mod.__all__: cls = getattr(mod, x) if isinstance(cls, (type, types.ClassType)) and issubclass(cls, AEComponent): if cls is not AEComponent: instance = cls(self.app, self) self.loaded_components[class_name] = instance self.components.append(instance) return instance
def check_plugin(plugins): need_restart = False for name in plugins: try: importlib.import_module(name) except ImportError, e: print "??????????'%s'??????? ..." % (name, ) ret = install(name) print "-" * 60 print "????%s? '%s'" % ("??" if ret else "??", name, ) if not ret: return False need_restart = True if need_restart: print "???????" return True
def do_execute_cmd(argv): valid_commands = get_commands() parser = argparse.ArgumentParser(description="") parser.add_argument("-v", "--verbose", help="I am verbose!", action="store_true") parser.add_argument("-t", "--tier", help="Tier to use (overrides drift_TIER from environment)") subparsers = parser.add_subparsers(help="sub-command help") for cmd in valid_commands: module = importlib.import_module("drift.management.commands." + cmd) subparser = subparsers.add_parser(cmd, help="Subcommands for {}".format(cmd)) if hasattr(module, "get_options"): module.get_options(subparser) subparser.set_defaults(func=module.run_command) args = parser.parse_args(argv) if args.tier: os.environ["drift_TIER"] = args.tier args.func(args)
def puzzle_longest(type='mnist', width=3, height=3): global output_dirs output_dirs = ["longest"] import importlib p = importlib.import_module('latplan.puzzles.puzzle_{}'.format(type)) p.setup() ics = [ # from Reinfield '93 # [8,0,6,5,4,7,2,3,1], # the second instance with the longest optimal solution 31 [3,5,6,8,4,7,2,1,0], # [8,7,6,0,4,1,2,5,3], # the first instance with the longest optimal solution 31 [1,8,6,7,4,3,2,5,0], # [8,5,6,7,2,3,4,1,0], # the first instance with the most solutions [8,7,4,5,6,1,2,3,0], # [8,5,4,7,6,3,2,1,0], # the second instance with the most solutions [8,7,6,5,2,1,4,3,0], # [8,6,7,2,5,4,3,0,1], # the "wrong"? hardest eight-puzzle from [7,8,3,6,5,4,1,2,0], # [6,4,7,8,5,0,3,2,1], # w01fe.com/blog/2009/01/the-hardest-eight-puzzle-instances-take-31-moves-to-solve/ [5,8,7,6,1,4,0,2,3], ] gcs = np.arange(width*height).reshape((1,width*height)) generate(p, ics, gcs, width, height)
def process_result(results, output_format, **kwargs): """Render the output into the proper format.""" module_name = 'monitorstack.common.formatters' method_name = 'write_{}'.format(output_format.replace('-', '_')) output_formatter = getattr( importlib.import_module(module_name), method_name ) # Force the output formatter into a list if not isinstance(results, list): # pragma: no cover results = [results] exit_code = 0 for result in results: output_formatter(result) if result['exit_code'] != 0: exit_code = result['exit_code'] else: sys.exit(exit_code)
def _train(self, matrix_store, class_path, parameters): """Fit a model to a training set. Works on any modeling class that is available in this package's environment and implements .fit Args: class_path (string) A full classpath to the model class parameters (dict) hyperparameters to give to the model constructor Returns: tuple of (fitted model, list of column names without label) """ module_name, class_name = class_path.rsplit(".", 1) module = importlib.import_module(module_name) cls = getattr(module, class_name) instance = cls(**parameters) y = matrix_store.labels() return instance.fit(matrix_store.matrix, y), matrix_store.matrix.columns
def run(self, grid_config): for classpath, parameter_config in grid_config.items(): try: module_name, class_name = classpath.rsplit(".", 1) module = importlib.import_module(module_name) cls = getattr(module, class_name) for parameters in ParameterGrid(parameter_config): try: cls(**parameters) except Exception as e: raise ValueError(dedent('''Section: grid_config - Unable to instantiate classifier {} with parameters {}, error thrown: {} '''.format(classpath, parameters, e))) except Exception as e: raise ValueError(dedent('''Section: grid_config - Unable to import classifier {}, error thrown: {} '''.format(classpath, e)))
def make_flask_app(config, username, password, url_prefix): """Return Flask app with default configuration and registered blueprint.""" app = Flask(__name__) # Start configuration with our built in defaults. app.config.from_object(default_settings) # Override with any settings in config file, if given. if config: app.config.from_object(importlib.import_module(config)) # Override from a configuration file in the env variable, if present. if 'RQ_SCHEDULER_DASHBOARD_SETTINGS' in os.environ: app.config.from_envvar('RQ_SCHEDULER_DASHBOARD_SETTINGS') # Optionally add basic auth to blueprint and register with app. if username: add_basic_auth(blueprint, username, password) app.register_blueprint(blueprint, url_prefix=url_prefix) return app
def _get_mod_ns(name, fullname, includeprivate): """Return the template context of module identified by `fullname` as a dict""" ns = { # template variables 'name': name, 'fullname': fullname, 'members': [], 'functions': [], 'classes': [], 'exceptions': [], 'subpackages': [], 'submodules': [], 'all_refs': [], 'members_imports': [], 'members_imports_refs': [], 'data': []} p = 0 if includeprivate: p = 1 mod = importlib.import_module(fullname) ns['members'] = _get_members(mod)[p] ns['functions'] = _get_members(mod, typ='function')[p] ns['classes'] = _get_members(mod, typ='class')[p] ns['exceptions'] = _get_members(mod, typ='exception')[p] ns['all_refs'] = _get_members(mod, include_imported=True, in__all__=True, as_refs=True)[p] ns['members_imports'] = _get_members(mod, include_imported=True)[p] ns['members_imports_refs'] = _get_members(mod, include_imported=True, as_refs=True)[p] ns['data'] = _get_members(mod, typ='data')[p] return ns
def _configure_publish(publish=None): """Determine and return the default publisher. Args: publish (str): The current default publisher (may be None). Returns: str: The new default publisher. """ # Set up publishing defaults. logger.info('Where do you want to publish code by default?') logger.info('The common valid options are "github" and "local".') publish = six.moves.input('Default publisher: ').lower() try: importlib.import_module('artman.tasks.publish.%s' % publish) return publish except ImportError: logger.error('Invalid publisher.') return _configure_publish()
def _get_publish_tasks(self, publish, **kwargs): """Dynamically import publisher tasks based on the selected publisher. This will raise ImportError if the publisher does not have a module in `pipeline.tasks.publish`. Args: publish (str): A string of a publisher in pipeline.tasks.publish.*. Returns: list: A list of Task subclasses defined by the publisher module. """ module = importlib.import_module( 'artman.tasks.publish.{}'.format(publish), ) return module.TASKS
def ensure_module(module_name: str): """ Makes sure that a module is importable. In case the module cannot be found, print an error and exit. Args: module_name: name of the module to look for """ try: importlib.import_module(module_name) except ModuleNotFoundError: click.secho( f'Module not found: {module_name}\n' f'Install it manually with: "pip install {module_name}"\n' f'Or install all dependencies with: "pip install -r requirements-dev.txt"', fg='red', err=True) exit(-1)