我们从Python开源项目中,提取了以下11个代码示例,用于说明如何使用configobj.ConfigObjError()。
def Set( self ): if self.package_installed( 'wget' ): print( '\r\n\r\n[WGET]' ) if self.verbose: print '%s Wget is installed, opening %s for writing ' % ( self.date(), self.wgetrc ) try: self.backup_config( self.wgetrc ) config = ConfigObj( self.wgetrc ) config[ 'http_proxy' ] = self.http config[ 'https_proxy' ] = self.http config[ 'ftp_proxy' ] = self.http config[ 'use_proxy' ] = 'on' config.write( open( self.wgetrc, 'w' ) ) if self.verbose: print( '%s Proxy configuration written successfully to %s ' % ( self.date(), self.wgetrc ) ) except ( IOError, ConfigObjError ), e: print( 'Unable to set wget proxy: Error reading wget config in \'%s\' - %s' % ( self.wgetrc, e ) ) os.exit( 1 ) else: print( '%s Wget not installed, skipping' % self.date() ) super( Wget, self ).Set()
def Unset( self ): if self.package_installed( 'wget' ): print( '\r\n\r\n[WGET]' ) if self.verbose: print( '%s Wget is installed, opening %s for writing ' % ( self.date(), self.wgetrc ) ) try: config = ConfigObj( self.wgetrc ) if config.has_key( 'http_proxy' ): del config[ 'http_proxy' ] if config.has_key( 'https_proxy' ): del config[ 'https_proxy' ] if config.has_key( 'ftp_proxy' ): del config[ 'ftp_proxy' ] config[ 'use_proxy' ] = 'off' config.write( open( self.wgetrc, 'w' ) ) if self.verbose: print( '%s Proxy configuration removed successfully from %s ' % ( self.date(), self.wgetrc ) ) except ( IOError, ConfigObjError ), e: print( 'Unable to unset wget proxy: Error reading wget config in \'%s\' - %s' % ( self.wgetrc, e ) ) os.exit( 1 ) else: print( '%s Wget not installed, skipping' % self.date() ) super( Wget, self ).Unset()
def merge(self, config): """ Simply merge the given configurations without validation. Parameters ---------- config : `~configobj.ConfigObj`, dict, str, or list[str] Supplied configurations to be merged. """ if not isinstance(config, ConfigObj): try: config = ConfigObj(config, interpolation=False, encoding="utf-8") except ConfigObjError as e: logger.exception(e) raise ConfigError(e) self._config.merge(config)
def read_config(self, config): """ Read, validate and merge the input config. Parameters ---------- config : str, or list[str] Input config to be validated and merged. This parameter can be the filename of the config file, or a list contains the lines of the configs. """ try: newconfig = ConfigObj(config, interpolation=False, configspec=self._configspec, encoding="utf-8") except ConfigObjError as e: raise ConfigError(e) newconfig = self._validate(newconfig) self.merge(newconfig) logger.info("Loaded additional config")
def parse_config(config_file): if not config_file: return None try: configs = configobj.ConfigObj( config_file, file_error=True, raise_errors=True) except configobj.ConfigObjError as e: print("Error reading config file:") print(e) exit() except IOError as e: print("Error reading config file:") print(e) exit() validate_config(configs) return configs
def __init__(self, filename, mapper=lambda x: x): """ :param str filename: A path to the configuration file. :param callable mapper: A transformation to apply to configuration key names :raises errors.PluginError: If the file does not exist or is not a valid format. """ validate_file_permissions(filename) try: self.confobj = configobj.ConfigObj(filename) except configobj.ConfigObjError as e: logger.debug("Error parsing credentials configuration: %s", e, exc_info=True) raise errors.PluginError("Error parsing credentials configuration: {0}".format(e)) self.mapper = mapper
def _get_config_object(self, alternative_config=None): """Create a L{ConfigObj} consistent with our preferences. @param config_source: Optional readable source to read from instead of the default configuration file. """ config_source = alternative_config or self.get_config_filename() # Setting list_values to False prevents ConfigObj from being "smart" # about lists (it now treats them as strings). See bug #1228301 for # more context. # Setting raise_errors to False causes ConfigObj to batch all parsing # errors into one ConfigObjError raised at the end of the parse instead # of raising the first one and then exiting. This also allows us to # recover the good config values in the error handler below. # Setting write_empty_values to True prevents configObj writes # from writing "" as an empty value, which get_plugins interprets as # '""' which search for a plugin named "". See bug #1241821. try: config_obj = ConfigObj(config_source, list_values=False, raise_errors=False, write_empty_values=True) except ConfigObjError as e: logger = getLogger() logger.warn(str(e)) # Good configuration values are recovered here config_obj = e.config return config_obj
def _validate(self, config): """ Validate the config against the specification using a default validator. The validated config values are returned if success, otherwise, the ``ConfigError`` raised with details. """ validator = Validator() try: # NOTE: # Use the "copy" mode, which will copy both the default values # and all the comments. results = config.validate(validator, preserve_errors=True, copy=True) except ConfigObjError as e: raise ConfigError(e) if results is not True: error_msg = "" for (section_list, key, res) in flatten_errors(config, results): if key is not None: if res is False: msg = 'key "%s" in section "%s" is missing.' msg = msg % (key, ", ".join(section_list)) else: msg = 'key "%s" in section "%s" failed validation: %s' msg = msg % (key, ", ".join(section_list), res) else: msg = 'section "%s" is missing' % ".".join(section_list) error_msg += msg + "\n" raise ConfigError(error_msg) return config
def __init__(self, config_filename, cli_config, update_symlinks=False): """Instantiate a RenewableCert object from an existing lineage. :param str config_filename: the path to the renewal config file that defines this lineage. :param .NamespaceConfig: parsed command line arguments :raises .CertStorageError: if the configuration file's name didn't end in ".conf", or the file is missing or broken. """ self.cli_config = cli_config self.lineagename = lineagename_for_filename(config_filename) # self.configuration should be used to read parameters that # may have been chosen based on default values from the # systemwide renewal configuration; self.configfile should be # used to make and save changes. try: self.configfile = configobj.ConfigObj(config_filename) except configobj.ConfigObjError: raise errors.CertStorageError( "error parsing {0}".format(config_filename)) # TODO: Do we actually use anything from defaults and do we want to # read further defaults from the systemwide renewal configuration # file at this stage? self.configuration = config_with_defaults(self.configfile) if not all(x in self.configuration for x in ALL_FOUR): raise errors.CertStorageError( "renewal config file {0} is missing a required " "file reference".format(self.configfile)) conf_version = self.configuration.get("version") if (conf_version is not None and util.get_strict_version(conf_version) > CURRENT_VERSION): logger.warning( "Attempting to parse the version %s renewal configuration " "file found at %s with version %s of Certbot. This might not " "work.", conf_version, config_filename, certbot.__version__) self.cert = self.configuration["cert"] self.privkey = self.configuration["privkey"] self.chain = self.configuration["chain"] self.fullchain = self.configuration["fullchain"] self.live_dir = os.path.dirname(self.cert) self._fix_symlinks() if update_symlinks: self._update_symlinks() self._check_symlinks()
def __init__(self, config_filename, cli_config): """Instantiate a RenewableCert object from an existing lineage. :param str config_filename: the path to the renewal config file that defines this lineage. :param .RenewerConfiguration: parsed command line arguments :raises .CertStorageError: if the configuration file's name didn't end in ".conf", or the file is missing or broken. """ self.cli_config = cli_config if not config_filename.endswith(".conf"): raise errors.CertStorageError( "renewal config file name must end in .conf") self.lineagename = os.path.basename( config_filename[:-len(".conf")]) # self.configuration should be used to read parameters that # may have been chosen based on default values from the # systemwide renewal configuration; self.configfile should be # used to make and save changes. try: self.configfile = configobj.ConfigObj(config_filename) except configobj.ConfigObjError: raise errors.CertStorageError( "error parsing {0}".format(config_filename)) # TODO: Do we actually use anything from defaults and do we want to # read further defaults from the systemwide renewal configuration # file at this stage? self.configuration = config_with_defaults(self.configfile) if not all(x in self.configuration for x in ALL_FOUR): raise errors.CertStorageError( "renewal config file {0} is missing a required " "file reference".format(self.configfile)) conf_version = self.configuration.get("version") if (conf_version is not None and util.get_strict_version(conf_version) > CURRENT_VERSION): logger.warning( "Attempting to parse the version %s renewal configuration " "file found at %s with version %s of Certbot. This might not " "work.", conf_version, config_filename, certbot.__version__) self.cert = self.configuration["cert"] self.privkey = self.configuration["privkey"] self.chain = self.configuration["chain"] self.fullchain = self.configuration["fullchain"] self.live_dir = os.path.dirname(self.cert) self._fix_symlinks() self._check_symlinks()
def run(self): """ This method is executed when the thread of this handler is started. It triggers the collector modules in their specified time intervals. """ try: if self.__stopEvent.isSet(): self.logger.critical("Environment is not set on running state!") return if self.initScanner() is None: self.logger.error("Error while initializing scanner.") return self.initExporters() self.logger.info("Environment running...") while not self.__stopEvent.isSet(): try: _, interval, name = self.queue.get(True, TIMEOUT) #Priority is only used by queue self.logger.debug("Starting collector module {}.".format(name)) if "config" not in self.config["modules"][name]: self.logger.error("No configuration given for collector {0}.".format(name)) else: configPath = self.config["modules"][name]["config"] configPath = configPath if os.path.isabs(configPath) else os.path.join(self.path, configPath) connectionInfo = ConfigObj(configPath) if connectionInfo == {}: self.logger.warning("Connection information for module {0} empty.".format(name)) connectionInfo["name"] = name #interval -1 means "No restart" if interval != -1: self.timers[name] = Timer(int(interval), self.executeScan, [name]) worker = Worker(partial(self.collectorModules[name], self.graph, connectionInfo ,self.logger), name, partial(self.finishedCallback, name, interval), self.logger) self.workers.append(worker) worker.start() except ConfigObjError: self.logger.error("Can not parse connectionInfo for module {0}: Path: {1}.".format(name, configPath)) except queue.Empty: #Just do nothing. This is a normal case self.logger.debug("No job to handle.") except KeyError as e: self.logger.error("Missing key '{0}' in configuration file for module {1}.".format(e.args[0], name)) except Exception as e: self.logger.debug("{0}: {1}".format(type(e), traceback.format_exc())) self.logger.error("Error while executing scan!") except Exception as e: self.logger.critical("Error in EnvironmentHandler: {}".format(str(e)))