private static LocalQueryRunner createLocalQueryRunner() { Session defaultSession = testSessionBuilder() .setCatalog("tpch") .setSchema(TINY_SCHEMA_NAME) .build(); LocalQueryRunner localQueryRunner = new LocalQueryRunner(defaultSession); InMemoryNodeManager nodeManager = localQueryRunner.getNodeManager(); localQueryRunner.createCatalog("tpch", new TpchConnectorFactory(nodeManager, 1), ImmutableMap.<String, String>of()); HyperLogLogPlugin plugin = new HyperLogLogPlugin(); for (Type type : plugin.getTypes()) { localQueryRunner.getTypeManager().addType(type); } for (ParametricType parametricType : plugin.getParametricTypes()) { localQueryRunner.getTypeManager().addParametricType(parametricType); } localQueryRunner.getMetadata().addFunctions(extractFunctions(plugin.getFunctions())); return localQueryRunner; }
public static LocalQueryRunner createLocalQueryRunner(boolean hashingEnabled) { SessionBuilder sessionBuilder = testSessionBuilder() .setCatalog("tpch") .setSchema(TINY_SCHEMA_NAME); if (hashingEnabled) { sessionBuilder.setSystemProperties(ImmutableMap.of("optimizer.optimize_hash_generation", "true")); } Session session = sessionBuilder.build(); LocalQueryRunner localQueryRunner = queryRunnerWithInitialTransaction(session); // add tpch InMemoryNodeManager nodeManager = localQueryRunner.getNodeManager(); localQueryRunner.createCatalog("tpch", new TpchConnectorFactory(nodeManager, 1), ImmutableMap.<String, String>of()); return localQueryRunner; }
public static LocalQueryRunner createLocalQueryRunner() { Session session = testSessionBuilder() .setCatalog("raptor") .setSchema("benchmark") .build(); LocalQueryRunner localQueryRunner = new LocalQueryRunner(session); // add tpch InMemoryNodeManager nodeManager = localQueryRunner.getNodeManager(); localQueryRunner.createCatalog("tpch", new TpchConnectorFactory(nodeManager, 1), ImmutableMap.<String, String>of()); // add raptor ConnectorFactory raptorConnectorFactory = createRaptorConnectorFactory(TPCH_CACHE_DIR, nodeManager); localQueryRunner.createCatalog("raptor", raptorConnectorFactory, ImmutableMap.of()); if (!localQueryRunner.tableExists(session, "orders")) { localQueryRunner.execute("CREATE TABLE orders AS SELECT * FROM tpch.sf1.orders"); } if (!localQueryRunner.tableExists(session, "lineitem")) { localQueryRunner.execute("CREATE TABLE lineitem AS SELECT * FROM tpch.sf1.lineitem"); } return localQueryRunner; }
@Test public void testAssignRandomNodeWhenBackupAvailable() throws InterruptedException, URISyntaxException { InMemoryNodeManager nodeManager = new InMemoryNodeManager(); RaptorConnectorId connectorId = new RaptorConnectorId("raptor"); NodeSupplier nodeSupplier = new RaptorNodeSupplier(nodeManager, connectorId); PrestoNode node = new PrestoNode(UUID.randomUUID().toString(), new URI("http://127.0.0.1/"), NodeVersion.UNKNOWN); nodeManager.addNode(connectorId.toString(), node); RaptorSplitManager raptorSplitManagerWithBackup = new RaptorSplitManager(connectorId, nodeSupplier, shardManager, true); deleteShardNodes(); ConnectorTableLayoutResult layout = getOnlyElement(metadata.getTableLayouts(SESSION, tableHandle, Constraint.alwaysTrue(), Optional.empty())); ConnectorSplitSource partitionSplit = getSplits(raptorSplitManagerWithBackup, layout); List<ConnectorSplit> batch = getFutureValue(partitionSplit.getNextBatch(1), PrestoException.class); assertEquals(getOnlyElement(getOnlyElement(batch).getAddresses()), node.getHostAndPort()); }
@BeforeMethod public void setupDatabase() throws Exception { TypeRegistry typeRegistry = new TypeRegistry(); dbi = new DBI("jdbc:h2:mem:test" + System.nanoTime()); dbi.registerMapper(new TableColumn.Mapper(typeRegistry)); dummyHandle = dbi.open(); createTablesWithRetry(dbi); RaptorConnectorId connectorId = new RaptorConnectorId("raptor"); InMemoryNodeManager nodeManager = new InMemoryNodeManager(); nodeManager.addCurrentNodeDatasource(connectorId.toString()); NodeSupplier nodeSupplier = new RaptorNodeSupplier(nodeManager, connectorId); shardManager = new DatabaseShardManager(dbi, nodeSupplier); metadata = new RaptorMetadata(connectorId.toString(), dbi, shardManager, SHARD_INFO_CODEC, SHARD_DELTA_CODEC); }
@Test public void testNoNodes() throws Exception { try { NodeTaskMap nodeTaskMap = new NodeTaskMap(finalizerService); InMemoryNodeManager nodeManager = new InMemoryNodeManager(); NodeScheduler nodeScheduler = new NodeScheduler(new LegacyNetworkTopology(), nodeManager, new NodeSchedulerConfig().setIncludeCoordinator(false), nodeTaskMap); StageExecutionPlan plan = createPlan(createFixedSplitSource(20, TestingSplit::createRemoteSplit)); SqlStageExecution stage = createSqlStageExecution(plan, nodeTaskMap); SourcePartitionedScheduler scheduler = new SourcePartitionedScheduler( stage, plan.getDataSource().get(), new SplitPlacementPolicy(nodeScheduler.createNodeSelector("test"), stage::getAllTasks), 2); scheduler.schedule(); fail("expected PrestoException"); } catch (PrestoException e) { assertEquals(e.getErrorCode(), NO_NODES_AVAILABLE.toErrorCode()); } }
public static ShardRecoveryManager createShardRecoveryManager( StorageService storageService, Optional<BackupStore> backupStore, ShardManager shardManager) { return new ShardRecoveryManager( storageService, backupStore, new InMemoryNodeManager(), shardManager, new Duration(5, MINUTES), 10); }
public static OrcStorageManager createOrcStorageManager(IDBI dbi, File temporary, int maxShardRows) throws IOException { File directory = new File(temporary, "data"); StorageService storageService = new FileStorageService(directory); storageService.start(); File backupDirectory = new File(temporary, "backup"); FileBackupStore fileBackupStore = new FileBackupStore(backupDirectory); fileBackupStore.start(); Optional<BackupStore> backupStore = Optional.of(fileBackupStore); ShardManager shardManager = createShardManager(dbi); ShardRecoveryManager recoveryManager = new ShardRecoveryManager( storageService, backupStore, new InMemoryNodeManager(), shardManager, MISSING_SHARD_DISCOVERY, 10); return createOrcStorageManager( storageService, backupStore, recoveryManager, new InMemoryShardRecorder(), maxShardRows, MAX_FILE_SIZE); }
@Test public void testPlugin() throws Exception { RaptorPlugin plugin = loadPlugin(RaptorPlugin.class); plugin.setNodeManager(new InMemoryNodeManager()); TypeRegistry typeRegistry = new TypeRegistry(); plugin.setTypeManager(typeRegistry); plugin.setBlockEncodingSerde(new BlockEncodingManager(typeRegistry)); plugin.setPageSorter(new PagesIndexPageSorter()); List<ConnectorFactory> factories = plugin.getServices(ConnectorFactory.class); ConnectorFactory factory = getOnlyElement(factories); assertInstanceOf(factory, RaptorConnectorFactory.class); File tmpDir = Files.createTempDir(); try { Map<String, String> config = ImmutableMap.<String, String>builder() .put("metadata.db.type", "h2") .put("metadata.db.filename", tmpDir.getAbsolutePath()) .put("storage.data-directory", tmpDir.getAbsolutePath()) .build(); factory.create("test", config); } finally { FileUtils.deleteRecursively(tmpDir); } }
@Test public void testTransactionWorkflow() throws Exception { try (IdleCheckExecutor executor = new IdleCheckExecutor()) { TransactionManager transactionManager = TransactionManager.create(new TransactionManagerConfig(), executor.getExecutor(), finishingExecutor); Connector c1 = new TpchConnectorFactory(new InMemoryNodeManager()).create("c1", ImmutableMap.of()); transactionManager.addConnector("c1", c1); TransactionId transactionId = transactionManager.beginTransaction(false); Assert.assertEquals(transactionManager.getAllTransactionInfos().size(), 1); TransactionInfo transactionInfo = transactionManager.getTransactionInfo(transactionId); Assert.assertFalse(transactionInfo.isAutoCommitContext()); Assert.assertTrue(transactionInfo.getConnectorIds().isEmpty()); Assert.assertFalse(transactionInfo.getWrittenConnectorId().isPresent()); ConnectorMetadata metadata = transactionManager.getMetadata(transactionId, "c1"); metadata.listSchemaNames(TEST_SESSION.toConnectorSession("c1")); transactionInfo = transactionManager.getTransactionInfo(transactionId); Assert.assertEquals(transactionInfo.getConnectorIds(), ImmutableList.of("c1")); Assert.assertFalse(transactionInfo.getWrittenConnectorId().isPresent()); transactionManager.asyncCommit(transactionId).join(); Assert.assertTrue(transactionManager.getAllTransactionInfos().isEmpty()); } }
@Test public void testAbortedTransactionWorkflow() throws Exception { try (IdleCheckExecutor executor = new IdleCheckExecutor()) { TransactionManager transactionManager = TransactionManager.create(new TransactionManagerConfig(), executor.getExecutor(), finishingExecutor); Connector c1 = new TpchConnectorFactory(new InMemoryNodeManager()).create("c1", ImmutableMap.of()); transactionManager.addConnector("c1", c1); TransactionId transactionId = transactionManager.beginTransaction(false); Assert.assertEquals(transactionManager.getAllTransactionInfos().size(), 1); TransactionInfo transactionInfo = transactionManager.getTransactionInfo(transactionId); Assert.assertFalse(transactionInfo.isAutoCommitContext()); Assert.assertTrue(transactionInfo.getConnectorIds().isEmpty()); Assert.assertFalse(transactionInfo.getWrittenConnectorId().isPresent()); ConnectorMetadata metadata = transactionManager.getMetadata(transactionId, "c1"); metadata.listSchemaNames(TEST_SESSION.toConnectorSession("c1")); transactionInfo = transactionManager.getTransactionInfo(transactionId); Assert.assertEquals(transactionInfo.getConnectorIds(), ImmutableList.of("c1")); Assert.assertFalse(transactionInfo.getWrittenConnectorId().isPresent()); transactionManager.asyncAbort(transactionId).join(); Assert.assertTrue(transactionManager.getAllTransactionInfos().isEmpty()); } }
@Setup public void setup() throws NoSuchMethodException, IllegalAccessException { TestingTransactionHandle transactionHandle = TestingTransactionHandle.create("foo"); finalizerService.start(); NodeTaskMap nodeTaskMap = new NodeTaskMap(finalizerService); ImmutableList.Builder<Node> nodeBuilder = ImmutableList.builder(); for (int i = 0; i < NODES; i++) { nodeBuilder.add(new PrestoNode("node" + i, URI.create("http://" + addressForHost(i).getHostText()), NodeVersion.UNKNOWN)); } List<Node> nodes = nodeBuilder.build(); MockRemoteTaskFactory remoteTaskFactory = new MockRemoteTaskFactory(Executors.newCachedThreadPool(daemonThreadsNamed("remoteTaskExecutor-%s"))); for (int i = 0; i < nodes.size(); i++) { Node node = nodes.get(i); ImmutableList.Builder<Split> initialSplits = ImmutableList.builder(); for (int j = 0; j < MAX_SPLITS_PER_NODE + MAX_PENDING_SPLITS_PER_TASK_PER_NODE; j++) { initialSplits.add(new Split("foo", transactionHandle, new TestSplitRemote(i))); } TaskId taskId = new TaskId(new StageId("test", "1"), String.valueOf(i)); MockRemoteTaskFactory.MockRemoteTask remoteTask = remoteTaskFactory.createTableScanTask(taskId, node, initialSplits.build(), nodeTaskMap.createPartitionedSplitCountTracker(node, taskId)); nodeTaskMap.addTask(node, remoteTask); taskMap.put(node, remoteTask); } for (int i = 0; i < SPLITS; i++) { splits.add(new Split("foo", transactionHandle, new TestSplitRemote(ThreadLocalRandom.current().nextInt(DATA_NODES)))); } InMemoryNodeManager nodeManager = new InMemoryNodeManager(); nodeManager.addNode("foo", nodes); NodeScheduler nodeScheduler = new NodeScheduler(getNetworkTopology(), nodeManager, getNodeSchedulerConfig(), nodeTaskMap); nodeSelector = nodeScheduler.createNodeSelector("foo"); }
@BeforeMethod public void setUp() throws Exception { finalizerService = new FinalizerService(); nodeTaskMap = new NodeTaskMap(finalizerService); nodeManager = new InMemoryNodeManager(); ImmutableList.Builder<Node> nodeBuilder = ImmutableList.builder(); nodeBuilder.add(new PrestoNode("other1", URI.create("http://127.0.0.1:11"), NodeVersion.UNKNOWN)); nodeBuilder.add(new PrestoNode("other2", URI.create("http://127.0.0.1:12"), NodeVersion.UNKNOWN)); nodeBuilder.add(new PrestoNode("other3", URI.create("http://127.0.0.1:13"), NodeVersion.UNKNOWN)); ImmutableList<Node> nodes = nodeBuilder.build(); nodeManager.addNode("foo", nodes); NodeSchedulerConfig nodeSchedulerConfig = new NodeSchedulerConfig() .setMaxSplitsPerNode(20) .setIncludeCoordinator(false) .setMaxPendingSplitsPerNodePerTask(10); NodeScheduler nodeScheduler = new NodeScheduler(new LegacyNetworkTopology(), nodeManager, nodeSchedulerConfig, nodeTaskMap); // contents of taskMap indicate the node-task map for the current stage taskMap = new HashMap<>(); nodeSelector = nodeScheduler.createNodeSelector("foo"); remoteTaskExecutor = Executors.newCachedThreadPool(daemonThreadsNamed("remoteTaskExecutor-%s")); finalizerService.start(); }
public static LocalQueryRunner createLocalQueryRunner(File tempDir) { Session session = testSessionBuilder() .setCatalog("hive") .setSchema("tpch") .build(); LocalQueryRunner localQueryRunner = new LocalQueryRunner(session); // add tpch InMemoryNodeManager nodeManager = localQueryRunner.getNodeManager(); localQueryRunner.createCatalog("tpch", new TpchConnectorFactory(nodeManager, 1), ImmutableMap.<String, String>of()); // add hive File hiveDir = new File(tempDir, "hive_data"); InMemoryHiveMetastore metastore = new InMemoryHiveMetastore(hiveDir); File tpchDataDir = new File(hiveDir, "tpch"); metastore.createDatabase(new Database("tpch", null, tpchDataDir.toURI().toString(), null)); HiveConnectorFactory hiveConnectorFactory = new HiveConnectorFactory( "hive", ImmutableMap.of("node.environment", "test"), HiveBenchmarkQueryRunner.class.getClassLoader(), metastore, new TypeRegistry(), new GroupByHashPageIndexerFactory()); Map<String, String> hiveCatalogConfig = ImmutableMap.<String, String>builder() .put("hive.metastore.uri", "thrift://none.invalid:0") .put("hive.max-split-size", "10GB") .build(); localQueryRunner.createCatalog("hive", hiveConnectorFactory, hiveCatalogConfig); localQueryRunner.execute("CREATE TABLE orders AS SELECT * FROM tpch.sf1.orders"); localQueryRunner.execute("CREATE TABLE lineitem AS SELECT * FROM tpch.sf1.lineitem"); return localQueryRunner; }
public SqlEngine(StructuredStore store, ExecutorService executor) { this.store = store; this.executor = executor; MetadataManager metadataManager = new MetadataManager(); SplitManager splitManager = new SplitManager(Sets.<ConnectorSplitManager> newHashSet()); this.dataStreamManager = new DataStreamManager(); HandleResolver handleResolver = new HandleResolver(); Map<String, ConnectorFactory> connectorFactories = Maps.newHashMap(); Map<String, Connector> globalConnectors = Maps.newHashMap(); RecordSinkManager recordSinkManager = new RecordSinkManager(); Map<String, ConnectorOutputHandleResolver> handleIdResolvers = Maps.newHashMap(); OutputTableHandleResolver outputTableHandleResolver = new OutputTableHandleResolver(handleIdResolvers); this.connectorManager = new ConnectorManager(metadataManager, splitManager, dataStreamManager, recordSinkManager, handleResolver, outputTableHandleResolver, connectorFactories, globalConnectors); // NodeManager nodeManager = new InMemoryNodeManager(); PlanOptimizersFactory planOptimizersFactory = new PlanOptimizersFactory(metadataManager, splitManager); List<PlanOptimizer> planOptimizers = planOptimizersFactory.get(); this.metadataManager = metadataManager; this.planOptimizers = planOptimizers; this.periodicImportManager = new StubPeriodicImportManager(); this.storageManager = new StubStorageManager(); NodeManager nodeManager = new InMemoryNodeManager(); CloudataConnectorFactory cloudataConnectorFactory = new CloudataConnectorFactory(nodeManager, Maps.<String, String> newHashMap(), store); connectorManager.addConnectorFactory(cloudataConnectorFactory); connectorManager.createConnection(catalogName, CloudataConnectorFactory.PROVIDER_ID, Maps.<String, String> newHashMap()); this.cloudataConnector = cloudataConnectorFactory.get(catalogName); }
private Plan parse(String sql) { InMemoryNodeManager nodeManager = new InMemoryNodeManager(); MetadataManager metadata = buildMetadata(); StorageManager storageManager = new MockStorageManager(); PeriodicImportManager periodicImportManager = new StubPeriodicImportManager(); SplitManager splitManager = buildSplitManager(nodeManager); List<PlanOptimizer> planOptimizers = buildPlanOptimizers(metadata, splitManager); Statement statement = SqlParser.createStatement(sql); // System.out.println("Statement: " + statement); Session session = buildSession(); QueryExplainer queryExplainer = new QueryExplainer(session, planOptimizers, metadata, periodicImportManager, storageManager); // analyze query Analyzer analyzer = new Analyzer(session, metadata, Optional.of(queryExplainer)); Analysis analysis = analyzer.analyze(statement); // System.out.println("analysis: " + analysis); PlanNodeIdAllocator idAllocator = new PlanNodeIdAllocator(); // plan query LogicalPlanner logicalPlanner = new LogicalPlanner(session, planOptimizers, idAllocator, metadata, periodicImportManager, storageManager); Plan plan = logicalPlanner.plan(analysis); return plan; }
@BeforeMethod public void setup() throws Exception { TypeRegistry typeRegistry = new TypeRegistry(); DBI dbi = new DBI("jdbc:h2:mem:test" + System.nanoTime()); dbi.registerMapper(new TableColumn.Mapper(typeRegistry)); dummyHandle = dbi.open(); temporary = createTempDir(); shardManager = createShardManager(dbi); InMemoryNodeManager nodeManager = new InMemoryNodeManager(); RaptorNodeSupplier nodeSupplier = new RaptorNodeSupplier(nodeManager, new RaptorConnectorId("raptor")); String nodeName = UUID.randomUUID().toString(); nodeManager.addNode("raptor", new PrestoNode(nodeName, new URI("http://127.0.0.1/"), NodeVersion.UNKNOWN)); RaptorConnectorId connectorId = new RaptorConnectorId("raptor"); metadata = new RaptorMetadata(connectorId.toString(), dbi, shardManager, SHARD_INFO_CODEC, SHARD_DELTA_CODEC); metadata.createTable(SESSION, TEST_TABLE); tableHandle = metadata.getTableHandle(SESSION, TEST_TABLE.getTable()); List<ShardInfo> shards = ImmutableList.<ShardInfo>builder() .add(shardInfo(UUID.randomUUID(), nodeName)) .add(shardInfo(UUID.randomUUID(), nodeName)) .add(shardInfo(UUID.randomUUID(), nodeName)) .add(shardInfo(UUID.randomUUID(), nodeName)) .build(); tableId = checkType(tableHandle, RaptorTableHandle.class, "tableHandle").getTableId(); List<ColumnInfo> columns = metadata.getColumnHandles(SESSION, tableHandle).values().stream() .map(handle -> checkType(handle, RaptorColumnHandle.class, "columnHandle")) .map(ColumnInfo::fromHandle) .collect(toList()); long transactionId = shardManager.beginTransaction(); shardManager.commitShards(transactionId, tableId, columns, shards, Optional.empty()); raptorSplitManager = new RaptorSplitManager(connectorId, nodeSupplier, shardManager, false); }
public InMemoryNodeManager getNodeManager() { return nodeManager; }
@Test public void testFailedTransactionWorkflow() throws Exception { try (IdleCheckExecutor executor = new IdleCheckExecutor()) { TransactionManager transactionManager = TransactionManager.create(new TransactionManagerConfig(), executor.getExecutor(), finishingExecutor); Connector c1 = new TpchConnectorFactory(new InMemoryNodeManager()).create("c1", ImmutableMap.of()); transactionManager.addConnector("c1", c1); TransactionId transactionId = transactionManager.beginTransaction(false); Assert.assertEquals(transactionManager.getAllTransactionInfos().size(), 1); TransactionInfo transactionInfo = transactionManager.getTransactionInfo(transactionId); Assert.assertFalse(transactionInfo.isAutoCommitContext()); Assert.assertTrue(transactionInfo.getConnectorIds().isEmpty()); Assert.assertFalse(transactionInfo.getWrittenConnectorId().isPresent()); ConnectorMetadata metadata = transactionManager.getMetadata(transactionId, "c1"); metadata.listSchemaNames(TEST_SESSION.toConnectorSession("c1")); transactionInfo = transactionManager.getTransactionInfo(transactionId); Assert.assertEquals(transactionInfo.getConnectorIds(), ImmutableList.of("c1")); Assert.assertFalse(transactionInfo.getWrittenConnectorId().isPresent()); transactionManager.fail(transactionId); Assert.assertEquals(transactionManager.getAllTransactionInfos().size(), 1); try { transactionManager.getMetadata(transactionId, "c1"); Assert.fail(); } catch (PrestoException e) { Assert.assertEquals(e.getErrorCode(), TRANSACTION_ALREADY_ABORTED.toErrorCode()); } Assert.assertEquals(transactionManager.getAllTransactionInfos().size(), 1); transactionManager.asyncAbort(transactionId).join(); Assert.assertTrue(transactionManager.getAllTransactionInfos().isEmpty()); } }
@Test public void testBlocking() throws Exception { Session session = TEST_SESSION .withSystemProperty("task_default_concurrency", "1"); LocalQueryRunner localQueryRunner = queryRunnerWithInitialTransaction(session); // add tpch InMemoryNodeManager nodeManager = localQueryRunner.getNodeManager(); localQueryRunner.createCatalog("tpch", new TpchConnectorFactory(nodeManager, 1), ImmutableMap.<String, String>of()); // reserve all the memory in the pool MemoryPool pool = new MemoryPool(new MemoryPoolId("test"), new DataSize(10, MEGABYTE)); QueryId fakeQueryId = new QueryId("fake"); assertTrue(pool.tryReserve(fakeQueryId, TEN_MEGABYTES)); MemoryPool systemPool = new MemoryPool(new MemoryPoolId("testSystem"), new DataSize(10, MEGABYTE)); QueryContext queryContext = new QueryContext(new QueryId("query"), new DataSize(10, MEGABYTE), pool, systemPool, localQueryRunner.getExecutor()); LocalQueryRunner.MaterializedOutputFactory outputFactory = new LocalQueryRunner.MaterializedOutputFactory(); TaskContext taskContext = createTaskContext(queryContext, localQueryRunner.getExecutor(), session, new DataSize(0, BYTE)); Driver driver = Iterables.getOnlyElement(localQueryRunner.createDrivers("SELECT COUNT(*), clerk FROM orders GROUP BY clerk", outputFactory, taskContext)); // run driver, until it blocks while (!driver.isFinished()) { if (!driver.process().isDone()) { break; } } // driver should be blocked waiting for memory assertFalse(driver.isFinished()); assertTrue(pool.getFreeBytes() <= 0); pool.free(fakeQueryId, TEN_MEGABYTES); do { // driver should not block assertTrue(driver.process().isDone()); } while (!driver.isFinished()); }
@Test public void testBalancedSplitAssignment() throws Exception { // use private node manager so we can add a node later InMemoryNodeManager nodeManager = new InMemoryNodeManager(); nodeManager.addNode(CONNECTOR_ID, new PrestoNode("other1", URI.create("http://127.0.0.1:11"), NodeVersion.UNKNOWN), new PrestoNode("other2", URI.create("http://127.0.0.1:12"), NodeVersion.UNKNOWN), new PrestoNode("other3", URI.create("http://127.0.0.1:13"), NodeVersion.UNKNOWN)); NodeTaskMap nodeTaskMap = new NodeTaskMap(finalizerService); // Schedule 15 splits - there are 3 nodes, each node should get 5 splits StageExecutionPlan firstPlan = createPlan(createFixedSplitSource(15, TestingSplit::createRemoteSplit)); SqlStageExecution firstStage = createSqlStageExecution(firstPlan, nodeTaskMap); SourcePartitionedScheduler firstScheduler = getSourcePartitionedScheduler(firstPlan, firstStage, nodeManager, nodeTaskMap, 200); ScheduleResult scheduleResult = firstScheduler.schedule(); assertTrue(scheduleResult.isFinished()); assertTrue(scheduleResult.getBlocked().isDone()); assertEquals(scheduleResult.getNewTasks().size(), 3); assertEquals(firstStage.getAllTasks().size(), 3); for (RemoteTask remoteTask : firstStage.getAllTasks()) { assertEquals(remoteTask.getPartitionedSplitCount(), 5); } // Add new node Node additionalNode = new PrestoNode("other4", URI.create("http://127.0.0.1:14"), NodeVersion.UNKNOWN); nodeManager.addNode(CONNECTOR_ID, additionalNode); // Schedule 5 splits in another query. Since the new node does not have any splits, all 5 splits are assigned to the new node StageExecutionPlan secondPlan = createPlan(createFixedSplitSource(5, TestingSplit::createRemoteSplit)); SqlStageExecution secondStage = createSqlStageExecution(secondPlan, nodeTaskMap); SourcePartitionedScheduler secondScheduler = getSourcePartitionedScheduler(secondPlan, secondStage, nodeManager, nodeTaskMap, 200); scheduleResult = secondScheduler.schedule(); assertTrue(scheduleResult.isFinished()); assertTrue(scheduleResult.getBlocked().isDone()); assertEquals(scheduleResult.getNewTasks().size(), 1); assertEquals(secondStage.getAllTasks().size(), 1); RemoteTask task = secondStage.getAllTasks().get(0); assertEquals(task.getPartitionedSplitCount(), 5); firstStage.abort(); secondStage.abort(); }