我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用imp.load_source()。
def get_processor(aid): """ Returns the processor module for a given achievement. Args: aid: the achievement id Returns: The processor module """ try: path = get_achievement(aid=aid, show_disabled=True)["processor"] base_path = api.config.get_settings()["achievements"]["processor_base_path"] return imp.load_source(path[:-3], join(base_path, path)) except FileNotFoundError: raise InternalException("Achievement processor is offline.")
def cdnImport(uri, name): import imp from resources.lib.modules import client path = os.path.join(dataPath, 'py' + name) path = path.decode('utf-8') deleteDir(os.path.join(path, ''), force=True) makeFile(dataPath) ; makeFile(path) r = client.request(uri) p = os.path.join(path, name + '.py') f = openFile(p, 'w') ; f.write(r) ; f.close() m = imp.load_source(name, p) deleteDir(os.path.join(path, ''), force=True) return m
def get_factory_from_template(maintype): path = os.path.join(BASE_DIR, 'templates', maintype, FACTORY_FILENAME) if (python_version_gte(3, 5)): # Python 3.5 code in this block import importlib.util spec = importlib.util.spec_from_file_location( "{}.factory".format(maintype), path) foo = importlib.util.module_from_spec(spec) spec.loader.exec_module(foo) return foo elif (python_version_gte(3, 0)): from importlib.machinery import SourceFileLoader foo = SourceFileLoader( "{}.factory".format(maintype), path).load_module() return foo else: # Python 2 code in this block import imp foo = imp.load_source("{}.factory".format(maintype), path) return foo
def load_plugin(self, file_name): # XAPI plugins run in a py24 environment and may be not compatible with # py34's syntax. In order to prevent unit test scanning the source file # under py34 environment, the plugins will be imported with this # function at run time. plugin_path = self._get_plugin_path() # add plugin path into search path. if plugin_path not in sys.path: sys.path.append(plugin_path) # be sure not to create c files next to the plugins sys.dont_write_bytecode = True name = file_name.split('.')[0] path = os.path.join(plugin_path, file_name) return imp.load_source(name, path)
def __checkUpload(self,msg): if msg.text()=="Yes": name = QtGui.QFileDialog.getOpenFileName(filter=".py (*.py)") file_name = name[0] if len(file_name) > 0: if hasattr(self,"i10"): self.algorithmArgTree.removeTopLevelItem(self.i10) delattr(self,"i10") delattr(self,"myFilter") self.myFilter = imp.load_source("filter_design",file_name) self.i10 = pg.TreeWidgetItem([""]) myFilterWgtt = QtGui.QPushButton() myFilterWgtt.setText("myFilter") myFilterWgtt.setFixedWidth(60) myFilterWgtt.setStyleSheet("background-color: rgb(190,190,190);border-radius: 6px;") # rpWgtt.clicked.connect(self.__setrp) self.i10.setWidget(1,myFilterWgtt) self.algorithmArgTree.addTopLevelItem(self.i10) elif msg.text()=="Cancel": pass # This class is used to draw multiple lines for the AnalogFilter class in a memory-efficient way
def test_exit_cleanly(self): # Make sure handler fun is called on clean interpreter exit. ret = pyrun(textwrap.dedent( """ import os, imp mod = imp.load_source("mod", r"{}") def foo(): with open(r"{}", "ab") as f: f.write(b"1") mod.register_exit_fun(foo) """.format(os.path.abspath(__file__), TESTFN) )) self.assertEqual(ret, 0) with open(TESTFN, "rb") as f: self.assertEqual(f.read(), b"1")
def test_exception(self): # Make sure handler fun is called on exception. ret = pyrun(textwrap.dedent( """ import os, imp, sys mod = imp.load_source("mod", r"{}") def foo(): with open(r"{}", "ab") as f: f.write(b"1") mod.register_exit_fun(foo) sys.stderr = os.devnull 1 / 0 """.format(os.path.abspath(__file__), TESTFN) )) self.assertEqual(ret, 1) with open(TESTFN, "rb") as f: self.assertEqual(f.read(), b"1")
def test_kinterrupt(self): # Simulates CTRL + C and make sure the exit function is called. ret = pyrun(textwrap.dedent( """ import os, imp, sys mod = imp.load_source("mod", r"{}") def foo(): with open(r"{}", "ab") as f: f.write(b"1") mod.register_exit_fun(foo) sys.stderr = os.devnull raise KeyboardInterrupt """.format(os.path.abspath(__file__), TESTFN) )) self.assertEqual(ret, 1) with open(TESTFN, "rb") as f: self.assertEqual(f.read(), b"1")
def test_called_once(self): # Make sure the registered fun is called once. ret = pyrun(textwrap.dedent( """ import os, imp mod = imp.load_source("mod", r"{}") def foo(): with open(r"{}", "ab") as f: f.write(b"1") mod.register_exit_fun(foo) """.format(os.path.abspath(__file__), TESTFN) )) self.assertEqual(ret, 0) with open(TESTFN, "rb") as f: self.assertEqual(f.read(), b"1")
def test_cascade(self): # Register 2 functions and make sure the last registered # function is executed first. ret = pyrun(textwrap.dedent( """ import functools, os, imp mod = imp.load_source("mod", r"{}") def foo(s): with open(r"{}", "ab") as f: f.write(s) mod.register_exit_fun(functools.partial(foo, b'1')) mod.register_exit_fun(functools.partial(foo, b'2')) """.format(os.path.abspath(__file__), TESTFN) )) self.assertEqual(ret, 0) with open(TESTFN, "rb") as f: self.assertEqual(f.read(), b"21")
def test_all_exit_sigs_with_sig(self): # Same as above but the process is terminated by a signal # instead of exiting cleanly. for sig in TEST_SIGNALS: ret = pyrun(textwrap.dedent( """ import functools, os, signal, imp mod = imp.load_source("mod", r"{modname}") def foo(s): with open(r"{testfn}", "ab") as f: f.write(s) signal.signal({sig}, functools.partial(foo, b'0')) mod.register_exit_fun(functools.partial(foo, b'1')) mod.register_exit_fun(functools.partial(foo, b'2')) os.kill(os.getpid(), {sig}) """.format(modname=os.path.abspath(__file__), testfn=TESTFN, sig=sig) )) self.assertEqual(ret, sig) with open(TESTFN, "rb") as f: self.assertEqual(f.read(), b"210") safe_remove(TESTFN)
def test_as_deco(self): ret = pyrun(textwrap.dedent( """ import imp mod = imp.load_source("mod", r"{}") @mod.register_exit_fun def foo(): with open(r"{}", "ab") as f: f.write(b"1") """.format(os.path.abspath(__file__), TESTFN) )) self.assertEqual(ret, 0) with open(TESTFN, "rb") as f: self.assertEqual(f.read(), b"1")
def load_node(node, file_path): path_var = "node.%s"%node main_module = imp.load_source(path_var, file_path) for c in main_module.__dict__.values(): try: if issubclass(c, Node) and c.__module__ is main_module.__name__: nodes[node] = c if c.char and isinstance(c.char, str): c.char = bytearray([ord(c.char[-1].encode()) | (0x80 * (len(c.char) == 2))]) elif isinstance(c.char, bytes): c.char = bytearray(c.char) c.ignore_dot = True elif c.char == "": c.char = b"" return c except TypeError: pass
def importModule(modulePath, name=""): if isDottedPath(modulePath) or not os.path.exists(modulePath): try: return __import__(modulePath, fromlist="dummy") except ImportError: logger.debug("failed to load module->%s" % modulePath, exc_info=True) try: if os.path.exists(modulePath): if not name: name = os.path.splitext(os.path.basename(modulePath))[0] if name in sys.modules: return sys.modules[name] if os.path.isdir(modulePath): modulePath = os.path.join(modulePath, "__init__.py") if not os.path.exists(modulePath): raise ValueError("Cannot find modulepath: {}".format(modulePath)) return imp.load_source(name, os.path.realpath(modulePath)) except ImportError: logger.error("Failed to load module {}".format(modulePath)) raise
def load_module(self, name, stuff): file, filename, info = stuff (suff, mode, type) = info try: if type == BUILTIN_MODULE: return self.hooks.init_builtin(name) if type == FROZEN_MODULE: return self.hooks.init_frozen(name) if type == C_EXTENSION: m = self.hooks.load_dynamic(name, filename, file) elif type == PY_SOURCE: m = self.hooks.load_source(name, filename, file) elif type == PY_COMPILED: m = self.hooks.load_compiled(name, filename, file) elif type == PKG_DIRECTORY: m = self.hooks.load_package(name, filename, file) else: raise ImportError, "Unrecognized module type (%r) for %s" % \ (type, name) finally: if file: file.close() m.__file__ = filename return m
def get_generator(pid): """ Gets a handle on a problem generator module. Args: pid: the problem pid Returns: The loaded module """ generator_path = get_generator_path(pid) if not path.isfile(generator_path): raise InternalException("Could not find {}.".format(generator_path)) return imp.load_source(generator_path[:-3], generator_path)
def get_logger(logger_name): app_conf = imp.load_source('app_conf', os.getenv('EAGLE_HOME', '..') + '/eagle_cfg.py') _logger = logging.getLogger(logger_name) file_formatter = Formatter( '%(levelname)s | %(asctime)s | %(name)s | %(message)s | %(pathname)s:%(lineno)d' ) time_rotating_handler = TimedRotatingFileHandler(\ '{0}/{1}.log'.format(app_conf.LOG_PATH, logger_name), when="midnight", encoding='utf-8') time_rotating_handler.suffix = "%Y-%m-%d" time_rotating_handler.setFormatter(file_formatter) stream_handler = StreamHandler(stream=sys.stdout) echo_formatter = Formatter('[%(levelname)s][%(name)s][in %(filename)s:%(lineno)d] %(message)s') stream_handler.setFormatter(echo_formatter) _logger.addHandler(time_rotating_handler) _logger.addHandler(stream_handler) _logger.setLevel(logging.DEBUG) return _logger
def __new__(cls, *args, **kwargs): if cls._defined is None: script_files = glob.glob('{}{}*.py'.format(SCRIPT_PATH, os.path.sep)) for i in script_files: try: s = imp.load_source('script', os.path.join(SCRIPT_PATH, i)) script_class = getattr(s, 'Script')() if cls.check(script_class): cls.scripts.append(script_class) print_success('Load script {} successfully.'.format(i)) except: print_warning('Load script {} failed, ignored'.format(i)) if os.getenv('DEBUG_SCRIPT'): traceback.print_exc() # self.scripts = filter(self._check_followed, self.scripts) # self.scripts = filter(self._check_bangumi, self.scripts) cls._defined = super(ScriptRunner, cls).__new__(cls, *args, **kwargs) return cls._defined
def get_version_info(): # Adding the git rev number needs to be done inside # write_version_py(), otherwise the import of pyfunt.version messes # up the build under Python 3. FULLVERSION = VERSION if os.path.exists('.git'): GIT_REVISION = git_version() elif os.path.exists('pyfunt/version.py'): # must be a source distribution, use existing version file # load it as a separate module to not load pyfunt/__init__.py import imp version = imp.load_source('pyfunt.version', 'pyfunt/version.py') GIT_REVISION = version.git_revision else: GIT_REVISION = "Unknown" if not ISRELEASED: FULLVERSION += '.dev0+' + GIT_REVISION[:7] return FULLVERSION, GIT_REVISION
def _runscript(self, filename): # The script has to run in __main__ namespace (clear it) import __main__ import imp filename = os.path.abspath(filename) __main__.__dict__.clear() __main__.__dict__.update({"__name__" : "__main__", "__file__" : filename, "__builtins__": __builtins__, "imp" : imp, # need for run }) # avoid stopping before we reach the main script self._wait_for_mainpyfile = 1 self.mainpyfile = self.canonic(filename) self._user_requested_quit = 0 if sys.version_info>(3,0): statement = 'imp.load_source("__main__", "%s")' % filename else: statement = 'execfile(%r)' % filename self.startup() self.run(statement)
def installXl(self, args = None): fd = open(self.config['template']) lvmDev = imp.load_source('xlConfig', '', fd).disk[0].split(',')[0][4:] fd.close() templateArgs = args.split(' ') if args is not None else [] xl = subprocess.Popen( ['/usr/share/xen/templates/xen-%s' % (self.template), '--hostname=%s' % (self.name), '--dev=%s' % (lvmDev)] + templateArgs, stdin = subprocess.PIPE, stdout = subprocess.PIPE, stderr = subprocess.PIPE ) stdOut, stdErr = xl.communicate() if xl.returncode == 0: if not os.path.exists('/var/lib/xen/%s' % (self.name)): os.mkdir('/var/lib/xen/%s' % (self.name), 0750) copyfile(self.config['template'], self.config['file']) self.state = self.getXlState() return True else: return False
def load(): """Load the active experiment.""" if os.getcwd() not in sys.path: sys.path.append(os.getcwd()) try: exp = imp.load_source('dallinger_experiment', "dallinger_experiment.py") classes = inspect.getmembers(exp, inspect.isclass) exps = [c for c in classes if (c[1].__bases__[0].__name__ in "Experiment")] this_experiment = exps[0][0] mod = __import__('dallinger_experiment', fromlist=[this_experiment]) return getattr(mod, this_experiment) except ImportError: logger.error('Could not import experiment.') raise
def register_extra_parameters(self): extra_parameters = None cwd = os.getcwd() sys.path.append(cwd) path_index = len(sys.path) - 1 try: from dallinger_experiment import extra_parameters except ImportError: try: exp = imp.load_source('dallinger_experiment', "dallinger_experiment.py") extra_parameters = getattr(exp, 'extra_parameters', None) except IOError: pass if extra_parameters is None: try: # We may be in the original source directory, try experiment.py exp = imp.load_source('dallinger_experiment', "experiment.py") extra_parameters = getattr(exp, 'extra_parameters', None) except IOError: pass if extra_parameters is not None and getattr(extra_parameters, 'loaded', None) is None: extra_parameters() extra_parameters.loaded = True # Remove path element we added sys.path.pop(path_index)
def _load_templates(self): """ Loads provider template from TEMPLATE_DIR :rtype: dict: :return: All templates available in TEMPLATE_DIR """ template_base = os.path.dirname(os.path.realpath(__file__)) + TEMPLATE_DIR modules = glob.glob(os.path.join(template_base, '*.py')) template_files = [os.path.basename(f)[:-3] for f in modules if not f.endswith('__init__.py')] templates = {} for template in template_files: infos = imp.load_source(template, os.path.join(template_base, template + '.py')) templates[infos.TEMPLATE.pop('email')] = infos.TEMPLATE return templates
def load_url_map(path, package, log=None): url_map = [] our_dir = path[0] for dirpath, dirnames, filenames in os.walk(our_dir): for fname in filenames: root, ext = os.path.splitext(fname) if ext != '.py' or root == '__init__': continue class_path = os.path.join(dirpath, fname) handle_class = imp.load_source(fname, class_path) _url_map = getattr(handle_class, 'url_map', {}) if _url_map: for _url, _handler in _url_map.items(): url_map.append((_url, getattr(handle_class, _handler))) log.info('url map:\n'+'\n'.join([ '%20s\t%s' % (_url_map[0], _url_map[1])\ for _url_map in url_map])) return url_map
def get_model_and_optimizer(result_dir, modelfn, opt, opt_kwargs, net_kwargs, gpu): model_fn = os.path.basename(modelfn) model_name = model_fn.split('.')[0] module = imp.load_source(model_name, modelfn) net = getattr(module, model_name) # Copy model definition and this train script to the result dir dst = '%s/%s' % (result_dir, model_fn) if not os.path.exists(dst): shutil.copy(modelfn, dst) dst = '%s/%s' % (result_dir, os.path.basename(__file__)) if not os.path.exists(dst): shutil.copy(__file__, dst) # Create model model = net(**net_kwargs) if gpu >= 0: model.to_gpu(gpu) # Create optimizer optimizer = optimizers.__dict__[opt](**opt_kwargs) optimizer.setup(model) return model, optimizer
def get_model_and_optimizer(result_dir, modelfn, opt, opt_kwargs, net_kwargs, gpu): model_fn = os.path.basename(modelfn) model_name = model_fn.split('.')[0] module = imp.load_source(model_name, modelfn) Net = getattr(module, model_name) dst = '%s/%s' % (result_dir, model_fn) if not os.path.exists(dst): shutil.copy(modelfn, dst) dst = '%s/%s' % (result_dir, os.path.basename(__file__)) if not os.path.exists(dst): shutil.copy(__file__, dst) # prepare model model = Net(**net_kwargs) if gpu >= 0: model.to_gpu() optimizer = optimizers.__dict__[opt](**opt_kwargs) optimizer.setup(model) return model, optimizer
def _load_module(self, dirpath, filename): mod_name = filename.split('.')[0] mod_dispname = '/'.join(re.split('/modules/', dirpath)[-1].split('/') + [mod_name]) mod_loadname = mod_dispname.replace('/', '_') mod_loadpath = os.path.join(dirpath, filename) mod_file = open(mod_loadpath) try: # import the module into memory mod = imp.load_source(mod_loadname, mod_loadpath, mod_file) __import__(mod_loadname) # add the module to the framework's loaded modules self._loaded_modules[mod_dispname] = sys.modules[mod_loadname].Module(mod_dispname) return True except ImportError as e: # notify the user of missing dependencies self.error('Module \'%s\' disabled. Dependency required: \'%s\'' % (mod_dispname, e.message[16:])) except: # notify the user of errors self.print_exception() self.error('Module \'%s\' disabled.' % (mod_dispname)) # remove the module from the framework's loaded modules self._loaded_modules.pop(mod_dispname, None) return False
def load_label_configuration(self, filename): # load in a label specification, provided by the user try: self.configuration = imp.load_source('label_config',filename) except IOError: self.logger.critical('failed to open label configuration file %s' % filename) raise except: self.logger.critical('error loading label configuration from %s' % filename) raise # perform some sanity checks on it # # make sure 'labels' is defined try: assert self.configuration.labels except AssertionError: logger.critical('loaded label configuration file %s, but it did not define "labels" !' % filename)
def test_bake_with_console_script_cli(cookies): context = {'command_line_interface': 'click'} result = cookies.bake(extra_context=context) project_path, project_slug, project_dir = project_info(result) module_path = os.path.join(project_dir, 'cli.py') module_name = '.'.join([project_slug, 'cli']) if sys.version_info >= (3, 5): spec = importlib.util.spec_from_file_location(module_name, module_path) cli = importlib.util.module_from_spec(spec) spec.loader.exec_module(cli) elif sys.version_info >= (3, 3): file_loader = importlib.machinery.SourceFileLoader cli = file_loader(module_name, module_path).load_module() else: cli = imp.load_source(module_name, module_path) runner = CliRunner() noarg_result = runner.invoke(cli.main) assert noarg_result.exit_code == 0 noarg_output = ' '.join(['Replace this message by putting your code into', project_slug]) assert noarg_output in noarg_result.output help_result = runner.invoke(cli.main, ['--help']) assert help_result.exit_code == 0 assert 'Show this message' in help_result.output
def write(self, revision, sourcefile): """ Finalize the __version__ = major.minor.micro.{revision} tag. Overwrite C{sourcefile} in place by substituting the {revision} macro. @param revision: revision number to write to the source file. @type revision: int @param sourcefile: python source file with a __version__ tag, typically "csb/__init__.py" @type sourcefile: str @return: sourcefile.__version__ """ content = open(sourcefile).readlines() with open(sourcefile, 'w') as src: for line in content: if line.startswith('__version__'): src.write(line.format(revision=revision)) else: src.write(line) self._delcache(sourcefile) return imp.load_source('____source', sourcefile).__version__
def check_config(use_config=None): if use_config: with lcd(codedir): local('cp config.{}.py config.py'.format(use_config)) try: config = imp.load_source('config', os.path.join(codedir, 'config.py')) except IOError: error('config.py not found. Did you create it by copying config.example.py?') try: config_example = imp.load_source('config_example', os.path.join(codedir, 'config.example.py')) except IOError: error('config.example.py not found. Did you remove it?') if config.signing_key == config_example.signing_key: error('You need to change the signing key to your own unique text.') if config.s3_bucket == config_example.s3_bucket: error('You need to change the s3 bucket name to a bucket you control.') puts('Your config.py appears to be set up.') return config
def caffe_to_tensorflow_session(caffe_def_path, caffemodel_path, inputs, graph_name='Graph', conversion_out_dir_path=None, use_padding_same=False): """Create a TensorFlow Session from a Caffe model.""" try: # noinspection PyUnresolvedReferences from caffeflow import convert except ImportError: raise Exception("caffeflow package needs to be installed to freeze Caffe models. Check out the README file.") with (dummy_context_mgr(conversion_out_dir_path) or util.TemporaryDirectory()) as dir_path: params_values_output_path = os.path.join(dir_path, 'params_values.npy') network_output_path = os.path.join(dir_path, 'network.py') convert.convert(caffe_def_path, caffemodel_path, params_values_output_path, network_output_path, False, use_padding_same=use_padding_same) network_module = imp.load_source('module.name', network_output_path) network_class = getattr(network_module, graph_name) network = network_class(inputs) sess = tf.Session() network.load(params_values_output_path, sess) return sess
def load_schemas(self): schemas = {} for root, dirnames, filenames in os.walk(self.dir): for file in filenames: if file.endswith('.py'): path = os.path.join(root, file) module = imp.load_source(root, path) for each in dir(module): attr = getattr(module, each) if isinstance(attr, Schema): name = path[:-3].replace(self.dir, '').split('/')[1:] schema_name = '.'.join(name + [each]) schemas[schema_name] = attr self.schemas = schemas
def _runscript(self, filename): # The script has to run in __main__ namespace (clear it) import __main__ import imp __main__.__dict__.clear() __main__.__dict__.update({"__name__": "__main__", "__file__": filename, "__builtins__": __builtins__, "imp": imp, # need for run }) # avoid stopping before we reach the main script self._wait_for_mainpyfile = 1 self.mainpyfile = self.canonic(filename) self._user_requested_quit = 0 statement = 'imp.load_source("__main__", "%s")' % filename # notify and wait frontend to set initial params and breakpoints self.pipe.send({'method': 'startup', 'args': (__version__, )}) while self.pull_actions() is not None: pass self.run(statement) # General interaction function
def get_model( model_file, model_name, loss_file, loss_name, class_weight, n_encdec, n_classes, in_channel, n_mid, train_depth=None, result_dir=None): model = imp.load_source(model_name, model_file) model = getattr(model, model_name) loss = imp.load_source(loss_name, loss_file) loss = getattr(loss, loss_name) # Initialize model = model(n_encdec, n_classes, in_channel, n_mid) if train_depth: model = loss(model, class_weight, train_depth) # Copy files if result_dir is not None: base_fn = os.path.basename(model_file) dst = '{}/{}'.format(result_dir, base_fn) if not os.path.exists(dst): shutil.copy(model_file, dst) base_fn = os.path.basename(loss_file) dst = '{}/{}'.format(result_dir, base_fn) if not os.path.exists(dst): shutil.copy(loss_file, dst) return model
def IncrementalInstall(device, apk_helper, installer_script): """Performs an incremental install. Args: device: Device to install on. apk_helper: ApkHelper instance for the _incremental.apk. installer_script: Path to the installer script for the incremental apk. """ try: install_wrapper = imp.load_source('install_wrapper', installer_script) except IOError: raise Exception('Incremental install script not found: %s\n' % installer_script) params = install_wrapper.GetInstallParameters() from incremental_install import installer installer.Install(device, apk_helper, split_globs=params['splits'], native_libs=params['native_libs'], dex_files=params['dex_files'], permissions=None) # Auto-grant permissions from manifest.
def load_module(self, code_path): try: try: code_dir = os.path.dirname(code_path) code_file = os.path.basename(code_path) fin = open(code_path, 'rb') return imp.load_source(md5.new(code_path).hexdigest(), code_path, fin) finally: try: fin.close() except: pass except ImportError, x: traceback.print_exc(file = sys.stderr) raise except: traceback.print_exc(file = sys.stderr) raise
def cdnImport(uri, name): import imp from resources.lib.modules import client path = os.path.join(dataPath, 'py' + name) path = path.decode('utf-8') deleteDir(os.path.join(path, ''), force=True) makeFile(dataPath) makeFile(path) r = client.request(uri) p = os.path.join(path, name + '.py') f = openFile(p, 'w') f.write(r) f.close() m = imp.load_source(name, p) deleteDir(os.path.join(path, ''), force=True) return m
def loadFingerprint(self, type, dirpath, filename): finger_dict = {} mod_name = filename.split('.')[0] mod_dispname = '/'.join(re.split('/modules/' + type, dirpath)[-1].split('/') + [mod_name]) mod_loadname = mod_dispname.replace('/', '_') mod_loadpath = os.path.join(dirpath, filename) mod_file = open(mod_loadpath) try: # import the module into memory imp.load_source(mod_loadname, mod_loadpath, mod_file) # find the module and make an instace of it _module = __import__(mod_loadname) _class = getattr(_module, mod_name) _instance = _class(self.config, self.display, self.modulelock) finger_dict = {'fingerprint': _instance.getFingerprint(), mod_name: 'name'} except Exception as e: # notify the user of errors print e return None return finger_dict
def loadProtocol(self, type, dirpath, filename): protocol_dict = {} mod_name = filename.split('.')[0] mod_dispname = '/'.join(re.split('/modules/' + type, dirpath)[-1].split('/') + [mod_name]) mod_loadname = mod_dispname.replace('/', '_') mod_loadpath = os.path.join(dirpath, filename) mod_file = open(mod_loadpath) try: # import the module into memory imp.load_source(mod_loadname, mod_loadpath, mod_file) # find the module and make an instace of it _module = __import__(mod_loadname) _class = getattr(_module, mod_name) _instance = _class(self.config, self.display, self.modulelock) protocol_dict = {'protocol': _instance.getProtocol(), mod_name: 'name'} except Exception as e: # notify the user of errors print e return None return protocol_dict
def main(argv): import getopt, imp def usage(): print ('usage: %s [-h host] [-p port] [-n name] module.class' % argv[0]) return 100 try: (opts, args) = getopt.getopt(argv[1:], 'h:p:n:') except getopt.GetoptError: return usage() host = '' port = 8080 name = 'WebApp' for (k, v) in opts: if k == '-h': host = v elif k == '-p': port = int(v) elif k == '-n': name = v if not args: return usage() path = args.pop(0) module = imp.load_source('app', path) WebAppHandler.APP_CLASS = getattr(module, name) print ('Listening %s:%d...' % (host,port)) httpd = HTTPServer((host,port), WebAppHandler) httpd.serve_forever() return