public ExhibitorArguments(int connectionTimeOutMs, int logWindowSizeLines, int configCheckMs, String extraHeadingText, String thisJVMHostname, boolean allowNodeMutations, JQueryStyle jQueryStyle, int restPort, String restPath, String restScheme, Runnable shutdownProc, LogDirection logDirection, ACLProvider aclProvider, ServoRegistration servoRegistration, String preferencesPath, RemoteConnectionConfiguration remoteConnectionConfiguration, HttpsConfiguration httpsConfiguration) { this.connectionTimeOutMs = connectionTimeOutMs; this.logWindowSizeLines = logWindowSizeLines; this.configCheckMs = configCheckMs; this.extraHeadingText = extraHeadingText; this.thisJVMHostname = thisJVMHostname; this.allowNodeMutations = allowNodeMutations; this.jQueryStyle = jQueryStyle; this.restPort = restPort; this.restPath = restPath; this.restScheme = restScheme; this.shutdownProc = shutdownProc; this.logDirection = logDirection; this.aclProvider = aclProvider; this.servoRegistration = servoRegistration; this.preferencesPath = preferencesPath; this.remoteConnectionConfiguration = remoteConnectionConfiguration; this.httpsConfiguration = httpsConfiguration; }
@VisibleForTesting void enhanceBuilderWithSecurityParameters(HAConfiguration.ZookeeperProperties zookeeperProperties, CuratorFrameworkFactory.Builder builder) { ACLProvider aclProvider = getAclProvider(zookeeperProperties); AuthInfo authInfo = null; if (zookeeperProperties.hasAuth()) { authInfo = AtlasZookeeperSecurityProperties.parseAuth(zookeeperProperties.getAuth()); } if (aclProvider != null) { LOG.info("Setting up acl provider."); builder.aclProvider(aclProvider); if (authInfo != null) { byte[] auth = authInfo.getAuth(); LOG.info("Setting up auth provider with scheme: {} and id: {}", authInfo.getScheme(), getIdForLogging(authInfo.getScheme(), new String(auth, Charsets.UTF_8))); builder.authorization(authInfo.getScheme(), auth); } } }
/** * 创建Zookeeper连接 * * @throws InterruptedException * @throws UnsupportedEncodingException */ @PostConstruct public void connect() throws InterruptedException, UnsupportedEncodingException { client = CuratorFrameworkFactory.builder() .connectString(connectString) .retryPolicy(new ExponentialBackoffRetry(DEFAULT_BASE_SLEEP_TIME_MS, DEFAULT_MAX_RETRIES)) .namespace(DEFAULT_NAMESPACE) .authorization(DEFAULT_ACL_SCHEME, DEFAULT_ACL_AUTH.getBytes(DEFAULT_CHARSET)) // 设置权限 .aclProvider(new ACLProvider() { // 设置ACL规则 @Override public List<ACL> getDefaultAcl() { return DEFAULT_ACL_LIST; } @Override public List<ACL> getAclForPath(String path) { return DEFAULT_ACL_LIST; } }) .build(); if (CuratorFrameworkState.STARTED != client.getState()) { client.start(); } while (!client.blockUntilConnected(MAX_WAIT_SECONDS, TimeUnit.SECONDS)) { log.info("can not connect to zookeeper , retry again!!"); } }
private ACLProvider aclProvider() { return new ACLProvider() { @Override public List<ACL> getDefaultAcl() { return ZooDefs.Ids.CREATOR_ALL_ACL; } @Override public List<ACL> getAclForPath(String path) { return ZooDefs.Ids.CREATOR_ALL_ACL; } }; }
public ACLProvider aclProvider() { return new ACLProvider() { @Override public List<ACL> getDefaultAcl() { return ZooDefs.Ids.CREATOR_ALL_ACL; } @Override public List<ACL> getAclForPath(String path) { return ZooDefs.Ids.CREATOR_ALL_ACL; } }; }
@Bean public ACLProvider aclProvider() { return new ACLProvider() { @Override public List<ACL> getDefaultAcl() { return ZooDefs.Ids.CREATOR_ALL_ACL; } @Override public List<ACL> getAclForPath(String path) { return ZooDefs.Ids.CREATOR_ALL_ACL; } }; }
@Override public void init() { logger.debug("init zookeeper registry, connect to servers : {}", zkConfig.getServerLists()); CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder().connectString(zkConfig.getServerLists()).retryPolicy(new ExponentialBackoffRetry(zkConfig.getBaseSleepTimeMilliseconds(), zkConfig.getMaxRetries(), zkConfig.getMaxSleepTimeMilliseconds())).namespace(zkConfig.getNamespace()); if (0 != zkConfig.getSessionTimeoutMilliseconds()) { builder.sessionTimeoutMs(zkConfig.getSessionTimeoutMilliseconds()); } if (0 != zkConfig.getConnectionTimeoutMilliseconds()) { builder.connectionTimeoutMs(zkConfig.getConnectionTimeoutMilliseconds()); } if (zkConfig.getDigest() != null && !zkConfig.getDigest().isEmpty()) { builder.authorization("digest", zkConfig.getDigest().getBytes(StandardCharsets.UTF_8)).aclProvider(new ACLProvider() { @Override public List<ACL> getDefaultAcl() { return ZooDefs.Ids.CREATOR_ALL_ACL; } @Override public List<ACL> getAclForPath(final String path) { return ZooDefs.Ids.CREATOR_ALL_ACL; } }); } client = builder.build(); client.start(); try { if (!client.blockUntilConnected(zkConfig.getMaxSleepTimeMilliseconds() * zkConfig.getMaxRetries(), TimeUnit.MILLISECONDS)) { client.close(); throw new KeeperException.OperationTimeoutException(); } } catch (final Exception ex) { RegExceptionHandler.handleException(ex); } }
@Test public void testSingleACLProvider() { ImmutableList<ACL> acl = ZooKeeperUtils.EVERYONE_READ_CREATOR_ALL; ACLProvider provider = new CuratorServiceDiscoveryModule.SingleACLProvider(acl); assertEquals(acl, provider.getDefaultAcl()); assertEquals(acl, provider.getAclForPath("/random/path/1")); assertEquals(acl, provider.getAclForPath("/random/path/2")); }
/** * Returns a new {@link CuratorPersister} instance using the provided settings, using reasonable defaults where * custom values were not specified. */ public CuratorPersister build() { CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder() .connectString(connectionString) .retryPolicy(retryPolicy); final CuratorFramework client; if (username.isEmpty() && password.isEmpty()) { client = builder.build(); } else if (!username.isEmpty() && !password.isEmpty()) { List<ACL> acls = new ArrayList<ACL>(); acls.addAll(ZooDefs.Ids.CREATOR_ALL_ACL); acls.addAll(ZooDefs.Ids.READ_ACL_UNSAFE); String authenticationString = username + ":" + password; builder.authorization("digest", authenticationString.getBytes(StandardCharsets.UTF_8)) .aclProvider(new ACLProvider() { @Override public List<ACL> getDefaultAcl() { return acls; } @Override public List<ACL> getAclForPath(String path) { return acls; } }); client = builder.build(); } else { throw new IllegalArgumentException( "username and password must both be provided, or both must be empty."); } CuratorPersister persister = new CuratorPersister(serviceName, client); CuratorUtils.initServiceName(persister, serviceName); return persister; }
private CuratorFramework createClient(ACLProvider aclProvider) { return CuratorFrameworkFactory.builder(). aclProvider(aclProvider). connectString(server.getConnectString()). retryPolicy(new RetryOneTime(1)). build(); }
private CuratorFramework createClient(ACLProvider provider) throws Exception { RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); CuratorFramework client = CuratorFrameworkFactory.builder() .namespace("ns") .connectString(server.getConnectString()) .retryPolicy(retryPolicy) .aclProvider(provider) .build(); client.start(); return client; }
/** * Builds a {@link CuratorFramework} instance using the given connectString. * * @param connectString connection string to connect to zookeeper * @param logger {@link Logger} instance of the calling class * @return Newly created CuratorFramework instance. */ public static CuratorFramework newCuratorFrameworkClient(String connectString, Logger logger) { int connectionTimeoutMs = Integer.parseInt(System.getProperty(Constants.Properties.ZK_CONNECTION_TIMEOUT, "15000")); int sessionTimeoutMs = Integer.parseInt(System.getProperty(Constants.Properties.ZK_CONNECTION_TIMEOUT, "60000")); int retryInitialWaitMs = Integer.parseInt(System.getProperty(Constants.Properties.ZK_CONNECTION_TIMEOUT, "1000")); int maxRetryCount = Integer.parseInt(System.getProperty(Constants.Properties.ZK_CONNECTION_TIMEOUT, "3")); CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder() .namespace(NAMESPACE) .connectString(connectString) .retryPolicy(new ExponentialBackoffRetry(retryInitialWaitMs, maxRetryCount)) .connectionTimeoutMs(connectionTimeoutMs) .sessionTimeoutMs(sessionTimeoutMs); /* * If authorization information is available, those will be added to the client. NOTE: These auth info are * for access control, therefore no authentication will happen when the client is being started. These * info will only be required whenever a client is accessing an already create ZNode. For another client of * another node to make use of a ZNode created by this node, it should also provide the same auth info. */ if (System.getProperty(Constants.Properties.ZK_USERNAME) != null && System.getProperty(Constants.Properties.ZK_PASSWORD) != null) { String authenticationString = System.getProperty(Constants.Properties.ZK_USERNAME) + ":" + System.getProperty(Constants.Properties.ZK_PASSWORD); builder.authorization("digest", authenticationString.getBytes()) .aclProvider(new ACLProvider() { public List<ACL> getDefaultAcl() { return ZooDefs.Ids.CREATOR_ALL_ACL; } public List<ACL> getAclForPath(String path) { return ZooDefs.Ids.CREATOR_ALL_ACL; } }); } CuratorFramework client = builder.build(); logger.debug("CuratorFramework client built successfully with connectString: {}, sessionTimeout: {} and connectionTimeout: {}", connectString, sessionTimeoutMs, connectionTimeoutMs); return client; }
/** * Creates a curator built using the given zookeeper connection string and timeout */ public static CuratorFramework newCurator(String zookeepers, int timeout, String secret) { final ExponentialBackoffRetry retry = new ExponentialBackoffRetry(1000, 10); if (secret.isEmpty()) { return CuratorFrameworkFactory.newClient(zookeepers, timeout, timeout, retry); } else { return CuratorFrameworkFactory.builder().connectString(zookeepers) .connectionTimeoutMs(timeout).sessionTimeoutMs(timeout).retryPolicy(retry) .authorization("digest", ("fluo:" + secret).getBytes(StandardCharsets.UTF_8)) .aclProvider(new ACLProvider() { @Override public List<ACL> getDefaultAcl() { return CREATOR_ALL_ACL; } @Override public List<ACL> getAclForPath(String path) { switch (path) { case ZookeeperPath.ORACLE_GC_TIMESTAMP: // The garbage collection iterator running in Accumulo tservers needs to read this // value w/o authenticating. return PUBLICLY_READABLE_ACL; default: return CREATOR_ALL_ACL; } } }).build(); } }
@Override public void init() { log.debug("Elastic job: zookeeper registry center init, server lists is: {}.", zkConfig.getServerLists()); CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder() .connectString(zkConfig.getServerLists()) .retryPolicy(new ExponentialBackoffRetry(zkConfig.getBaseSleepTimeMilliseconds(), zkConfig.getMaxRetries(), zkConfig.getMaxSleepTimeMilliseconds())) .namespace(zkConfig.getNamespace()); if (0 != zkConfig.getSessionTimeoutMilliseconds()) { builder.sessionTimeoutMs(zkConfig.getSessionTimeoutMilliseconds()); } if (0 != zkConfig.getConnectionTimeoutMilliseconds()) { builder.connectionTimeoutMs(zkConfig.getConnectionTimeoutMilliseconds()); } if (!Strings.isNullOrEmpty(zkConfig.getDigest())) { builder.authorization("digest", zkConfig.getDigest().getBytes(Charsets.UTF_8)) .aclProvider(new ACLProvider() { @Override public List<ACL> getDefaultAcl() { return ZooDefs.Ids.CREATOR_ALL_ACL; } @Override public List<ACL> getAclForPath(final String path) { return ZooDefs.Ids.CREATOR_ALL_ACL; } }); } client = builder.build(); client.start(); try { if (!client.blockUntilConnected(zkConfig.getMaxSleepTimeMilliseconds() * zkConfig.getMaxRetries(), TimeUnit.MILLISECONDS)) { client.close(); throw new KeeperException.OperationTimeoutException(); } //CHECKSTYLE:OFF } catch (final Exception ex) { //CHECKSTYLE:ON RegExceptionHandler.handleException(ex); } }
@Provides @Singleton CuratorFramework provideCuratorFramework( ShutdownRegistry shutdownRegistry, @ServiceDiscoveryBindings.ZooKeeper Iterable<InetSocketAddress> zooKeeperCluster, ACLProvider aclProvider) { String connectString = FluentIterable.from(zooKeeperCluster) .transform(InetSocketAddressHelper::toString) .join(Joiner.on(',')); if (zooKeeperConfig.getChrootPath().isPresent()) { connectString = connectString + zooKeeperConfig.getChrootPath().get(); } // This emulates the default BackoffHelper configuration used by the legacy commons/zookeeper // stack. BackoffHelper is unbounded, this dies after around 5 minutes using the 10 retries. // NB: BoundedExponentialBackoffRetry caps max retries at 29 if you send it a larger value. RetryPolicy retryPolicy = new BoundedExponentialBackoffRetry( Amount.of(1, Time.SECONDS).as(Time.MILLISECONDS), Amount.of(1, Time.MINUTES).as(Time.MILLISECONDS), 10); CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder() .dontUseContainerParents() // Container nodes are only available in ZK 3.5+. .connectString(connectString) .canBeReadOnly(false) // We must be able to write to perform leader election. .sessionTimeoutMs(zooKeeperConfig.getSessionTimeout().as(Time.MILLISECONDS)) .retryPolicy(retryPolicy) .aclProvider(aclProvider); if (zooKeeperConfig.getCredentials().isPresent()) { Credentials credentials = zooKeeperConfig.getCredentials().get(); builder.authorization(credentials.scheme(), credentials.authToken()); } CuratorFramework curatorFramework = builder.build(); // TODO(John Sirois): It would be nice to use a Service to control the lifecycle here, but other // services (org.apache.aurora.scheduler.http.JettyServerModule.RedirectMonitor) rely on this // service being started 1st which is not deterministic as things stand. Find a way to leverage // the Service system for services with Service dependencies. curatorFramework.start(); shutdownRegistry.addAction(curatorFramework::close); return curatorFramework; }
@Provides @Singleton ACLProvider provideACLProvider(@ServiceDiscoveryBindings.ZooKeeper List<ACL> acl) { return new SingleACLProvider(acl); }
/** * 初始化 */ @Override public void init() { if (client != null) return; if (zkConfig == null) { zkConfig = ZKConfig.build(); } Builder builder = CuratorFrameworkFactory .builder() .connectString(zkConfig.getHosts()) .retryPolicy(new ExponentialBackoffRetry(zkConfig.getBaseSleepTimeMs(), zkConfig.getMaxRetries(), zkConfig.getMaxSleepMs())) .namespace(zkConfig.getNamespace()); if (zkConfig.getConnectionTimeout() > 0) { builder.connectionTimeoutMs(zkConfig.getConnectionTimeout()); } if (zkConfig.getSessionTimeout() > 0) { builder.sessionTimeoutMs(zkConfig.getSessionTimeout()); } if (zkConfig.getDigest() != null) { /* * scheme对应于采用哪种方案来进行权限管理,zookeeper实现了一个pluggable的ACL方案,可以通过扩展scheme,来扩展ACL的机制。 * zookeeper缺省支持下面几种scheme: * * world: 默认方式,相当于全世界都能访问; 它下面只有一个id, 叫anyone, world:anyone代表任何人,zookeeper中对所有人有权限的结点就是属于world:anyone的 * auth: 代表已经认证通过的用户(cli中可以通过addauth digest user:pwd 来添加当前上下文中的授权用户); 它不需要id, 只要是通过authentication的user都有权限(zookeeper支持通过kerberos来进行authencation, 也支持username/password形式的authentication) * digest: 即用户名:密码这种方式认证,这也是业务系统中最常用的;它对应的id为username:BASE64(SHA1(password)),它需要先通过username:password形式的authentication * ip: 使用Ip地址认证;它对应的id为客户机的IP地址,设置的时候可以设置一个ip段,比如ip:192.168.1.0/16, 表示匹配前16个bit的IP段 * super: 在这种scheme情况下,对应的id拥有超级权限,可以做任何事情(cdrwa) */ builder.authorization("digest", zkConfig.getDigest().getBytes(Constants.UTF_8)); builder.aclProvider(new ACLProvider() { @Override public List<ACL> getDefaultAcl() { return ZooDefs.Ids.CREATOR_ALL_ACL; } @Override public List<ACL> getAclForPath(final String path) { return ZooDefs.Ids.CREATOR_ALL_ACL; } }); } client = builder.build(); Logs.RSD.info("init zk client, config={}", zkConfig.toString()); }
@Override public void init() { if (zkConfig.isUseNestedZookeeper()) { NestedZookeeperServers.getInstance().startServerIfNotStarted(zkConfig.getNestedPort(), zkConfig.getNestedDataDir()); } log.debug("Elastic job: zookeeper registry center init, server lists is: {}.", zkConfig.getServerLists()); Builder builder = CuratorFrameworkFactory.builder() .connectString(zkConfig.getServerLists()) .retryPolicy(new ExponentialBackoffRetry(zkConfig.getBaseSleepTimeMilliseconds(), zkConfig.getMaxRetries(), zkConfig.getMaxSleepTimeMilliseconds())) .namespace(zkConfig.getNamespace()); if (0 != zkConfig.getSessionTimeoutMilliseconds()) { builder.sessionTimeoutMs(zkConfig.getSessionTimeoutMilliseconds()); } if (0 != zkConfig.getConnectionTimeoutMilliseconds()) { builder.connectionTimeoutMs(zkConfig.getConnectionTimeoutMilliseconds()); } if (!Strings.isNullOrEmpty(zkConfig.getDigest())) { builder.authorization("digest", zkConfig.getDigest().getBytes(Charset.forName("UTF-8"))) .aclProvider(new ACLProvider() { @Override public List<ACL> getDefaultAcl() { return ZooDefs.Ids.CREATOR_ALL_ACL; } @Override public List<ACL> getAclForPath(final String path) { return ZooDefs.Ids.CREATOR_ALL_ACL; } }); } client = builder.build(); client.start(); try { client.blockUntilConnected(zkConfig.getMaxSleepTimeMilliseconds() * zkConfig.getMaxRetries(), TimeUnit.MILLISECONDS); if (!client.getZookeeperClient().isConnected()) { throw new KeeperException.OperationTimeoutException(); } if (!Strings.isNullOrEmpty(zkConfig.getLocalPropertiesPath())) { fillData(); } //CHECKSTYLE:OFF } catch (final Exception ex) { //CHECKSTYLE:ON RegExceptionHandler.handleException(ex); } }
@Override public void init() { if (zkConfig.isUseNestedZookeeper()) { NestedZookeeperServers.getInstance().startServerIfNotStarted(zkConfig.getNestedPort(), zkConfig.getNestedDataDir()); } log.debug("Elastic config: zookeeper registry center init, server lists is: {}.", zkConfig.getServerLists()); Builder builder = CuratorFrameworkFactory .builder() .connectString(zkConfig.getServerLists()) .retryPolicy( new ExponentialBackoffRetry(zkConfig.getBaseSleepTimeMilliseconds(), zkConfig.getMaxRetries(), zkConfig .getMaxSleepTimeMilliseconds())).namespace(zkConfig.getNamespace()); if (0 != zkConfig.getSessionTimeoutMilliseconds()) { builder.sessionTimeoutMs(zkConfig.getSessionTimeoutMilliseconds()); } if (0 != zkConfig.getConnectionTimeoutMilliseconds()) { builder.connectionTimeoutMs(zkConfig.getConnectionTimeoutMilliseconds()); } if (!Strings.isNullOrEmpty(zkConfig.getDigest())) { builder.authorization("digest", zkConfig.getDigest().getBytes(Charset.forName("UTF-8"))).aclProvider( new ACLProvider() { @Override public List<ACL> getDefaultAcl() { return ZooDefs.Ids.CREATOR_ALL_ACL; } @Override public List<ACL> getAclForPath(final String path) { return ZooDefs.Ids.CREATOR_ALL_ACL; } }); } client = builder.build(); client.start(); try { client.blockUntilConnected(zkConfig.getMaxSleepTimeMilliseconds() * zkConfig.getMaxRetries(), TimeUnit.MILLISECONDS); if (!client.getZookeeperClient().isConnected()) { throw new KeeperException.OperationTimeoutException(); } } catch (final Exception ex) { RegisterExceptionHandler.handleException(ex); } }
private ACLProvider getAclProvider(ExhibitorCLI cli, String aclId, String aclScheme, String aclPerms) throws ExhibitorCreatorExit { int perms; if ( notNullOrEmpty(aclPerms) ) { perms = 0; for ( String verb : aclPerms.split(",") ) { verb = verb.trim(); if ( verb.equalsIgnoreCase("read") ) { perms |= ZooDefs.Perms.READ; } else if ( verb.equalsIgnoreCase("write") ) { perms |= ZooDefs.Perms.WRITE; } else if ( verb.equalsIgnoreCase("create") ) { perms |= ZooDefs.Perms.CREATE; } else if ( verb.equalsIgnoreCase("delete") ) { perms |= ZooDefs.Perms.DELETE; } else if ( verb.equalsIgnoreCase("admin") ) { perms |= ZooDefs.Perms.ADMIN; } else { log.error("Unknown ACL perm value: " + verb); throw new ExhibitorCreatorExit(cli); } } } else { perms = ZooDefs.Perms.ALL; } if ( aclId == null ) { aclId = ""; } if ( aclScheme == null ) { aclScheme = ""; } final ACL acl = new ACL(perms, new Id(aclScheme, aclId)); return new ACLProvider() { @Override public List<ACL> getDefaultAcl() { return Collections.singletonList(acl); } @Override public List<ACL> getAclForPath(String path) { return Collections.singletonList(acl); } }; }
private CuratorFramework buildCuratorClient(ACLProvider aclProvider) { return curatorClientTemplate() .aclProvider(aclProvider) .build(); }
public ACLProvider getAclProvider() { return aclProvider; }
ACLing(ACLProvider aclProvider) { this(aclProvider, null); }
ACLing(ACLProvider aclProvider, List<ACL> aclList) { this(aclProvider, aclList, false); }
ACLing(ACLProvider aclProvider, List<ACL> aclList, boolean applyToParents) { this.aclProvider = aclProvider; this.aclList = (aclList != null) ? ImmutableList.copyOf(aclList) : null; this.applyToParents = applyToParents; }
@Test public void testErrorListener() throws Exception { //The first call to the ACL provider will return a reasonable //value. The second will throw an error. This is because the ACL //provider is accessed prior to the backgrounding call. final AtomicBoolean aclProviderCalled = new AtomicBoolean(false); ACLProvider badAclProvider = new ACLProvider() { @Override public List<ACL> getDefaultAcl() { if(aclProviderCalled.getAndSet(true)) { throw new UnsupportedOperationException(); } else { return new ArrayList<>(); } } @Override public List<ACL> getAclForPath(String path) { if(aclProviderCalled.getAndSet(true)) { throw new UnsupportedOperationException(); } else { return new ArrayList<>(); } } }; CuratorFramework client = CuratorFrameworkFactory.builder() .connectString(server.getConnectString()) .retryPolicy(new RetryOneTime(1)) .aclProvider(badAclProvider) .build(); try { client.start(); final CountDownLatch errorLatch = new CountDownLatch(1); UnhandledErrorListener listener = new UnhandledErrorListener() { @Override public void unhandledError(String message, Throwable e) { if ( e instanceof UnsupportedOperationException ) { errorLatch.countDown(); } } }; client.create().inBackground().withUnhandledErrorListener(listener).forPath("/foo"); Assert.assertTrue(new Timing().awaitLatch(errorLatch)); } finally { CloseableUtils.closeQuietly(client); } }
@Test public void testErrorListener() throws Exception { //The first call to the ACL provider will return a reasonable //value. The second will throw an error. This is because the ACL //provider is accessed prior to the backgrounding call. final AtomicBoolean aclProviderCalled = new AtomicBoolean(false); ACLProvider badAclProvider = new ACLProvider() { @Override public List<ACL> getDefaultAcl() { if(aclProviderCalled.getAndSet(true)) { throw new UnsupportedOperationException(); } else { return new ArrayList<>(); } } @Override public List<ACL> getAclForPath(String path) { if(aclProviderCalled.getAndSet(true)) { throw new UnsupportedOperationException(); } else { return new ArrayList<>(); } } }; CuratorFramework client = CuratorFrameworkFactory.builder() .connectString(server.getConnectString()) .retryPolicy(new RetryOneTime(1)) .aclProvider(badAclProvider) .build(); try { client.start(); AsyncCuratorFramework async = AsyncCuratorFramework.wrap(client); final CountDownLatch errorLatch = new CountDownLatch(1); UnhandledErrorListener listener = (message, e) -> { if ( e instanceof UnsupportedOperationException ) { errorLatch.countDown(); } }; async.with(listener).create().forPath("/foo"); Assert.assertTrue(new Timing().awaitLatch(errorLatch)); } finally { CloseableUtils.closeQuietly(client); } }
@Test public void testNoWritePermission() throws Exception { final ACLProvider aclProvider = new ACLProvider() { final ACL acl = new ACL(ZooDefs.Perms.READ | ZooDefs.Perms.CREATE | ZooDefs.Perms.DELETE, ZooDefs.Ids.ANYONE_ID_UNSAFE); final List<ACL> aclList = Collections.singletonList(acl); @Override public List<ACL> getDefaultAcl() { return aclList; } @Override public List<ACL> getAclForPath(String path) { return aclList; } }; CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder(); CuratorFramework client = builder .connectString(server.getConnectString()) .aclProvider(aclProvider) .retryPolicy(new RetryOneTime(1)) .build(); PersistentEphemeralNode node = null; try { client.start(); node = new PersistentEphemeralNode(client, PersistentEphemeralNode.Mode.EPHEMERAL, PATH, new byte[0]); node.start(); assertTrue(node.waitForInitialCreate(timing.seconds(), TimeUnit.SECONDS), "Node not created"); assertNodeExists(client, PATH); assertFalse(node.isAuthFailure(), "AuthFailure when creating node."); byte[] NEW_DATA = "NEW_DATA".getBytes(); node.setData(NEW_DATA); timing.sleepABit(); byte[] read_data = client.getData().forPath(PATH); assertNotEquals(read_data, NEW_DATA, "Data matches - write went through."); assertTrue(node.isAuthFailure(), "AuthFailure response not received."); } finally { CloseableUtils.closeQuietly(node); CloseableUtils.closeQuietly(client); } }
void bindAclProvider(ACLProvider service) { this.aclProvider.bind(service); }
void unbindAclProvider(ACLProvider service) { this.aclProvider.unbind(service); }
@Inject(optional = true) public void setACLProvider(@Curator final ACLProvider aclProvider) { this.builder.aclProvider(aclProvider); }