Python importlib 模块,import_module() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用importlib.import_module()

项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def plugins(fetch_handlers=None):
    if not fetch_handlers:
        fetch_handlers = FETCH_HANDLERS
    plugin_list = []
    for handler_name in fetch_handlers:
        package, classname = handler_name.rsplit('.', 1)
        try:
            handler_class = getattr(
                importlib.import_module(package),
                classname)
            plugin_list.append(handler_class())
        except NotImplementedError:
            # Skip missing plugins so that they can be ommitted from
            # installation if desired
            log("FetchHandler {} not found, skipping plugin".format(
                handler_name))
    return plugin_list
项目:redberry    作者:michaelcho    | 项目源码 | 文件源码
def run_migration(self, migration_file, method='upgrade'):

        if RedVersion.already_run(migration_file):
            self.logger.warn("Migration %s has already been run, skipping." % migration_file)
            return

        try:
            module = importlib.import_module('redberry.models.migrations.%s' % migration_file)

            if method == 'upgrade':
                self.logger.info("Running upgrade on %s" % migration_file)
                module.upgrade(self.db)
            else:
                self.logger.info("Running downgrade on %s" % migration_file)
                module.downgrade(self.db)

            RedVersion.store_migration(migration_file)

        except Exception, e:
            self.logger.error("Error running %s" % migration_file)
            self.logger.error(e)
项目:quizbot-2017    作者:pycontw    | 项目源码 | 文件源码
def load_migration_nodes():
    nodes = {None: MigrationRouteHead()}
    for path in _migrations_dir_path.iterdir():
        if not _migration_filename_pattern.match(path.name):
            continue
        module = importlib.import_module(f'quizzler.migrations.{path.stem}')
        nodes[path.stem] = MigrationNode(path.stem, module)

    # Fill linked-list info.
    for name, node in nodes.items():
        if name is None:
            continue
        prev_name = node.module.previous
        prev_node = nodes[prev_name]
        if prev_node.next is not None:
            raise MigrationRouteConflict(
                f'{name} and {prev_node.next.name} '
                f'both specify previous = {prev_name!r}',
            )
        prev_node.next = node
        node.prev = prev_node

    return nodes
项目:spyking-circus    作者:spyking-circus    | 项目源码 | 文件源码
def launch(task, filename, nb_cpu, nb_gpu, use_gpu, output=None, benchmark=None, extension='', sim_same_elec=None):

    from circus.shared.parser import CircusParser
    params = CircusParser(filename)

    if task not in ['filtering', 'benchmarking']:
        params.get_data_file()

    module = importlib.import_module('circus.' + task)

    if task == 'benchmarking':
        module.main(params, nb_cpu, nb_gpu, use_gpu, output, benchmark, sim_same_elec)
    elif task in ['converting', 'merging']:
        module.main(params, nb_cpu, nb_gpu, use_gpu, extension)
    else:
        module.main(params, nb_cpu, nb_gpu, use_gpu)
项目:PyPlanet    作者:PyPlanet    | 项目源码 | 文件源码
def create_from_settings(cls, instance, conf):
        try:
            engine_path, _, cls_name = conf['ENGINE'].rpartition('.')
            db_name = conf['NAME']
            db_options = conf['OPTIONS'] if 'OPTIONS' in conf and conf['OPTIONS'] else dict()

            # FIX for #331. Replace utf8 by utf8mb4 in the mysql driver encoding.
            if conf['ENGINE'] == 'peewee_async.MySQLDatabase' and 'charset' in db_options and db_options['charset'] == 'utf8':
                logging.info('Forcing to use \'utf8mb4\' instead of \'utf8\' for the MySQL charset option! (Fix #331).')
                db_options['charset'] = 'utf8mb4'

            # We will try to load it so we have the validation inside this class.
            engine = getattr(importlib.import_module(engine_path), cls_name)
        except ImportError:
            raise ImproperlyConfigured('Database engine doesn\'t exist!')
        except Exception as e:
            raise ImproperlyConfigured('Database configuration isn\'t complete or engine could\'t be found!')

        return cls(engine, instance, db_name, **db_options)
项目:PyPlanet    作者:PyPlanet    | 项目源码 | 文件源码
def run_migration(self, name, app_label, app_module, save_migration=True):
        """
        Run + apply migration to the actual database.

        :param name: Name of migration.
        :param app_label: App label.
        :param app_module: App module path.
        :param save_migration: Save migration state?
        """
        from .models.migration import Migration
        mod = importlib.import_module('{}.migrations.{}'.format(app_module, name))

        try:
            with self.db.allow_sync():
                mod.upgrade(self.migrator)

                if save_migration:
                    Migration.create(
                        app=app_label,
                        name=name,
                        applied=True
                    )
        except Exception as e:
            logger.warning('Can\'t migrate {}.migrations.{}: {}'.format(app_module, name, str(e)))
            raise
项目:PyPlanet    作者:PyPlanet    | 项目源码 | 文件源码
def init_app(self, app):
        """
        Initiate app, load all signal/callbacks files. (just import, they should register with decorators).

        :param app: App instance
        :type app: pyplanet.apps.AppConfig
        """
        self._current_namespace = app.label

        # Import the signals module.
        try:
            importlib.import_module('{}.signals'.format(app.name))
        except ImportError:
            pass
        self._current_namespace = None

        # Import the callbacks module.
        try:
            importlib.import_module('{}.callbacks'.format(app.name))
        except ImportError:
            pass
项目:PyPlanet    作者:PyPlanet    | 项目源码 | 文件源码
def load(self):
        # Make sure we load the defaults first.
        super().load()

        # Prepare the loading.
        self.module = os.environ.get('PYPLANET_SETTINGS_MODULE')

        if not self.module:
            raise ImproperlyConfigured(
                'Settings module is not defined! Please define PYPLANET_SETTINGS_MODULE in your '
                'environment or start script.'
            )

        # Add the module itself to the configuration.
        self.settings['SETTINGS_MODULE'] = self.module

        # Load the module, put the settings into the local context.
        module = importlib.import_module(self.module)

        for setting in dir(module):
            if setting.isupper():
                self.settings[setting] = getattr(module, setting)
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def plugins(fetch_handlers=None):
    if not fetch_handlers:
        fetch_handlers = FETCH_HANDLERS
    plugin_list = []
    for handler_name in fetch_handlers:
        package, classname = handler_name.rsplit('.', 1)
        try:
            handler_class = getattr(
                importlib.import_module(package),
                classname)
            plugin_list.append(handler_class())
        except NotImplementedError:
            # Skip missing plugins so that they can be ommitted from
            # installation if desired
            log("FetchHandler {} not found, skipping plugin".format(
                handler_name))
    return plugin_list
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def plugins(fetch_handlers=None):
    if not fetch_handlers:
        fetch_handlers = FETCH_HANDLERS
    plugin_list = []
    for handler_name in fetch_handlers:
        package, classname = handler_name.rsplit('.', 1)
        try:
            handler_class = getattr(
                importlib.import_module(package),
                classname)
            plugin_list.append(handler_class())
        except NotImplementedError:
            # Skip missing plugins so that they can be ommitted from
            # installation if desired
            log("FetchHandler {} not found, skipping plugin".format(
                handler_name))
    return plugin_list
项目:vspk-ansible    作者:nuagenetworks    | 项目源码 | 文件源码
def _verify_api(self):
        """
        Verifies the API and loads the proper VSPK version
        """
        # Checking auth parameters
        if ('api_password' not in list(self.auth.keys()) or not self.auth['api_password']) and ('api_certificate' not in list(self.auth.keys()) or
                                                                                                'api_key' not in list(self.auth.keys()) or
                                                                                                not self.auth['api_certificate'] or not self.auth['api_key']):
            self.module.fail_json(msg='Missing api_password or api_certificate and api_key parameter in auth')

        self.api_username = self.auth['api_username']
        if 'api_password' in list(self.auth.keys()) and self.auth['api_password']:
            self.api_password = self.auth['api_password']
        if 'api_certificate' in list(self.auth.keys()) and 'api_key' in list(self.auth.keys()) and self.auth['api_certificate'] and self.auth['api_key']:
            self.api_certificate = self.auth['api_certificate']
            self.api_key = self.auth['api_key']
        self.api_enterprise = self.auth['api_enterprise']
        self.api_url = self.auth['api_url']
        self.api_version = self.auth['api_version']

        try:
            global VSPK
            VSPK = importlib.import_module('vspk.{0:s}'.format(self.api_version))
        except ImportError:
            self.module.fail_json(msg='vspk is required for this module, or the API version specified does not exist.')
项目:serverless-wsgi    作者:logandk    | 项目源码 | 文件源码
def serve(cwd, app, port):
    sys.path.insert(0, cwd)

    wsgi_fqn = app.rsplit('.', 1)
    wsgi_fqn_parts = wsgi_fqn[0].rsplit('/', 1)
    if len(wsgi_fqn_parts) == 2:
        sys.path.insert(0, os.path.join(cwd, wsgi_fqn_parts[0]))
    wsgi_module = importlib.import_module(wsgi_fqn_parts[-1])
    wsgi_app = getattr(wsgi_module, wsgi_fqn[1])

    # Attempt to force Flask into debug mode
    try:
        wsgi_app.debug = True
    except:  # noqa: E722
        pass

    os.environ['IS_OFFLINE'] = 'True'

    serving.run_simple(
        'localhost', int(port), wsgi_app,
        use_debugger=True, use_reloader=True, use_evalex=True)
项目:saveScreenShot    作者:guncys-inc    | 项目源码 | 文件源码
def _setup(module, extras):
    """Install common submodules"""

    Qt.__binding__ = module.__name__

    for name in list(_common_members) + extras:
        try:
            # print("Trying %s" % name)
            submodule = importlib.import_module(
                module.__name__ + "." + name)
        except ImportError:
            # print("Failed %s" % name)
            continue

        setattr(Qt, "_" + name, submodule)

        if name not in extras:
            # Store reference to original binding,
            # but don't store speciality modules
            # such as uic or QtUiTools
            setattr(Qt, name, _new_module(name))
项目:aniping    作者:kuruoujou    | 项目源码 | 文件源码
def scan_for_plugins(self):
        """Plugin scanner.

        Scans for plugins in the known plugin directories. Adds them to the
        available plugins dictionary, ready to be loaded.

        Returns:
            The available plugins dictionary.
        """
        _logger.debug("Scanning for plugins.")
        for category,info in CATEGORIES.items():
            _logger.debug("Scanning category {0}".format(category))
            for module in os.listdir(os.path.join(os.path.dirname(__file__),info["directory"])):
                if module == "__init__.py" or module[-3:] != ".py":
                    continue
                _logger.debug("\tFound plugin {0}".format(module[:-3]))
                importlib.import_module("aniping.{0}.{1}".format(info["directory"], module[:-3]))
                self._available_plugins[category].append(module[:-3])
        _logger.debug("All available plugins found.")
        return self._available_plugins
项目:pyseeder    作者:PurpleI2P    | 项目源码 | 文件源码
def upload(filename, config):
    """Upload .su3 file with transports"""
    if "transports" in config and "enabled" in config["transports"]:
        for t in config["transports"]["enabled"].split():
            if t in config:
                tconf = config[t]
            else:
                tconf = None

            try:
                importlib.import_module("pyseeder.transports.{}".format(t)) \
                                                .run(filename, tconf)
            except ImportError:
                raise PyseederException(
                        "{} transport can't be loaded".format(t))
            except TransportException as e:
                log.error("Transport error: {}".format(e))
项目:airflow-admin-tools-plugin    作者:rssanders3    | 项目源码 | 文件源码
def email(self):
        try:
            to_email = request.args.get('to_email')

            path, attr = configuration.get('email', 'EMAIL_BACKEND').rsplit('.', 1)
            logging.info("path: " + str(path))
            logging.info("attr: " + str(attr))

            module = importlib.import_module(path)
            logging.info("module: " + str(module))

            backend = getattr(module, attr)
            backend(to_email, "Test Email", "Test Email", files=None, dryrun=False)
            flash('Email Sent')
        except Exception as e:
            flash('Failed to Send Email: ' + str(e), 'error')

        return redirect("/admin/admintools", code=302)
项目:Hanabi-AI    作者:MeGotsThis    | 项目源码 | 文件源码
def __init__(self,
                 username: str,
                 password: str,
                 botModule: str,
                 botconfig: Mapping,
                 numPlayers: int,
                 variant: Variant,
                 spectators: bool,
                 gameName: str,
                 *args,
                 **kwargs) -> None:
        super().__init__(*args, **kwargs)
        self.username: str = username
        self.password: str = password
        module = importlib.import_module(botModule + '.bot')
        self.botCls: Type[Bot] = module.Bot  # type: ignore
        self.botconfig: Mapping = botconfig
        self.numPlayers: int = numPlayers
        self.variant: Variant = variant
        self.spectators: bool = spectators
        self.gameName: str = gameName
        self.conn: socketIO_client.SocketIO
        self.tablePlayers: List[str] = []
        self.readyToStart: bool = False
        self.game: Optional[Game] = None
项目:socketshark    作者:closeio    | 项目源码 | 文件源码
def load_config(config_name):
    config = {}

    # Get config defaults
    for key in dir(config_defaults):
        if key.isupper():
            config[key] = getattr(config_defaults, key)

    # Merge given config with defaults
    obj = importlib.import_module(config_name)
    for key in dir(obj):
        if key in config:
            value = getattr(obj, key)
            if isinstance(config[key], dict):
                config[key].update(value)
            else:
                config[key] = value

    return config
项目:charm-heat    作者:openstack    | 项目源码 | 文件源码
def plugins(fetch_handlers=None):
    if not fetch_handlers:
        fetch_handlers = FETCH_HANDLERS
    plugin_list = []
    for handler_name in fetch_handlers:
        package, classname = handler_name.rsplit('.', 1)
        try:
            handler_class = getattr(
                importlib.import_module(package),
                classname)
            plugin_list.append(handler_class())
        except NotImplementedError:
            # Skip missing plugins so that they can be ommitted from
            # installation if desired
            log("FetchHandler {} not found, skipping plugin".format(
                handler_name))
    return plugin_list
项目:charm-keystone    作者:openstack    | 项目源码 | 文件源码
def plugins(fetch_handlers=None):
    if not fetch_handlers:
        fetch_handlers = FETCH_HANDLERS
    plugin_list = []
    for handler_name in fetch_handlers:
        package, classname = handler_name.rsplit('.', 1)
        try:
            handler_class = getattr(
                importlib.import_module(package),
                classname)
            plugin_list.append(handler_class())
        except NotImplementedError:
            # Skip missing plugins so that they can be ommitted from
            # installation if desired
            log("FetchHandler {} not found, skipping plugin".format(
                handler_name))
    return plugin_list
项目:charm-keystone    作者:openstack    | 项目源码 | 文件源码
def plugins(fetch_handlers=None):
    if not fetch_handlers:
        fetch_handlers = FETCH_HANDLERS
    plugin_list = []
    for handler_name in fetch_handlers:
        package, classname = handler_name.rsplit('.', 1)
        try:
            handler_class = getattr(
                importlib.import_module(package),
                classname)
            plugin_list.append(handler_class())
        except NotImplementedError:
            # Skip missing plugins so that they can be ommitted from
            # installation if desired
            log("FetchHandler {} not found, skipping plugin".format(
                handler_name))
    return plugin_list
项目:charm-keystone    作者:openstack    | 项目源码 | 文件源码
def plugins(fetch_handlers=None):
    if not fetch_handlers:
        fetch_handlers = FETCH_HANDLERS
    plugin_list = []
    for handler_name in fetch_handlers:
        package, classname = handler_name.rsplit('.', 1)
        try:
            handler_class = getattr(
                importlib.import_module(package),
                classname)
            plugin_list.append(handler_class())
        except NotImplementedError:
            # Skip missing plugins so that they can be ommitted from
            # installation if desired
            log("FetchHandler {} not found, skipping plugin".format(
                handler_name))
    return plugin_list
项目:charm-keystone    作者:openstack    | 项目源码 | 文件源码
def plugins(fetch_handlers=None):
    if not fetch_handlers:
        fetch_handlers = FETCH_HANDLERS
    plugin_list = []
    for handler_name in fetch_handlers:
        package, classname = handler_name.rsplit('.', 1)
        try:
            handler_class = getattr(
                importlib.import_module(package),
                classname)
            plugin_list.append(handler_class())
        except NotImplementedError:
            # Skip missing plugins so that they can be ommitted from
            # installation if desired
            log("FetchHandler {} not found, skipping plugin".format(
                handler_name))
    return plugin_list
项目:onefl-deduper    作者:ufbmi    | 项目源码 | 文件源码
def from_object(self, obj):
        """Updates the values from the given object.  An object can be of one
        of the following two types:
        -   a string: in this case the object with that name will be imported
        -   an actual object reference: that object is used directly
        :param obj: an import name or object
        """
        string_types = (str,)

        if isinstance(obj, string_types):
            # github.com/pallets/werkzeug/blob/master/werkzeug/utils.py+L399
            # obj = import_string(obj)
            obj = importlib.import_module(obj)

        for key in dir(obj):
            if key.isupper():
                self[key] = getattr(obj, key)
项目:TACTIC-Handler    作者:listyque    | 项目源码 | 文件源码
def _setup(module, extras):
    """Install common submodules"""

    Qt.__binding__ = module.__name__

    for name in list(_common_members) + extras:
        try:
            # print("Trying %s" % name)
            submodule = importlib.import_module(
                module.__name__ + "." + name)
        except ImportError:
            # print("Failed %s" % name)
            continue

        setattr(Qt, "_" + name, submodule)

        if name not in extras:
            # Store reference to original binding,
            # but don't store speciality modules
            # such as uic or QtUiTools
            setattr(Qt, name, _new_module(name))
项目:twisted-json-rpc    作者:elston    | 项目源码 | 文件源码
def as_view(path):
    def decorator(func):
        # ..
        path_name, klass_name  = (path.split(':'))
        # ...
        @inlineCallbacks
        def wrapper(router, request, *args, **kwargs):
            # ...
            module = importlib.import_module(path_name)
            Klass = getattr(module,klass_name)
            klass = Klass(router, request,*args, **kwargs)
            # ..
            result = yield defer.maybeDeferred(klass)            
            defer.returnValue(result)
        # ..
        # _conspect_name(wrapper, klass_name)
        _conspect_name(wrapper, func.__name__)        
        _conspect_param(wrapper, func)
        _conspect_param_defaults(wrapper, func)        
        return wrapper
    return decorator
项目:charm-nova-cloud-controller    作者:openstack    | 项目源码 | 文件源码
def plugins(fetch_handlers=None):
    if not fetch_handlers:
        fetch_handlers = FETCH_HANDLERS
    plugin_list = []
    for handler_name in fetch_handlers:
        package, classname = handler_name.rsplit('.', 1)
        try:
            handler_class = getattr(
                importlib.import_module(package),
                classname)
            plugin_list.append(handler_class())
        except NotImplementedError:
            # Skip missing plugins so that they can be ommitted from
            # installation if desired
            log("FetchHandler {} not found, skipping plugin".format(
                handler_name))
    return plugin_list
项目:errcron    作者:attakei    | 项目源码 | 文件源码
def set_action(self, action, *args):
        """Set action and arguments to run in triggered time

        :param action: function path as anction
        :type action: str
        :param args: function arguments
        :type args: list or tuple
        """
        if isinstance(action, (types.FunctionType, types.MethodType)):
            self.action = action
        else:
            action_module = '.'.join(action.split('.')[:-1])
            action_module = importlib.import_module(action_module)
            action = action.split('.')[-1]
            self.action = getattr(action_module, action)
        self.action_args = args
项目:alexafsm    作者:allenai    | 项目源码 | 文件源码
def __init__(self, states: States, request: dict = None, with_graph: bool = False):
        self.states = states
        self.state = states.attributes.state
        state_names, transitions = type(states).get_states_transitions()
        machine_cls = \
            importlib.import_module('transitions.extensions').GraphMachine if with_graph else \
            importlib.import_module('transitions').Machine
        self.machine = machine_cls(
            model=self,
            states=state_names,
            initial=states.attributes.state,
            auto_transitions=False
        )

        for transition in transitions:
            self.machine.add_transition(**transition)
项目:django-cml    作者:ArtemiusUA    | 项目源码 | 文件源码
def _load_project_pipelines(self):
        try:
            pipelines_module_name = settings.CML_PROJECT_PIPELINES
        except AttributeError:
            logger.info('Configure CML_PROJECT_PIPELINES in settings!')
            return
        try:
            pipelines_module = importlib.import_module(pipelines_module_name)
        except ImportError:
            return
        for item_class_name in PROCESSED_ITEMS:
            try:
                pipeline_class = getattr(pipelines_module, '{}Pipeline'.format(item_class_name))
            except AttributeError:
                continue
            self._project_pipelines[item_class_name] = pipeline_class()
项目:mos-horizon    作者:Mirantis    | 项目源码 | 文件源码
def _get_page_class(self, path, page_cls_name):

        # last module name does not contain '_'
        final_module = self.unify_page_path(path[-1],
                                            preserve_spaces=False)
        page_cls_path = ".".join(path[:-1] + (final_module,))
        page_cls_path = self.unify_page_path(page_cls_path)
        # append 'page' as every page module ends with this keyword
        page_cls_path += "page"

        page_cls_name = page_cls_name or self._get_page_cls_name(final_module)

        module = None
        # return imported class
        for path in self.PAGES_IMPORT_PATH:
            try:
                module = importlib.import_module(path %
                                                 page_cls_path)
                break
            except ImportError:
                pass
        if module is None:
            raise ImportError("Failed to import module: " +
                              (path % page_cls_path))
        return getattr(module, page_cls_name)
项目:zoocore    作者:dsparrow27    | 项目源码 | 文件源码
def _setup(module, extras):
    """Install common submodules"""

    Qt.__binding__ = module.__name__

    for name in list(_common_members) + extras:
        try:
            submodule = importlib.import_module(
                module.__name__ + "." + name)
        except ImportError:
            continue

        setattr(Qt, "_" + name, submodule)

        if name not in extras:
            # Store reference to original binding,
            # but don't store speciality modules
            # such as uic or QtUiTools
            setattr(Qt, name, _new_module(name))
项目:sublime-text-3-packages    作者:nickjj    | 项目源码 | 文件源码
def get_module_version(cls):
        """
        Return the string version of the imported module, without any prefix/suffix.

        This method handles the common case where a module (or one of its parents)
        defines a __version__ string. For other cases, subclasses should override
        this method and return the version string.

        """

        if cls.module:
            module = cls.module

            while True:
                if isinstance(getattr(module, '__version__', None), str):
                    return module.__version__

                if hasattr(module, '__package__'):
                    try:
                        module = importlib.import_module(module.__package__)
                    except ImportError:
                        return None
        else:
            return None
项目:routersploit    作者:reverse-shell    | 项目源码 | 文件源码
def import_exploit(path):
    """ Import exploit module

    :param path: absolute path to exploit e.g. routersploit.modules.exploits.asus.pass_bypass
    :return: exploit module or error
    """
    try:
        module = importlib.import_module(path)
        return getattr(module, 'Exploit')
    except (ImportError, AttributeError, KeyError) as err:
        raise RoutersploitException(
            "Error during loading '{}'\n\n"
            "Error: {}\n\n"
            "It should be valid path to the module. "
            "Use <tab> key multiple times for completion.".format(humanize_path(path), err)
        )
项目:ampersand    作者:natejms    | 项目源码 | 文件源码
def plugin_run(self, name, method, content):
        root = relative(self.root)

        try:
            # Retrieve the _plugin.json file
            plugin = build.get_json(
                root(self.config["plugins"][name], "_plugin.json"))

            # Load and run the module
            if plugin["method"] == method:
                sys.path.append(root(self.config["plugins"][name]))
                module = importlib.import_module(plugin["init"], name)
                content = module.main(content, self)

            return content

        except (KeyError, OSError, TypeError,
                ImportError, AttributeError) as e:
            return content
项目:smc-python    作者:gabstopper    | 项目源码 | 文件源码
def import_submodules(package, recursive=True):
    """
    Import all submodules of a module, recursively,
    including subpackages.

    From http://stackoverflow.com/questions/3365740/how-to-import-all-submodules

    :param package: package (name or actual module)
    :type package: str | module
    :rtype: dict[str, types.ModuleType]
    """
    import importlib
    import pkgutil
    if isinstance(package, str):
        package = importlib.import_module(package)
    results = {}
    for _loader, name, is_pkg in pkgutil.walk_packages(package.__path__):
        full_name = package.__name__ + '.' + name
        results[full_name] = importlib.import_module(full_name)
        if recursive and is_pkg:
            results.update(import_submodules(full_name))
项目:quokka_ng    作者:rochacbruno    | 项目源码 | 文件源码
def get_command(self, ctx, name):
        try:
            if sys.version_info[0] == 2:
                name = name.encode('ascii', 'replace')
            splitted = name.split('_')
            if len(splitted) <= 1:
                return
            module_name, command_name = splitted
            if not all([module_name, command_name]):
                return
            module = '{0}.{1}.commands.{2}'.format(
                self.base_module_name,
                module_name,
                command_name)
            mod = importlib.import_module(module)
        except ImportError:
            return
        return getattr(mod, 'cli', None)
项目:sc-controller    作者:kozec    | 项目源码 | 文件源码
def load_component(self, class_name):
        """
        Loads and adds new component to editor.
        Returns component instance.
        """
        if class_name in self.loaded_components:
            return self.loaded_components[class_name]
        mod = importlib.import_module("scc.gui.ae.%s" % (class_name,))
        for x in mod.__all__:
            cls = getattr(mod, x)
            if isinstance(cls, (type, types.ClassType)) and issubclass(cls, AEComponent):
                if cls is not AEComponent:
                    instance = cls(self.app, self)
                    self.loaded_components[class_name] = instance
                    self.components.append(instance)
                    return instance
项目:ExcelToCode    作者:youlanhai    | 项目源码 | 文件源码
def check_plugin(plugins):
    need_restart = False
    for name in plugins:
        try:
            importlib.import_module(name)
        except ImportError, e:
            print "??????????'%s'??????? ..." % (name, )

            ret = install(name)
            print "-" * 60
            print "????%s? '%s'" % ("??" if ret else "??", name, )

            if not ret: return False

            need_restart = True

    if need_restart: print "???????"
    return True
项目:purelove    作者:hucmosin    | 项目源码 | 文件源码
def import_exploit(path):
    """ Import exploit module

    :param path: absolute path to exploit e.g. routersploit.modules.exploits.asus.pass_bypass
    :return: exploit module or error
    """
    try:
        module = importlib.import_module(path)
        return getattr(module, 'Exploit')
    except (ImportError, AttributeError, KeyError) as err:
        raise RoutersploitException(
            "Error during loading '{}'\n\n"
            "Error: {}\n\n"
            "It should be valid path to the module. "
            "Use <tab> key multiple times for completion.".format(humanize_path(path), err)
        )
项目:drift    作者:dgnorth    | 项目源码 | 文件源码
def do_execute_cmd(argv):
    valid_commands = get_commands()
    parser = argparse.ArgumentParser(description="")
    parser.add_argument("-v", "--verbose", help="I am verbose!", action="store_true")
    parser.add_argument("-t", "--tier", help="Tier to use (overrides drift_TIER from environment)")
    subparsers = parser.add_subparsers(help="sub-command help")
    for cmd in valid_commands:
        module = importlib.import_module("drift.management.commands." + cmd)
        subparser = subparsers.add_parser(cmd, help="Subcommands for {}".format(cmd))
        if hasattr(module, "get_options"):
            module.get_options(subparser)
        subparser.set_defaults(func=module.run_command)

    args = parser.parse_args(argv)

    if args.tier:
        os.environ["drift_TIER"] = args.tier

    args.func(args)
项目:latplan    作者:guicho271828    | 项目源码 | 文件源码
def puzzle_longest(type='mnist', width=3, height=3):
    global output_dirs
    output_dirs = ["longest"]
    import importlib
    p = importlib.import_module('latplan.puzzles.puzzle_{}'.format(type))
    p.setup()
    ics = [
        # from Reinfield '93
        # [8,0,6,5,4,7,2,3,1], # the second instance with the longest optimal solution 31
        [3,5,6,8,4,7,2,1,0],
        # [8,7,6,0,4,1,2,5,3], # the first instance with the longest optimal solution 31
        [1,8,6,7,4,3,2,5,0],
        # [8,5,6,7,2,3,4,1,0], # the first instance with the most solutions
        [8,7,4,5,6,1,2,3,0],
        # [8,5,4,7,6,3,2,1,0], # the second instance with the most solutions
        [8,7,6,5,2,1,4,3,0],
        # [8,6,7,2,5,4,3,0,1], # the "wrong"? hardest eight-puzzle from
        [7,8,3,6,5,4,1,2,0],
        # [6,4,7,8,5,0,3,2,1], # w01fe.com/blog/2009/01/the-hardest-eight-puzzle-instances-take-31-moves-to-solve/
        [5,8,7,6,1,4,0,2,3],
    ]
    gcs = np.arange(width*height).reshape((1,width*height))
    generate(p, ics, gcs, width, height)
项目:monitorstack    作者:openstack    | 项目源码 | 文件源码
def process_result(results, output_format, **kwargs):
    """Render the output into the proper format."""
    module_name = 'monitorstack.common.formatters'
    method_name = 'write_{}'.format(output_format.replace('-', '_'))
    output_formatter = getattr(
        importlib.import_module(module_name),
        method_name
    )

    # Force the output formatter into a list
    if not isinstance(results, list):  # pragma: no cover
        results = [results]

    exit_code = 0
    for result in results:
        output_formatter(result)
        if result['exit_code'] != 0:
            exit_code = result['exit_code']
    else:
        sys.exit(exit_code)
项目:triage    作者:dssg    | 项目源码 | 文件源码
def _train(self, matrix_store, class_path, parameters):
        """Fit a model to a training set. Works on any modeling class that
        is available in this package's environment and implements .fit

        Args:
            class_path (string) A full classpath to the model class
            parameters (dict) hyperparameters to give to the model constructor

        Returns:
            tuple of (fitted model, list of column names without label)
        """
        module_name, class_name = class_path.rsplit(".", 1)
        module = importlib.import_module(module_name)
        cls = getattr(module, class_name)
        instance = cls(**parameters)
        y = matrix_store.labels()

        return instance.fit(matrix_store.matrix, y), matrix_store.matrix.columns
项目:triage    作者:dssg    | 项目源码 | 文件源码
def run(self, grid_config):
        for classpath, parameter_config in grid_config.items():
            try:
                module_name, class_name = classpath.rsplit(".", 1)
                module = importlib.import_module(module_name)
                cls = getattr(module, class_name)
                for parameters in ParameterGrid(parameter_config):
                    try:
                        cls(**parameters)
                    except Exception as e:
                        raise ValueError(dedent('''Section: grid_config -
                        Unable to instantiate classifier {} with parameters {}, error thrown: {}
                        '''.format(classpath, parameters, e)))
            except Exception as e:
                raise ValueError(dedent('''Section: grid_config -
                Unable to import classifier {}, error thrown: {}
                '''.format(classpath, e)))
项目:rq-scheduler-dashboard    作者:lamflam    | 项目源码 | 文件源码
def make_flask_app(config, username, password, url_prefix):
    """Return Flask app with default configuration and registered blueprint."""
    app = Flask(__name__)

    # Start configuration with our built in defaults.
    app.config.from_object(default_settings)

    # Override with any settings in config file, if given.
    if config:
        app.config.from_object(importlib.import_module(config))

    # Override from a configuration file in the env variable, if present.
    if 'RQ_SCHEDULER_DASHBOARD_SETTINGS' in os.environ:
        app.config.from_envvar('RQ_SCHEDULER_DASHBOARD_SETTINGS')

    # Optionally add basic auth to blueprint and register with app.
    if username:
        add_basic_auth(blueprint, username, password)
    app.register_blueprint(blueprint, url_prefix=url_prefix)

    return app
项目:better-apidoc    作者:goerz    | 项目源码 | 文件源码
def _get_mod_ns(name, fullname, includeprivate):
    """Return the template context of module identified by `fullname` as a
    dict"""
    ns = {  # template variables
        'name': name, 'fullname': fullname, 'members': [], 'functions': [],
        'classes': [], 'exceptions': [], 'subpackages': [], 'submodules': [],
        'all_refs': [], 'members_imports': [], 'members_imports_refs': [],
        'data': []}
    p = 0
    if includeprivate:
        p = 1
    mod = importlib.import_module(fullname)
    ns['members'] = _get_members(mod)[p]
    ns['functions'] = _get_members(mod, typ='function')[p]
    ns['classes'] = _get_members(mod, typ='class')[p]
    ns['exceptions'] = _get_members(mod, typ='exception')[p]
    ns['all_refs'] = _get_members(mod, include_imported=True, in__all__=True, as_refs=True)[p]
    ns['members_imports'] = _get_members(mod, include_imported=True)[p]
    ns['members_imports_refs'] = _get_members(mod, include_imported=True, as_refs=True)[p]
    ns['data'] = _get_members(mod, typ='data')[p]
    return ns
项目:artman    作者:googleapis    | 项目源码 | 文件源码
def _configure_publish(publish=None):
    """Determine and return the default publisher.

    Args:
        publish (str): The current default publisher (may be None).

    Returns:
        str: The new default publisher.
    """
    # Set up publishing defaults.
    logger.info('Where do you want to publish code by default?')
    logger.info('The common valid options are "github" and "local".')
    publish = six.moves.input('Default publisher: ').lower()
    try:
        importlib.import_module('artman.tasks.publish.%s' % publish)
        return publish
    except ImportError:
        logger.error('Invalid publisher.')
        return _configure_publish()
项目:artman    作者:googleapis    | 项目源码 | 文件源码
def _get_publish_tasks(self, publish, **kwargs):
        """Dynamically import publisher tasks based on the selected publisher.

        This will raise ImportError if the publisher does not have a module
        in `pipeline.tasks.publish`.

        Args:
            publish (str): A string of a publisher in pipeline.tasks.publish.*.

        Returns:
            list: A list of Task subclasses defined by the publisher module.
        """
        module = importlib.import_module(
            'artman.tasks.publish.{}'.format(publish),
        )
        return module.TASKS
项目:EMFT    作者:132nd-etcher    | 项目源码 | 文件源码
def ensure_module(module_name: str):
    """
    Makes sure that a module is importable.

    In case the module cannot be found, print an error and exit.

    Args:
        module_name: name of the module to look for
    """
    try:
        importlib.import_module(module_name)
    except ModuleNotFoundError:
        click.secho(
            f'Module not found: {module_name}\n'
            f'Install it manually with: "pip install {module_name}"\n'
            f'Or install all dependencies with: "pip install -r requirements-dev.txt"',
            fg='red', err=True)
        exit(-1)