public CuratorTransactionFinal build(CuratorFramework client, String serviceRootPath) throws Exception { // List of paths that are known to exist, or which are about to be created by the transaction // Includes "known to exist" in order to avoid repeated lookups for the same path Set<String> existingAndPendingCreatePaths = new HashSet<>(); CuratorTransactionFinal transaction = client.inTransaction().check().forPath(serviceRootPath).and(); for (Map.Entry<String, byte[]> entry : pathBytesMap.entrySet()) { String path = entry.getKey(); if (!existingAndPendingCreatePaths.contains(path) && client.checkExists().forPath(path) == null) { // Path does not exist and is not being created: Create value (and any parents as needed). transaction = createParentsOf(client, path, transaction, existingAndPendingCreatePaths) .create().forPath(path, entry.getValue()).and(); existingAndPendingCreatePaths.add(path); } else { // Path exists (or will exist): Update existing value. transaction = transaction.setData().forPath(path, entry.getValue()).and(); } } return transaction; }
public CuratorTransactionFinal build(CuratorFramework client, String serviceRootPath) throws Exception { // List of paths which are about to be deleted by the transaction Set<String> pendingDeletePaths = new HashSet<>(); CuratorTransactionFinal transaction = client.inTransaction().check().forPath(serviceRootPath).and(); for (String path : pathsToClear) { // if present, delete path and any children (unless already being deleted) if (!pendingDeletePaths.contains(path) && client.checkExists().forPath(path) != null) { transaction = deleteChildrenOf(client, path, transaction, pendingDeletePaths) .delete().forPath(path).and(); pendingDeletePaths.add(path); } } return transaction; }
/** * Updates and returns a transaction which can be used to delete the children of the provided path, if any. */ private static CuratorTransactionFinal deleteChildrenOf( CuratorFramework client, String path, CuratorTransactionFinal curatorTransactionFinal, Set<String> pendingDeletePaths) throws Exception { if (pendingDeletePaths.contains(path)) { // Short-circuit: Path and any children are already scheduled for deletion return curatorTransactionFinal; } // For each child: recurse into child (to delete any grandchildren, etc..), THEN delete child itself for (String child : client.getChildren().forPath(path)) { String childPath = PersisterUtils.join(path, child); curatorTransactionFinal = deleteChildrenOf(client, childPath, curatorTransactionFinal, pendingDeletePaths); // RECURSE if (!pendingDeletePaths.contains(childPath)) { // Avoid attempting to delete a path twice in the same transaction, just in case we're told to delete // two nodes where one is the child of the other (or something to that effect) curatorTransactionFinal = curatorTransactionFinal.delete().forPath(childPath).and(); pendingDeletePaths.add(childPath); } } return curatorTransactionFinal; }
@Test public void assertPersistShardingInfoTransactionExecutionCallback() throws Exception { CuratorTransactionFinal curatorTransactionFinal = mock(CuratorTransactionFinal.class); TransactionCreateBuilder transactionCreateBuilder = mock(TransactionCreateBuilder.class); TransactionDeleteBuilder transactionDeleteBuilder = mock(TransactionDeleteBuilder.class); CuratorTransactionBridge curatorTransactionBridge = mock(CuratorTransactionBridge.class); when(curatorTransactionFinal.create()).thenReturn(transactionCreateBuilder); when(transactionCreateBuilder.forPath("/testJob/servers/host0/sharding", "0,1,2".getBytes())).thenReturn(curatorTransactionBridge); when(curatorTransactionBridge.and()).thenReturn(curatorTransactionFinal); when(curatorTransactionFinal.delete()).thenReturn(transactionDeleteBuilder); when(transactionDeleteBuilder.forPath("/testJob/leader/sharding/necessary")).thenReturn(curatorTransactionBridge); when(curatorTransactionBridge.and()).thenReturn(curatorTransactionFinal); when(curatorTransactionFinal.delete()).thenReturn(transactionDeleteBuilder); when(transactionDeleteBuilder.forPath("/testJob/leader/sharding/processing")).thenReturn(curatorTransactionBridge); when(curatorTransactionBridge.and()).thenReturn(curatorTransactionFinal); Map<String, List<Integer>> shardingItems = new HashMap<String, List<Integer>>(1); shardingItems.put("host0", Arrays.asList(0, 1, 2)); ShardingService.PersistShardingInfoTransactionExecutionCallback actual = shardingService.new PersistShardingInfoTransactionExecutionCallback(shardingItems); actual.execute(curatorTransactionFinal); verify(curatorTransactionFinal).create(); verify(transactionCreateBuilder).forPath("/testJob/servers/host0/sharding", "0,1,2".getBytes()); verify(curatorTransactionFinal, times(2)).delete(); verify(transactionDeleteBuilder).forPath("/testJob/leader/sharding/necessary"); verify(transactionDeleteBuilder).forPath("/testJob/leader/sharding/processing"); verify(curatorTransactionBridge, times(3)).and(); }
@Test public void assertPersistShardingInfoTransactionExecutionCallback() throws Exception { CuratorTransactionFinal curatorTransactionFinal = mock(CuratorTransactionFinal.class); TransactionCreateBuilder transactionCreateBuilder = mock(TransactionCreateBuilder.class); TransactionDeleteBuilder transactionDeleteBuilder = mock(TransactionDeleteBuilder.class); CuratorTransactionBridge curatorTransactionBridge = mock(CuratorTransactionBridge.class); when(curatorTransactionFinal.create()).thenReturn(transactionCreateBuilder); when(transactionCreateBuilder.forPath("/testJob/servers/host0/sharding", "0,1,2".getBytes())).thenReturn(curatorTransactionBridge); when(curatorTransactionBridge.and()).thenReturn(curatorTransactionFinal); when(curatorTransactionFinal.delete()).thenReturn(transactionDeleteBuilder); when(transactionDeleteBuilder.forPath("/testJob/leader/sharding/necessary")).thenReturn(curatorTransactionBridge); when(curatorTransactionBridge.and()).thenReturn(curatorTransactionFinal); when(curatorTransactionFinal.delete()).thenReturn(transactionDeleteBuilder); when(transactionDeleteBuilder.forPath("/testJob/leader/sharding/processing")).thenReturn(curatorTransactionBridge); when(curatorTransactionBridge.and()).thenReturn(curatorTransactionFinal); Map<String, List<Integer>> shardingItems = new HashMap<>(1); shardingItems.put("host0", Arrays.asList(0, 1, 2)); ShardingService.PersistShardingInfoTransactionExecutionCallback actual = shardingService.new PersistShardingInfoTransactionExecutionCallback(shardingItems); actual.execute(curatorTransactionFinal); verify(curatorTransactionFinal).create(); verify(transactionCreateBuilder).forPath("/testJob/servers/host0/sharding", "0,1,2".getBytes()); verify(curatorTransactionFinal, times(2)).delete(); verify(transactionDeleteBuilder).forPath("/testJob/leader/sharding/necessary"); verify(transactionDeleteBuilder).forPath("/testJob/leader/sharding/processing"); verify(curatorTransactionBridge, times(3)).and(); }
private void deleteLogs() throws Exception { final List<String> children = curator.getChildren().forPath(absolutePath(LOG_PATH)); if (children.size() <= maxLogCount) { return; } final long minAllowedTimestamp = System.currentTimeMillis() - minLogAgeMillis; final int targetCount = children.size() - maxLogCount; final List<String> deleted = new ArrayList<>(targetCount); children.sort(Comparator.comparingLong(Long::parseLong)); try { for (int i = 0; i < targetCount; ++i) { final String logPath = absolutePath(LOG_PATH, children.get(i)); final LogMeta meta = Jackson.readValue(curator.getData().forPath(logPath), LogMeta.class); if (meta.timestamp() >= minAllowedTimestamp) { // Do not delete the logs that are not old enough. // We can break the loop here because the 'children' has been sorted by // insertion order (sequence value). break; } final CuratorTransactionFinal tr = curator.inTransaction().delete().forPath(logPath).and(); for (long blockId : meta.blocks()) { String blockPath = absolutePath(LOG_BLOCK_PATH) + '/' + pathFromRevision(blockId); tr.delete().forPath(blockPath).and(); } tr.commit(); deleted.add(children.get(i)); } } finally { logger.info("delete logs: {}", deleted); } }
@Override public void commit() throws DataSourceConnectorException { String error = "Can't commit transaction"; executeOrFailWithError( () -> { if (transaction instanceof CuratorTransactionFinal) { Collection<CuratorTransactionResult> result = ((CuratorTransactionFinal)transaction).commit(); log.info("Transaction is committed. Result: {}", result); } else { log.info("Transaction cancelled: nothing to save"); } return null; }, error); }
private void createTaskAndDeletePendingTaskPrivate(SingularityTask task) throws Exception { delete(getPendingPath(task.getTaskRequest().getPendingTask().getPendingTaskId())); final long now = System.currentTimeMillis(); String msg = String.format("Task launched because of %s", task.getTaskRequest().getPendingTask().getPendingTaskId().getPendingType().name()); if (task.getTaskRequest().getPendingTask().getUser().isPresent()) { msg = String.format("%s by %s", msg, task.getTaskRequest().getPendingTask().getUser().get()); } if (task.getTaskRequest().getPendingTask().getMessage().isPresent()) { msg = String.format("%s (%s)", msg, task.getTaskRequest().getPendingTask().getMessage().get()); } saveTaskHistoryUpdate(new SingularityTaskHistoryUpdate(task.getTaskId(), now, ExtendedTaskState.TASK_LAUNCHED, Optional.of(msg), Optional.<String>absent())); saveLastActiveTaskStatus(new SingularityTaskStatusHolder(task.getTaskId(), Optional.<TaskStatus>absent(), now, serverId, Optional.of(task.getOffer().getSlaveId().getValue()))); try { final String path = getTaskPath(task.getTaskId()); CuratorTransactionFinal transaction = curator.inTransaction().create().forPath(path, taskTranscoder.toBytes(task)).and(); transaction.create().forPath(getActivePath(task.getTaskId().getId())).and().commit(); taskCache.set(path, task); } catch (KeeperException.NodeExistsException nee) { LOG.error("Task or active path already existed for {}", task.getTaskId()); } }
protected void doWriteChangeLogPartitionMapping(Map<TaskName, Integer> mapping) throws Exception { CuratorTransaction transaction = curator.inTransaction(); boolean needTransaction = false; for (Map.Entry<TaskName, Integer> entry : mapping.entrySet()) { Integer partitionNumber = entry.getValue(); TaskName tn = entry.getKey(); String clpnPath = getChangelogPartitionNumberPath(tn); byte[] data = intToBytes(partitionNumber); boolean created = createChangeLogPartitionPathIfNecessary(clpnPath, data); if (!created) {//create would have written with the data, but since we didn't create, we have to set it now: transaction.setData().forPath(clpnPath, data); needTransaction = true; log.debug("Appended changelog partition mapping {}={} to current transaction.", tn, partitionNumber); } } if (needTransaction) { ((CuratorTransactionFinal) transaction).commit(); } log.info("Wrote changelog partition mappings {}", mapping); }
/** * Updates and returns a transaction which can be used to create missing parents of the provided path, if any. */ private static CuratorTransactionFinal createParentsOf( CuratorFramework client, String path, CuratorTransactionFinal curatorTransactionFinal, Set<String> existingAndPendingCreatePaths) throws Exception { for (String parentPath : PersisterUtils.getParentPaths(path)) { if (!existingAndPendingCreatePaths.contains(parentPath) && client.checkExists().forPath(parentPath) == null) { curatorTransactionFinal = curatorTransactionFinal.create().forPath(parentPath).and(); } existingAndPendingCreatePaths.add(parentPath); } return curatorTransactionFinal; }
/** Commits this transaction. If it is not already prepared this will prepare it first */ @Override public void commit() { try { if ( ! prepared) prepare(); org.apache.curator.framework.api.transaction.CuratorTransaction transaction = curator.framework().inTransaction(); for (Operation operation : operations()) { transaction = ((CuratorOperation)operation).and(transaction); } ((CuratorTransactionFinal) transaction).commit(); } catch (Exception e) { throw new IllegalStateException(e); } }
/** * Creates all the given paths in a single transaction. Any paths which already exists are ignored. */ public void createAtomically(Path... paths) { try { CuratorTransaction transaction = framework().inTransaction(); for (Path path : paths) { if ( ! exists(path)) { transaction = transaction.create().forPath(path.getAbsolute(), new byte[0]).and(); } } ((CuratorTransactionFinal)transaction).commit(); } catch (Exception e) { throw new RuntimeException("Could not create " + Arrays.toString(paths), e); } }
@Override public void execute(final CuratorTransactionFinal curatorTransactionFinal) throws Exception { for (Entry<String, List<Integer>> entry : shardingItems.entrySet()) { curatorTransactionFinal.create().forPath(jobNodePath.getFullPath(ShardingNode.getShardingNode(entry.getKey())), ItemUtils.toItemsString(entry.getValue()).getBytes()).and(); } curatorTransactionFinal.delete().forPath(jobNodePath.getFullPath(ShardingNode.NECESSARY)).and(); curatorTransactionFinal.delete().forPath(jobNodePath.getFullPath(ShardingNode.PROCESSING)).and(); }
/** * 在事务中执行操作. * * @param callback 执行操作的回调 */ public void executeInTransaction(final TransactionExecutionCallback callback) { try { CuratorTransactionFinal curatorTransactionFinal = getClient().inTransaction().check().forPath("/").and(); callback.execute(curatorTransactionFinal); curatorTransactionFinal.commit(); //CHECKSTYLE:OFF } catch (final Exception ex) { //CHECKSTYLE:ON RegExceptionHandler.handleException(ex); } }
public void complete(List<Long> txIds) throws Exception { Iterator<Long> iterator = txIds.iterator(); CuratorTransaction transaction = curatorFramework.inTransaction(); while (iterator.hasNext()) { Long txId = iterator.next(); transaction = transaction.delete().forPath(LIMBO_PATH + "/" + txId) .and().create().forPath(COMPLETED_PATH + "/" + txId).and(); } CuratorTransactionFinal tx = (CuratorTransactionFinal) transaction; tx.commit(); }
public void complete(List<String> partitionIds) throws Exception { Iterator<String> iterator = partitionIds.iterator(); CuratorTransaction transaction = curatorFramework.inTransaction(); while (iterator.hasNext()) { String partitionId = iterator.next(); transaction = transaction.delete().forPath(LIMBO_PATH + "/" + partitionId) .and().create().forPath(COMPLETED_PATH + "/" + partitionId).and(); } CuratorTransactionFinal tx = (CuratorTransactionFinal) transaction; tx.commit(); }
private void createTaskAndDeletePendingTaskPrivate(SingularityTask task) throws Exception { // TODO: Should more of the below be done within a transaction? deletePendingTask(task.getTaskRequest().getPendingTask().getPendingTaskId()); final long now = System.currentTimeMillis(); String msg = String.format("Task launched because of %s", task.getTaskRequest().getPendingTask().getPendingTaskId().getPendingType().name()); if (task.getTaskRequest().getPendingTask().getUser().isPresent()) { msg = String.format("%s by %s", msg, task.getTaskRequest().getPendingTask().getUser().get()); } if (task.getTaskRequest().getPendingTask().getMessage().isPresent()) { msg = String.format("%s (%s)", msg, task.getTaskRequest().getPendingTask().getMessage().get()); } saveTaskHistoryUpdate(new SingularityTaskHistoryUpdate(task.getTaskId(), now, ExtendedTaskState.TASK_LAUNCHED, Optional.of(msg), Optional.<String>absent())); saveLastActiveTaskStatus(new SingularityTaskStatusHolder(task.getTaskId(), Optional.absent(), now, serverId, Optional.of(task.getAgentId().getValue()))); try { final String path = getTaskPath(task.getTaskId()); CuratorTransactionFinal transaction = curator.inTransaction().create().forPath(path, taskTranscoder.toBytes(task)).and(); transaction.create().forPath(getActivePath(task.getTaskId().getId())).and().commit(); leaderCache.putActiveTask(task); taskCache.set(path, task); } catch (KeeperException.NodeExistsException nee) { LOG.error("Task or active path already existed for {}", task.getTaskId()); } }
public static void handle(String stmt, ServerConnection c) { Map<String, String> map = parse(stmt); String table = map.get("table"); String add = map.get("add"); if (table == null) { writeErrMessage(c, "table cannot be null"); return; } if (add == null) { writeErrMessage(c, "add cannot be null"); return; } try { SchemaConfig schemaConfig = MycatServer.getInstance().getConfig().getSchemas().get(c.getSchema()); TableConfig tableConfig = schemaConfig.getTables().get(table.toUpperCase()); AbstractPartitionAlgorithm algorithm = tableConfig.getRule().getRuleAlgorithm(); if (!(algorithm instanceof PartitionByCRC32PreSlot)) { writeErrMessage(c, "table: " + table + " rule is not be PartitionByCRC32PreSlot"); return; } Map<Integer, List<Range>> integerListMap = ((PartitionByCRC32PreSlot) algorithm).getRangeMap(); integerListMap = (Map<Integer, List<Range>>) ObjectUtil.copyObject(integerListMap); ArrayList<String> oldDataNodes = tableConfig.getDataNodes(); List<String> newDataNodes = Splitter.on(",").omitEmptyStrings().trimResults().splitToList(add); Map<String, List<MigrateTask>> tasks= MigrateUtils .balanceExpand(table, integerListMap, oldDataNodes, newDataNodes,PartitionByCRC32PreSlot.DEFAULT_SLOTS_NUM); long taskID= System.currentTimeMillis(); //todo 需要修改唯一 CuratorTransactionFinal transactionFinal=null; String taskPath = ZKUtils.getZKBasePath() + "migrate/" + table + "/" + taskID; CuratorFramework client= ZKUtils.getConnection(); client.create().creatingParentsIfNeeded().forPath(taskPath); TaskNode taskNode=new TaskNode(); taskNode.schema=c.getSchema(); taskNode.sql=stmt; taskNode.end=false; transactionFinal= client.inTransaction() .setData().forPath(taskPath,JSON.toJSONBytes(taskNode)).and() ; for (Map.Entry<String, List<MigrateTask>> entry : tasks.entrySet()) { String key=entry.getKey(); List<MigrateTask> value=entry.getValue(); String path= taskPath + "/" + key; transactionFinal= transactionFinal.create().forPath(path, JSON.toJSONBytes(value)).and() ; } transactionFinal.commit(); } catch (Exception e) { LOGGER.error("migrate error", e); writeErrMessage(c, "migrate error:" + e); return; } getOkPacket().write(c); }
private TestOperation(Mode mode, CuratorTransactionFinal returnMe) { this.mode = mode; this.returnMe = new PassthroughBridge(returnMe); }
private TestCheck(CuratorTransactionFinal returnMe) { super(TestOperation.Mode.CHECK, returnMe); }
private TestCreate(CuratorTransactionFinal returnMe) { super(TestOperation.Mode.CREATE, returnMe); }
private TestSetData(CuratorTransactionFinal returnMe) { super(TestOperation.Mode.SET_DATA, returnMe); }
private TestDelete(CuratorTransactionFinal returnMe) { super(TestOperation.Mode.DELETE, returnMe); }
public PassthroughBridge(CuratorTransactionFinal transaction) { this.transaction = transaction; }
public CuratorTransactionFinal and() { return transaction; }
@Override public CuratorTransactionFinal and() { return MockCuratorTransactionFinal.this; }
/** * 事务执行的回调方法. * * @param curatorTransactionFinal 执行事务的上下文 * @throws Exception 处理中异常 */ void execute(CuratorTransactionFinal curatorTransactionFinal) throws Exception;
public CuratorTransactionFinal build(CuratorFramework client, String serviceRootPath) throws Exception;