@Test public void testSanity() throws InterruptedException { List<ConnectorTableLayoutResult> layouts = metadata.getTableLayouts(SESSION, tableHandle, Constraint.alwaysTrue(), Optional.empty()); assertEquals(layouts.size(), 1); ConnectorTableLayoutResult layout = getOnlyElement(layouts); assertInstanceOf(layout.getTableLayout().getHandle(), RaptorTableLayoutHandle.class); ConnectorSplitSource splitSource = getSplits(raptorSplitManager, layout); int splitCount = 0; while (!splitSource.isFinished()) { splitCount += getFutureValue(splitSource.getNextBatch(1000)).size(); } assertEquals(splitCount, 4); }
@Test public void testAssignRandomNodeWhenBackupAvailable() throws InterruptedException, URISyntaxException { InMemoryNodeManager nodeManager = new InMemoryNodeManager(); RaptorConnectorId connectorId = new RaptorConnectorId("raptor"); NodeSupplier nodeSupplier = new RaptorNodeSupplier(nodeManager, connectorId); PrestoNode node = new PrestoNode(UUID.randomUUID().toString(), new URI("http://127.0.0.1/"), NodeVersion.UNKNOWN); nodeManager.addNode(connectorId.toString(), node); RaptorSplitManager raptorSplitManagerWithBackup = new RaptorSplitManager(connectorId, nodeSupplier, shardManager, true); deleteShardNodes(); ConnectorTableLayoutResult layout = getOnlyElement(metadata.getTableLayouts(SESSION, tableHandle, Constraint.alwaysTrue(), Optional.empty())); ConnectorSplitSource partitionSplit = getSplits(raptorSplitManagerWithBackup, layout); List<ConnectorSplit> batch = getFutureValue(partitionSplit.getNextBatch(1), PrestoException.class); assertEquals(getOnlyElement(getOnlyElement(batch).getAddresses()), node.getHostAndPort()); }
@Override public List<ConnectorTableLayoutResult> getTableLayouts( ConnectorSession session, ConnectorTableHandle handle, Constraint<ColumnHandle> constraint, Optional<Set<ColumnHandle>> desiredColumns) { requireNonNull(handle, "handle is null"); checkArgument(handle instanceof BlackHoleTableHandle); BlackHoleTableHandle blackHoleHandle = (BlackHoleTableHandle) handle; BlackHoleTableLayoutHandle layoutHandle = new BlackHoleTableLayoutHandle( blackHoleHandle.getSplitCount(), blackHoleHandle.getPagesPerSplit(), blackHoleHandle.getRowsPerPage(), blackHoleHandle.getFieldsLength()); return ImmutableList.of(new ConnectorTableLayoutResult(getTableLayout(session, layoutHandle), TupleDomain.all())); }
@Override public List<TableLayoutResult> getLayouts(Session session, TableHandle table, Constraint<ColumnHandle> constraint, Optional<Set<ColumnHandle>> desiredColumns) { if (constraint.getSummary().isNone()) { return ImmutableList.of(); } TupleDomain<ColumnHandle> summary = constraint.getSummary(); String connectorId = table.getConnectorId(); ConnectorTableHandle connectorTable = table.getConnectorHandle(); Predicate<Map<ColumnHandle, NullableValue>> predicate = constraint.predicate(); ConnectorEntry entry = getConnectorMetadata(connectorId); ConnectorMetadata metadata = entry.getMetadata(session); ConnectorTransactionHandle transaction = entry.getTransactionHandle(session); ConnectorSession connectorSession = session.toConnectorSession(entry.getCatalog()); List<ConnectorTableLayoutResult> layouts = metadata.getTableLayouts(connectorSession, connectorTable, new Constraint<>(summary, predicate::test), desiredColumns); return layouts.stream() .map(layout -> new TableLayoutResult(fromConnectorLayout(connectorId, transaction, layout.getTableLayout()), layout.getUnenforcedConstraint())) .collect(toImmutableList()); }
@Test public void testGetRecordsS3() throws Exception { ConnectorTableHandle table = getTableHandle(tableS3); List<ColumnHandle> columnHandles = ImmutableList.copyOf(metadata.getColumnHandles(SESSION, table).values()); Map<String, Integer> columnIndex = indexColumns(columnHandles); List<ConnectorTableLayoutResult> tableLayoutResults = metadata.getTableLayouts(SESSION, table, new Constraint<>(TupleDomain.all(), bindings -> true), Optional.empty()); HiveTableLayoutHandle layoutHandle = (HiveTableLayoutHandle) getOnlyElement(tableLayoutResults).getTableLayout().getHandle(); assertEquals(layoutHandle.getPartitions().get().size(), 1); ConnectorSplitSource splitSource = splitManager.getSplits(SESSION, layoutHandle); long sum = 0; for (ConnectorSplit split : getAllSplits(splitSource)) { try (ConnectorPageSource pageSource = pageSourceProvider.createPageSource(SESSION, split, columnHandles)) { MaterializedResult result = materializeSourceDataStream(SESSION, pageSource, getTypes(columnHandles)); for (MaterializedRow row : result) { sum += (Long) row.getField(columnIndex.get("t_bigint")); } } } assertEquals(sum, 78300); }
@Test public void testGetPartitionSplitsTableOfflinePartition() throws Exception { ConnectorSession session = newSession(); ConnectorTableHandle tableHandle = getTableHandle(tableOfflinePartition); assertNotNull(tableHandle); ColumnHandle dsColumn = metadata.getColumnHandles(session, tableHandle).get("ds"); assertNotNull(dsColumn); Domain domain = Domain.singleValue(VARCHAR, utf8Slice("2012-12-30")); TupleDomain<ColumnHandle> tupleDomain = TupleDomain.withColumnDomains(ImmutableMap.of(dsColumn, domain)); List<ConnectorTableLayoutResult> tableLayoutResults = metadata.getTableLayouts(session, tableHandle, new Constraint<>(tupleDomain, bindings -> true), Optional.empty()); try { getSplitCount(splitManager.getSplits(session, getOnlyElement(tableLayoutResults).getTableLayout().getHandle())); fail("Expected PartitionOfflineException"); } catch (PartitionOfflineException e) { assertEquals(e.getTableName(), tableOfflinePartition); assertEquals(e.getPartition(), "ds=2012-12-30"); } }
@Test public void testPartitionSchemaNonCanonical() throws Exception { ConnectorSession session = newSession(); ConnectorTableHandle table = getTableHandle(tablePartitionSchemaChangeNonCanonical); ColumnHandle column = metadata.getColumnHandles(session, table).get("t_boolean"); assertNotNull(column); List<ConnectorTableLayoutResult> tableLayoutResults = metadata.getTableLayouts(session, table, new Constraint<>(TupleDomain.fromFixedValues(ImmutableMap.of(column, NullableValue.of(BOOLEAN, false))), bindings -> true), Optional.empty()); ConnectorTableLayoutHandle layoutHandle = getOnlyElement(tableLayoutResults).getTableLayout().getHandle(); assertEquals(getAllPartitions(layoutHandle).size(), 1); assertEquals(getPartitionId(getAllPartitions(layoutHandle).get(0)), "t_boolean=0"); ConnectorSplitSource splitSource = splitManager.getSplits(session, layoutHandle); ConnectorSplit split = getOnlyElement(getAllSplits(splitSource)); ImmutableList<ColumnHandle> columnHandles = ImmutableList.of(column); try (ConnectorPageSource ignored = pageSourceProvider.createPageSource(session, split, columnHandles)) { // TODO coercion of non-canonical values should be supported fail("expected exception"); } catch (PrestoException e) { assertEquals(e.getErrorCode(), HIVE_INVALID_PARTITION_VALUE.toErrorCode()); } }
private List<HivePartition> getPartitions(String tableName) { Session session = getSession(); Metadata metadata = ((DistributedQueryRunner) queryRunner).getCoordinator().getMetadata(); return transaction(queryRunner.getTransactionManager()) .readOnly() .execute(session, transactionSession -> { Optional<TableHandle> tableHandle = metadata.getTableHandle(transactionSession, new QualifiedObjectName(HIVE_CATALOG, TPCH_SCHEMA, tableName)); assertTrue(tableHandle.isPresent()); List<TableLayoutResult> layouts = metadata.getLayouts(transactionSession, tableHandle.get(), Constraint.alwaysTrue(), Optional.empty()); TableLayout layout = Iterables.getOnlyElement(layouts).getLayout(); return ((HiveTableLayoutHandle) layout.getHandle().getConnectorHandle()).getPartitions().get(); }); }
/** * Return a list of table layouts that satisfy the given constraint. * <p> * For each layout, connectors must return an "unenforced constraint" representing the part of the constraint summary that isn't guaranteed by the layout. * * @param session session * @param table table * @param constraint constraint * @param desiredColumns desired columns */ @Override public List<ConnectorTableLayoutResult> getTableLayouts(ConnectorSession session, ConnectorTableHandle table, Constraint<ColumnHandle> constraint, Optional<Set<ColumnHandle>> desiredColumns) { // get table name from ConnectorTableHandle HDFSTableHandle hdfsTable = checkType(table, HDFSTableHandle.class, "table"); SchemaTableName tableName = hdfsTable.getSchemaTableName(); // create HDFSTableLayoutHandle HDFSTableLayoutHandle tableLayout = metaDataQuery.getTableLayout(connectorId, tableName.getSchemaName(), tableName.getTableName()).orElse(null); tableLayout.setPredicates(constraint.getSummary() != null ? Optional.of(constraint.getSummary()) : Optional.empty()); // ConnectorTableLayout layout = new ConnectorTableLayout(HDFSTableLayoutHandle) ConnectorTableLayout layout = getTableLayout(session, tableLayout); return ImmutableList.of(new ConnectorTableLayoutResult(layout, constraint.getSummary())); }
@Override public List<ConnectorTableLayoutResult> getTableLayouts(ConnectorSession connectorSession, ConnectorTableHandle connectorTableHandle, Constraint<ColumnHandle> constraint, Optional<Set<ColumnHandle>> optional) { RestTableHandle tableHandle = Types.checkType(connectorTableHandle, RestTableHandle.class, "tableHandle"); return ImmutableList.of( new ConnectorTableLayoutResult( getTableLayout(connectorSession, new RestConnectorTableLayoutHandle(tableHandle)), TupleDomain.all())); }
@Override public List<ConnectorTableLayoutResult> getTableLayouts(ConnectorSession session, ConnectorTableHandle table, Constraint<ColumnHandle> constraint, Optional<Set<ColumnHandle>> desiredColumns) { KuduTableHandle tableHandle = checkType(table, KuduTableHandle.class, "tableHandle"); ConnectorTableLayout layout = new ConnectorTableLayout(new KuduTableLayoutHandle(tableHandle, constraint.getSummary())); return ImmutableList.of(new ConnectorTableLayoutResult(layout, constraint.getSummary())); }
@Override public List<ConnectorTableLayoutResult> getTableLayouts(ConnectorSession session, ConnectorTableHandle table, Constraint<ColumnHandle> constraint, Optional<Set<ColumnHandle>> desiredColumns) { KafkaTableHandle handle = convertTableHandle(table); ConnectorTableLayout layout = new ConnectorTableLayout(new KafkaTableLayoutHandle(handle)); return ImmutableList.of(new ConnectorTableLayoutResult(layout, constraint.getSummary())); }
@Override public List<ConnectorTableLayoutResult> getTableLayouts(ConnectorSession session, ConnectorTableHandle table, Constraint<ColumnHandle> constraint, Optional<Set<ColumnHandle>> desiredColumns) { ExampleTableHandle tableHandle = checkType(table, ExampleTableHandle.class, "table"); ConnectorTableLayout layout = new ConnectorTableLayout(new ExampleTableLayoutHandle(tableHandle)); return ImmutableList.of(new ConnectorTableLayoutResult(layout, constraint.getSummary())); }
@Override public List<ConnectorTableLayoutResult> getTableLayouts(ConnectorSession session, ConnectorTableHandle table, Constraint<ColumnHandle> constraint, Optional<Set<ColumnHandle>> desiredColumns) { RaptorTableHandle handle = checkType(table, RaptorTableHandle.class, "table"); ConnectorTableLayout layout = new ConnectorTableLayout(new RaptorTableLayoutHandle(handle, constraint.getSummary())); return ImmutableList.of(new ConnectorTableLayoutResult(layout, constraint.getSummary())); }
@Test(expectedExceptions = PrestoException.class, expectedExceptionsMessageRegExp = "No host for shard .* found: \\[\\]") public void testNoHostForShard() throws InterruptedException { deleteShardNodes(); ConnectorTableLayoutResult layout = getOnlyElement(metadata.getTableLayouts(SESSION, tableHandle, Constraint.alwaysTrue(), Optional.empty())); ConnectorSplitSource splitSource = getSplits(raptorSplitManager, layout); getFutureValue(splitSource.getNextBatch(1000)); }
@Test(expectedExceptions = PrestoException.class, expectedExceptionsMessageRegExp = "No nodes available to run query") public void testNoNodes() throws InterruptedException, URISyntaxException { deleteShardNodes(); RaptorSplitManager raptorSplitManagerWithBackup = new RaptorSplitManager(new RaptorConnectorId("fbraptor"), ImmutableSet::of, shardManager, true); ConnectorTableLayoutResult layout = getOnlyElement(metadata.getTableLayouts(SESSION, tableHandle, Constraint.alwaysTrue(), Optional.empty())); ConnectorSplitSource splitSource = getSplits(raptorSplitManagerWithBackup, layout); getFutureValue(splitSource.getNextBatch(1000), PrestoException.class); }
@Override public List<ConnectorTableLayoutResult> getTableLayouts( ConnectorSession session, ConnectorTableHandle table, Constraint<ColumnHandle> constraint, Optional<Set<ColumnHandle>> desiredColumns) { RedisTableHandle tableHandle = convertTableHandle(table); ConnectorTableLayout layout = new ConnectorTableLayout(new RedisTableLayoutHandle(tableHandle)); return ImmutableList.of(new ConnectorTableLayoutResult(layout, constraint.getSummary())); }
@Override public ConnectorTableLayout getTableLayout(ConnectorSession session, ConnectorTableLayoutHandle handle) { RedisTableLayoutHandle layout = convertLayout(handle); // tables in this connector have a single layout return getTableLayouts(session, layout.getTable(), Constraint.<ColumnHandle>alwaysTrue(), Optional.empty()) .get(0) .getTableLayout(); }
@Override public List<ConnectorTableLayoutResult> getTableLayouts(ConnectorSession session, ConnectorTableHandle table, Constraint<ColumnHandle> constraint, Optional<Set<ColumnHandle>> desiredColumns) { JmxTableHandle handle = checkType(table, JmxTableHandle.class, "table"); ConnectorTableLayout layout = new ConnectorTableLayout(new JmxTableLayoutHandle(handle, constraint.getSummary())); return ImmutableList.of(new ConnectorTableLayoutResult(layout, constraint.getSummary())); }
@Override public List<ConnectorTableLayoutResult> getTableLayouts(ConnectorSession session, ConnectorTableHandle table, Constraint<ColumnHandle> constraint, Optional<Set<ColumnHandle>> desiredColumns) { JdbcTableHandle tableHandle = checkType(table, JdbcTableHandle.class, "table"); ConnectorTableLayout layout = new ConnectorTableLayout(new JdbcTableLayoutHandle(tableHandle, constraint.getSummary())); return ImmutableList.of(new ConnectorTableLayoutResult(layout, constraint.getSummary())); }
@Override public List<ConnectorTableLayoutResult> getTableLayouts(ConnectorSession session, ConnectorTableHandle table, Constraint<ColumnHandle> constraint, Optional<Set<ColumnHandle>> desiredColumns) { InformationSchemaTableHandle handle = checkType(table, InformationSchemaTableHandle.class, "table"); ConnectorTableLayout layout = new ConnectorTableLayout(new InformationSchemaTableLayoutHandle(handle, constraint.getSummary())); return ImmutableList.of(new ConnectorTableLayoutResult(layout, constraint.getSummary())); }
@Override public List<ConnectorTableLayoutResult> getTableLayouts(ConnectorSession session, ConnectorTableHandle table, Constraint<ColumnHandle> constraint, Optional<Set<ColumnHandle>> desiredColumns) { SystemTableHandle tableHandle = checkType(table, SystemTableHandle.class, "table"); ConnectorTableLayout layout = new ConnectorTableLayout(new SystemTableLayoutHandle(tableHandle.getConnectorId(), tableHandle, constraint.getSummary())); return ImmutableList.of(new ConnectorTableLayoutResult(layout, constraint.getSummary())); }
@Override public List<ConnectorTableLayoutResult> getTableLayouts(ConnectorSession session, ConnectorTableHandle table, Constraint<ColumnHandle> constraint, Optional<Set<ColumnHandle>> desiredColumns) { TpchTableHandle tableHandle = checkType(table, TpchTableHandle.class, "table"); Optional<Set<ColumnHandle>> partitioningColumns = Optional.empty(); List<LocalProperty<ColumnHandle>> localProperties = ImmutableList.of(); Map<String, ColumnHandle> columns = getColumnHandles(session, tableHandle); if (tableHandle.getTableName().equals(TpchTable.ORDERS.getTableName())) { partitioningColumns = Optional.of(ImmutableSet.of(columns.get(OrderColumn.ORDER_KEY.getColumnName()))); localProperties = ImmutableList.of(new SortingProperty<>(columns.get(OrderColumn.ORDER_KEY.getColumnName()), SortOrder.ASC_NULLS_FIRST)); } else if (tableHandle.getTableName().equals(TpchTable.LINE_ITEM.getTableName())) { partitioningColumns = Optional.of(ImmutableSet.of(columns.get(LineItemColumn.ORDER_KEY.getColumnName()))); localProperties = ImmutableList.of( new SortingProperty<>(columns.get(LineItemColumn.ORDER_KEY.getColumnName()), SortOrder.ASC_NULLS_FIRST), new SortingProperty<>(columns.get(LineItemColumn.LINE_NUMBER.getColumnName()), SortOrder.ASC_NULLS_FIRST)); } ConnectorTableLayout layout = new ConnectorTableLayout( new TpchTableLayoutHandle(tableHandle), Optional.<List<ColumnHandle>>empty(), TupleDomain.<ColumnHandle>all(), // TODO: return well-known properties (e.g., orderkey > 0, etc) partitioningColumns, Optional.empty(), localProperties); return ImmutableList.of(new ConnectorTableLayoutResult(layout, constraint.getSummary())); }
@Override public ConnectorTableLayout getTableLayout(ConnectorSession session, ConnectorTableLayoutHandle handle) { TpchTableLayoutHandle layout = checkType(handle, TpchTableLayoutHandle.class, "layout"); // tables in this connector have a single layout return getTableLayouts(session, layout.getTable(), Constraint.<ColumnHandle>alwaysTrue(), Optional.empty()) .get(0) .getTableLayout(); }
@Override public List<ConnectorTableLayoutResult> getTableLayouts( ConnectorSession session, ConnectorTableHandle table, Constraint<ColumnHandle> constraint, Optional<Set<ColumnHandle>> desiredColumns) { try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) { return delegate.getTableLayouts(session, table, constraint, desiredColumns); } }
@Override public List<ConnectorTableLayoutResult> getTableLayouts(ConnectorSession session, ConnectorTableHandle table, Constraint<ColumnHandle> constraint, Optional<Set<ColumnHandle>> desiredColumns) { CassandraTableHandle handle = checkType(table, CassandraTableHandle.class, "table"); CassandraPartitionResult result = partitionManager.getPartitions(handle, constraint.getSummary()); ConnectorTableLayout layout = getTableLayout(session, new CassandraTableLayoutHandle(handle, result.getPartitions())); return ImmutableList.of(new ConnectorTableLayoutResult(layout, result.getUnenforcedConstraint())); }
private List<HivePartition> getOrComputePartitions(HiveTableLayoutHandle layoutHandle, ConnectorSession session, ConnectorTableHandle tableHandle) { if (layoutHandle.getPartitions().isPresent()) { return layoutHandle.getPartitions().get(); } else { TupleDomain<ColumnHandle> promisedPredicate = layoutHandle.getPromisedPredicate(); Predicate<Map<ColumnHandle, NullableValue>> predicate = convertToPredicate(promisedPredicate); List<ConnectorTableLayoutResult> tableLayoutResults = getTableLayouts(session, tableHandle, new Constraint<>(promisedPredicate, predicate), Optional.empty()); return checkType(Iterables.getOnlyElement(tableLayoutResults).getTableLayout().getHandle(), HiveTableLayoutHandle.class, "tableLayoutHandle").getPartitions().get(); } }
@Override public List<ConnectorTableLayoutResult> getTableLayouts(ConnectorSession session, ConnectorTableHandle tableHandle, Constraint<ColumnHandle> constraint, Optional<Set<ColumnHandle>> desiredColumns) { HiveTableHandle handle = checkType(tableHandle, HiveTableHandle.class, "tableHandle"); HivePartitionResult hivePartitionResult = partitionManager.getPartitions(session, metastore, tableHandle, constraint.getSummary()); return ImmutableList.of(new ConnectorTableLayoutResult( getTableLayout(session, new HiveTableLayoutHandle(handle.getClientId(), hivePartitionResult.getPartitions(), hivePartitionResult.getEnforcedConstraint())), hivePartitionResult.getUnenforcedConstraint())); }
@Test public void testGetPartitions() throws Exception { ConnectorTableHandle tableHandle = getTableHandle(tablePartitionFormat); List<ConnectorTableLayoutResult> tableLayoutResults = metadata.getTableLayouts(newSession(), tableHandle, new Constraint<>(TupleDomain.all(), bindings -> true), Optional.empty()); assertExpectedTableLayout(getOnlyElement(tableLayoutResults).getTableLayout(), tableLayout); }
@Test public void testGetPartitionsWithBindings() throws Exception { ConnectorTableHandle tableHandle = getTableHandle(tablePartitionFormat); List<ConnectorTableLayoutResult> tableLayoutResults = metadata.getTableLayouts(newSession(), tableHandle, new Constraint<>(TupleDomain.withColumnDomains(ImmutableMap.of(intColumn, Domain.singleValue(BIGINT, 5L))), bindings -> true), Optional.empty()); assertExpectedTableLayout(getOnlyElement(tableLayoutResults).getTableLayout(), tableLayout); }
@Test public void testGetPartitionNames() throws Exception { ConnectorTableHandle tableHandle = getTableHandle(tablePartitionFormat); List<ConnectorTableLayoutResult> tableLayoutResults = metadata.getTableLayouts(newSession(), tableHandle, new Constraint<>(TupleDomain.all(), bindings -> true), Optional.empty()); assertExpectedTableLayout(getOnlyElement(tableLayoutResults).getTableLayout(), tableLayout); }
@Test public void testGetPartitionNamesUnpartitioned() throws Exception { ConnectorTableHandle tableHandle = getTableHandle(tableUnpartitioned); List<ConnectorTableLayoutResult> tableLayoutResults = metadata.getTableLayouts(newSession(), tableHandle, new Constraint<>(TupleDomain.all(), bindings -> true), Optional.empty()); assertEquals(getAllPartitions(getOnlyElement(tableLayoutResults).getTableLayout().getHandle()).size(), 1); assertExpectedTableLayout(getOnlyElement(tableLayoutResults).getTableLayout(), unpartitionedTableLayout); }
@Test public void testGetPartitionSplitsBatch() throws Exception { ConnectorSession session = newSession(); ConnectorTableHandle tableHandle = getTableHandle(tablePartitionFormat); List<ConnectorTableLayoutResult> tableLayoutResults = metadata.getTableLayouts(session, tableHandle, new Constraint<>(TupleDomain.all(), bindings -> true), Optional.empty()); ConnectorSplitSource splitSource = splitManager.getSplits(session, getOnlyElement(tableLayoutResults).getTableLayout().getHandle()); assertEquals(getSplitCount(splitSource), partitionCount); }
@Test public void testGetPartitionSplitsBatchUnpartitioned() throws Exception { ConnectorSession session = newSession(); ConnectorTableHandle tableHandle = getTableHandle(tableUnpartitioned); List<ConnectorTableLayoutResult> tableLayoutResults = metadata.getTableLayouts(session, tableHandle, new Constraint<>(TupleDomain.all(), bindings -> true), Optional.empty()); ConnectorSplitSource splitSource = splitManager.getSplits(session, getOnlyElement(tableLayoutResults).getTableLayout().getHandle()); assertEquals(getSplitCount(splitSource), 1); }
@Test public void testGetPartitionTableOffline() throws Exception { ConnectorTableHandle tableHandle = getTableHandle(tableOffline); try { metadata.getTableLayouts(newSession(), tableHandle, new Constraint<>(TupleDomain.all(), bindings -> true), Optional.empty()); fail("expected TableOfflineException"); } catch (TableOfflineException e) { assertEquals(e.getTableName(), tableOffline); } }
private MaterializedResult readTable( ConnectorTableHandle tableHandle, List<ColumnHandle> columnHandles, ConnectorSession session, TupleDomain<ColumnHandle> tupleDomain, OptionalInt expectedSplitCount, Optional<HiveStorageFormat> expectedStorageFormat) throws Exception { List<ConnectorTableLayoutResult> tableLayoutResults = metadata.getTableLayouts(session, tableHandle, new Constraint<>(tupleDomain, bindings -> true), Optional.empty()); ConnectorTableLayoutHandle layoutHandle = getOnlyElement(tableLayoutResults).getTableLayout().getHandle(); List<ConnectorSplit> splits = getAllSplits(splitManager.getSplits(session, layoutHandle)); if (expectedSplitCount.isPresent()) { assertEquals(splits.size(), expectedSplitCount.getAsInt()); } ImmutableList.Builder<MaterializedRow> allRows = ImmutableList.builder(); for (ConnectorSplit split : splits) { try (ConnectorPageSource pageSource = pageSourceProvider.createPageSource(session, split, columnHandles)) { if (expectedStorageFormat.isPresent()) { assertPageSourceType(pageSource, expectedStorageFormat.get()); } MaterializedResult result = materializeSourceDataStream(session, pageSource, getTypes(columnHandles)); allRows.addAll(result.getMaterializedRows()); } } return new MaterializedResult(allRows.build(), getTypes(columnHandles)); }
private List<ConnectorSplit> getAllSplits(ConnectorTableHandle tableHandle, TupleDomain<ColumnHandle> tupleDomain) throws InterruptedException { ConnectorSession session = newSession(); List<ConnectorTableLayoutResult> tableLayoutResults = metadata.getTableLayouts(session, tableHandle, new Constraint<>(tupleDomain, bindings -> true), Optional.empty()); ConnectorTableLayoutHandle layoutHandle = getOnlyElement(tableLayoutResults).getTableLayout().getHandle(); return getAllSplits(splitManager.getSplits(session, layoutHandle)); }
@Override public List<ConnectorTableLayoutResult> getTableLayouts(ConnectorSession session, ConnectorTableHandle table, Constraint<ColumnHandle> constraint, Optional<Set<ColumnHandle>> desiredColumns) { SpreadsheetTableHandle tableHandle = (SpreadsheetTableHandle) table; SpreadsheetTableLayoutHandle baseTableLayoutHandle = createTableLayoutHandle(tableHandle); ConnectorTableLayout layout = new ConnectorTableLayout(baseTableLayoutHandle); return ImmutableList.of(new ConnectorTableLayoutResult(layout, constraint.getSummary())); }
@Override public ConnectorTableLayout getTableLayout(ConnectorSession session, ConnectorTableLayoutHandle handle) { SpreadsheetTableLayoutHandle layout = (SpreadsheetTableLayoutHandle) handle; List<ConnectorTableLayoutResult> tableLayouts = getTableLayouts(session, layout.getTable(), Constraint.<ColumnHandle>alwaysTrue(), Optional.empty()); ConnectorTableLayoutResult connectorTableLayoutResult = tableLayouts.get(0); return connectorTableLayoutResult.getTableLayout(); }
@Override public List<ConnectorTableLayoutResult> getTableLayouts(ConnectorSession session, ConnectorTableHandle table, Constraint<ColumnHandle> constraint, Optional<Set<ColumnHandle>> desiredColumns) { BaseTableHandle tableHandle = (BaseTableHandle) table; BaseTableLayoutHandle baseTableLayoutHandle = createTableLayoutHandle(tableHandle); ConnectorTableLayout layout = new ConnectorTableLayout(baseTableLayoutHandle); return ImmutableList.of(new ConnectorTableLayoutResult(layout, constraint.getSummary())); }