我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用configobj.ConfigObj()。
def Set( self ): if self.package_installed( 'wget' ): print( '\r\n\r\n[WGET]' ) if self.verbose: print '%s Wget is installed, opening %s for writing ' % ( self.date(), self.wgetrc ) try: self.backup_config( self.wgetrc ) config = ConfigObj( self.wgetrc ) config[ 'http_proxy' ] = self.http config[ 'https_proxy' ] = self.http config[ 'ftp_proxy' ] = self.http config[ 'use_proxy' ] = 'on' config.write( open( self.wgetrc, 'w' ) ) if self.verbose: print( '%s Proxy configuration written successfully to %s ' % ( self.date(), self.wgetrc ) ) except ( IOError, ConfigObjError ), e: print( 'Unable to set wget proxy: Error reading wget config in \'%s\' - %s' % ( self.wgetrc, e ) ) os.exit( 1 ) else: print( '%s Wget not installed, skipping' % self.date() ) super( Wget, self ).Set()
def Unset( self ): if self.package_installed( 'wget' ): print( '\r\n\r\n[WGET]' ) if self.verbose: print( '%s Wget is installed, opening %s for writing ' % ( self.date(), self.wgetrc ) ) try: config = ConfigObj( self.wgetrc ) if config.has_key( 'http_proxy' ): del config[ 'http_proxy' ] if config.has_key( 'https_proxy' ): del config[ 'https_proxy' ] if config.has_key( 'ftp_proxy' ): del config[ 'ftp_proxy' ] config[ 'use_proxy' ] = 'off' config.write( open( self.wgetrc, 'w' ) ) if self.verbose: print( '%s Proxy configuration removed successfully from %s ' % ( self.date(), self.wgetrc ) ) except ( IOError, ConfigObjError ), e: print( 'Unable to unset wget proxy: Error reading wget config in \'%s\' - %s' % ( self.wgetrc, e ) ) os.exit( 1 ) else: print( '%s Wget not installed, skipping' % self.date() ) super( Wget, self ).Unset()
def Set( self ): desktops = self.desktops() if desktops and 'plasma' in desktops: print( '\n\n[KDE]' ) if self.verbose: print( '{0} Found Plasma DE installed'.format( self.date() ) ) config = os.path.join( os.path.expanduser( '~' + str( self.get_sudoer() ) ), '.kde/share/config', 'kioslaverc' ) if os.path.isfile( config ): config_file = ConfigObj( config ) section = config_file.get( 'Proxy Settings' ) section[ 'ftpProxy' ] = 'http://' + self.http section[ 'httpProxy' ] = 'http://' + self.http section[ 'httpsProxy' ] = 'http://' + self.http section[ 'socksProxy' ] = 'http://' + self.http section[ 'ProxyType' ] = 1 config_file.write( open( config, 'w' ) ) if self.verbose: print( '{0} Proxy successfully set'.format( self.date() ) ) else: if self.verbose: print( '{0} Proxy configuration file doesn\'t exist, skipping...'.format( self.date() ) ) else: if self.verbose: print( '{0} Plasma DE doesn\'t exist on this system. Skipping'.format( self.date() ) ) super( KDE, self ).Set()
def Unset( self ): desktops = self.desktops() if desktops and 'plasma' in desktops: print( '\n\n[KDE]') if self.verbose: print( '{0} Found Plasma DE installed'.format( self.date() ) ) config = os.path.join( os.path.expanduser( '~' + str( self.get_sudoer() ) ), '.kde/share/config', 'kioslaverc' ) if os.path.isfile( config ): config_file = ConfigObj( config ) section = config_file.get( 'Proxy Settings' ) section[ 'ProxyType' ] = 0 config_file.write( open( config, 'w' ) ) if self.verbose: print( '{0} Proxy successfully removed'.format( self.date() ) ) else: if self.verbose: print( '{0} Proxy configuration file doesn\'t exist, skipping...'.format( self.date() ) ) else: if self.verbose: print( '{0} Plasma DE doesn\'t exist on this system. Skipping'.format( self.date() ) ) super( KDE, self ).Unset()
def test_salt_api_upgrade(self): config_file = "/etc/default/openattic" self.fs.CreateFile(config_file, contents="SALT_API_HOST=mysalt.localhost\n" "SALT_API_PORT=8000\n" "SALT_API_EAUTH=auto\n" "SALT_API_USERNAME=myuser\n" "SALT_API_PASSWORD=mypassword\n") openattic.configure_salt_api("salt.localhost", 9000, "admin", "mysharedsecret") config = configobj.ConfigObj(config_file) self.assertEqual(config['SALT_API_HOST'], "salt.localhost") self.assertEqual(config['SALT_API_PORT'], "9000") self.assertEqual(config['SALT_API_USERNAME'], "admin") self.assertEqual(config['SALT_API_EAUTH'], "sharedsecret") self.assertEqual(config['SALT_API_SHARED_SECRET'], "mysharedsecret") self.fs.RemoveFile(config_file) self.assertTrue(os.path.isfile("{}.deepsea.bak".format(config_file))) config = configobj.ConfigObj("{}.deepsea.bak".format(config_file)) self.assertEqual(config['SALT_API_HOST'], "mysalt.localhost") self.assertEqual(config['SALT_API_PORT'], "8000") self.assertEqual(config['SALT_API_USERNAME'], "myuser") self.assertEqual(config['SALT_API_EAUTH'], "auto") self.fs.RemoveFile("{}.deepsea.bak".format(config_file))
def configure_salt_api(hostname, port, username, sharedsecret): """ Update the SALT API configuration """ config_file = _select_config_file_path() config = configobj.ConfigObj(config_file) config['SALT_API_HOST'] = hostname config['SALT_API_PORT'] = int(port) config['SALT_API_EAUTH'] = 'sharedsecret' config['SALT_API_USERNAME'] = username config['SALT_API_SHARED_SECRET'] = sharedsecret # Write backup file copyfile(config_file, "{}.deepsea.bak".format(config_file)) _write_config_file(config_file, config)
def __init__(self, cmdoptions, mainapp): self.cmdoptions=cmdoptions self.mainapp=mainapp self.panelsettings=ConfigObj(self.cmdoptions.configfile, configspec=self.cmdoptions.configval, list_values=False) self.root = Tk() self.root.config(height=20, width=20) self.root.geometry("+200+200") self.root.protocol("WM_DELETE_WINDOW", self.ClosePanel) #self.root.iconify() #if self.panelsettings['General']['Master Password'] != "x\x9c\x03\x00\x00\x00\x00\x01": # self.PasswordDialog() #print len(self.panelsettings['General']['Master Password']) #print zlib.decompress(self.panelsettings['General']['Master Password']) passcheck = self.PasswordDialog() # call the password authentication widget # if password match, then create the main panel if passcheck == 0: self.InitializeMainPanel() self.root.mainloop() elif passcheck == 1: self.ClosePanel()
def _set_up_config(self, domain, custom_archive): # TODO: maybe provide NamespaceConfig.make_dirs? # TODO: main() should create those dirs, c.f. #902 os.makedirs(os.path.join(self.config.live_dir, domain)) config_file = configobj.ConfigObj() if custom_archive is not None: os.makedirs(custom_archive) config_file["archive_dir"] = custom_archive else: os.makedirs(os.path.join(self.config.default_archive_dir, domain)) for kind in ALL_FOUR: config_file[kind] = os.path.join(self.config.live_dir, domain, kind + ".pem") config_file.filename = os.path.join(self.config.renewal_configs_dir, domain + ".conf") config_file.write() return config_file
def write(values, path): """Write the specified values to a config file. :param dict values: A map of values to write. :param str path: Where to write the values. """ config = configobj.ConfigObj() for key in values: config[key] = values[key] with open(path, "wb") as f: config.write(outfile=f) os.chmod(path, 0o600)
def _pre_setup(self): super(BaseCliTestCase, self)._pre_setup() # Start with default config self._config = ConfigObj({ 'username': 'user', 'access_token': 'access_token', }) self.config = MagicMock(spec_set=self._config) self.config.__getitem__.side_effect = self._config.__getitem__ self.config.__setitem__.side_effect = self._config.__setitem__ self.config.get.side_effect = self._config.get patch('dpm.config.ConfigObj', lambda *a: self.config).start() self.runner = CliRunner() self.isolated_fs = self.runner.isolated_filesystem() self.isolated_fs.__enter__()
def read_config(config_path=None): """ Read configuration from file, falling back to env or hardcoded defaults. """ # this test comes first before we default config_path to default location # because we only care if config_path does not exist if user supplied it # (not if using default location - it is ok for that not to exist) if config_path is not None and not os.path.exists(config_path): raise Exception('No config file found at: %s' % config_path) if config_path is None: config_path = configfile config = ConfigObj(config_path) return { 'server': os.environ.get('DPM_SERVER') \ or config.get('server') \ or DEFAULT_SERVER, 'username': os.environ.get('DPM_USERNAME') or config.get('username'), 'access_token': os.environ.get('DPM_ACCESS_TOKEN') or config.get('access_token') }
def getconfig(configfile="analysis.conf"): """ Reads config file and gets parameters. """ import configobj config = configobj.ConfigObj(configfile) # gets useful things folder = config['out'] cmapfile=folder+"/"+config['target']['name']+'_'+config['file']['tag']+'_CountMap.fits' modelmap=folder+"/"+config['target']['name']+'_'+config['file']['tag']+'_ModelMap.fits' residuals=folder+"/"+config['target']['name']+'_Residual_Model_cmap.fits' images = [residuals, modelmap, cmapfile] tsdir=folder+"/TSMap/" tsfits=folder+"/"+config['target']['name']+'_'+config['file']['tag']+'_TSMap.npz' # tsmap filename # create dictionary with useful values useful = {'folder': folder, 'cmapfile': cmapfile, 'modelmap':modelmap, 'residuals':residuals, 'images':images, 'tsdir':tsdir, 'tsfits':tsfits} return config, useful
def __init__(self): self.mainapp=_mainapp['mainapp'] self.panelsettings=ConfigObj(_cmdoptions['cmdoptions'].configfile, configspec=_cmdoptions['cmdoptions'].configval, list_values=False) self.root = Pmw.initialise(fontScheme='pmw2') self.root.withdraw() # call the password authentication widget # if password matches, then create the main panel password_correct = self.password_dialog() if password_correct: self.root.deiconify() self.initialize_main_panel() self.root.mainloop() else: self.close()
def __init__(self, userconfig=None): """ Load the bundled default configurations and specifications. If the ``userconfig`` provided, the user configurations is also loaded, validated, and merged. """ configspec = _get_configspec() self._configspec = ConfigObj(configspec, interpolation=False, list_values=False, _inspec=True, encoding="utf-8") configs_default = ConfigObj(interpolation=False, configspec=self._configspec, encoding="utf-8") # Keep a copy of the default configurations self._config_default = self._validate(configs_default) # NOTE: use ``copy.deepcopy``; see ``self.reset()`` for more details self._config = copy.deepcopy(self._config_default) if userconfig: self.read_userconfig(userconfig)
def merge(self, config): """ Simply merge the given configurations without validation. Parameters ---------- config : `~configobj.ConfigObj`, dict, str, or list[str] Supplied configurations to be merged. """ if not isinstance(config, ConfigObj): try: config = ConfigObj(config, interpolation=False, encoding="utf-8") except ConfigObjError as e: logger.exception(e) raise ConfigError(e) self._config.merge(config)
def read_config(self, config): """ Read, validate and merge the input config. Parameters ---------- config : str, or list[str] Input config to be validated and merged. This parameter can be the filename of the config file, or a list contains the lines of the configs. """ try: newconfig = ConfigObj(config, interpolation=False, configspec=self._configspec, encoding="utf-8") except ConfigObjError as e: raise ConfigError(e) newconfig = self._validate(newconfig) self.merge(newconfig) logger.info("Loaded additional config")
def load_config(filename): """ Load, validate and return the configuration file stored at *filename*. In case the configuration is invalid, report the error to the user and exit with return code 1. :param filename: config filename as a string :return: a dictionary """ with open(filename, 'r') as f: config = configobj.ConfigObj(f, configspec=CONFIG_SPEC.splitlines()) validator = validate.Validator() result = config.validate(validator, preserve_errors=True) if result != True: report_config_errors(config, result) sys.exit(1) return config
def __init__(self, builderConfPath, logger): """ Search for a function that solves the given action for an object that specifies a certain hypervisor, template or service. :param builderConfPath: Path to the builder config for this Environment. :type builderConfPath: str """ self.logger = logger self.builderConf = None if not os.path.isfile(builderConfPath): self.logger.warning("No builder.conf found.") else: self.builderConf = ConfigObj(builderConfPath) self.functions = self.loadFunctions(BUILD_MODULE)
def read_config_file_with_defaults(cfg): """ Reads a config file with the default config file as fallback Params: cfg: string, path to cfg file Returns: ConfigObj of settings """ if not cfg: cfg = addSlash(sys.path[0]) + 'settings/config.cfg' defaultCfg = scriptPath() + '/settings/defaults.cfg' config = configobj.ConfigObj(defaultCfg) if cfg: try: userConfig = configobj.ConfigObj(cfg) config.merge(userConfig) except configobj.ParseError: logMessage("ERROR: Could not parse user config file %s" % cfg) except IOError: logMessage("Could not open user config file %s. Using only default config file" % cfg) return config
def test_zhu2015(): conf_file = os.path.join(upone, 'defaultconfigfiles', 'models', 'zhu_2015.ini') conf = ConfigObj(conf_file) data_path = os.path.join(datadir, 'loma_prieta', 'model_inputs') conf = correct_config_filepaths(data_path, conf) shakefile = os.path.join(datadir, 'loma_prieta', 'grid.xml') lm = LM.LogisticModel(shakefile, conf, saveinputs=True) maplayers = lm.calculate() pgrid = maplayers['model']['grid'] test_data = pgrid.getData() # To change target data: # pgrd = GMTGrid(pgrid.getData(), pgrid.getGeoDict()) # pgrd.save(os.path.join(datadir, 'loma_prieta', 'targets', 'zhu2015.grd')) # Load target target_file = os.path.join(datadir, 'loma_prieta', 'targets', 'zhu2015.grd') target_grid = GMTGrid.load(target_file) target_data = target_grid.getData() # Assert np.testing.assert_allclose(target_data, test_data)
def test_zoom(): # boundaries == 'zoom' shakefile = os.path.join(datadir, 'loma_prieta', 'grid.xml') conf_file = os.path.join(upone, 'defaultconfigfiles', 'models', 'zhu_2015.ini') conf = ConfigObj(conf_file) data_path = os.path.join(datadir, 'loma_prieta', 'model_inputs') conf = correct_config_filepaths(data_path, conf) lq = LM.LogisticModel(shakefile, conf, saveinputs=True) maplayers = lq.calculate() makemaps.modelMap(maplayers, boundaries='zoom', zthresh=0.3, savepdf=False, savepng=False) # bounaries dictionary bounds = {'xmin': -122.54, 'xmax': -120.36, 'ymin': 36.1, 'ymax': 37.0} makemaps.modelMap(maplayers, boundaries=bounds, savepdf=False, savepng=False)
def config_reader(config_file, path): """ Read config to dict :param config_file: :param path: :return: """ from configobj import ConfigObj config_file = path + os.sep + config_file + ".conf" if os.path.isfile(config_file) is False: raise IOError, ("Config file (%s) not find,exit!" % config_file) config = ConfigObj(config_file, encoding='UTF8') if isset(config["setup"]["enable"]) and config["setup"]["enable"] == "0": raise AttributeError, "config setup enable is off" return config
def get_worker_config(worker_name, config_path=None): """ Get worker config :param worker_name:str :param config_path:str :return:ConfigObj """ try: if config_path is None: config = config_reader(worker_name, os.path.dirname(this_file_dir()) + os.sep + "conf") else: config = config_reader(worker_name, config_path) except IOError, ex: print "[FATAL ERROR]", ex sys.exit(0) except AttributeError, ex: print "[NOTICE ERROR]", ex sys.exit(0) return config
def getDefinitionOBjFromDefinitionFile(definitionFile,create=True): # configObj objects are created even when the file doesn't exist if not os.path.isabs(definitionFile): raise ValueError("Absolute path required") try: obj=ConfigObj( infile=definitionFile, raise_errors=True, create_empty=create, file_error=not create, indent_type="\t", encoding="UTF-8", interpolation=False, unrepr=True, configspec=defFileSpec ) obj.newlines = "\r\n" val = Validator() obj.validate(val) except Exception as e: raise e return obj
def add_to_desktop(self, widget, desktopEntry): try: # Determine where the Desktop folder is (could be localized) import subprocess #sys.path.append('/usr/lib/ubuntu-mate/common') from configobj import ConfigObj config = ConfigObj(GLib.get_home_dir() + "/.config/user-dirs.dirs") desktopDir = GLib.get_home_dir() + "/Desktop" tmpdesktopDir = config['XDG_DESKTOP_DIR'] tmpdesktopDir = subprocess.getoutput("echo " + tmpdesktopDir) if os.path.exists(tmpdesktopDir): desktopDir = tmpdesktopDir # Copy the desktop file to the desktop os.system("cp \"%s\" \"%s/\"" % (desktopEntry.desktopFile, desktopDir)) os.system("chmod a+rx %s/*.desktop" % (desktopDir)) except Exception as detail: print (detail)
def set_up_pref_file(): """Update or create a config file""" import configobj try: os.makedirs(CONFIG_DIR) except FileExistsError: pass # get the ini file with default settings default_config = configobj.ConfigObj(os.path.join(HERE, 'clid/config.ini')) try: # get user's config file if app is already installed user_config = configobj.ConfigObj(USER_CONFIG_FILE, file_error=True) except OSError: # expand `~/Music` if app is being installed for the first time user_config = configobj.ConfigObj(USER_CONFIG_FILE) default_config['General']['music_dir'] = os.path.join(HOME, 'Music', '') default_config.merge(user_config) default_config.write(outfile=open(USER_CONFIG_FILE, 'wb'))
def test_valid_config(self): """Test an existing and valid configuration file""" from configobj import ConfigObj c = Config(base_dir + '/static/config_valid.ini') config = c.read() self.assertIs(ConfigObj, type(config)) self.assertTrue('main' in config) self.assertTrue('services' in config['main']) self.assertTrue('php' in config['main']['services']) self.assertFalse('apache' in config['main']['services']) self.assertTrue('project_name' in config['main']) self.assertEqual('test', config['main']['project_name']) self.assertTrue('php.version' in config['main']) self.assertEqual('7.0', config['main']['php.version'])
def __new__(cls, file_name='cfg.yaml'): if file_name not in cls.config_object_instance: config_file_path = file_name if not os.path.exists(file_name): config_file_path0 = os.path.join(sys.path[0], file_name) config_file_path1 = os.path.join(sys.path[1], file_name) if os.path.exists(config_file_path0): config_file_path = config_file_path0 elif os.path.exists(config_file_path1): config_file_path = config_file_path1 else: raise OSError('can not find config file !') if file_name.endswith('.yaml'): cls.config_object_instance[file_name] = yaml.load(open(config_file_path)) elif file_name.endswith(('.cfg', '.ini', '.conf')): cls.config_object_instance[file_name] = ConfigObj(config_file_path) elif file_name.endswith('.json'): cls.config_object_instance[file_name] = json.load(open(config_file_path)) else: raise ValueError('Unsupported configuration file type') return cls.config_object_instance[file_name]
def get_conf(base_dir): ''' Get the actual configuration, built by merging the repository conf into the global user conf. ''' user_conf = get_user_conf() repos_conf = get_repository_conf(base_dir) configspec_fname = op.join(THIS_DIR, 'confspec.ini') merged_conf = configobj.ConfigObj(configspec=configspec_fname) merged_conf.merge(user_conf) merged_conf.merge(repos_conf) validator = validate.Validator() validation_results = merged_conf.validate(validator, preserve_errors=True) if validation_results is not True: raise exceptions.ConfError(merged_conf, validation_results) return merged_conf
def load_config_file(self, path=sys_path+'/configs/config.ini', config_version='cnn_default'): config = ConfigObj(path) local_dict = OrderedDict() # load config information from file and store in configuarion_dict local_dict['filter_dims'] = list(zip(list(map(int, config[config_version]['filter_dims_x'])), list(map(int,config[config_version]['filter_dims_y'])))), local_dict['hidden_channels'] = list(map(int, config[config_version]['hidden_channels'])), local_dict['pooling_type'] = config[config_version]['pooling_type'], local_dict['strides'] = config[config_version]['strides'], local_dict['activation_function'] = config[config_version]['activation_function'], local_dict['dense_depths'] = list(map(int, config[config_version]['dense_depths'])), local_dict['batch_size'] = config[config_version]['batch_size'], local_dict['max_iterations'] = config[config_version]['max_iterations'], local_dict['chk_iterations'] = config[config_version]['chk_iterations'], local_dict['dropout_k_p'] = config[config_version]['dropout_k_p'], local_dict['fine_tuning_only'] = config[config_version]['fine_tuning_only'], local_dict['step_size'] = config[config_version]['step_size'], for k, v in local_dict.items(): self.configuration_dict[k] = min(v) print('Succesfully loaded config file, values are:') for k, v in self.configuration_dict.items(): print(k, v)
def open_file(self): name = QFileDialog.getOpenFileName(self, 'Open File') if not name[0]: return config_file = ConfigObj(name[0]) self.checked_units.clear() for x in config_file["systems"]: self.checked_units.add(int(x)) for x in parameter_list: self.tab1.value[x[0]] = config_file["tab1"]["values"][x[0]] self.tab1.unit[x[0]] = config_file["tab1"]["units"][x[0]] for x in fluids: self.tab2.oil.value[x[0]] = config_file["tab2"]["oil"]["values"][x[0]] self.tab2.oil.unit[x[0]] = config_file["tab2"]["oil"]["units"][x[0]] self.tab2.water.value[x[0]] = config_file["tab2"]["water"]["values"][x[0]] self.tab2.water.unit[x[0]] = config_file["tab2"]["water"]["units"][x[0]] for x in config_file["tab3"]: self.tab3.value[x] = config_file["tab3"][x] for x in mesh: self.tab5.value[x[0]] = config_file["tab5"]["values"][x[0]] self.tab5.unit[x[0]] = config_file["tab5"]["units"][x[0]] self.update_parameters() self.update_wells(config_file["tab4"])
def load(self): self.target_nodes = self.config.get('target_nodes', '//token') self.input_attribute = self.config.get('input_attribute', 'text') ## osw: For now, store phone class info to voice, not corpus. #self.phone_class_file = os.path.join(self.voice_resources.get_path(c.LANG), self.config['phone_classes']) self.phone_class_file = os.path.join(self.get_location(), self.config['phone_classes']) self.trained = False if os.path.isfile(self.phone_class_file): self.trained = True self.phone_classes = ConfigObj(self.phone_class_file, encoding='utf8') #if self.trained == False: # self.phone_classes.clear() pass
def __init__(self, config=None, config_section=None, entry_points=None, config_filename=None, **initial_config): """Eager / lazy loading of the plugins In: - ``config`` -- ``ConfigObj`` configuration object - ``config_section`` -- if defined, overloads the ``CONFIG_SECTION`` class attribute - ``entry_points`` -- if defined, overloads the ``ENTRY_POINT`` class attribute - ``config_filename`` -- path of the configuration file - ``initial_config`` -- other configuration parameters not read from the configuration file """ # Load the plugins only if the ``config`` object exists if config is not None: if not isinstance(config, configobj.ConfigObj): raise ValueError("Not a `ConfigObj` instance. Don't you want to call the `from_file()` method instead?") self.load_plugins(config, config_section, entry_points, config_filename, **initial_config)
def read_config(self, plugins, config, config_section, config_filename=None, **initial_config): """Read and validate all the plugins configurations In: - ``plugins`` -- the plugins - ``config`` -- ``ConfigObj`` configuration object - ``config_section`` -- parent section of the plugins in the application configuration file - ``config_filename`` -- path of the configuration file - ``initial_config`` -- other configuration parameters not read from the configuration file Return: - the ``ConfigObj`` validated section of the plugins configurations """ if not config_section: return {} # Merge the configuration specifications of all the plugins spec = {plugin.get_entry_name(): self.get_plugin_spec(plugin) for plugin in plugins} spec = configobj.ConfigObj({config_section: spec}) plugins_conf = configobj.ConfigObj(config, configspec=spec, interpolation='Template') plugins_conf.merge(initial_config) validate(plugins_conf, config_filename) return plugins_conf[config_section]
def parse_config(config_file): if not config_file: return None try: configs = configobj.ConfigObj( config_file, file_error=True, raise_errors=True) except configobj.ConfigObjError as e: print("Error reading config file:") print(e) exit() except IOError as e: print("Error reading config file:") print(e) exit() validate_config(configs) return configs
def get_plugin_config(plugin_id, config_dir): """ Wrapper around :func:`get_config` to fetch a plugin's configurations. .. warning:: The same :func:`get_config` warning applies for this function. Stick with the conf singleton in order to retrieve plugin configurations. :param plugin_id: The plugin ID. :type plugin_id: string :param config_dir: The directory where the plugin configuration file is found. :type config_dir: string :return: The loaded configuration object. :rtype: `ConfigObj <https://configobj.readthedocs.io/en/latest/>`_ """ from eva import conf plugin_dir = conf['plugins'][plugin_id]['path'] spec_file = plugin_dir + '/' + plugin_id + '.conf.spec' config_file = config_dir + '/' + plugin_id + '.conf' return get_config(config_file, spec_file)
def test_configure_salt_api(self): config_file = "/etc/sysconfig/openattic" self.fs.CreateFile(config_file, contents="") openattic.configure_salt_api("salt.localhost", 9000, "admin", "mysharedsecret") self.assertTrue(os.path.isfile("{}.deepsea.bak".format(config_file))) config = configobj.ConfigObj(config_file) self.assertEqual(config['SALT_API_HOST'], "salt.localhost") self.assertEqual(config['SALT_API_PORT'], "9000") self.assertEqual(config['SALT_API_USERNAME'], "admin") self.assertEqual(config['SALT_API_EAUTH'], "sharedsecret") self.assertEqual(config['SALT_API_SHARED_SECRET'], "mysharedsecret") self.fs.RemoveFile(config_file) self.fs.RemoveFile("{}.deepsea.bak".format(config_file))
def test_configure_grafana(self): config_file = "/etc/sysconfig/openattic" self.fs.CreateFile(config_file, contents="") openattic.configure_grafana("grafana.localhost") config = configobj.ConfigObj(config_file) self.assertEqual(config['GRAFANA_API_HOST'], "grafana.localhost") self.fs.RemoveFile(config_file) self.fs.RemoveFile("{}.deepsea.bak".format(config_file))
def configure_grafana(hostname): """ Update the Grafana configuration """ config_file = _select_config_file_path() config = configobj.ConfigObj(config_file) config['GRAFANA_API_HOST'] = hostname # Write backup file copyfile(config_file, "{}.deepsea.bak".format(config_file)) _write_config_file(config_file, config)
def CreateConfigPanel(self, section): # reload the settings so that we are reading from the file, # rather than from the potentially modified but not yet written out configobj del(self.panelsettings) self.panelsettings=ConfigObj(self.cmdoptions.configfile, configspec=self.cmdoptions.configval, list_values=False) self.configpanel = ConfigPanel(self.root, title=section + " Settings", settings=self.panelsettings, section=section)
def __init__(self, config, regid=None, arf_dict={}, rmf_dict={}, grouping=None, quality=None): """ Arguments: * config: a section of the whole config file (`ConfigObj' object) """ self.regid = regid self.cross_in_specset = [] self.cross_in_arf = [] self.cross_out_arf = [] # this spectrum to be corrected self.spectrumset = SpectrumSet( filename=config["spec"], regid=regid, outfile=config["outfile"], arf=arf_dict[config["arf"]], rmf=rmf_dict.get(config.get("rmf"), None), bkg=config.get("bkg")) # spectra and cross arf from which photons were scattered in for k, v in config["cross_in"].items(): specset = SpectrumSet( filename=v["spec"], regid=k, arf=arf_dict[config["arf"]], rmf=rmf_dict.get(config.get("rmf"), None), bkg=v.get("bkg")) self.cross_in_specset.append(specset) self.cross_in_arf.append( arf_dict.get(v["cross_arf"], ARF(v["cross_arf"], regid=k))) # regions into which the photons of this spectrum were scattered into if "cross_out" in config.sections: cross_arf = config["cross_out"].as_list("cross_arf") for arffile in cross_arf: self.cross_out_arf.append(arf_dict.get(arffile, ARF(arffile))) # grouping and quality self.grouping = grouping self.quality = quality
def start(dynamo_region, dynamo_host, table_root, log_config, verbosity, environment, server_config): """ Starts an APScheduler job to periodically reload HAProxy config as well as run the API to register/deregister new services, target groups and backends. :param fqdn: The fully qualified domain name of Flyby - requests coming here will go to the API endpoints :param dynamo_region: The AWS region of the DynamoDB tables Flyby stores and reads config in :param dynamo_host: The hostname and port to use for DynamoDB connections. Useful for local testing with moto or DynamoDB Local. :param table_root: The root that will be used for table names in DynamboDB. This will be prefixed to all tables created. :param log_config: Location of python yaml config file. :param verbosity: Logging verbosity, defaults to INFO. :return: """ logging.getLogger().setLevel(level=getattr(logging, verbosity)) dynamo_manager = DynamoTableManagement() config = ConfigObj(infile=server_config, configspec='flyby/configspec.ini', stringify=True) res = config.validate(Validator(), preserve_errors=True) if res is not True: for section, key, msg in flatten_errors(config, res): click.echo("{}: {} in {}".format(key, msg, section)) raise click.ClickException('bad server config') # Create the DynamoDB tables if missing, update the DynamoDB read/write capacity if required dynamo_manager.update_capacity(dynamo_host, dynamo_region, table_root, logger, config['dynamodb']) if log_config: with open(log_config, 'r') as conf: logging.config.dictConfig(yaml.load(conf)) scheduler = BackgroundScheduler(timezone=utc) scheduler.add_job(update, 'interval', seconds=10, next_run_time=datetime.now(), args=(config['haproxy'],)) scheduler.start() if environment == "development": app.run(host='0.0.0.0') else: serve(app, listen='*:5000')
def __init__(self): self.maxWords = 140 self.nextSignal = r'....' self.spSignal = r'///' memPath = self._get_mem_path() self.mem = ConfigObj(memPath) self.memLS = self.mem['long_str']
def __init__(self): self.maxLen = 3 self.spSignal = '//' self.spSignal2 = '///\n' memPath = self._get_mem_path() self.mem = ConfigObj(memPath) self.memDia = self.mem['dialogue']
def __init__(self): self.cfg = ConfigObj('/etc/bread.cfg')
def __init__(self, config: ConfigObj or dict() = {}): self.secrets = config self.limiter = RateLimiter(max_calls=self.max_calls, period=self.max_period) self.runner = ApplicationRunner(self.ws_uri, self.ws_realm) # WAMP Streaming API
def setUp(self): from certbot import storage super(BaseRenewableCertTest, self).setUp() # TODO: maybe provide NamespaceConfig.make_dirs? # TODO: main() should create those dirs, c.f. #902 os.makedirs(os.path.join(self.config.config_dir, "live", "example.org")) archive_path = os.path.join(self.config.config_dir, "archive", "example.org") os.makedirs(archive_path) os.makedirs(os.path.join(self.config.config_dir, "renewal")) config_file = configobj.ConfigObj() for kind in ALL_FOUR: kind_path = os.path.join(self.config.config_dir, "live", "example.org", kind + ".pem") config_file[kind] = kind_path with open(os.path.join(self.config.config_dir, "live", "example.org", "README"), 'a'): pass config_file["archive"] = archive_path config_file.filename = os.path.join(self.config.config_dir, "renewal", "example.org.conf") config_file.write() self.config_file = config_file # We also create a file that isn't a renewal config in the same # location to test that logic that reads in all-and-only renewal # configs will ignore it and NOT attempt to parse it. junk = open(os.path.join(self.config.config_dir, "renewal", "IGNORE.THIS"), "w") junk.write("This file should be ignored!") junk.close() self.defaults = configobj.ConfigObj() with mock.patch("certbot.storage.RenewableCert._check_symlinks") as check: check.return_value = True self.test_rc = storage.RenewableCert(config_file.filename, self.config)
def test_renewal_incomplete_config(self): """Test that the RenewableCert constructor will complain if the renewal configuration file is missing a required file element.""" from certbot import storage config = configobj.ConfigObj() config["cert"] = "imaginary_cert.pem" # Here the required privkey is missing. config["chain"] = "imaginary_chain.pem" config["fullchain"] = "imaginary_fullchain.pem" config.filename = os.path.join(self.config.config_dir, "imaginary_config.conf") config.write() self.assertRaises(errors.CertStorageError, storage.RenewableCert, config.filename, self.config)
def config_with_defaults(config=None): """Merge supplied config, if provided, on top of builtin defaults.""" defaults_copy = configobj.ConfigObj(constants.RENEWER_DEFAULTS) defaults_copy.merge(config if config is not None else configobj.ConfigObj()) return defaults_copy
def _full_archive_path(config_obj, cli_config, lineagename): """Returns the full archive path for a lineagename Uses cli_config to determine archive path if not available from config_obj. :param configobj.ConfigObj config_obj: Renewal conf file contents (can be None) :param configuration.NamespaceConfig cli_config: Main config file :param str lineagename: Certificate name """ if config_obj and "archive_dir" in config_obj: return config_obj["archive_dir"] else: return os.path.join(cli_config.default_archive_dir, lineagename)