/** * The asynchronous version of getData. * * @see #getData(String, Watcher, Stat) */ public void getData(final String path, Watcher watcher, DataCallback cb, Object ctx) { final String clientPath = path; PathUtils.validatePath(clientPath); // the watch contains the un-chroot path WatchRegistration wcb = null; if (watcher != null) { wcb = new DataWatchRegistration(watcher, clientPath); } final String serverPath = prependChroot(clientPath); RequestHeader h = new RequestHeader(); h.setType(ZooDefs.OpCode.getData); GetDataRequest request = new GetDataRequest(); request.setPath(serverPath); request.setWatch(watcher != null); GetDataResponse response = new GetDataResponse(); cnxn.queuePacket(h, new ReplyHeader(), request, response, cb, clientPath, serverPath, ctx, wcb); }
/** * The asynchronous version of getConfig. * * @see #getConfig(Watcher, Stat) */ public void getConfig(Watcher watcher, DataCallback cb, Object ctx) { final String configZnode = ZooDefs.CONFIG_NODE; // the watch contains the un-chroot path WatchRegistration wcb = null; if (watcher != null) { wcb = new DataWatchRegistration(watcher, configZnode); } RequestHeader h = new RequestHeader(); h.setType(ZooDefs.OpCode.getData); GetDataRequest request = new GetDataRequest(); request.setPath(configZnode); request.setWatch(watcher != null); GetDataResponse response = new GetDataResponse(); cnxn.queuePacket(h, new ReplyHeader(), request, response, cb, configZnode, configZnode, ctx, wcb); }
/** * The Asynchronous version of getData. The request doesn't actually until * the asynchronous callback is called. * * @see #getData(String, Watcher, Stat) */ public void getData(final String path, Watcher watcher, DataCallback cb, Object ctx) { final String clientPath = path; PathUtils.validatePath(clientPath); // the watch contains the un-chroot path WatchRegistration wcb = null; if (watcher != null) { wcb = new DataWatchRegistration(watcher, clientPath); } final String serverPath = prependChroot(clientPath); RequestHeader h = new RequestHeader(); h.setType(ZooDefs.OpCode.getData); GetDataRequest request = new GetDataRequest(); request.setPath(serverPath); request.setWatch(watcher != null); GetDataResponse response = new GetDataResponse(); cnxn.queuePacket(h, new ReplyHeader(), request, response, cb, clientPath, serverPath, ctx, wcb); }
private void removeWorkerNodeInJobs(String worker) { final String workerPath = ZKDistributedJobEngine.NODES_PATH + "/" + worker; zookeeper.getData(workerPath, false, new DataCallback() { @Override public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { if (rc == KeeperException.Code.OK.intValue()) { try { zookeeper.delete(workerPath, stat.getVersion()); } catch (Exception e) { logger.warn("exception delete node", e); } } } }, null); }
private void notifyWorkerToExecuteJob(final String workerName, final IJavaCCJob jobInfo) { final String workerPath = ZKDistributedJobEngine.NODES_PATH + "/" + workerName; zookeeper.getData(workerPath, watcher, new DataCallback() { @Override public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { if (logger.isDebugEnabled()) { logger.debug("notifing worker [" + workerName + "] to execute job [" + jobInfo.getId() + "],stat =" + stat + "\n rc=" + rc + "\n path=" + path); } Map<String, Object> context = new HashMap<String, Object>(); context.put(DATA, data); context.put(PATH, path); context.put(STAT2, stat); Operation operation = Operation.get(rc); if (operation != null) { operation.apply(workerName, jobInfo, LeaderJobScheduler.this, context); return; } logger.warn("Fail notify worker '" + workerName + "' to execute job '" + jobInfo.getId() + "' errorCode = " + rc); } }, null); }
/** * The Asynchronous version of reconfig. * * @see #reconfigure * **/ public void reconfigure(String joiningServers, String leavingServers, String newMembers, long fromConfig, DataCallback cb, Object ctx) { RequestHeader h = new RequestHeader(); h.setType(ZooDefs.OpCode.reconfig); ReconfigRequest request = new ReconfigRequest(joiningServers, leavingServers, newMembers, fromConfig); GetDataResponse response = new GetDataResponse(); cnxn.queuePacket(h, new ReplyHeader(), request, response, cb, ZooDefs.CONFIG_NODE, ZooDefs.CONFIG_NODE, ctx, null); }
/** * Convenience wrapper around asynchronous reconfig that takes Lists of strings instead of comma-separated servers. * * @see #reconfigure * */ public void reconfigure(List<String> joiningServers, List<String> leavingServers, List<String> newMembers, long fromConfig, DataCallback cb, Object ctx) { reconfigure(StringUtils.joinStrings(joiningServers, ","), StringUtils.joinStrings(leavingServers, ","), StringUtils.joinStrings(newMembers, ","), fromConfig, cb, ctx); }
@Override public void getData(final String path, boolean watch, final DataCallback cb, final Object ctx) { executor.execute(() -> { checkReadOpDelay(); if (getProgrammedFailStatus()) { cb.processResult(failReturnCode.intValue(), path, ctx, null, null); return; } else if (stopped) { cb.processResult(KeeperException.Code.ConnectionLoss, path, ctx, null, null); return; } Pair<byte[], Integer> value; mutex.lock(); try { value = tree.get(path); } finally { mutex.unlock(); } if (value == null) { cb.processResult(KeeperException.Code.NoNode, path, ctx, null, null); } else { Stat stat = new Stat(); stat.setVersion(value.second); cb.processResult(0, path, ctx, value.first, stat); } }); }
@Override public void getData(final String path, final Watcher watcher, final DataCallback cb, final Object ctx) { executor.execute(() -> { checkReadOpDelay(); mutex.lock(); if (getProgrammedFailStatus()) { mutex.unlock(); cb.processResult(failReturnCode.intValue(), path, ctx, null, null); return; } else if (stopped) { mutex.unlock(); cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, null, null); return; } Pair<byte[], Integer> value = tree.get(path); if (value == null) { mutex.unlock(); cb.processResult(KeeperException.Code.NONODE.intValue(), path, ctx, null, null); } else { if (watcher != null) { watchers.put(path, watcher); } Stat stat = new Stat(); stat.setVersion(value.second); mutex.unlock(); cb.processResult(0, path, ctx, value.first, stat); } }); }
/** * The Asynchronous version of reconfig. * * @see #reconfig * **/ public void reconfig(String joiningServers, String leavingServers, String newMembers, long fromConfig, DataCallback cb, Object ctx) { RequestHeader h = new RequestHeader(); h.setType(ZooDefs.OpCode.reconfig); ReconfigRequest request = new ReconfigRequest(joiningServers, leavingServers, newMembers, fromConfig); GetDataResponse response = new GetDataResponse(); cnxn.queuePacket(h, new ReplyHeader(), request, response, cb, ZooDefs.CONFIG_NODE, ZooDefs.CONFIG_NODE, ctx, null); }
/** * Convenience wrapper around asynchronous reconfig that takes Lists of strings instead of comma-separated servers. * * @see #reconfig * */ public void reconfig(List<String> joiningServers, List<String> leavingServers, List<String> newMembers, long fromConfig, DataCallback cb, Object ctx) { reconfig(StringUtils.joinStrings(joiningServers, ","), StringUtils.joinStrings(leavingServers, ","), StringUtils.joinStrings(newMembers, ","), fromConfig, cb, ctx); }
@Override protected void installWatch() { connection.getData(getNode(), this, new DataCallback() { public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { } }, null); }
/** * The asynchronous version of getData. * * @see #getData(String, boolean, Stat) */ public void getData(String path, boolean watch, DataCallback cb, Object ctx) { getData(path, watch ? watchManager.defaultWatcher : null, cb, ctx); }
/** * The Asynchronous version of getConfig. * * @see #getData(String, boolean, Stat) */ public void getConfig(boolean watch, DataCallback cb, Object ctx) { getConfig(watch ? watchManager.defaultWatcher : null, cb, ctx); }
/** * The Asynchronous version of getData. The request doesn't actually until * the asynchronous callback is called. * * @see #getData(String, boolean, Stat) */ public void getData(String path, boolean watch, DataCallback cb, Object ctx) { getData(path, watch ? watchManager.defaultWatcher : null, cb, ctx); }
/** * Passes a callback and a context object to the config/reconfig command. * @param callback The async callback to use. * @param ctx An object that will be passed to the callback. * @return this */ T usingDataCallback(DataCallback callback, Object ctx);