private Feeder createFeeder(String riverType, String riverName, RiverSettings riverSettings) { JDBCFeeder feeder = null; try { Map<String, Object> spec = (Map<String, Object>) riverSettings.settings().get("jdbc"); Map<String, String> loadedSettings = new JsonSettingsLoader().load(jsonBuilder().map(spec).string()); Settings mySettings = settingsBuilder().put(loadedSettings).build(); String strategy = XContentMapValues.nodeStringValue(spec.get("strategy"), "simple"); RiverFlow riverFlow = RiverServiceLoader.findRiverFlow(strategy); logger.debug("found river flow class {} for strategy {}", riverFlow.getClass().getName(), strategy); feeder = riverFlow.getFeeder(); logger.debug("spec = {} settings = {}", spec, mySettings.getAsMap()); feeder.setName(riverName) .setType(riverType) .setSpec(spec).setSettings(mySettings); } catch (IOException e) { logger.error(e.getMessage(), e); } return feeder; }
@Override public Feeder<T, R, P> readFrom(Reader reader) { try { Map<String, Object> spec = XContentFactory.xContent(XContentType.JSON).createParser(reader).mapOrderedAndClose(); Map<String, String> loadedSettings = new JsonSettingsLoader().load(jsonBuilder().map(spec).string()); Settings settings = settingsBuilder().put(loadedSettings).build(); setSpec(spec); setSettings(settings); } catch (IOException e) { logger.error(e.getMessage(), e); } return this; }
protected Map<String,String> initializeElasticSearchIndexSettings(String injectedConfig) { String defaultConfig = injectedConfig; if (org.apache.commons.lang.StringUtils.isEmpty(injectedConfig)) { try { StringWriter writer = new StringWriter(); IOUtils.copy(getClass().getResourceAsStream(this.defaultIndexSettingsResource), writer, "UTF-8"); defaultConfig = writer.toString(); } catch (Exception ex) { getLog().error("Failed to load indexSettings config from ["+ this.defaultIndexSettingsResource + "] for index builder [" + getName() + "]", ex); } } JsonSettingsLoader loader = new JsonSettingsLoader(); Map<String, String> mergedConfig = null; try { mergedConfig = loader.load(defaultConfig); } catch (IOException e) { getLog().error("Problem loading indexSettings for index builder [" + getName() + "]", e); } // Set these here so we don't have to do this string concatenation // and comparison every time through the upcoming 'for' loop final boolean IS_DEFAULT = SearchIndexBuilder.DEFAULT_INDEX_BUILDER_NAME.equals(getName()); final String DEFAULT_INDEX = ElasticSearchConstants.CONFIG_PROPERTY_PREFIX + "index."; final String LOCAL_INDEX = String.format("%s%s%s.",ElasticSearchConstants.CONFIG_PROPERTY_PREFIX,"index_",getName(),"."); // load anything set into the ServerConfigurationService that starts with "elasticsearch.index." this will // override anything set in the indexSettings config for (ServerConfigurationService.ConfigItem configItem : serverConfigurationService.getConfigData().getItems()) { String propertyName = configItem.getName(); if (IS_DEFAULT && (propertyName.startsWith(DEFAULT_INDEX))) { propertyName = propertyName.replaceFirst(DEFAULT_INDEX, "index."); mergedConfig.put(propertyName, (String) configItem.getValue()); } else if (propertyName.startsWith(LOCAL_INDEX)) { propertyName = propertyName.replaceFirst(LOCAL_INDEX, "index."); mergedConfig.put(propertyName, (String) configItem.getValue()); } } if (getLog().isDebugEnabled()) { for (Map.Entry<String,String> entry : mergedConfig.entrySet()) { getLog().debug("Index property '" + entry.getKey() + "' set to: " + entry.getValue() + "' for index builder '" + getName() + "'"); } } return mergedConfig; }
public static Settings decodeSettings(String encodedSettings) throws IOException { Map<String, String> loaded = new JsonSettingsLoader().load(encodedSettings); return Settings.builder().put(loaded).build(); }