@Override public ConnectorSplitSource getSplits(ConnectorTransactionHandle handle, ConnectorSession session, ConnectorTableLayoutHandle layout) { log.info("INFORMATION: AmpoolSplitManager getSplits() called."); AmpoolTableLayoutHandle layoutHandle = (AmpoolTableLayoutHandle) layout; AmpoolTableHandle tableHandle = layoutHandle.getTable(); AmpoolTable table = new AmpoolTable(ampoolClient, tableHandle.getTableName()); // this can happen if table is removed during a query checkState(table.getColumnsMetadata() != null, "Table %s.%s no longer exists", tableHandle.getSchemaName(), tableHandle.getTableName()); List<ConnectorSplit> splits = new ArrayList<>(); // TODO Pass here bucket id splits.add(new AmpoolSplit(connectorId, tableHandle.getSchemaName(), tableHandle.getTableName(),"" ,HostAddress.fromParts("localhost",0))); Collections.shuffle(splits); return new FixedSplitSource(splits); }
@Override public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorTableLayoutHandle layout) { KuduTableLayoutHandle layoutHandle = checkType(layout, KuduTableLayoutHandle.class, "layout"); KuduTableHandle tableHandle = layoutHandle.getTable(); KuduClient kuduClient = kuduClientManager.getClient(); List<KuduScanToken> tokens = kuduClientManager.newScanTokenBuilder(kuduClient, tableHandle.getSchemaTableName().getTableName()).build(); TupleDomain<KuduColumnHandle> effectivePredicate = layoutHandle.getConstraint() .transform(handle -> checkType(handle, KuduColumnHandle.class, "columnHandle")); ImmutableList.Builder<ConnectorSplit> builder = ImmutableList.builder(); for (int i = 0; i < tokens.size(); i++) { // nodeManager.getWorkerNodes() List<HostAddress> hostAddresses = nodeManager.getWorkerNodes().stream() .map(node -> node.getHostAndPort()).collect(Collectors.toList()); ConnectorSplit split = new KuduSplit(hostAddresses, tableHandle.getSchemaTableName(), i, effectivePredicate); builder.add(split); } kuduClientManager.close(kuduClient); return new FixedSplitSource(builder.build()); }
@Override public ConnectorSplitSource getSplits(ConnectorSession session, ConnectorTableLayoutHandle layout) { ExampleTableLayoutHandle layoutHandle = checkType(layout, ExampleTableLayoutHandle.class, "layout"); ExampleTableHandle tableHandle = layoutHandle.getTable(); ExampleTable table = exampleClient.getTable(tableHandle.getSchemaName(), tableHandle.getTableName()); // this can happen if table is removed during a query checkState(table != null, "Table %s.%s no longer exists", tableHandle.getSchemaName(), tableHandle.getTableName()); List<ConnectorSplit> splits = new ArrayList<>(); for (URI uri : table.getSources()) { splits.add(new ExampleSplit(connectorId, tableHandle.getSchemaName(), tableHandle.getTableName(), uri)); } Collections.shuffle(splits); return new FixedSplitSource(connectorId, splits); }
@Override public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorTableLayoutHandle layoutHandle) { BlackHoleTableLayoutHandle layout = checkType( layoutHandle, BlackHoleTableLayoutHandle.class, "BlackHoleTableLayoutHandle"); ImmutableList.Builder<BlackHoleSplit> builder = ImmutableList.<BlackHoleSplit>builder(); for (int i = 0; i < layout.getSplitCount(); i++) { builder.add( new BlackHoleSplit( layout.getPagesPerSplit(), layout.getRowsPerPage(), layout.getFieldsLength())); } return new FixedSplitSource("blackhole", builder.build()); }
@Override public ConnectorSplitSource getSplits(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorTableLayoutHandle layout) { JmxTableLayoutHandle jmxLayout = checkType(layout, JmxTableLayoutHandle.class, "layout"); JmxTableHandle tableHandle = jmxLayout.getTable(); TupleDomain<ColumnHandle> predicate = jmxLayout.getConstraint(); //TODO is there a better way to get the node column? JmxColumnHandle nodeColumnHandle = tableHandle.getColumns().get(0); List<ConnectorSplit> splits = nodeManager.getNodes(ACTIVE) .stream() .filter(node -> { NullableValue value = NullableValue.of(VARCHAR, utf8Slice(node.getNodeIdentifier())); return predicate.overlaps(fromFixedValues(ImmutableMap.of(nodeColumnHandle, value))); }) .map(node -> new JmxSplit(tableHandle, ImmutableList.of(node.getHostAndPort()))) .collect(toList()); return new FixedSplitSource(connectorId, splits); }
@Test public void testPredicatePushdown() throws Exception { for (Node node : nodes) { String nodeIdentifier = node.getNodeIdentifier(); TupleDomain<ColumnHandle> nodeTupleDomain = TupleDomain.fromFixedValues(ImmutableMap.of(columnHandle, NullableValue.of(VARCHAR, utf8Slice(nodeIdentifier)))); ConnectorTableLayoutHandle layout = new JmxTableLayoutHandle(tableHandle, nodeTupleDomain); ConnectorSplitSource splitSource = splitManager.getSplits(JmxTransactionHandle.INSTANCE, SESSION, layout); List<ConnectorSplit> allSplits = getAllSplits(splitSource); assertEquals(allSplits.size(), 1); assertEquals(allSplits.get(0).getAddresses().size(), 1); assertEquals(allSplits.get(0).getAddresses().get(0).getHostText(), nodeIdentifier); } }
@Test public void testNoPredicate() throws Exception { ConnectorTableLayoutHandle layout = new JmxTableLayoutHandle(tableHandle, TupleDomain.all()); ConnectorSplitSource splitSource = splitManager.getSplits(JmxTransactionHandle.INSTANCE, SESSION, layout); List<ConnectorSplit> allSplits = getAllSplits(splitSource); assertEquals(allSplits.size(), nodes.size()); Set<String> actualNodes = nodes.stream().map(Node::getNodeIdentifier).collect(toSet()); Set<String> expectedNodes = new HashSet<>(); for (ConnectorSplit split : allSplits) { List<HostAddress> addresses = split.getAddresses(); assertEquals(addresses.size(), 1); expectedNodes.add(addresses.get(0).getHostText()); } assertEquals(actualNodes, expectedNodes); }
@Test public void testRecordSetProvider() throws Exception { for (SchemaTableName schemaTableName : metadata.listTables(SESSION, "jmx")) { JmxTableHandle tableHandle = metadata.getTableHandle(SESSION, schemaTableName); List<ColumnHandle> columnHandles = ImmutableList.copyOf(metadata.getColumnHandles(SESSION, tableHandle).values()); ConnectorTableLayoutHandle layout = new JmxTableLayoutHandle(tableHandle, TupleDomain.all()); ConnectorSplitSource splitSource = splitManager.getSplits(JmxTransactionHandle.INSTANCE, SESSION, layout); List<ConnectorSplit> allSplits = getAllSplits(splitSource); assertEquals(allSplits.size(), nodes.size()); ConnectorSplit split = allSplits.get(0); RecordSet recordSet = recordSetProvider.getRecordSet(JmxTransactionHandle.INSTANCE, SESSION, split, columnHandles); try (RecordCursor cursor = recordSet.cursor()) { while (cursor.advanceNextPosition()) { for (int i = 0; i < recordSet.getColumnTypes().size(); i++) { cursor.isNull(i); } } } } }
@Override public ConnectorSplitSource getSplits(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorTableLayoutHandle layout) { InformationSchemaTableLayoutHandle handle = checkType(layout, InformationSchemaTableLayoutHandle.class, "layout"); Map<ColumnHandle, NullableValue> bindings = extractFixedValues(handle.getConstraint()).orElse(ImmutableMap.of()); List<HostAddress> localAddress = ImmutableList.of(nodeManager.getCurrentNode().getHostAndPort()); Map<String, NullableValue> filters = bindings.entrySet().stream().collect(toMap( entry -> checkType(entry.getKey(), InformationSchemaColumnHandle.class, "column").getColumnName(), Entry::getValue)); ConnectorSplit split = new InformationSchemaSplit(handle.getTable(), filters, localAddress); return new FixedSplitSource(null, ImmutableList.of(split)); }
@Override public ConnectorSplitSource getSplits(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorTableLayoutHandle layout) { TpchTableHandle tableHandle = checkType(layout, TpchTableLayoutHandle.class, "layout").getTable(); Set<Node> nodes = nodeManager.getActiveDatasourceNodes(connectorId); checkState(!nodes.isEmpty(), "No TPCH nodes available"); int totalParts = nodes.size() * splitsPerNode; int partNumber = 0; // Split the data using split and skew by the number of nodes available. ImmutableList.Builder<ConnectorSplit> splits = ImmutableList.builder(); for (Node node : nodes) { for (int i = 0; i < splitsPerNode; i++) { splits.add(new TpchSplit(tableHandle, partNumber, totalParts, ImmutableList.of(node.getHostAndPort()))); partNumber++; } } return new FixedSplitSource(connectorId, splits.build()); }
@Override public ConnectorSplitSource getSplits(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorTableLayoutHandle layout) { CassandraTableLayoutHandle layoutHandle = checkType(layout, CassandraTableLayoutHandle.class, "layout"); CassandraTableHandle cassandraTableHandle = layoutHandle.getTable(); List<CassandraPartition> partitions = layoutHandle.getPartitions().get(); requireNonNull(partitions, "partitions is null"); if (partitions.isEmpty()) { return new FixedSplitSource(connectorId, ImmutableList.<ConnectorSplit>of()); } // if this is an unpartitioned table, split into equal ranges if (partitions.size() == 1) { CassandraPartition cassandraPartition = partitions.get(0); if (cassandraPartition.isUnpartitioned() || cassandraPartition.isIndexedColumnPredicatePushdown()) { CassandraTable table = schemaProvider.getTable(cassandraTableHandle); List<ConnectorSplit> splits = getSplitsByTokenRange(table, cassandraPartition.getPartitionId()); return new FixedSplitSource(connectorId, splits); } } return new FixedSplitSource(connectorId, getSplitsForPartitions(cassandraTableHandle, partitions)); }
@Test public void testPartitionSchemaNonCanonical() throws Exception { ConnectorSession session = newSession(); ConnectorTableHandle table = getTableHandle(tablePartitionSchemaChangeNonCanonical); ColumnHandle column = metadata.getColumnHandles(session, table).get("t_boolean"); assertNotNull(column); List<ConnectorTableLayoutResult> tableLayoutResults = metadata.getTableLayouts(session, table, new Constraint<>(TupleDomain.fromFixedValues(ImmutableMap.of(column, NullableValue.of(BOOLEAN, false))), bindings -> true), Optional.empty()); ConnectorTableLayoutHandle layoutHandle = getOnlyElement(tableLayoutResults).getTableLayout().getHandle(); assertEquals(getAllPartitions(layoutHandle).size(), 1); assertEquals(getPartitionId(getAllPartitions(layoutHandle).get(0)), "t_boolean=0"); ConnectorSplitSource splitSource = splitManager.getSplits(session, layoutHandle); ConnectorSplit split = getOnlyElement(getAllSplits(splitSource)); ImmutableList<ColumnHandle> columnHandles = ImmutableList.of(column); try (ConnectorPageSource ignored = pageSourceProvider.createPageSource(session, split, columnHandles)) { // TODO coercion of non-canonical values should be supported fail("expected exception"); } catch (PrestoException e) { assertEquals(e.getErrorCode(), HIVE_INVALID_PARTITION_VALUE.toErrorCode()); } }
@Override public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorTableLayoutHandle layout) { KinesisTableLayoutHandle kinesislayout = handleResolver.convertLayout(layout); KinesisTableHandle kinesisTableHandle = kinesislayout.getTable(); InternalStreamDescription desc = this.getStreamDescription(kinesisTableHandle.getStreamName()); ImmutableList.Builder<ConnectorSplit> builder = ImmutableList.builder(); for (Shard shard : desc.getShards()) { KinesisSplit split = new KinesisSplit(connectorId, kinesisTableHandle.getStreamName(), kinesisTableHandle.getMessageDataFormat(), shard.getShardId(), shard.getSequenceNumberRange().getStartingSequenceNumber(), shard.getSequenceNumberRange().getEndingSequenceNumber()); builder.add(split); } return new FixedSplitSource(builder.build()); }
@Override public ConnectorTableLayout getTableLayout(ConnectorSession session, ConnectorTableLayoutHandle handle) { // TODO add fiber and timestamp as new LocalProperty into ConnectorTableLayout ? HDFSTableLayoutHandle layoutHandle = checkType(handle, HDFSTableLayoutHandle.class, "tableLayoutHandle"); return new ConnectorTableLayout(layoutHandle); }
@Override public ConnectorHandleResolver getHandleResolver() { return new ConnectorHandleResolver() { public Class<? extends ConnectorTableHandle> getTableHandleClass() { return RestTableHandle.class; } public Class<? extends ColumnHandle> getColumnHandleClass() { return RestColumnHandle.class; } public Class<? extends ConnectorSplit> getSplitClass() { return RestConnectorSplit.class; } public Class<? extends ConnectorTableLayoutHandle> getTableLayoutHandleClass() { return RestConnectorTableLayoutHandle.class; } @Override public Class<? extends ConnectorTransactionHandle> getTransactionHandleClass() { return RestTransactionHandle.class; } @Override public Class<? extends ConnectorInsertTableHandle> getInsertTableHandleClass() { return RestInsertTableHandle.class; } }; }
@Override public ConnectorTableLayout getTableLayout(ConnectorSession connectorSession, ConnectorTableLayoutHandle connectorTableLayoutHandle) { RestConnectorTableLayoutHandle tableLayoutHandle = Types.checkType(connectorTableLayoutHandle, RestConnectorTableLayoutHandle.class, "tableLayoutHandle"); return new ConnectorTableLayout(tableLayoutHandle); }
@Override public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorTableLayoutHandle layout) { RestConnectorTableLayoutHandle layoutHandle = Types.checkType(layout, RestConnectorTableLayoutHandle.class, "layout"); List<HostAddress> addresses = nodeManager.getRequiredWorkerNodes().stream() .map(Node::getHostAndPort) .collect(toList()); return new FixedSplitSource(ImmutableList.of( new RestConnectorSplit(layoutHandle.getTableHandle(), addresses))); }
@Override public ConnectorSplitSource getSplits(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorTableLayoutHandle layout) { RaptorTableLayoutHandle handle = checkType(layout, RaptorTableLayoutHandle.class, "layout"); RaptorTableHandle table = handle.getTable(); TupleDomain<RaptorColumnHandle> effectivePredicate = toRaptorTupleDomain(handle.getConstraint()); return new RaptorSplitSource(table.getTableId(), effectivePredicate, table.getTransactionId()); }
@Override public ConnectorTableLayout getTableLayout(ConnectorSession session, ConnectorTableLayoutHandle handle) { RedisTableLayoutHandle layout = convertLayout(handle); // tables in this connector have a single layout return getTableLayouts(session, layout.getTable(), Constraint.<ColumnHandle>alwaysTrue(), Optional.empty()) .get(0) .getTableLayout(); }
@Override public ConnectorTableLayout getTableLayout(ConnectorSession session, ConnectorTableLayoutHandle handle) { return new ConnectorTableLayout( handle, Optional.empty(), TupleDomain.none(), Optional.empty(), Optional.empty(), ImmutableList.of()); }
@Override public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorTableLayoutHandle layout) { SystemTableLayoutHandle layoutHandle = checkType(layout, SystemTableLayoutHandle.class, "layout"); SystemTableHandle tableHandle = layoutHandle.getTable(); TupleDomain<ColumnHandle> constraint = layoutHandle.getConstraint(); SystemTable systemTable = tables.get(tableHandle.getSchemaTableName()); Distribution tableDistributionMode = systemTable.getDistribution(); if (tableDistributionMode == SINGLE_COORDINATOR) { HostAddress address = nodeManager.getCurrentNode().getHostAndPort(); ConnectorSplit split = new SystemSplit(tableHandle.getConnectorId(), tableHandle, address, constraint); return new FixedSplitSource(GlobalSystemConnector.NAME, ImmutableList.of(split)); } ImmutableList.Builder<ConnectorSplit> splits = ImmutableList.builder(); ImmutableSet.Builder<Node> nodes = ImmutableSet.builder(); if (tableDistributionMode == ALL_COORDINATORS) { nodes.addAll(nodeManager.getCoordinators()); } else if (tableDistributionMode == ALL_NODES) { nodes.addAll(nodeManager.getNodes(ACTIVE)); } Set<Node> nodeSet = nodes.build(); for (Node node : nodeSet) { splits.add(new SystemSplit(tableHandle.getConnectorId(), tableHandle, node.getHostAndPort(), constraint)); } return new FixedSplitSource(GlobalSystemConnector.NAME, splits.build()); }
@Inject public TableLayoutHandleJacksonModule(HandleResolver handleResolver) { super(ConnectorTableLayoutHandle.class, handleResolver::getId, handleResolver::getTableLayoutHandleClass); }
@JsonCreator public TableLayoutHandle( @JsonProperty("connectorId") String connectorId, @JsonProperty("transactionHandle") ConnectorTransactionHandle transactionHandle, @JsonProperty("connectorHandle") ConnectorTableLayoutHandle layout) { requireNonNull(connectorId, "connectorId is null"); requireNonNull(transactionHandle, "transactionHandle is null"); requireNonNull(layout, "layout is null"); this.connectorId = connectorId; this.transactionHandle = transactionHandle; this.layout = layout; }
@Override public ConnectorTableLayout getTableLayout(ConnectorSession session, ConnectorTableLayoutHandle handle) { TpchTableLayoutHandle layout = checkType(handle, TpchTableLayoutHandle.class, "layout"); // tables in this connector have a single layout return getTableLayouts(session, layout.getTable(), Constraint.<ColumnHandle>alwaysTrue(), Optional.empty()) .get(0) .getTableLayout(); }
@Override public ConnectorTableLayout getTableLayout(ConnectorSession session, ConnectorTableLayoutHandle handle) { try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) { return delegate.getTableLayout(session, handle); } }
@Override public boolean supportsMetadataDelete(ConnectorSession session, ConnectorTableHandle tableHandle, ConnectorTableLayoutHandle tableLayoutHandle) { try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) { return delegate.supportsMetadataDelete(session, tableHandle, tableLayoutHandle); } }
@Override public OptionalLong metadataDelete(ConnectorSession session, ConnectorTableHandle tableHandle, ConnectorTableLayoutHandle tableLayoutHandle) { try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) { return delegate.metadataDelete(session, tableHandle, tableLayoutHandle); } }
@Override public ConnectorSplitSource getSplits(ConnectorSession session, ConnectorTableLayoutHandle layout) { try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) { return delegate.getSplits(session, layout); } }
@Override public OptionalLong metadataDelete(ConnectorSession session, ConnectorTableHandle tableHandle, ConnectorTableLayoutHandle tableLayoutHandle) { HiveTableHandle handle = checkType(tableHandle, HiveTableHandle.class, "tableHandle"); HiveTableLayoutHandle layoutHandle = checkType(tableLayoutHandle, HiveTableLayoutHandle.class, "tableLayoutHandle"); for (HivePartition hivePartition : getOrComputePartitions(layoutHandle, session, tableHandle)) { metastore.dropPartitionByName(handle.getSchemaName(), handle.getTableName(), hivePartition.getPartitionId()); } // it is too expensive to determine the exact number of deleted rows return OptionalLong.empty(); }
@Override public boolean supportsMetadataDelete(ConnectorSession session, ConnectorTableHandle tableHandle, ConnectorTableLayoutHandle tableLayoutHandle) { HiveTableLayoutHandle layoutHandle = checkType(tableLayoutHandle, HiveTableLayoutHandle.class, "tableLayoutHandle"); // return true if none of the partitions is <UNPARTITIONED> return layoutHandle.getPartitions().get().stream() .noneMatch(partition -> HivePartition.UNPARTITIONED_ID.equals(partition.getPartitionId())); }
@Override public ConnectorTableLayout getTableLayout(ConnectorSession session, ConnectorTableLayoutHandle layoutHandle) { HiveTableLayoutHandle hiveLayoutHandle = checkType(layoutHandle, HiveTableLayoutHandle.class, "layoutHandle"); List<TupleDomain<ColumnHandle>> partitionDomains = hiveLayoutHandle.getPartitions().get().stream() .map(HivePartition::getTupleDomain) .collect(toList()); TupleDomain<ColumnHandle> predicate = TupleDomain.none(); if (!partitionDomains.isEmpty()) { predicate = TupleDomain.columnWiseUnion(partitionDomains); } return new ConnectorTableLayout(hiveLayoutHandle, Optional.empty(), predicate, Optional.empty(), Optional.of(partitionDomains), ImmutableList.of()); }
protected void assertExpectedTableLayoutHandle(ConnectorTableLayoutHandle actualTableLayoutHandle, ConnectorTableLayoutHandle expectedTableLayoutHandle) { assertInstanceOf(actualTableLayoutHandle, HiveTableLayoutHandle.class); assertInstanceOf(expectedTableLayoutHandle, HiveTableLayoutHandle.class); HiveTableLayoutHandle actual = (HiveTableLayoutHandle) actualTableLayoutHandle; HiveTableLayoutHandle expected = (HiveTableLayoutHandle) expectedTableLayoutHandle; assertEquals(actual.getClientId(), expected.getClientId()); assertExpectedPartitions(actual.getPartitions().get(), expected.getPartitions().get()); }
private MaterializedResult readTable( ConnectorTableHandle tableHandle, List<ColumnHandle> columnHandles, ConnectorSession session, TupleDomain<ColumnHandle> tupleDomain, OptionalInt expectedSplitCount, Optional<HiveStorageFormat> expectedStorageFormat) throws Exception { List<ConnectorTableLayoutResult> tableLayoutResults = metadata.getTableLayouts(session, tableHandle, new Constraint<>(tupleDomain, bindings -> true), Optional.empty()); ConnectorTableLayoutHandle layoutHandle = getOnlyElement(tableLayoutResults).getTableLayout().getHandle(); List<ConnectorSplit> splits = getAllSplits(splitManager.getSplits(session, layoutHandle)); if (expectedSplitCount.isPresent()) { assertEquals(splits.size(), expectedSplitCount.getAsInt()); } ImmutableList.Builder<MaterializedRow> allRows = ImmutableList.builder(); for (ConnectorSplit split : splits) { try (ConnectorPageSource pageSource = pageSourceProvider.createPageSource(session, split, columnHandles)) { if (expectedStorageFormat.isPresent()) { assertPageSourceType(pageSource, expectedStorageFormat.get()); } MaterializedResult result = materializeSourceDataStream(session, pageSource, getTypes(columnHandles)); allRows.addAll(result.getMaterializedRows()); } } return new MaterializedResult(allRows.build(), getTypes(columnHandles)); }
private List<ConnectorSplit> getAllSplits(ConnectorTableHandle tableHandle, TupleDomain<ColumnHandle> tupleDomain) throws InterruptedException { ConnectorSession session = newSession(); List<ConnectorTableLayoutResult> tableLayoutResults = metadata.getTableLayouts(session, tableHandle, new Constraint<>(tupleDomain, bindings -> true), Optional.empty()); ConnectorTableLayoutHandle layoutHandle = getOnlyElement(tableLayoutResults).getTableLayout().getHandle(); return getAllSplits(splitManager.getSplits(session, layoutHandle)); }
@Override public ConnectorTableLayout getTableLayout(ConnectorSession session, ConnectorTableLayoutHandle handle) { SpreadsheetTableLayoutHandle layout = (SpreadsheetTableLayoutHandle) handle; List<ConnectorTableLayoutResult> tableLayouts = getTableLayouts(session, layout.getTable(), Constraint.<ColumnHandle>alwaysTrue(), Optional.empty()); ConnectorTableLayoutResult connectorTableLayoutResult = tableLayouts.get(0); return connectorTableLayoutResult.getTableLayout(); }
@Override public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorTableLayoutHandle layout) { SpreadsheetTableLayoutHandle layoutHandle = (SpreadsheetTableLayoutHandle) layout; SpreadsheetTableHandle spreadsheetTableHandle = layoutHandle.getTable(); SpreadsheetSplit spreadsheetSplit = new SpreadsheetSplit(spreadsheetTableHandle); return new FixedSplitSource(ImmutableList.of(spreadsheetSplit)); }
@Override public ConnectorTableLayout getTableLayout(ConnectorSession session, ConnectorTableLayoutHandle handle) { BaseTableLayoutHandle layout = (BaseTableLayoutHandle) handle; List<ConnectorTableLayoutResult> tableLayouts = getTableLayouts(session, layout.getTable(), Constraint.<ColumnHandle>alwaysTrue(), Optional.empty()); ConnectorTableLayoutResult connectorTableLayoutResult = tableLayouts.get(0); return connectorTableLayoutResult.getTableLayout(); }
KinesisTableLayoutHandle convertLayout(ConnectorTableLayoutHandle layout) { requireNonNull(layout, "layout is null"); checkArgument(layout instanceof KinesisTableLayoutHandle, "layout is not an instance of KinesisTableLayoutHandle"); KinesisTableLayoutHandle kinesisLayout = (KinesisTableLayoutHandle) layout; checkArgument(kinesisLayout.getConnectorId().equals(connectorId), "split is not for this connector"); return kinesisLayout; }
@Override public Class<? extends ConnectorTableLayoutHandle> getTableLayoutHandleClass() { return HDFSTableLayoutHandle.class; }
@Override public ConnectorTableLayout getTableLayout(ConnectorSession session, ConnectorTableLayoutHandle handle) { return new ConnectorTableLayout(handle); }