@Override public ConnectorHandleResolver getHandleResolver() { return new ConnectorHandleResolver() { public Class<? extends ConnectorTableHandle> getTableHandleClass() { return RestTableHandle.class; } public Class<? extends ColumnHandle> getColumnHandleClass() { return RestColumnHandle.class; } public Class<? extends ConnectorSplit> getSplitClass() { return RestConnectorSplit.class; } public Class<? extends ConnectorTableLayoutHandle> getTableLayoutHandleClass() { return RestConnectorTableLayoutHandle.class; } @Override public Class<? extends ConnectorTransactionHandle> getTransactionHandleClass() { return RestTransactionHandle.class; } @Override public Class<? extends ConnectorInsertTableHandle> getInsertTableHandleClass() { return RestInsertTableHandle.class; } }; }
public void addConnectorName(String name, ConnectorHandleResolver resolver) { requireNonNull(name, "name is null"); requireNonNull(resolver, "resolver is null"); ConnectorHandleResolver existingResolver = handleResolvers.putIfAbsent(name, resolver); checkState(existingResolver == null || existingResolver.equals(resolver), "Connector '%s' is already assigned to resolver: %s", name, existingResolver); }
private <T> String getId(T handle, Function<ConnectorHandleResolver, Class<? extends T>> getter) { for (Entry<String, ConnectorHandleResolver> entry : handleResolvers.entrySet()) { try { if (getter.apply(entry.getValue()).isInstance(handle)) { return entry.getKey(); } } catch (UnsupportedOperationException ignored) { } } throw new IllegalArgumentException("No connector for handle: " + handle); }
@Inject public KafkaConnector(ConnectorMetadata metadata, ConnectorSplitManager splitManager, ConnectorRecordSetProvider recordSetProvider, ConnectorHandleResolver handleResolver) { construct(metadata, splitManager, recordSetProvider, handleResolver); }
private void construct(ConnectorMetadata metadata, ConnectorSplitManager splitManager, ConnectorRecordSetProvider recordSetProvider, ConnectorHandleResolver handleResolver) { this.metadata = checkNotNull(metadata, "metadata is null"); this.splitManager = checkNotNull(splitManager, "splitManager is null"); this.recordSetProvider = checkNotNull(recordSetProvider, "recordSetProvider is null"); this.handleResolver = checkNotNull(handleResolver, "handleResolver is null"); }
@Override public Connector create(final String connectorId, Map<String, String> requiredConfig) { checkNotNull(requiredConfig, "requiredConfig is null"); checkNotNull(optionalConfig, "optionalConfig is null"); try { // // A plugin is not required to use Guice; it is just very convenient // Bootstrap app = new Bootstrap(new JsonModule(), new ExampleModule(connectorId)); // // Injector injector = app.strictConfig().doNotInitializeLogging() // .setRequiredConfigurationProperties(requiredConfig) // .setOptionalConfigurationProperties(optionalConfig).initialize(); ClassToInstanceMap<Object> services = ImmutableClassToInstanceMap.builder() .put(ConnectorMetadata.class, new CloudataConnectorMetadata(connectorId, store)) .put(ConnectorSplitManager.class, new CloudataSplitManager(nodeManager, connectorId)) .put(ConnectorRecordSetProvider.class, new CloudataConnectorRecordSetProvider()) .put(ConnectorHandleResolver.class, new CloudataConnectorHandleResolver()).build(); CloudataConnector connector = new CloudataConnector(store, services); connectors.put(connectorId, connector); return connector; } catch (Exception e) { throw Throwables.propagate(e); } }
@Override public ConnectorHandleResolver getHandleResolver() { return new HDFSHandleResolver(); }
@Override public ConnectorHandleResolver getHandleResolver() { return new EthereumHandleResolver(); }
public ConnectorHandleResolver getHandleResolver() { log.info("INFORMATION: AmpoolConnectorFactory getHandleResolver() called."); return new AmpoolHandleResolver(); }
@Override public ConnectorHandleResolver getHandleResolver() { return new KuduHandleResolver(); }
@Override public ConnectorHandleResolver getHandleResolver() { return new KafkaHandleResolver(); }
@Override public ConnectorHandleResolver getHandleResolver() { return new ExampleHandleResolver(); }
@Override public ConnectorHandleResolver getHandleResolver() { return new RaptorHandleResolver(); }
@Override public ConnectorHandleResolver getHandleResolver() { return new RedisHandleResolver(); }
@Override public ConnectorHandleResolver getHandleResolver() { return new BlackHoleHandleResolver(); }
@Override public ConnectorHandleResolver getHandleResolver() { return new TpchIndexHandleResolver(); }
@Override public ConnectorHandleResolver getHandleResolver() { return new JmxHandleResolver(); }
@Override public ConnectorHandleResolver getHandleResolver() { return new JdbcHandleResolver(); }
@Override public ConnectorHandleResolver getHandleResolver() { return new LegacyTransactionHandleResolver(connectorFactory.getHandleResolver()); }
public LegacyTransactionHandleResolver(ConnectorHandleResolver handleResolver) { this.handleResolver = requireNonNull(handleResolver, "handleResolver is null"); }
@Override public ConnectorHandleResolver getHandleResolver() { return new GlobalSystemHandleResolver(); }
public String getId(ConnectorTableHandle tableHandle) { return getId(tableHandle, ConnectorHandleResolver::getTableHandleClass); }
public String getId(ConnectorTableLayoutHandle handle) { return getId(handle, ConnectorHandleResolver::getTableLayoutHandleClass); }
public String getId(ColumnHandle columnHandle) { return getId(columnHandle, ConnectorHandleResolver::getColumnHandleClass); }
public String getId(ConnectorSplit split) { return getId(split, ConnectorHandleResolver::getSplitClass); }
public String getId(ConnectorIndexHandle indexHandle) { return getId(indexHandle, ConnectorHandleResolver::getIndexHandleClass); }
public String getId(ConnectorOutputTableHandle outputHandle) { return getId(outputHandle, ConnectorHandleResolver::getOutputTableHandleClass); }
public String getId(ConnectorInsertTableHandle insertHandle) { return getId(insertHandle, ConnectorHandleResolver::getInsertTableHandleClass); }
public String getId(ConnectorTransactionHandle transactionHandle) { return getId(transactionHandle, ConnectorHandleResolver::getTransactionHandleClass); }
public ConnectorHandleResolver resolverFor(String id) { ConnectorHandleResolver resolver = handleResolvers.get(id); checkArgument(resolver != null, "No handle resolver for connector: %s", id); return resolver; }
@Override public ConnectorHandleResolver getHandleResolver() { return new TpchHandleResolver(); }
@Override public ConnectorHandleResolver getHandleResolver() { return new CassandraHandleResolver(); }
@Override public ConnectorHandleResolver getHandleResolver() { return new HiveHandleResolver(); }
@Override public ConnectorHandleResolver getHandleResolver() { return _handleResolver; }
@Override public ConnectorHandleResolver getHandleResolver() { return new KinesisHandleResolver(connectorName); }
@Override public Connector create(String connectorId, Map<String, String> config) { checkNotNull(config, "config is null"); try { KafkaClientModule kafkaClientModule = new KafkaClientModule(connectorId); Bootstrap app = new Bootstrap( new NodeModule(), new JsonModule(), kafkaClientModule ); Injector injector = app.strictConfig().doNotInitializeLogging() .setRequiredConfigurationProperties(config) .quiet() .requireExplicitBindings(false) .setOptionalConfigurationProperties(optionalConfig).initialize(); KafkaClientConfig clientConfig = KafkaClientConfig.INSTANCE; KafkaPluginConfig pluginConfig = KafkaPluginConfig.INSTANCE; KafkaConnectorId kafkaConnectorId = KafkaConnectorId.INSTANCE; KafkaHiveClient hiveClient = new KafkaHiveClient(kafkaConnectorId, clientConfig, pluginConfig); KafkaMetadata kafkaMetadata = new KafkaMetadata(hiveClient, kafkaConnectorId); KafkaSplitManager kafkaSplitManager = new KafkaSplitManager(hiveClient, kafkaConnectorId, clientConfig); KafkaRecordSetProvider kafkaRecordSetProvider = new KafkaRecordSetProvider(kafkaConnectorId); KafkaHandleResolver kafkaHandleResolver = new KafkaHandleResolver(kafkaConnectorId); ConnectorMetadata connMetadata = new ClassLoaderSafeConnectorMetadata(kafkaMetadata, classLoader); ConnectorSplitManager connSplitManager = new ClassLoaderSafeConnectorSplitManager(kafkaSplitManager, classLoader); ConnectorRecordSetProvider connRecordSetProvider = new ClassLoaderSafeConnectorRecordSetProvider(kafkaRecordSetProvider, classLoader); ConnectorHandleResolver connHandleResolver = new ClassLoaderSafeConnectorHandleResolver(kafkaHandleResolver, classLoader); return new KafkaConnector(connMetadata, connSplitManager, connRecordSetProvider, connHandleResolver); // return injector.getInstance(KafkaConnector.class); // KafkaMetadata kafkaMetadata = injector.getInstance(KafkaMetadata.class); // KafkaSplitManager kafkaSplitManager = injector.getInstance(KafkaSplitManager.class); // KafkaRecordSetProvider kafkaRecordSetProvider = injector.getInstance(KafkaRecordSetProvider.class); // KafkaHandleResolver kafkaHandleResolver = injector.getInstance(KafkaHandleResolver.class); // return new KafkaConnector(kafkaMetadata, kafkaSplitManager, // kafkaRecordSetProvider, kafkaHandleResolver); // return new KafkaConnector( // new ClassLoaderSafeConnectorMetadata(kafkaMetadata, classLoader), // new ClassLoaderSafeConnectorSplitManager(kafkaSplitManager, classLoader), // new ClassLoaderSafeConnectorRecordSetProvider(kafkaRecordSetProvider, classLoader), // new ClassLoaderSafeConnectorHandleResolver(kafkaHandleResolver, classLoader)); } catch (Exception e) { e.printStackTrace(); throw Throwables.propagate(e); } }
@Override public ConnectorHandleResolver getHandleResolver() { return handleResolver; }
ConnectorHandleResolver getHandleResolver();