@SuppressWarnings("SynchronizeOnThis") public WanReplication getWanReplication(String name) { WanReplication wr = mapWanReplications.get(name); if (wr != null) return wr; synchronized (this) { wr = mapWanReplications.get(name); if (wr != null) return wr; WanReplicationConfig wanReplicationConfig = node.getConfig().getWanReplicationConfig(name); if (wanReplicationConfig == null) return null; List<WanTargetClusterConfig> targets = wanReplicationConfig.getTargetClusterConfigs(); WanReplicationEndpoint[] targetClusters = new WanReplicationEndpoint[targets.size()]; int count = 0; for (WanTargetClusterConfig targetClusterConfig : targets) { WanReplicationEndpoint target = new WanNoDelayReplication(); String groupName = targetClusterConfig.getGroupName(); String password = targetClusterConfig.getGroupPassword(); String[] addresses = new String[targetClusterConfig.getEndpoints().size()]; targetClusterConfig.getEndpoints().toArray(addresses); target.init(node, groupName, password, addresses); targetClusters[count++] = target; } wr = new WanReplication(name, targetClusters); mapWanReplications.put(name, wr); return wr; } }
private WanReplicationConfig getWanReplication() { // TODO Auto-generated method stub WanReplicationConfig wrConfig = new WanReplicationConfig(); List<WanPublisherConfig> publisherConfig = wrConfig.getWanPublisherConfigs(); wrConfig.setName("my-wan-cluster-batch"); // publisherConfig.setGroupName("london"); // publisherConfig.setClassName("com.hazelcast.enterprise.wan.replication.WanBatchReplication"); // // Map<String, Comparable> props = publisherConfig.getProperties(); // props.put("group.password", "london-pass"); // props.put("snapshot.enabled", false); // props.put("endpoints", "10.3.5.1:5701,10.3.5.2:5701"); return wrConfig; }
@Test public void testWANClusteringActivePassive() throws Exception { Config c1 = new Config(); Config c2 = new Config(); c1.getGroupConfig().setName("newyork"); c1.addWanReplicationConfig(new WanReplicationConfig() .setName("my-wan") .addTargetClusterConfig(new WanTargetClusterConfig() .addEndpoint("127.0.0.1:5702").setGroupName("london"))); c1.getMapConfig("default").setWanReplicationRef(new WanReplicationRef() .setName("my-wan") .setMergePolicy(PassThroughMergePolicy.NAME)); c2.getGroupConfig().setName("london"); c2.getMapConfig("default").setWanReplicationRef(new WanReplicationRef() .setName("my-wan") .setMergePolicy(PassThroughMergePolicy.NAME)); HazelcastInstance h10 = Hazelcast.newHazelcastInstance(c1); HazelcastInstance h20 = Hazelcast.newHazelcastInstance(c2); int size = 1000; MergeLatch mergeLatch2 = new MergeLatch(size); getConcurrentMapManager(h20).addWanMergeListener(mergeLatch2); for (int i = 0; i < size; i++) { h10.getMap("default").put(i, "value" + i); } assertTrue("Latch state: " + mergeLatch2, mergeLatch2.await(60, TimeUnit.SECONDS)); Thread.sleep(1000); assertEquals(size, mergeLatch2.totalOperations()); assertEquals(size, h10.getMap("default").size()); assertEquals(size, h20.getMap("default").size()); for (int i = 0; i < size; i++) { assertEquals("value" + i, h20.getMap("default").get(i)); } }
@SuppressWarnings("SynchronizeOnThis") public WanReplication getWanReplication(String name) { WanReplication wr = mapWanReplications.get(name); if (wr != null) return wr; synchronized (this) { wr = mapWanReplications.get(name); if (wr != null) return wr; WanReplicationConfig wanReplicationConfig = node.getConfig().getWanReplicationConfig(name); if (wanReplicationConfig == null) return null; List<WanTargetClusterConfig> targets = wanReplicationConfig.getTargetClusterConfigs(); WanReplicationEndpoint[] targetClusters = new WanReplicationEndpoint[targets.size()]; int count = 0; for (WanTargetClusterConfig targetClusterConfig : targets) { WanReplicationEndpoint target = null; if( targetClusterConfig.getReplicationImpl() != null) { try { target = (WanReplicationEndpoint) Serializer.loadClass(targetClusterConfig.getReplicationImpl()).newInstance(); } catch (Exception e) { logger.log(Level.SEVERE, e.getMessage(), e); } } else { target = new WanNoDelayReplication(); } String groupName = targetClusterConfig.getGroupName(); String password = targetClusterConfig.getGroupPassword(); String[] addresses = new String[targetClusterConfig.getEndpoints().size()]; targetClusterConfig.getEndpoints().toArray(addresses); target.init(node, groupName, password, addresses); targetClusters[count++] = target; } wr = new WanReplication(name, targetClusters); mapWanReplications.put(name, wr); return wr; } }
@Test public void testWANClusteringActivePassive() throws Exception { Config c1 = new Config(); Config c2 = new Config(); c1.getGroupConfig().setName("newyork"); c1.addWanReplicationConfig(new WanReplicationConfig() .setName("my-wan") .addTargetClusterConfig(new WanTargetClusterConfig() .addEndpoint("127.0.0.1:5702").setGroupName("london"))); c1.getMapConfig("default").setWanReplicationRef(new WanReplicationRef() .setName("my-wan") .setMergePolicy(PassThroughMergePolicy.NAME)); c2.getGroupConfig().setName("london"); c2.getMapConfig("default").setWanReplicationRef(new WanReplicationRef() .setName("my-wan") .setMergePolicy(PassThroughMergePolicy.NAME)); HazelcastInstance h10 = Hazelcast.newHazelcastInstance(c1); HazelcastInstance h20 = Hazelcast.newHazelcastInstance(c2); int size = 1000; MergeLatch mergeLatch2 = new MergeLatch(size); getConcurrentMapManager(h20).addWanMergeListener(mergeLatch2); for (int i = 0; i < size; i++) { h10.getMap("default").put(i, "value" + i); } assertTrue("Latch state: " + mergeLatch2, mergeLatch2.await(60, TimeUnit.SECONDS)); Thread.sleep(5000); assertEquals(size, mergeLatch2.totalOperations()); assertEquals(size, h10.getMap("default").size()); assertEquals(size, h20.getMap("default").size()); for (int i = 0; i < size; i++) { assertEquals("value" + i, h20.getMap("default").get(i)); } }
@Test public void testWANClustering() throws Exception { Config c1 = new Config(); Config c2 = new Config(); c1.getGroupConfig().setName("newyork"); c1.addWanReplicationConfig(new WanReplicationConfig() .setName("my-wan") .addTargetClusterConfig(new WanTargetClusterConfig() .addEndpoint("127.0.0.1:5702").setGroupName("london"))); c1.getMapConfig("default").setWanReplicationRef(new WanReplicationRef() .setName("my-wan") .setMergePolicy(PassThroughMergePolicy.NAME)); c2.getGroupConfig().setName("london"); c2.addWanReplicationConfig(new WanReplicationConfig() .setName("my-wan") .addTargetClusterConfig(new WanTargetClusterConfig() .addEndpoint("127.0.0.1:5701").setGroupName("newyork"))); c2.getMapConfig("default").setWanReplicationRef(new WanReplicationRef() .setName("my-wan") .setMergePolicy(PassThroughMergePolicy.NAME)); HazelcastInstance h1 = Hazelcast.newHazelcastInstance(c1); HazelcastInstance h2 = Hazelcast.newHazelcastInstance(c2); HazelcastInstance h12 = Hazelcast.newHazelcastInstance(c1); HazelcastInstance h13 = Hazelcast.newHazelcastInstance(c1); HazelcastInstance h22 = Hazelcast.newHazelcastInstance(c2); int size = 100; MergeLatch mergeLatch1 = new MergeLatch(2 * size); MergeLatch mergeLatch2 = new MergeLatch(size); getConcurrentMapManager(h1).addWanMergeListener(mergeLatch1); getConcurrentMapManager(h12).addWanMergeListener(mergeLatch1); getConcurrentMapManager(h13).addWanMergeListener(mergeLatch1); getConcurrentMapManager(h2).addWanMergeListener(mergeLatch2); getConcurrentMapManager(h22).addWanMergeListener(mergeLatch2); for (int i = 0; i < size; i++) { h2.getMap("default").put(i, "value" + i); h22.getMap("default").put(size + i, "value" + (size + i)); } assertTrue("Latch state: " + mergeLatch1, mergeLatch1.await(60, TimeUnit.SECONDS)); Thread.sleep(1000); assertEquals(0, mergeLatch2.totalOperations()); assertEquals(2 * size, mergeLatch1.getUpdateCount()); assertEquals(2 * size, mergeLatch1.totalOperations()); assertEquals(2 * size, h2.getMap("default").size()); assertEquals(2 * size, h1.getMap("default").size()); assertEquals(2 * size, h12.getMap("default").size()); assertEquals(2 * size, h13.getMap("default").size()); assertEquals(2 * size, h22.getMap("default").size()); mergeLatch1.reset(); for (int i = 0; i < size / 2; i++) { h1.getMap("default").remove(i); h13.getMap("default").remove(size + i); } assertTrue("Latch state: " + mergeLatch2, mergeLatch2.await(60, TimeUnit.SECONDS)); Thread.sleep(1000); assertEquals(size, mergeLatch2.getRemoveCount()); assertEquals(size, mergeLatch2.totalOperations()); assertEquals(0, mergeLatch1.totalOperations()); assertEquals(size, h1.getMap("default").size()); assertEquals(size, h2.getMap("default").size()); assertEquals(size, h12.getMap("default").size()); assertEquals(size, h13.getMap("default").size()); assertEquals(size, h22.getMap("default").size()); }
@Test public void testWANClustering2() throws Exception { Config c1 = new Config(); Config c2 = new Config(); c1.getGroupConfig().setName("newyork"); c1.addWanReplicationConfig(new WanReplicationConfig() .setName("my-wan") .addTargetClusterConfig(new WanTargetClusterConfig() .addEndpoint("127.0.0.1:5703").setGroupName("london"))); c1.getMapConfig("default").setWanReplicationRef(new WanReplicationRef() .setName("my-wan") .setMergePolicy(PassThroughMergePolicy.NAME)); c2.getGroupConfig().setName("london"); c2.addWanReplicationConfig(new WanReplicationConfig() .setName("my-wan") .addTargetClusterConfig(new WanTargetClusterConfig() .addEndpoint("127.0.0.1:5701").setGroupName("newyork"))); c2.getMapConfig("default").setWanReplicationRef(new WanReplicationRef() .setName("my-wan") .setMergePolicy(PassThroughMergePolicy.NAME)); HazelcastInstance h10 = Hazelcast.newHazelcastInstance(c1); HazelcastInstance h11 = Hazelcast.newHazelcastInstance(c1); int size = 1000; HazelcastInstance h20 = Hazelcast.newHazelcastInstance(c2); HazelcastInstance h21 = Hazelcast.newHazelcastInstance(c2); HazelcastInstance h12 = Hazelcast.newHazelcastInstance(c1); MergeLatch mergeLatch1 = new MergeLatch(size); getConcurrentMapManager(h10).addWanMergeListener(mergeLatch1); getConcurrentMapManager(h11).addWanMergeListener(mergeLatch1); getConcurrentMapManager(h12).addWanMergeListener(mergeLatch1); MergeLatch mergeLatch2 = new MergeLatch(size); getConcurrentMapManager(h20).addWanMergeListener(mergeLatch2); getConcurrentMapManager(h21).addWanMergeListener(mergeLatch2); for (int i = 0; i < size; i++) { h11.getMap("default").put(i, "value" + i); } assertTrue("Latch state: " + mergeLatch2, mergeLatch2.await(60, TimeUnit.SECONDS)); Thread.sleep(1000); assertEquals(size, mergeLatch2.totalOperations()); assertEquals(0, mergeLatch1.totalOperations()); assertEquals(size, h10.getMap("default").size()); assertEquals(size, h20.getMap("default").size()); assertEquals(size, h12.getMap("default").size()); assertEquals(size, h11.getMap("default").size()); assertEquals(size, h21.getMap("default").size()); mergeLatch2.reset(); for (int i = 0; i < size; i++) { h21.getMap("default").put(size + i, "value" + (size + i)); } assertTrue("Latch state: " + mergeLatch1, mergeLatch1.await(60, TimeUnit.SECONDS)); Thread.sleep(1000); assertEquals(size, mergeLatch1.totalOperations()); assertEquals(0, mergeLatch2.totalOperations()); assertEquals(2 * size, h10.getMap("default").size()); assertEquals(2 * size, h20.getMap("default").size()); assertEquals(2 * size, h12.getMap("default").size()); assertEquals(2 * size, h11.getMap("default").size()); assertEquals(2 * size, h21.getMap("default").size()); mergeLatch1.reset(size / 2); mergeLatch2.reset(size / 2); for (int i = 0; i < size / 2; i++) { h10.getMap("default").remove(i); h21.getMap("default").remove(size + i); } assertTrue("Latch state: " + mergeLatch1, mergeLatch1.await(60, TimeUnit.SECONDS)); assertTrue("Latch state: " + mergeLatch2, mergeLatch2.await(60, TimeUnit.SECONDS)); Thread.sleep(1000); assertEquals(size / 2, mergeLatch1.totalOperations()); assertEquals(size / 2, mergeLatch2.totalOperations()); assertEquals(size, h10.getMap("default").size()); assertEquals(size, h20.getMap("default").size()); assertEquals(size, h12.getMap("default").size()); assertEquals(size, h11.getMap("default").size()); assertEquals(size, h21.getMap("default").size()); }
@Test public void testWANClustering() throws Exception { Config c1 = new Config(); Config c2 = new Config(); c1.getGroupConfig().setName("newyork"); c1.addWanReplicationConfig(new WanReplicationConfig() .setName("my-wan") .addTargetClusterConfig(new WanTargetClusterConfig() .addEndpoint("127.0.0.1:5702").setGroupName("london"))); c1.getMapConfig("default").setWanReplicationRef(new WanReplicationRef() .setName("my-wan") .setMergePolicy(PassThroughMergePolicy.NAME)); c2.getGroupConfig().setName("london"); c2.addWanReplicationConfig(new WanReplicationConfig() .setName("my-wan") .addTargetClusterConfig(new WanTargetClusterConfig() .addEndpoint("127.0.0.1:5701").setGroupName("newyork"))); c2.getMapConfig("default").setWanReplicationRef(new WanReplicationRef() .setName("my-wan") .setMergePolicy(PassThroughMergePolicy.NAME)); HazelcastInstance h1 = Hazelcast.newHazelcastInstance(c1); HazelcastInstance h2 = Hazelcast.newHazelcastInstance(c2); HazelcastInstance h12 = Hazelcast.newHazelcastInstance(c1); HazelcastInstance h13 = Hazelcast.newHazelcastInstance(c1); HazelcastInstance h22 = Hazelcast.newHazelcastInstance(c2); int size = 100; MergeLatch mergeLatch1 = new MergeLatch(2 * size); MergeLatch mergeLatch2 = new MergeLatch(size); getConcurrentMapManager(h1).addWanMergeListener(mergeLatch1); getConcurrentMapManager(h12).addWanMergeListener(mergeLatch1); getConcurrentMapManager(h13).addWanMergeListener(mergeLatch1); getConcurrentMapManager(h2).addWanMergeListener(mergeLatch2); getConcurrentMapManager(h22).addWanMergeListener(mergeLatch2); for (int i = 0; i < size; i++) { h2.getMap("default").put(i, "value" + i); h22.getMap("default").put(size + i, "value" + (size + i)); } assertTrue("Latch state 1: " + mergeLatch1, mergeLatch1.await(60, TimeUnit.SECONDS)); Thread.sleep(5000); assertEquals(0, mergeLatch2.totalOperations()); assertEquals(2 * size, mergeLatch1.getUpdateCount()); assertEquals(2 * size, mergeLatch1.totalOperations()); assertEquals(2 * size, h2.getMap("default").size()); assertEquals(2 * size, h1.getMap("default").size()); assertEquals(2 * size, h12.getMap("default").size()); assertEquals(2 * size, h13.getMap("default").size()); assertEquals(2 * size, h22.getMap("default").size()); mergeLatch1.reset(); for (int i = 0; i < size / 2; i++) { h1.getMap("default").remove(i); h13.getMap("default").remove(size + i); } assertTrue("Latch state 2: " + mergeLatch2, mergeLatch2.await(60, TimeUnit.SECONDS)); Thread.sleep(5000); assertEquals(size, mergeLatch2.getRemoveCount()); assertEquals(size, mergeLatch2.totalOperations()); assertEquals(0, mergeLatch1.totalOperations()); assertEquals(size, h1.getMap("default").size()); assertEquals(size, h2.getMap("default").size()); assertEquals(size, h12.getMap("default").size()); assertEquals(size, h13.getMap("default").size()); assertEquals(size, h22.getMap("default").size()); }
@Test public void testWANClustering2() throws Exception { Config c1 = new Config(); Config c2 = new Config(); c1.getGroupConfig().setName("newyork"); c1.addWanReplicationConfig(new WanReplicationConfig() .setName("my-wan") .addTargetClusterConfig(new WanTargetClusterConfig() .addEndpoint("127.0.0.1:5703").setGroupName("london"))); c1.getMapConfig("default").setWanReplicationRef(new WanReplicationRef() .setName("my-wan") .setMergePolicy(PassThroughMergePolicy.NAME)); c2.getGroupConfig().setName("london"); c2.addWanReplicationConfig(new WanReplicationConfig() .setName("my-wan") .addTargetClusterConfig(new WanTargetClusterConfig() .addEndpoint("127.0.0.1:5701").setGroupName("newyork"))); c2.getMapConfig("default").setWanReplicationRef(new WanReplicationRef() .setName("my-wan") .setMergePolicy(PassThroughMergePolicy.NAME)); HazelcastInstance h10 = Hazelcast.newHazelcastInstance(c1); HazelcastInstance h11 = Hazelcast.newHazelcastInstance(c1); int size = 1000; HazelcastInstance h20 = Hazelcast.newHazelcastInstance(c2); HazelcastInstance h21 = Hazelcast.newHazelcastInstance(c2); HazelcastInstance h12 = Hazelcast.newHazelcastInstance(c1); MergeLatch mergeLatch1 = new MergeLatch(size); getConcurrentMapManager(h10).addWanMergeListener(mergeLatch1); getConcurrentMapManager(h11).addWanMergeListener(mergeLatch1); getConcurrentMapManager(h12).addWanMergeListener(mergeLatch1); MergeLatch mergeLatch2 = new MergeLatch(size); getConcurrentMapManager(h20).addWanMergeListener(mergeLatch2); getConcurrentMapManager(h21).addWanMergeListener(mergeLatch2); for (int i = 0; i < size; i++) { h11.getMap("default").put(i, "value" + i); } assertTrue("Latch state: " + mergeLatch2, mergeLatch2.await(60, TimeUnit.SECONDS)); Thread.sleep(5000); assertEquals(size, mergeLatch2.totalOperations()); assertEquals(0, mergeLatch1.totalOperations()); assertEquals(size, h10.getMap("default").size()); assertEquals(size, h20.getMap("default").size()); assertEquals(size, h12.getMap("default").size()); assertEquals(size, h11.getMap("default").size()); assertEquals(size, h21.getMap("default").size()); mergeLatch2.reset(); for (int i = 0; i < size; i++) { h21.getMap("default").put(size + i, "value" + (size + i)); } assertTrue("Latch state: " + mergeLatch1, mergeLatch1.await(60, TimeUnit.SECONDS)); Thread.sleep(5000); assertEquals(size, mergeLatch1.totalOperations()); assertEquals(0, mergeLatch2.totalOperations()); assertEquals(2 * size, h10.getMap("default").size()); assertEquals(2 * size, h20.getMap("default").size()); assertEquals(2 * size, h12.getMap("default").size()); assertEquals(2 * size, h11.getMap("default").size()); assertEquals(2 * size, h21.getMap("default").size()); mergeLatch1.reset(size / 2); mergeLatch2.reset(size / 2); for (int i = 0; i < size / 2; i++) { h10.getMap("default").remove(i); h21.getMap("default").remove(size + i); } assertTrue("Latch state: " + mergeLatch1, mergeLatch1.await(60, TimeUnit.SECONDS)); assertTrue("Latch state: " + mergeLatch2, mergeLatch2.await(60, TimeUnit.SECONDS)); Thread.sleep(5000); assertEquals(size / 2, mergeLatch1.totalOperations()); assertEquals(size / 2, mergeLatch2.totalOperations()); assertEquals(size, h10.getMap("default").size()); assertEquals(size, h20.getMap("default").size()); assertEquals(size, h12.getMap("default").size()); assertEquals(size, h11.getMap("default").size()); assertEquals(size, h21.getMap("default").size()); }