public CuratorZookeeperClient(URL url) { super(url); try { Builder builder = CuratorFrameworkFactory.builder() .connectString(url.getBackupAddress()) .retryPolicy(new RetryNTimes(Integer.MAX_VALUE, 1000)) .connectionTimeoutMs(5000); String authority = url.getAuthority(); if (authority != null && authority.length() > 0) { builder = builder.authorization("digest", authority.getBytes()); } client = builder.build(); client.getConnectionStateListenable().addListener((client, state) -> { if (state == ConnectionState.LOST) { CuratorZookeeperClient.this.stateChanged(StateListener.DISCONNECTED); } else if (state == ConnectionState.CONNECTED) { CuratorZookeeperClient.this.stateChanged(StateListener.CONNECTED); } else if (state == ConnectionState.RECONNECTED) { CuratorZookeeperClient.this.stateChanged(StateListener.RECONNECTED); } }); client.start(); } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } }
@Override public CuratorFramework getObject() throws Exception { if (client == null) { Builder builder = CuratorFrameworkFactory.builder() .connectString(connectString) .retryPolicy(retryPolicy) .canBeReadOnly(canBeReadOnly) .defaultData(defaultData); if (sessionTimeoutMs != null) builder.sessionTimeoutMs(sessionTimeoutMs); if (connectionTimeoutMs != null) builder.connectionTimeoutMs(connectionTimeoutMs); client = builder.build(); client.start(); log.info("Create zooKeeper instance successfully."); } return client; }
private CuratorFramework connectToZk(String connectString) throws InterruptedException { Builder builder = CuratorFrameworkFactory.builder(); builder.connectionTimeoutMs(3000); builder.connectString(connectString); builder.maxCloseWaitMs(3000); builder.namespace("xpipe"); builder.retryPolicy(new RetryNTimes(3, 1000)); builder.sessionTimeoutMs(5000); CuratorFramework client = builder.build(); client.start(); client.blockUntilConnected(); return client; }
@Override public CuratorFramework create(String address) throws InterruptedException { Builder builder = CuratorFrameworkFactory.builder(); builder.connectionTimeoutMs(getZkConnectionTimeoutMillis()); builder.connectString(address); builder.maxCloseWaitMs(getZkCloseWaitMillis()); builder.namespace(getZkNamespace()); builder.retryPolicy(new RetryNTimes(getZkRetries(), getSleepMsBetweenRetries())); builder.sessionTimeoutMs(getZkSessionTimeoutMillis()); builder.threadFactory(XpipeThreadFactory.create("Xpipe-ZK-" + address, true)); logger.info("[create]{}, {}", Codec.DEFAULT.encode(this), address); CuratorFramework curatorFramework = builder.build(); curatorFramework.start(); curatorFramework.blockUntilConnected(waitForZkConnectedMillis(), TimeUnit.MILLISECONDS); return curatorFramework; }
@Override public void afterPropertiesSet() throws Exception { Builder builder = CuratorFrameworkFactory.builder().connectString(zkAddr) .retryPolicy(new ExponentialBackoffRetry(1000, 3, 3000)).namespace("pddl"); client = builder.build(); client.start(); try { client.blockUntilConnected(); } catch (Exception e) { logger.error("connect zk fail!zkAddr=" + zkAddr, e); throw e; } }
@Override protected CuratorFramework createInstance() throws Exception { String connectionString = resolveConnectionString(); if(connectionString==null) { throw new IllegalArgumentException("Cannot resolve zookeeper connection string"); } RetryPolicy retryPolicy = new ExponentialBackoffRetry(baseSleepTime, maxRetries); Builder curatorFrameworkBuilder = CuratorFrameworkFactory.builder() .connectString(connectionString) .retryPolicy(retryPolicy) .canBeReadOnly(canReadOnly); String credentialString = resolveCredentialString(); if(credentialString!=null) { String[] credentials = StringUtils.tokenizeToStringArray(credentialString, STRING_ARRAY_SEPARATOR); List<AuthInfo> authList = new ArrayList<AuthInfo>(); for(String cred : credentials){ String[] aclId = cred.split(":"); String passwd = new String(Base64.decodeBase64(aclId[1].trim()),"UTF-8"); authList.add(new AuthInfo( SCHEME_DIGEST, String.format("%s:%s", aclId[0].trim(), passwd).getBytes())); } if(!authList.isEmpty()) { curatorFrameworkBuilder.authorization(authList); } } CuratorFramework client = curatorFrameworkBuilder.build(); client.start(); return client; }
@Test public void testAclDisabled() { KylinConfig testConfig = KylinConfig.getInstanceFromEnv(); testConfig.setProperty("kylin.env.zookeeper-acl-enabled", "false"); ZookeeperAclBuilder zookeeperAclBuilder = new ZookeeperAclBuilder().invoke(); Assert.assertNotNull(zookeeperAclBuilder); Assert.assertFalse(zookeeperAclBuilder.isNeedAcl()); Builder builder = zookeeperAclBuilder.setZKAclBuilder(CuratorFrameworkFactory.builder()); Assert.assertNotNull(builder); Assert.assertEquals(ZooDefs.Ids.OPEN_ACL_UNSAFE, builder.getAclProvider().getDefaultAcl()); Assert.assertNull(builder.getAuthInfos()); }
/** * Returns a {@link CuratorFramework} with the {@link Builder#connectString(String)} and * {@link Builder#namespace(String)} already set. The {@link CuratorFramework} will already be * started, and will be closed automatically. */ public CuratorFramework getClient(RetryPolicy retryPolicy) { CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder(); builder = builder.connectString("127.0.0.1:" + getCnxnFactory().getLocalPort()); builder = builder.retryPolicy(retryPolicy); builder = builder.namespace(this.namespace); CuratorFramework client = builder.build(); client.start(); curatorClients.add(client); return client; }
/** * 初始化 */ @Override public void init() { if (client != null) return; if (zkConfig == null) { zkConfig = ZKConfig.build(); } Builder builder = CuratorFrameworkFactory .builder() .connectString(zkConfig.getHosts()) .retryPolicy(new ExponentialBackoffRetry(zkConfig.getBaseSleepTimeMs(), zkConfig.getMaxRetries(), zkConfig.getMaxSleepMs())) .namespace(zkConfig.getNamespace()); if (zkConfig.getConnectionTimeout() > 0) { builder.connectionTimeoutMs(zkConfig.getConnectionTimeout()); } if (zkConfig.getSessionTimeout() > 0) { builder.sessionTimeoutMs(zkConfig.getSessionTimeout()); } if (zkConfig.getDigest() != null) { /* * scheme对应于采用哪种方案来进行权限管理,zookeeper实现了一个pluggable的ACL方案,可以通过扩展scheme,来扩展ACL的机制。 * zookeeper缺省支持下面几种scheme: * * world: 默认方式,相当于全世界都能访问; 它下面只有一个id, 叫anyone, world:anyone代表任何人,zookeeper中对所有人有权限的结点就是属于world:anyone的 * auth: 代表已经认证通过的用户(cli中可以通过addauth digest user:pwd 来添加当前上下文中的授权用户); 它不需要id, 只要是通过authentication的user都有权限(zookeeper支持通过kerberos来进行authencation, 也支持username/password形式的authentication) * digest: 即用户名:密码这种方式认证,这也是业务系统中最常用的;它对应的id为username:BASE64(SHA1(password)),它需要先通过username:password形式的authentication * ip: 使用Ip地址认证;它对应的id为客户机的IP地址,设置的时候可以设置一个ip段,比如ip:192.168.1.0/16, 表示匹配前16个bit的IP段 * super: 在这种scheme情况下,对应的id拥有超级权限,可以做任何事情(cdrwa) */ builder.authorization("digest", zkConfig.getDigest().getBytes(Constants.UTF_8)); builder.aclProvider(new ACLProvider() { @Override public List<ACL> getDefaultAcl() { return ZooDefs.Ids.CREATOR_ALL_ACL; } @Override public List<ACL> getAclForPath(final String path) { return ZooDefs.Ids.CREATOR_ALL_ACL; } }); } client = builder.build(); Logs.RSD.info("init zk client, config={}", zkConfig.toString()); }
@Override public void init() { if (zkConfig.isUseNestedZookeeper()) { NestedZookeeperServers.getInstance().startServerIfNotStarted(zkConfig.getNestedPort(), zkConfig.getNestedDataDir()); } log.debug("Elastic job: zookeeper registry center init, server lists is: {}.", zkConfig.getServerLists()); Builder builder = CuratorFrameworkFactory.builder() .connectString(zkConfig.getServerLists()) .retryPolicy(new ExponentialBackoffRetry(zkConfig.getBaseSleepTimeMilliseconds(), zkConfig.getMaxRetries(), zkConfig.getMaxSleepTimeMilliseconds())) .namespace(zkConfig.getNamespace()); if (0 != zkConfig.getSessionTimeoutMilliseconds()) { builder.sessionTimeoutMs(zkConfig.getSessionTimeoutMilliseconds()); } if (0 != zkConfig.getConnectionTimeoutMilliseconds()) { builder.connectionTimeoutMs(zkConfig.getConnectionTimeoutMilliseconds()); } if (!Strings.isNullOrEmpty(zkConfig.getDigest())) { builder.authorization("digest", zkConfig.getDigest().getBytes(Charset.forName("UTF-8"))) .aclProvider(new ACLProvider() { @Override public List<ACL> getDefaultAcl() { return ZooDefs.Ids.CREATOR_ALL_ACL; } @Override public List<ACL> getAclForPath(final String path) { return ZooDefs.Ids.CREATOR_ALL_ACL; } }); } client = builder.build(); client.start(); try { client.blockUntilConnected(zkConfig.getMaxSleepTimeMilliseconds() * zkConfig.getMaxRetries(), TimeUnit.MILLISECONDS); if (!client.getZookeeperClient().isConnected()) { throw new KeeperException.OperationTimeoutException(); } if (!Strings.isNullOrEmpty(zkConfig.getLocalPropertiesPath())) { fillData(); } //CHECKSTYLE:OFF } catch (final Exception ex) { //CHECKSTYLE:ON RegExceptionHandler.handleException(ex); } }
@Override public void init() { if (zkConfig.isUseNestedZookeeper()) { NestedZookeeperServers.getInstance().startServerIfNotStarted(zkConfig.getNestedPort(), zkConfig.getNestedDataDir()); } log.debug("Elastic config: zookeeper registry center init, server lists is: {}.", zkConfig.getServerLists()); Builder builder = CuratorFrameworkFactory .builder() .connectString(zkConfig.getServerLists()) .retryPolicy( new ExponentialBackoffRetry(zkConfig.getBaseSleepTimeMilliseconds(), zkConfig.getMaxRetries(), zkConfig .getMaxSleepTimeMilliseconds())).namespace(zkConfig.getNamespace()); if (0 != zkConfig.getSessionTimeoutMilliseconds()) { builder.sessionTimeoutMs(zkConfig.getSessionTimeoutMilliseconds()); } if (0 != zkConfig.getConnectionTimeoutMilliseconds()) { builder.connectionTimeoutMs(zkConfig.getConnectionTimeoutMilliseconds()); } if (!Strings.isNullOrEmpty(zkConfig.getDigest())) { builder.authorization("digest", zkConfig.getDigest().getBytes(Charset.forName("UTF-8"))).aclProvider( new ACLProvider() { @Override public List<ACL> getDefaultAcl() { return ZooDefs.Ids.CREATOR_ALL_ACL; } @Override public List<ACL> getAclForPath(final String path) { return ZooDefs.Ids.CREATOR_ALL_ACL; } }); } client = builder.build(); client.start(); try { client.blockUntilConnected(zkConfig.getMaxSleepTimeMilliseconds() * zkConfig.getMaxRetries(), TimeUnit.MILLISECONDS); if (!client.getZookeeperClient().isConnected()) { throw new KeeperException.OperationTimeoutException(); } } catch (final Exception ex) { RegisterExceptionHandler.handleException(ex); } }