@Override public void registry(RegistryConfig registryConfig) { registryConfig.setRegistry(registry); try { String nodeName = registryConfig.getServerInfo().toString(); Map<String, PersistentNode> appKeyServers = nodeNames.computeIfAbsent(registryConfig.getAppKey(), (key) -> new ConcurrentHashMap<>()); if (appKeyServers.get(nodeName) != null) { log.error("server registry exists: " + nodeName); throw new TRpcRegistryException("server registry exists for key:" + nodeName); } String nodeValue = registryConfig.toJsonString(); PersistentNode node = new PersistentNode(client, CreateMode.EPHEMERAL, true, ZkConstant.SERVICES_DIR + registryConfig.getAppKey() + "/" + nodeName, nodeValue.getBytes()); node.start(); node.waitForInitialCreate(3, TimeUnit.SECONDS); appKeyServers.put(nodeName, node); String actualPath = node.getActualPath(); log.info("registry to zookeeper, node: " + actualPath + " value: " + nodeValue); } catch (Exception e) { if (e instanceof RuntimeException) { throw (RuntimeException) e; } else { log.error("registry error", e); } } }
@Override public void modify(RegistryConfig registryConfig) { registryConfig.setRegistry(registry); try { String nodeName = registryConfig.getServerInfo().toString(); Map<String, PersistentNode> appKeyServers = nodeNames.computeIfAbsent(registryConfig.getAppKey(), (key) -> new ConcurrentHashMap<>()); PersistentNode node = appKeyServers.get(nodeName); if (node == null) { log.warn("server registry not exists " + nodeName + " try to registry"); registry(registryConfig); return; } String nodeValue = registryConfig.toJsonString(); node.setData(nodeValue.getBytes()); String actualPath = node.getActualPath(); log.info("zookeeper modify, node: " + actualPath + " value: " + nodeValue); } catch (Exception e) { log.error("modify error", e); } }
@Override public boolean unRegistry(String appKey, ThriftServerInfo serverInfo) { try { Map<String, PersistentNode> appKeyServers = nodeNames.computeIfAbsent(appKey, (key) -> new ConcurrentHashMap<>()); PersistentNode node = appKeyServers.get(serverInfo.toString()); if (node != null) { log.info("unRegistry " + serverInfo + " actualPath: " + node.getActualPath()); node.close(); appKeyServers.remove(serverInfo.toString()); return true; } else { log.warn("unRegistry " + serverInfo + " node not found"); } } catch (Exception e) { log.error("unRegistry error, serverInfo: " + serverInfo, e); } return false; }
void advertise( Closer closer, InetSocketAddress endpoint, Map<String, InetSocketAddress> additionalEndpoints) throws AdvertiseException, InterruptedException { byte[] nodeData = serializeAdvertisement(endpoint, additionalEndpoints); PersistentNode persistentNode = new PersistentNode( client, CreateMode.EPHEMERAL_SEQUENTIAL, // TODO(John Sirois): Enable GUID protection once clients are updated to support // its effects on group member node naming. We get nodes like: // 4f5f98c4-8e71-41e3-8c8d-1c9a1f5f5df9-member_000000001 // Clients expect member_ is the prefix and are not prepared for the GUID. false /* GUID protection */, ZKPaths.makePath(groupPath, memberToken), nodeData); persistentNode.start(); closer.register(persistentNode); // NB: This blocks on initial server set node population to emulate legacy // SingletonService.LeaderControl.advertise (Group.join) behavior. Asynchronous // population is an option though, we simply need to remove this wait. if (!persistentNode.waitForInitialCreate(Long.MAX_VALUE, TimeUnit.DAYS)) { throw new AdvertiseException("Timed out waiting for leader advertisement."); } }
/** * Register Host to cluster. * * @param host Host to be part of cluster. */ @Override @Synchronized public void registerHost(Host host) { Preconditions.checkNotNull(host, "host"); Exceptions.checkArgument(!entryMap.containsKey(host), "host", "host is already registered to cluster."); String hostPath = ZKPaths.makePath(getPathPrefix(), host.toString()); PersistentNode node = new PersistentNode(client, CreateMode.EPHEMERAL, false, hostPath, SerializationUtils.serialize(host)); node.start(); //start creation of ephemeral node in background. entryMap.put(host, node); }
/** * Remove Host from cluster. * * @param host Host to be removed from cluster. */ @Override @Synchronized public void deregisterHost(Host host) { Preconditions.checkNotNull(host, "host"); PersistentNode node = entryMap.get(host); Preconditions.checkNotNull(node, "Host is not present in cluster."); entryMap.remove(host); close(node); }
@Override protected void doStart() throws Exception { CuratorFramework client = zkClient.get(); if(client.checkExists().forPath(serverPath) != null){ byte []data = client.getData().forPath(serverPath); throw new IllegalStateException("server already exist:" + new String(data)); } persistentNode = new PersistentNode(zkClient.get(), CreateMode.EPHEMERAL, false, serverPath, Codec.DEFAULT.encodeAsBytes(getClusterInfo())); persistentNode.start(); }
protected void doStart() throws Exception { int i = startingId; while (this.id == -1) { String lockPath = ZKPaths.makePath(locksBasePath, String.valueOf(i)); String memberPath = ZKPaths.makePath(membersBasePath, String.valueOf(i)); log.trace("Acquiring mutex for member {} via lock path {}", i, lockPath); InterProcessMutex mutex = new InterProcessMutex(this.client, lockPath); mutex.acquire(); log.debug("Acquired mutex for member {} via lock path {}", i, lockPath); try { Stat stat = client.checkExists().creatingParentContainersIfNeeded().forPath(memberPath); if (stat == null) { log.debug("Claiming container id {} via member path {}", i, memberPath); try { //no peer has this node yet, grab it: pen = new PersistentNode(client, CreateMode.EPHEMERAL, false, memberPath, payload); pen.start(); pen.waitForInitialCreate(30000, TimeUnit.SECONDS); this.id = i; log.info("Claimed container id {} via member path {}", i, memberPath); return; } catch (InterruptedException e) { CloseableUtils.closeQuietly(pen); ThreadUtils.checkInterrupted(e); Throwables.propagate(e); } } } finally { mutex.release(); log.debug("Released mutex for member {} via lock path {}", i, lockPath); } i++; } }
@Test public void testNode() throws IOException{ String path = "/" + getTestName(); PersistentNode persistentNode = new PersistentNode(client, CreateMode.EPHEMERAL, false, path, "123456".getBytes()); persistentNode.start(); waitForAnyKeyToExit(); persistentNode.close(); }