Java 类com.hazelcast.config.WanReplicationConfig 实例源码

项目:hazelcast-archive    文件:WanReplicationService.java   
@SuppressWarnings("SynchronizeOnThis")
public WanReplication getWanReplication(String name) {
    WanReplication wr = mapWanReplications.get(name);
    if (wr != null) return wr;
    synchronized (this) {
        wr = mapWanReplications.get(name);
        if (wr != null) return wr;
        WanReplicationConfig wanReplicationConfig = node.getConfig().getWanReplicationConfig(name);
        if (wanReplicationConfig == null) return null;
        List<WanTargetClusterConfig> targets = wanReplicationConfig.getTargetClusterConfigs();
        WanReplicationEndpoint[] targetClusters = new WanReplicationEndpoint[targets.size()];
        int count = 0;
        for (WanTargetClusterConfig targetClusterConfig : targets) {
            WanReplicationEndpoint target = new WanNoDelayReplication();
            String groupName = targetClusterConfig.getGroupName();
            String password = targetClusterConfig.getGroupPassword();
            String[] addresses = new String[targetClusterConfig.getEndpoints().size()];
            targetClusterConfig.getEndpoints().toArray(addresses);
            target.init(node, groupName, password, addresses);
            targetClusters[count++] = target;
        }
        wr = new WanReplication(name, targetClusters);
        mapWanReplications.put(name, wr);
        return wr;
    }
}
项目:eet.osslite.cz    文件:HazelcastConfiguration.java   
private WanReplicationConfig getWanReplication() {
    // TODO Auto-generated method stub
    WanReplicationConfig wrConfig = new WanReplicationConfig();
    List<WanPublisherConfig> publisherConfig = wrConfig.getWanPublisherConfigs();

    wrConfig.setName("my-wan-cluster-batch");
    // publisherConfig.setGroupName("london");
    // publisherConfig.setClassName("com.hazelcast.enterprise.wan.replication.WanBatchReplication");
    //
    // Map<String, Comparable> props = publisherConfig.getProperties();
    // props.put("group.password", "london-pass");
    // props.put("snapshot.enabled", false);
    // props.put("endpoints", "10.3.5.1:5701,10.3.5.2:5701");
    return wrConfig;
}
项目:hazelcast-archive    文件:WanReplicationTest.java   
@Test
public void testWANClusteringActivePassive() throws Exception {
    Config c1 = new Config();
    Config c2 = new Config();
    c1.getGroupConfig().setName("newyork");
    c1.addWanReplicationConfig(new WanReplicationConfig()
            .setName("my-wan")
            .addTargetClusterConfig(new WanTargetClusterConfig()
                    .addEndpoint("127.0.0.1:5702").setGroupName("london")));
    c1.getMapConfig("default").setWanReplicationRef(new WanReplicationRef()
            .setName("my-wan")
            .setMergePolicy(PassThroughMergePolicy.NAME));
    c2.getGroupConfig().setName("london");
    c2.getMapConfig("default").setWanReplicationRef(new WanReplicationRef()
            .setName("my-wan")
            .setMergePolicy(PassThroughMergePolicy.NAME));
    HazelcastInstance h10 = Hazelcast.newHazelcastInstance(c1);
    HazelcastInstance h20 = Hazelcast.newHazelcastInstance(c2);
    int size = 1000;
    MergeLatch mergeLatch2 = new MergeLatch(size);
    getConcurrentMapManager(h20).addWanMergeListener(mergeLatch2);
    for (int i = 0; i < size; i++) {
        h10.getMap("default").put(i, "value" + i);
    }
    assertTrue("Latch state: " + mergeLatch2, mergeLatch2.await(60, TimeUnit.SECONDS));
    Thread.sleep(1000);
    assertEquals(size, mergeLatch2.totalOperations());
    assertEquals(size, h10.getMap("default").size());
    assertEquals(size, h20.getMap("default").size());
    for (int i = 0; i < size; i++) {
        assertEquals("value" + i, h20.getMap("default").get(i));
    }
}
项目:health-and-care-developer-network    文件:WanReplicationService.java   
@SuppressWarnings("SynchronizeOnThis")
public WanReplication getWanReplication(String name) {
    WanReplication wr = mapWanReplications.get(name);
    if (wr != null) return wr;
    synchronized (this) {
        wr = mapWanReplications.get(name);
        if (wr != null) return wr;
        WanReplicationConfig wanReplicationConfig = node.getConfig().getWanReplicationConfig(name);
        if (wanReplicationConfig == null) return null;
        List<WanTargetClusterConfig> targets = wanReplicationConfig.getTargetClusterConfigs();
        WanReplicationEndpoint[] targetClusters = new WanReplicationEndpoint[targets.size()];
        int count = 0;
        for (WanTargetClusterConfig targetClusterConfig : targets) {
            WanReplicationEndpoint target = null;
            if( targetClusterConfig.getReplicationImpl() != null) {
                try {
                    target = (WanReplicationEndpoint) Serializer.loadClass(targetClusterConfig.getReplicationImpl()).newInstance();
                } catch (Exception e) {
                    logger.log(Level.SEVERE, e.getMessage(), e);
                }
            }
            else {
                target = new WanNoDelayReplication();
            }
            String groupName = targetClusterConfig.getGroupName();
            String password = targetClusterConfig.getGroupPassword();
            String[] addresses = new String[targetClusterConfig.getEndpoints().size()];
            targetClusterConfig.getEndpoints().toArray(addresses);
            target.init(node, groupName, password, addresses);
            targetClusters[count++] = target;
        }
        wr = new WanReplication(name, targetClusters);
        mapWanReplications.put(name, wr);
        return wr;
    }
}
项目:health-and-care-developer-network    文件:WanReplicationTest.java   
@Test
public void testWANClusteringActivePassive() throws Exception {
    Config c1 = new Config();
    Config c2 = new Config();
    c1.getGroupConfig().setName("newyork");
    c1.addWanReplicationConfig(new WanReplicationConfig()
            .setName("my-wan")
            .addTargetClusterConfig(new WanTargetClusterConfig()
                    .addEndpoint("127.0.0.1:5702").setGroupName("london")));
    c1.getMapConfig("default").setWanReplicationRef(new WanReplicationRef()
            .setName("my-wan")
            .setMergePolicy(PassThroughMergePolicy.NAME));
    c2.getGroupConfig().setName("london");
    c2.getMapConfig("default").setWanReplicationRef(new WanReplicationRef()
            .setName("my-wan")
            .setMergePolicy(PassThroughMergePolicy.NAME));
    HazelcastInstance h10 = Hazelcast.newHazelcastInstance(c1);
    HazelcastInstance h20 = Hazelcast.newHazelcastInstance(c2);
    int size = 1000;
    MergeLatch mergeLatch2 = new MergeLatch(size);
    getConcurrentMapManager(h20).addWanMergeListener(mergeLatch2);
    for (int i = 0; i < size; i++) {
        h10.getMap("default").put(i, "value" + i);
    }
    assertTrue("Latch state: " + mergeLatch2, mergeLatch2.await(60, TimeUnit.SECONDS));
    Thread.sleep(5000);
    assertEquals(size, mergeLatch2.totalOperations());
    assertEquals(size, h10.getMap("default").size());
    assertEquals(size, h20.getMap("default").size());
    for (int i = 0; i < size; i++) {
        assertEquals("value" + i, h20.getMap("default").get(i));
    }
}
项目:hazelcast-archive    文件:WanReplicationTest.java   
@Test
public void testWANClustering() throws Exception {
    Config c1 = new Config();
    Config c2 = new Config();
    c1.getGroupConfig().setName("newyork");
    c1.addWanReplicationConfig(new WanReplicationConfig()
            .setName("my-wan")
            .addTargetClusterConfig(new WanTargetClusterConfig()
                    .addEndpoint("127.0.0.1:5702").setGroupName("london")));
    c1.getMapConfig("default").setWanReplicationRef(new WanReplicationRef()
            .setName("my-wan")
            .setMergePolicy(PassThroughMergePolicy.NAME));
    c2.getGroupConfig().setName("london");
    c2.addWanReplicationConfig(new WanReplicationConfig()
            .setName("my-wan")
            .addTargetClusterConfig(new WanTargetClusterConfig()
                    .addEndpoint("127.0.0.1:5701").setGroupName("newyork")));
    c2.getMapConfig("default").setWanReplicationRef(new WanReplicationRef()
            .setName("my-wan")
            .setMergePolicy(PassThroughMergePolicy.NAME));
    HazelcastInstance h1 = Hazelcast.newHazelcastInstance(c1);
    HazelcastInstance h2 = Hazelcast.newHazelcastInstance(c2);
    HazelcastInstance h12 = Hazelcast.newHazelcastInstance(c1);
    HazelcastInstance h13 = Hazelcast.newHazelcastInstance(c1);
    HazelcastInstance h22 = Hazelcast.newHazelcastInstance(c2);
    int size = 100;
    MergeLatch mergeLatch1 = new MergeLatch(2 * size);
    MergeLatch mergeLatch2 = new MergeLatch(size);
    getConcurrentMapManager(h1).addWanMergeListener(mergeLatch1);
    getConcurrentMapManager(h12).addWanMergeListener(mergeLatch1);
    getConcurrentMapManager(h13).addWanMergeListener(mergeLatch1);
    getConcurrentMapManager(h2).addWanMergeListener(mergeLatch2);
    getConcurrentMapManager(h22).addWanMergeListener(mergeLatch2);
    for (int i = 0; i < size; i++) {
        h2.getMap("default").put(i, "value" + i);
        h22.getMap("default").put(size + i, "value" + (size + i));
    }
    assertTrue("Latch state: " + mergeLatch1, mergeLatch1.await(60, TimeUnit.SECONDS));
    Thread.sleep(1000);
    assertEquals(0, mergeLatch2.totalOperations());
    assertEquals(2 * size, mergeLatch1.getUpdateCount());
    assertEquals(2 * size, mergeLatch1.totalOperations());
    assertEquals(2 * size, h2.getMap("default").size());
    assertEquals(2 * size, h1.getMap("default").size());
    assertEquals(2 * size, h12.getMap("default").size());
    assertEquals(2 * size, h13.getMap("default").size());
    assertEquals(2 * size, h22.getMap("default").size());
    mergeLatch1.reset();
    for (int i = 0; i < size / 2; i++) {
        h1.getMap("default").remove(i);
        h13.getMap("default").remove(size + i);
    }
    assertTrue("Latch state: " + mergeLatch2, mergeLatch2.await(60, TimeUnit.SECONDS));
    Thread.sleep(1000);
    assertEquals(size, mergeLatch2.getRemoveCount());
    assertEquals(size, mergeLatch2.totalOperations());
    assertEquals(0, mergeLatch1.totalOperations());
    assertEquals(size, h1.getMap("default").size());
    assertEquals(size, h2.getMap("default").size());
    assertEquals(size, h12.getMap("default").size());
    assertEquals(size, h13.getMap("default").size());
    assertEquals(size, h22.getMap("default").size());
}
项目:hazelcast-archive    文件:WanReplicationTest.java   
@Test
public void testWANClustering2() throws Exception {
    Config c1 = new Config();
    Config c2 = new Config();
    c1.getGroupConfig().setName("newyork");
    c1.addWanReplicationConfig(new WanReplicationConfig()
            .setName("my-wan")
            .addTargetClusterConfig(new WanTargetClusterConfig()
                    .addEndpoint("127.0.0.1:5703").setGroupName("london")));
    c1.getMapConfig("default").setWanReplicationRef(new WanReplicationRef()
            .setName("my-wan")
            .setMergePolicy(PassThroughMergePolicy.NAME));
    c2.getGroupConfig().setName("london");
    c2.addWanReplicationConfig(new WanReplicationConfig()
            .setName("my-wan")
            .addTargetClusterConfig(new WanTargetClusterConfig()
                    .addEndpoint("127.0.0.1:5701").setGroupName("newyork")));
    c2.getMapConfig("default").setWanReplicationRef(new WanReplicationRef()
            .setName("my-wan")
            .setMergePolicy(PassThroughMergePolicy.NAME));
    HazelcastInstance h10 = Hazelcast.newHazelcastInstance(c1);
    HazelcastInstance h11 = Hazelcast.newHazelcastInstance(c1);
    int size = 1000;
    HazelcastInstance h20 = Hazelcast.newHazelcastInstance(c2);
    HazelcastInstance h21 = Hazelcast.newHazelcastInstance(c2);
    HazelcastInstance h12 = Hazelcast.newHazelcastInstance(c1);
    MergeLatch mergeLatch1 = new MergeLatch(size);
    getConcurrentMapManager(h10).addWanMergeListener(mergeLatch1);
    getConcurrentMapManager(h11).addWanMergeListener(mergeLatch1);
    getConcurrentMapManager(h12).addWanMergeListener(mergeLatch1);
    MergeLatch mergeLatch2 = new MergeLatch(size);
    getConcurrentMapManager(h20).addWanMergeListener(mergeLatch2);
    getConcurrentMapManager(h21).addWanMergeListener(mergeLatch2);
    for (int i = 0; i < size; i++) {
        h11.getMap("default").put(i, "value" + i);
    }
    assertTrue("Latch state: " + mergeLatch2, mergeLatch2.await(60, TimeUnit.SECONDS));
    Thread.sleep(1000);
    assertEquals(size, mergeLatch2.totalOperations());
    assertEquals(0, mergeLatch1.totalOperations());
    assertEquals(size, h10.getMap("default").size());
    assertEquals(size, h20.getMap("default").size());
    assertEquals(size, h12.getMap("default").size());
    assertEquals(size, h11.getMap("default").size());
    assertEquals(size, h21.getMap("default").size());
    mergeLatch2.reset();
    for (int i = 0; i < size; i++) {
        h21.getMap("default").put(size + i, "value" + (size + i));
    }
    assertTrue("Latch state: " + mergeLatch1, mergeLatch1.await(60, TimeUnit.SECONDS));
    Thread.sleep(1000);
    assertEquals(size, mergeLatch1.totalOperations());
    assertEquals(0, mergeLatch2.totalOperations());
    assertEquals(2 * size, h10.getMap("default").size());
    assertEquals(2 * size, h20.getMap("default").size());
    assertEquals(2 * size, h12.getMap("default").size());
    assertEquals(2 * size, h11.getMap("default").size());
    assertEquals(2 * size, h21.getMap("default").size());
    mergeLatch1.reset(size / 2);
    mergeLatch2.reset(size / 2);
    for (int i = 0; i < size / 2; i++) {
        h10.getMap("default").remove(i);
        h21.getMap("default").remove(size + i);
    }
    assertTrue("Latch state: " + mergeLatch1, mergeLatch1.await(60, TimeUnit.SECONDS));
    assertTrue("Latch state: " + mergeLatch2, mergeLatch2.await(60, TimeUnit.SECONDS));
    Thread.sleep(1000);
    assertEquals(size / 2, mergeLatch1.totalOperations());
    assertEquals(size / 2, mergeLatch2.totalOperations());
    assertEquals(size, h10.getMap("default").size());
    assertEquals(size, h20.getMap("default").size());
    assertEquals(size, h12.getMap("default").size());
    assertEquals(size, h11.getMap("default").size());
    assertEquals(size, h21.getMap("default").size());
}
项目:health-and-care-developer-network    文件:WanReplicationTest.java   
@Test
public void testWANClustering() throws Exception {
    Config c1 = new Config();
    Config c2 = new Config();
    c1.getGroupConfig().setName("newyork");
    c1.addWanReplicationConfig(new WanReplicationConfig()
            .setName("my-wan")
            .addTargetClusterConfig(new WanTargetClusterConfig()
                    .addEndpoint("127.0.0.1:5702").setGroupName("london")));
    c1.getMapConfig("default").setWanReplicationRef(new WanReplicationRef()
            .setName("my-wan")
            .setMergePolicy(PassThroughMergePolicy.NAME));
    c2.getGroupConfig().setName("london");
    c2.addWanReplicationConfig(new WanReplicationConfig()
            .setName("my-wan")
            .addTargetClusterConfig(new WanTargetClusterConfig()
                    .addEndpoint("127.0.0.1:5701").setGroupName("newyork")));
    c2.getMapConfig("default").setWanReplicationRef(new WanReplicationRef()
            .setName("my-wan")
            .setMergePolicy(PassThroughMergePolicy.NAME));
    HazelcastInstance h1 = Hazelcast.newHazelcastInstance(c1);
    HazelcastInstance h2 = Hazelcast.newHazelcastInstance(c2);
    HazelcastInstance h12 = Hazelcast.newHazelcastInstance(c1);
    HazelcastInstance h13 = Hazelcast.newHazelcastInstance(c1);
    HazelcastInstance h22 = Hazelcast.newHazelcastInstance(c2);
    int size = 100;
    MergeLatch mergeLatch1 = new MergeLatch(2 * size);
    MergeLatch mergeLatch2 = new MergeLatch(size);
    getConcurrentMapManager(h1).addWanMergeListener(mergeLatch1);
    getConcurrentMapManager(h12).addWanMergeListener(mergeLatch1);
    getConcurrentMapManager(h13).addWanMergeListener(mergeLatch1);
    getConcurrentMapManager(h2).addWanMergeListener(mergeLatch2);
    getConcurrentMapManager(h22).addWanMergeListener(mergeLatch2);
    for (int i = 0; i < size; i++) {
        h2.getMap("default").put(i, "value" + i);
        h22.getMap("default").put(size + i, "value" + (size + i));
    }
    assertTrue("Latch state 1: " + mergeLatch1, mergeLatch1.await(60, TimeUnit.SECONDS));
    Thread.sleep(5000);
    assertEquals(0, mergeLatch2.totalOperations());
    assertEquals(2 * size, mergeLatch1.getUpdateCount());
    assertEquals(2 * size, mergeLatch1.totalOperations());
    assertEquals(2 * size, h2.getMap("default").size());
    assertEquals(2 * size, h1.getMap("default").size());
    assertEquals(2 * size, h12.getMap("default").size());
    assertEquals(2 * size, h13.getMap("default").size());
    assertEquals(2 * size, h22.getMap("default").size());
    mergeLatch1.reset();
    for (int i = 0; i < size / 2; i++) {
        h1.getMap("default").remove(i);
        h13.getMap("default").remove(size + i);
    }
    assertTrue("Latch state 2: " + mergeLatch2, mergeLatch2.await(60, TimeUnit.SECONDS));
    Thread.sleep(5000);
    assertEquals(size, mergeLatch2.getRemoveCount());
    assertEquals(size, mergeLatch2.totalOperations());
    assertEquals(0, mergeLatch1.totalOperations());
    assertEquals(size, h1.getMap("default").size());
    assertEquals(size, h2.getMap("default").size());
    assertEquals(size, h12.getMap("default").size());
    assertEquals(size, h13.getMap("default").size());
    assertEquals(size, h22.getMap("default").size());
}
项目:health-and-care-developer-network    文件:WanReplicationTest.java   
@Test
public void testWANClustering2() throws Exception {
    Config c1 = new Config();
    Config c2 = new Config();
    c1.getGroupConfig().setName("newyork");
    c1.addWanReplicationConfig(new WanReplicationConfig()
            .setName("my-wan")
            .addTargetClusterConfig(new WanTargetClusterConfig()
                    .addEndpoint("127.0.0.1:5703").setGroupName("london")));
    c1.getMapConfig("default").setWanReplicationRef(new WanReplicationRef()
            .setName("my-wan")
            .setMergePolicy(PassThroughMergePolicy.NAME));
    c2.getGroupConfig().setName("london");
    c2.addWanReplicationConfig(new WanReplicationConfig()
            .setName("my-wan")
            .addTargetClusterConfig(new WanTargetClusterConfig()
                    .addEndpoint("127.0.0.1:5701").setGroupName("newyork")));
    c2.getMapConfig("default").setWanReplicationRef(new WanReplicationRef()
            .setName("my-wan")
            .setMergePolicy(PassThroughMergePolicy.NAME));
    HazelcastInstance h10 = Hazelcast.newHazelcastInstance(c1);
    HazelcastInstance h11 = Hazelcast.newHazelcastInstance(c1);
    int size = 1000;
    HazelcastInstance h20 = Hazelcast.newHazelcastInstance(c2);
    HazelcastInstance h21 = Hazelcast.newHazelcastInstance(c2);
    HazelcastInstance h12 = Hazelcast.newHazelcastInstance(c1);
    MergeLatch mergeLatch1 = new MergeLatch(size);
    getConcurrentMapManager(h10).addWanMergeListener(mergeLatch1);
    getConcurrentMapManager(h11).addWanMergeListener(mergeLatch1);
    getConcurrentMapManager(h12).addWanMergeListener(mergeLatch1);
    MergeLatch mergeLatch2 = new MergeLatch(size);
    getConcurrentMapManager(h20).addWanMergeListener(mergeLatch2);
    getConcurrentMapManager(h21).addWanMergeListener(mergeLatch2);
    for (int i = 0; i < size; i++) {
        h11.getMap("default").put(i, "value" + i);
    }
    assertTrue("Latch state: " + mergeLatch2, mergeLatch2.await(60, TimeUnit.SECONDS));
    Thread.sleep(5000);
    assertEquals(size, mergeLatch2.totalOperations());
    assertEquals(0, mergeLatch1.totalOperations());
    assertEquals(size, h10.getMap("default").size());
    assertEquals(size, h20.getMap("default").size());
    assertEquals(size, h12.getMap("default").size());
    assertEquals(size, h11.getMap("default").size());
    assertEquals(size, h21.getMap("default").size());
    mergeLatch2.reset();
    for (int i = 0; i < size; i++) {
        h21.getMap("default").put(size + i, "value" + (size + i));
    }
    assertTrue("Latch state: " + mergeLatch1, mergeLatch1.await(60, TimeUnit.SECONDS));
    Thread.sleep(5000);
    assertEquals(size, mergeLatch1.totalOperations());
    assertEquals(0, mergeLatch2.totalOperations());
    assertEquals(2 * size, h10.getMap("default").size());
    assertEquals(2 * size, h20.getMap("default").size());
    assertEquals(2 * size, h12.getMap("default").size());
    assertEquals(2 * size, h11.getMap("default").size());
    assertEquals(2 * size, h21.getMap("default").size());
    mergeLatch1.reset(size / 2);
    mergeLatch2.reset(size / 2);
    for (int i = 0; i < size / 2; i++) {
        h10.getMap("default").remove(i);
        h21.getMap("default").remove(size + i);
    }
    assertTrue("Latch state: " + mergeLatch1, mergeLatch1.await(60, TimeUnit.SECONDS));
    assertTrue("Latch state: " + mergeLatch2, mergeLatch2.await(60, TimeUnit.SECONDS));
    Thread.sleep(5000);
    assertEquals(size / 2, mergeLatch1.totalOperations());
    assertEquals(size / 2, mergeLatch2.totalOperations());
    assertEquals(size, h10.getMap("default").size());
    assertEquals(size, h20.getMap("default").size());
    assertEquals(size, h12.getMap("default").size());
    assertEquals(size, h11.getMap("default").size());
    assertEquals(size, h21.getMap("default").size());
}