@Override public RecordSet getRecordSet( ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorSplit split, List<? extends ColumnHandle> columns ) { EthereumSplit ethereumSplit = convertSplit(split); ImmutableList.Builder<EthereumColumnHandle> handleBuilder = ImmutableList.builder(); for (ColumnHandle handle : columns) { EthereumColumnHandle columnHandle = convertColumnHandle(handle); handleBuilder.add(columnHandle); } return new EthereumRecordSet(web3j, handleBuilder.build(), ethereumSplit); }
@Override public ConnectorPageSource createPageSource(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorSplit split, List<ColumnHandle> columns) { List<HDFSColumnHandle> hdfsColumns = columns.stream() .map(col -> (HDFSColumnHandle) col) .collect(Collectors.toList()); HDFSSplit hdfsSplit = checkType(split, HDFSSplit.class, "hdfs split"); Path path = new Path(hdfsSplit.getPath()); Optional<ConnectorPageSource> pageSource = createHDFSPageSource( path, hdfsSplit.getStart(), hdfsSplit.getLen(), hdfsColumns); if (pageSource.isPresent()) { return pageSource.get(); } throw new RuntimeException("Could not find a file reader for split " + hdfsSplit); }
@Override public void createTable(ConnectorSession session, ConnectorTableMetadata tableMetadata) { log.debug("Create table " + tableMetadata.getTable().getTableName()); String tblName = tableMetadata.getTable().getTableName(); String dbName = tableMetadata.getTable().getSchemaName(); List<ColumnMetadata> columns = tableMetadata.getColumns(); List<String> columnName = new LinkedList<>(); List<String> dataType = new LinkedList<>(); for (ColumnMetadata column : columns) { columnName.add(column.getName()); dataType.add(column.getType().getDisplayName()); } String userName = ""; String storageFormatName = ""; metaClient.createRegularTable(dbName, tblName, userName, storageFormatName, columnName, dataType); }
@Override public Map<SchemaTableName, List<ColumnMetadata>> listTableColumns(ConnectorSession session, SchemaTablePrefix prefix) { requireNonNull(prefix, "prefix is null"); ImmutableMap.Builder<SchemaTableName, List<ColumnMetadata>> columns = ImmutableMap.builder(); List<SchemaTableName> tableNames = prefix.getSchemaName() == null ? listTables(session, null) : ImmutableList.of(new SchemaTableName(prefix.getSchemaName(), prefix.getTableName())); for (SchemaTableName tableName : tableNames) { ConnectorTableMetadata tableMetadata = getTableMetadata(tableName); // table can disappear during listing operation if (tableMetadata != null) { columns.put(tableName, tableMetadata.getColumns()); } } return columns.build(); }
@Override public ConnectorSplitSource getSplits(ConnectorTransactionHandle handle, ConnectorSession session, ConnectorTableLayoutHandle layout) { log.info("INFORMATION: AmpoolSplitManager getSplits() called."); AmpoolTableLayoutHandle layoutHandle = (AmpoolTableLayoutHandle) layout; AmpoolTableHandle tableHandle = layoutHandle.getTable(); AmpoolTable table = new AmpoolTable(ampoolClient, tableHandle.getTableName()); // this can happen if table is removed during a query checkState(table.getColumnsMetadata() != null, "Table %s.%s no longer exists", tableHandle.getSchemaName(), tableHandle.getTableName()); List<ConnectorSplit> splits = new ArrayList<>(); // TODO Pass here bucket id splits.add(new AmpoolSplit(connectorId, tableHandle.getSchemaName(), tableHandle.getTableName(),"" ,HostAddress.fromParts("localhost",0))); Collections.shuffle(splits); return new FixedSplitSource(splits); }
@Override public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorTableLayoutHandle layout) { KuduTableLayoutHandle layoutHandle = checkType(layout, KuduTableLayoutHandle.class, "layout"); KuduTableHandle tableHandle = layoutHandle.getTable(); KuduClient kuduClient = kuduClientManager.getClient(); List<KuduScanToken> tokens = kuduClientManager.newScanTokenBuilder(kuduClient, tableHandle.getSchemaTableName().getTableName()).build(); TupleDomain<KuduColumnHandle> effectivePredicate = layoutHandle.getConstraint() .transform(handle -> checkType(handle, KuduColumnHandle.class, "columnHandle")); ImmutableList.Builder<ConnectorSplit> builder = ImmutableList.builder(); for (int i = 0; i < tokens.size(); i++) { // nodeManager.getWorkerNodes() List<HostAddress> hostAddresses = nodeManager.getWorkerNodes().stream() .map(node -> node.getHostAndPort()).collect(Collectors.toList()); ConnectorSplit split = new KuduSplit(hostAddresses, tableHandle.getSchemaTableName(), i, effectivePredicate); builder.add(split); } kuduClientManager.close(kuduClient); return new FixedSplitSource(builder.build()); }
@Override public Map<SchemaTableName, List<ColumnMetadata>> listTableColumns(ConnectorSession session, SchemaTablePrefix prefix) { requireNonNull(prefix, "prefix is null"); KuduClient kuduClient = kuduClientManager.getClient(); ImmutableMap.Builder<SchemaTableName, List<ColumnMetadata>> columns = ImmutableMap.builder(); for (SchemaTableName tableName : listTables(session, prefix)) { KuduTableHandle tableHandle = kuduTables.getTables(kuduClient).get(tableName); if (tableHandle != null) { columns.put(tableName, kuduTables.getColumns(kuduClient, tableHandle)); } } kuduClientManager.close(kuduClient); return columns.build(); }
@Override /** * @ */ public RecordSet getRecordSet(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorSplit split, List<? extends ColumnHandle> columns) { requireNonNull(split, "split is null"); KuduSplit kuduSplit = checkType(split, KuduSplit.class, "split"); ImmutableList.Builder<KuduColumnHandle> handles = ImmutableList.builder(); for (ColumnHandle handle : columns) { handles.add(checkType(handle, KuduColumnHandle.class, "handle")); } return new KuduRecordSet(kuduTable, kuduClientManager, kuduSplit, handles.build()); }
@Override public void addColumn(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnMetadata column) { if (!allowAddColumn) { throw new PrestoException(PERMISSION_DENIED, "Adding Columns is disabled in this Hive catalog"); } HiveTableHandle handle = checkType(tableHandle, HiveTableHandle.class, "tableHandle"); Optional<Table> tableMetadata = metastore.getTable(handle.getSchemaName(), handle.getTableName()); if (!tableMetadata.isPresent()) { throw new TableNotFoundException(handle.getSchemaTableName()); } Table table = tableMetadata.get(); StorageDescriptor sd = table.getSd(); ImmutableList.Builder<FieldSchema> columns = ImmutableList.builder(); columns.addAll(sd.getCols()); columns.add(new FieldSchema(column.getName(), toHiveType(column.getType()).getHiveTypeName(), column.getComment())); sd.setCols(columns.build()); table.setSd(sd); metastore.alterTable(handle.getSchemaName(), handle.getTableName(), table); }
private void generateProjectMethod(ClassDefinition classDefinition, CallSiteBinder callSiteBinder, CachedInstanceBinder cachedInstanceBinder, String methodName, RowExpression projection) { Parameter session = arg("session", ConnectorSession.class); Parameter cursor = arg("cursor", RecordCursor.class); Parameter output = arg("output", BlockBuilder.class); MethodDefinition method = classDefinition.declareMethod(a(PUBLIC), methodName, type(void.class), session, cursor, output); method.comment("Projection: %s", projection.toString()); Scope scope = method.getScope(); Variable wasNullVariable = scope.declareVariable(type(boolean.class), "wasNull"); BytecodeBlock body = method.getBody() .comment("boolean wasNull = false;") .putVariable(wasNullVariable, false); BytecodeExpressionVisitor visitor = new BytecodeExpressionVisitor(callSiteBinder, cachedInstanceBinder, fieldReferenceCompiler(cursor, wasNullVariable), metadata.getFunctionRegistry()); body.getVariable(output) .comment("evaluate projection: " + projection.toString()) .append(projection.accept(visitor, scope)) .append(generateWrite(callSiteBinder, scope, wasNullVariable, projection.getType())) .ret(); }
@Override public RecordCursor cursor(ConnectorTransactionHandle transactionHandle, ConnectorSession session, TupleDomain<Integer> constraint) { Builder table = InMemoryRecordSet.builder(QUERY_TABLE); for (QueryInfo queryInfo : queryManager.getAllQueryInfo()) { QueryStats queryStats = queryInfo.getQueryStats(); table.addRow( nodeId, queryInfo.getQueryId().toString(), queryInfo.getState().toString(), queryInfo.getSession().getUser(), queryInfo.getSession().getSource().orElse(null), queryInfo.getQuery(), toMillis(queryStats.getQueuedTime()), toMillis(queryStats.getAnalysisTime()), toMillis(queryStats.getDistributedPlanningTime()), toTimeStamp(queryStats.getCreateTime()), toTimeStamp(queryStats.getExecutionStartTime()), toTimeStamp(queryStats.getLastHeartbeat()), toTimeStamp(queryStats.getEndTime())); } return table.build().cursor(); }
@Override public Map<SchemaTableName, List<ColumnMetadata>> listTableColumns(ConnectorSession session, SchemaTablePrefix prefix) { requireNonNull(prefix, "prefix is null"); if (prefix.getSchemaName() != null && !prefix.getSchemaName().equals(SCHEMA_NAME)) { return ImmutableMap.of(); } ImmutableMap.Builder<SchemaTableName, List<ColumnMetadata>> columns = ImmutableMap.builder(); List<SchemaTableName> tableNames; if (prefix.getTableName() == null) { tableNames = listTables(session, prefix.getSchemaName()); } else { tableNames = ImmutableList.of(new SchemaTableName(prefix.getSchemaName(), prefix.getTableName())); } for (SchemaTableName tableName : tableNames) { JmxTableHandle tableHandle = getTableHandle(session, tableName); columns.put(tableName, tableHandle.getTableMetadata().getColumns()); } return columns.build(); }
@Override public Map<SchemaTableName, ConnectorViewDefinition> getViews(ConnectorSession session, SchemaTablePrefix prefix) { ImmutableMap.Builder<SchemaTableName, ConnectorViewDefinition> views = ImmutableMap.builder(); List<SchemaTableName> tableNames; if (prefix.getTableName() != null) { tableNames = ImmutableList.of(new SchemaTableName(prefix.getSchemaName(), prefix.getTableName())); } else { tableNames = listViews(session, prefix.getSchemaName()); } for (SchemaTableName schemaTableName : tableNames) { Optional<Table> table = metastore.getTable(schemaTableName.getSchemaName(), schemaTableName.getTableName()); if (table.isPresent() && HiveUtil.isPrestoView(table.get())) { views.put(schemaTableName, new ConnectorViewDefinition( schemaTableName, Optional.ofNullable(table.get().getOwner()), decodeViewData(table.get().getViewOriginalText()))); } } return views.build(); }
@Override public Object getObjectValue(ConnectorSession session, Block block, int position) { if (block.isNull(position)) { return null; } Block arrayBlock = getObject(block, position); List<Object> values = new ArrayList<>(arrayBlock.getPositionCount()); for (int i = 0; i < arrayBlock.getPositionCount(); i++) { values.add(fields.get(i).getType().getObjectValue(session, arrayBlock, i)); } return Collections.unmodifiableList(values); }
@Override public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorInsertTableHandle tableHandle) { RaptorInsertTableHandle handle = checkType(tableHandle, RaptorInsertTableHandle.class, "tableHandle"); return new RaptorPageSink( pageSorter, storageManager, shardInfoCodec, handle.getTransactionId(), toColumnIds(handle.getColumnHandles()), handle.getColumnTypes(), Optional.empty(), toColumnIds(handle.getSortColumnHandles()), handle.getSortOrders(), maxBufferSize); }
@Override public RedisTableHandle getTableHandle(ConnectorSession session, SchemaTableName schemaTableName) { RedisTableDescription table = getDefinedTables().get(schemaTableName); if (table == null) { return null; } // check if keys are supplied in a zset // via the table description doc String keyName = null; if (table.getKey() != null) { keyName = table.getKey().getName(); } return new RedisTableHandle( connectorId, schemaTableName.getSchemaName(), schemaTableName.getTableName(), getDataFormat(table.getKey()), getDataFormat(table.getValue()), keyName); }
@Override public void renameTable(ConnectorSession session, ConnectorTableHandle tableHandle, SchemaTableName newTableName) { BlackHoleTableHandle oldTableHandle = checkType(tableHandle, BlackHoleTableHandle.class, "tableHandle"); BlackHoleTableHandle newTableHandle = new BlackHoleTableHandle( oldTableHandle.getSchemaName(), newTableName.getTableName(), oldTableHandle.getColumnHandles(), oldTableHandle.getSplitCount(), oldTableHandle.getPagesPerSplit(), oldTableHandle.getRowsPerPage(), oldTableHandle.getFieldsLength() ); tables.remove(oldTableHandle.getTableName()); tables.put(newTableName.getTableName(), newTableHandle); }
private static Object mapKeyToObject(ConnectorSession session, String jsonKey, Type type) { BlockBuilder blockBuilder; if (type instanceof FixedWidthType) { blockBuilder = type.createBlockBuilder(new BlockBuilderStatus(), 1); } else { blockBuilder = type.createBlockBuilder(new BlockBuilderStatus(), 1, jsonKey.length()); } if (type.getJavaType() == boolean.class) { type.writeBoolean(blockBuilder, Boolean.parseBoolean(jsonKey)); } else if (type.getJavaType() == long.class) { type.writeLong(blockBuilder, Long.parseLong(jsonKey)); } else if (type.getJavaType() == double.class) { type.writeDouble(blockBuilder, Double.parseDouble(jsonKey)); } else if (type.getJavaType() == Slice.class) { type.writeSlice(blockBuilder, Slices.utf8Slice(jsonKey)); } return type.getObjectValue(session, blockBuilder.build(), 0); }
@Override public ConnectorPageSource createPageSource( ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorSplit split, List<ColumnHandle> columns) { BlackHoleSplit blackHoleSplit = checkType(split, BlackHoleSplit.class, "BlackHoleSplit"); ImmutableList.Builder<Type> builder = ImmutableList.builder(); for (ColumnHandle column : columns) { builder.add((checkType(column, BlackHoleColumnHandle.class, "BlackHoleColumnHandle")).getColumnType()); } List<Type> types = builder.build(); return new FixedPageSource(Iterables.limit( Iterables.cycle(generateZeroPage(types, blackHoleSplit.getRowsPerPage(), blackHoleSplit.getFieldsLength())), blackHoleSplit.getPagesCount())); }
@Override public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorTableLayoutHandle layout) { KinesisTableLayoutHandle kinesislayout = handleResolver.convertLayout(layout); KinesisTableHandle kinesisTableHandle = kinesislayout.getTable(); InternalStreamDescription desc = this.getStreamDescription(kinesisTableHandle.getStreamName()); ImmutableList.Builder<ConnectorSplit> builder = ImmutableList.builder(); for (Shard shard : desc.getShards()) { KinesisSplit split = new KinesisSplit(connectorId, kinesisTableHandle.getStreamName(), kinesisTableHandle.getMessageDataFormat(), shard.getShardId(), shard.getSequenceNumberRange().getStartingSequenceNumber(), shard.getSequenceNumberRange().getEndingSequenceNumber()); builder.add(split); } return new FixedSplitSource(builder.build()); }
@Test public void testRenameTable() { try { createDummyTable(temporaryRenameTableOld); ConnectorSession session = newSession(); metadata.renameTable(session, getTableHandle(temporaryRenameTableOld), temporaryRenameTableNew); assertNull(metadata.getTableHandle(session, temporaryRenameTableOld)); assertNotNull(metadata.getTableHandle(session, temporaryRenameTableNew)); } finally { dropTable(temporaryRenameTableOld); dropTable(temporaryRenameTableNew); } }
@Override public void createTableWithFiber(ConnectorSession session, ConnectorTableMetadata tableMetadata, String fiberKey, String function, String timeKey) { log.debug("Create table with fiber " + tableMetadata.getTable().getTableName()); // check fiberKey, function and timeKey List<ColumnMetadata> columns = tableMetadata.getColumns(); // List<String> columnNames = columns.stream() // .map(ColumnMetadata::getName) // .collect(Collectors.toList()); List<String> columnName = new LinkedList<>(); List<String> dataType = new LinkedList<>(); for (ColumnMetadata column : columns) { columnName.add(column.getName()); dataType.add(column.getType().getDisplayName()); } String tblName = tableMetadata.getTable().getTableName(); String dbName = tableMetadata.getTable().getSchemaName(); String storageFormatName = ""; String userName = ""; int fiberColIndex = Integer.parseInt(fiberKey); int timstampColIndex = Integer.parseInt(timeKey); // createTable metaClient.createFiberTable(dbName, tblName, userName, storageFormatName, fiberColIndex, function, timstampColIndex, columnName, dataType); }
/** * Returns a table handle for the specified table name, or null if the connector does not contain the table. * * @param session session * @param tableName table name */ @Override public ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTableName tableName) { Optional<HDFSTableHandle> table = metaDataQuery.getTableHandle(connectorId, tableName.getSchemaName(), tableName.getTableName()); return table.orElse(null); }
/** * Return a list of table layouts that satisfy the given constraint. * <p> * For each layout, connectors must return an "unenforced constraint" representing the part of the constraint summary that isn't guaranteed by the layout. * * @param session session * @param table table * @param constraint constraint * @param desiredColumns desired columns */ @Override public List<ConnectorTableLayoutResult> getTableLayouts(ConnectorSession session, ConnectorTableHandle table, Constraint<ColumnHandle> constraint, Optional<Set<ColumnHandle>> desiredColumns) { // get table name from ConnectorTableHandle HDFSTableHandle hdfsTable = checkType(table, HDFSTableHandle.class, "table"); SchemaTableName tableName = hdfsTable.getSchemaTableName(); // create HDFSTableLayoutHandle HDFSTableLayoutHandle tableLayout = metaDataQuery.getTableLayout(connectorId, tableName.getSchemaName(), tableName.getTableName()).orElse(null); tableLayout.setPredicates(constraint.getSummary() != null ? Optional.of(constraint.getSummary()) : Optional.empty()); // ConnectorTableLayout layout = new ConnectorTableLayout(HDFSTableLayoutHandle) ConnectorTableLayout layout = getTableLayout(session, tableLayout); return ImmutableList.of(new ConnectorTableLayoutResult(layout, constraint.getSummary())); }
@Override public ConnectorTableLayout getTableLayout(ConnectorSession session, ConnectorTableLayoutHandle handle) { // TODO add fiber and timestamp as new LocalProperty into ConnectorTableLayout ? HDFSTableLayoutHandle layoutHandle = checkType(handle, HDFSTableLayoutHandle.class, "tableLayoutHandle"); return new ConnectorTableLayout(layoutHandle); }
/** * Return the metadata for the specified table handle. * * @param session session * @param table table * @throws RuntimeException if table handle is no longer valid */ @Override public ConnectorTableMetadata getTableMetadata(ConnectorSession session, ConnectorTableHandle table) { HDFSTableHandle hdfsTable = checkType(table, HDFSTableHandle.class, "table"); SchemaTableName tableName = hdfsTable.getSchemaTableName(); return getTableMetadata(tableName); }
/** * List table names, possibly filtered by schema. An empty list is returned if none match. * * @param session session * @param schemaNameOrNull schema name */ @Override public List<SchemaTableName> listTables(ConnectorSession session, String schemaNameOrNull) { if (schemaNameOrNull == null) { return new ArrayList<>(); } return metaDataQuery.listTables(new SchemaTablePrefix(schemaNameOrNull)); }
/** * Gets all of the columns on the specified table, or an empty map if the columns can not be enumerated. * * @param session session * @param tableHandle table handle * @throws RuntimeException if table handle is no longer valid */ @Override public Map<String, ColumnHandle> getColumnHandles(ConnectorSession session, ConnectorTableHandle tableHandle) { HDFSTableHandle table = checkType(tableHandle, HDFSTableHandle.class, "table"); List<HDFSColumnHandle> cols = metaDataQuery.getTableColumnHandle(connectorId, table.getSchemaName(), table.getTableName()) .orElse(new ArrayList<>()); Map<String, ColumnHandle> columnMap = new HashMap<>(); for (HDFSColumnHandle col : cols) { columnMap.putIfAbsent(col.getName(), col); } return columnMap; }
/** * Gets the metadata for all columns that match the specified table prefix. * * @param session session * @param prefix prefix */ @Override public Map<SchemaTableName, List<ColumnMetadata>> listTableColumns(ConnectorSession session, SchemaTablePrefix prefix) { Map<SchemaTableName, List<ColumnMetadata>> tableColumns = new HashMap<>(); List<SchemaTableName> tableNames = metaDataQuery.listTables(prefix); for (SchemaTableName table : tableNames) { List<ColumnMetadata> columnMetadatas = metaDataQuery.getTableColMetadata(connectorId, table.getSchemaName(), table.getTableName()).orElse(new ArrayList<>()); tableColumns.putIfAbsent(table, columnMetadatas); } return tableColumns; }
@Override public RecordSink getRecordSink(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorInsertTableHandle connectorTableHandle) { RestInsertTableHandle insertTableHandle = Types.checkType(connectorTableHandle, RestInsertTableHandle.class, "tableHandle"); RestTableHandle tableHandle = insertTableHandle.getTableHandle(); SchemaTableName schemaTableName = tableHandle.getSchemaTableName(); Consumer<List> rowSink = rest.createRowSink(schemaTableName); List<Type> types = rest.getTypes(schemaTableName); return new InMemoryObjectRecordSink(types, rowSink); }
@Override public RecordSet getRecordSet( ConnectorTransactionHandle connectorTransactionHandle, ConnectorSession connectorSession, ConnectorSplit connectorSplit, List<? extends ColumnHandle> list) { RestConnectorSplit split = Types.checkType(connectorSplit, RestConnectorSplit.class, "split"); // TODO fix below cast List<RestColumnHandle> restColumnHandles = (List<RestColumnHandle>) list; SchemaTableName schemaTableName = split.getTableHandle().getSchemaTableName(); Collection<? extends List<?>> rows = rest.getRows(schemaTableName); ConnectorTableMetadata tableMetadata = rest.getTableMetadata(schemaTableName); List<Integer> columnIndexes = restColumnHandles.stream() .map(column -> { int index = 0; for (ColumnMetadata columnMetadata : tableMetadata.getColumns()) { if (columnMetadata.getName().equalsIgnoreCase(column.getName())) { return index; } index++; } throw new IllegalStateException("Unknown column: " + column.getName()); }) .collect(toList()); Collection<? extends List<?>> mappedRows = rows.stream() .map(row -> columnIndexes.stream() .map(index -> row.get(index)) .collect(toList())) .collect(toList()); List<Type> mappedTypes = restColumnHandles.stream() .map(RestColumnHandle::getType) .collect(toList()); return new InMemoryRecordSet(mappedTypes, mappedRows); }
@Override public ConnectorTableHandle getTableHandle(ConnectorSession connectorSession, SchemaTableName schemaTableName) { if (rest.listTables().contains(schemaTableName)) { return new RestTableHandle(schemaTableName); } return null; }
@Override public List<ConnectorTableLayoutResult> getTableLayouts(ConnectorSession connectorSession, ConnectorTableHandle connectorTableHandle, Constraint<ColumnHandle> constraint, Optional<Set<ColumnHandle>> optional) { RestTableHandle tableHandle = Types.checkType(connectorTableHandle, RestTableHandle.class, "tableHandle"); return ImmutableList.of( new ConnectorTableLayoutResult( getTableLayout(connectorSession, new RestConnectorTableLayoutHandle(tableHandle)), TupleDomain.all())); }
@Override public ConnectorTableLayout getTableLayout(ConnectorSession connectorSession, ConnectorTableLayoutHandle connectorTableLayoutHandle) { RestConnectorTableLayoutHandle tableLayoutHandle = Types.checkType(connectorTableLayoutHandle, RestConnectorTableLayoutHandle.class, "tableLayoutHandle"); return new ConnectorTableLayout(tableLayoutHandle); }
@Override public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorTableLayoutHandle layout) { RestConnectorTableLayoutHandle layoutHandle = Types.checkType(layout, RestConnectorTableLayoutHandle.class, "layout"); List<HostAddress> addresses = nodeManager.getRequiredWorkerNodes().stream() .map(Node::getHostAndPort) .collect(toList()); return new FixedSplitSource(ImmutableList.of( new RestConnectorSplit(layoutHandle.getTableHandle(), addresses))); }
@Override public EthereumTableHandle getTableHandle(ConnectorSession session, SchemaTableName schemaTableName) { if (EthereumTable.BLOCK.getName().equals(schemaTableName.getTableName())) { return new EthereumTableHandle(connectorId, DEFAULT_SCHEMA, EthereumTable.BLOCK.getName()); } else if (EthereumTable.TRANSACTION.getName().equals(schemaTableName.getTableName())) { return new EthereumTableHandle(connectorId, DEFAULT_SCHEMA, EthereumTable.TRANSACTION.getName()); } else if (EthereumTable.ERC20.getName().equals(schemaTableName.getTableName())) { return new EthereumTableHandle(connectorId, DEFAULT_SCHEMA, EthereumTable.ERC20.getName()); } else { throw new IllegalArgumentException("Unknown Table Name " + schemaTableName.getTableName()); } }
@Override public List<SchemaTableName> listTables(ConnectorSession session, String schemaNameOrNull) { return ImmutableList.of(new SchemaTableName(DEFAULT_SCHEMA, EthereumTable.BLOCK.getName()), new SchemaTableName(DEFAULT_SCHEMA, EthereumTable.TRANSACTION.getName()), new SchemaTableName(DEFAULT_SCHEMA, EthereumTable.ERC20.getName())); }
@Override public ColumnMetadata getColumnMetadata( ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle columnHandle ) { convertTableHandle(tableHandle); return convertColumnHandle(columnHandle).getColumnMetadata(); }
@Override public RecordSet getRecordSet(ConnectorTransactionHandle connectorTransactionHandle, ConnectorSession connectorSession, ConnectorSplit connectorSplit, List<? extends ColumnHandle> list) { log.info("INFORMATION: AmpoolRecordSetProvider getRecordSet() called."); requireNonNull(connectorSplit, "split is null"); AmpoolSplit ampoolSplit = (AmpoolSplit) connectorSplit; checkArgument(ampoolSplit.getConnectorId().equals(connectorId), "split is not for this connector"); ImmutableList.Builder<AmpoolColumnHandle> handles = ImmutableList.builder(); for (ColumnHandle handle : list) { handles.add((AmpoolColumnHandle) handle); } // TODO: Projections and filters on Ampool side Iterator<Row> iterator; if (ampoolClient.existsFTable(ampoolSplit.getTableName())) iterator = ampoolClient.getFTable(ampoolSplit.getTableName()).getScanner(new Scan()).iterator(); else if (ampoolClient.existsMTable(ampoolSplit.getTableName())) iterator = ampoolClient.getMTable(ampoolSplit.getTableName()).getScanner(new Scan()).iterator(); else iterator = null; return new AmpoolRecordSet(ampoolSplit, handles.build(), iterator); }
@Override public ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTableName tableName) { requireNonNull(tableName, "tableName is null"); KuduClient kuduClient = kuduClientManager.getClient(); ConnectorTableHandle connectorTableHandle = kuduTables.getTables(kuduClient).get(tableName); kuduClientManager.close(kuduClient); return connectorTableHandle; }