@Test public void testBackpressure() throws Exception { DAG dag = new DAG(); final int member1Port = jet1.getCluster().getLocalMember().getAddress().getPort(); final Member member2 = jet2.getCluster().getLocalMember(); final int ptionOwnedByMember2 = jet1.getHazelcastInstance().getPartitionService() .getPartitions().stream() .filter(p -> p.getOwner().equals(member2)) .map(Partition::getPartitionId) .findAny() .orElseThrow(() -> new RuntimeException("Can't find a partition owned by member " + jet2)); Vertex source = dag.newVertex("source", ProcessorMetaSupplier.of((Address address) -> ProcessorSupplier.of(address.getPort() == member1Port ? GenerateP::new : noopP()) )); Vertex hiccup = dag.newVertex("hiccup", HiccupP::new); Vertex sink = dag.newVertex("sink", SinkProcessors.writeMapP("counts")); dag.edge(between(source, hiccup) .distributed().partitioned(wholeItem(), (x, y) -> ptionOwnedByMember2)) .edge(between(hiccup, sink)); jet1.newJob(dag).join(); assertCounts(jet1.getMap("counts")); }
@ManagedOperation(description="Return Document Location Info") @ManagedOperationParameters({ @ManagedOperationParameter(name = "uri", description = "Document identifier")}) public CompositeData getDocumentLocation(String uri) { try { Properties props = new Properties(); props.setProperty("bdb.document.headers", String.valueOf(DocumentAccessor.HDR_URI)); DocumentAccessor doc = docManager.getDocument(uri, props); CompositeData result = null; if (doc != null) { Partition part = hzClient.getPartitionService().getPartition(doc.getUri().hashCode()); Map<String, Object> location = new HashMap<>(2); location.put("partition", part.getPartitionId()); location.put("owner", part.getOwner().toString()); result = JMXUtils.mapToComposite("document", "Document Location", location); } logger.debug("getDocumentLocation; returning: {}", result); return result; } catch (BagriException ex) { logger.error("getDocumentLocation.error: {}", ex.getMessage(), ex); throw new RuntimeException(ex.getMessage()); } }
@Override protected void execute(String... args) { withHazelcast(hazelcast -> { String name = "default"; IMap<String, Integer> map = hazelcast.getMap(name); if (args.length > 0) { if ("master".equals(args[0])) { IntStream.rangeClosed(1, 10).forEach(i -> map.put("key" + i, i)); } } readConsoleWhile(hazelcast, name, () -> { map .keySet() .forEach(k -> { Partition partition = hazelcast.getPartitionService().getPartition(k); show("key = %s, partitionId = %d, owner = %s.", k, partition.getPartitionId(), partition.getOwner()); }); return null; }, map::size); }); }
@Test public void liteMemberWithNormalMember() { withHazelcast(hasDataHazelcast -> { withLiteMember(liteHazelcast -> { Map<String, String> map = liteHazelcast.getMap("default"); map.put("key", "value"); assertThat(map) .containsExactly(MapEntry.entry("key", "value")); assertThat(liteHazelcast.getConfig().isLiteMember()).isTrue(); assertThat(liteHazelcast.getCluster().getMembers()).hasSize(2); Set<Partition> partitions = liteHazelcast.getPartitionService().getPartitions(); assertThat(partitions).hasSize(271); assertThat(partitions.stream().map(p -> p.getOwner()).distinct().count()).isEqualTo(1); Partition partition = partitions.stream().findAny().get(); assertThat(partition.getOwner()) .isEqualTo(hasDataHazelcast.getCluster().getLocalMember()) .isNotEqualTo(liteHazelcast.getCluster().getLocalMember()); }); }); }
@Test public void normalHazelcastCluster() { withHazelcast(2, hazelcast -> { Map<String, String> map = hazelcast.getMap("default"); map.put("key", "value"); assertThat(map) .containsExactly(MapEntry.entry("key", "value")); assertThat(hazelcast.getConfig().isLiteMember()).isFalse(); Set<Partition> partitions = hazelcast.getPartitionService().getPartitions(); assertThat(partitions).hasSize(271); assertThat(partitions.stream().map(p -> p.getOwner()).distinct().count()).isEqualTo(2); }); }
@Test public void preConfigurationedInstances() { withHazelcast("hazelcast-datamember.xml", 2, hasDataHazelcast -> { withHazelcast("hazelcast-litemember.xml", liteHazelcast -> { Map<String, String> map = liteHazelcast.getMap("default"); map.put("key", "value"); assertThat(map).containsExactly(MapEntry.entry("key", "value")); try { TimeUnit.SECONDS.sleep(5L); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } assertThat(map).isEmpty(); assertThat(liteHazelcast.getCluster().getMembers()) .hasSize(3); assertThat(liteHazelcast.getConfig().isLiteMember()).isTrue(); Set<Partition> partitions = liteHazelcast.getPartitionService().getPartitions(); assertThat(partitions).hasSize(271); assertThat(partitions.stream().map(p -> p.getOwner()).distinct().count()).isEqualTo(2); }); }); }
@Override public final K next() { for (; ; ) { K key = generateKey(); Partition partition = partitionService.getPartition(key); Set<K> keys = keysPerPartition[partition.getPartitionId()]; if (keys == null) { continue; } if (keys.contains(key)) { continue; } if (keys.size() == maxKeysPerPartition) { continue; } keys.add(key); return key; } }
@Test public void testGenerateIntKeys_whenRandom_equalDistributionOverPartitions() { int keysPerPartition = 4; int keyCount = keysPerPartition * PARTITION_COUNT; int[] keys = generateIntKeys(keyCount, KeyLocality.RANDOM, hz); assertEquals(keyCount, keys.length); int[] countPerPartition = new int[PARTITION_COUNT]; for (Integer key : keys) { Partition partition = hz.getPartitionService().getPartition(key); countPerPartition[partition.getPartitionId()]++; } for (int count : countPerPartition) { assertEquals(keysPerPartition, count); } }
@Test public void testGenerateIntegerKeys_whenRandom_equalDistributionOverPartitions() { int keysPerPartition = 4; int keyCount = keysPerPartition * PARTITION_COUNT; Integer[] keys = generateIntegerKeys(keyCount, KeyLocality.RANDOM, hz); assertEquals(keyCount, keys.length); int[] countPerPartition = new int[PARTITION_COUNT]; for (Integer key : keys) { Partition partition = hz.getPartitionService().getPartition(key); countPerPartition[partition.getPartitionId()]++; } for (int count : countPerPartition) { assertEquals(keysPerPartition, count); } }
@Test public void testGenerateStringKeys_whenRandom_equalDistributionOverPartitions() { int keysPerPartition = 4; int keyCount = keysPerPartition * PARTITION_COUNT; String[] keys = generateStringKeys("prefix", keyCount, KeyLocality.RANDOM, hz); assertEquals(keyCount, keys.length); int[] countPerPartition = new int[PARTITION_COUNT]; for (String key : keys) { Partition partition = hz.getPartitionService().getPartition(key); countPerPartition[partition.getPartitionId()]++; assertTrue(key.startsWith("prefix")); } for (int count : countPerPartition) { assertEquals(keysPerPartition, count); } }
public static void warmupPartitions(HazelcastInstance hazelcastInstance) { LOGGER.info("Waiting for partition warmup"); PartitionService partitionService = hazelcastInstance.getPartitionService(); long started = System.nanoTime(); for (Partition partition : partitionService.getPartitions()) { if (System.nanoTime() - started > PARTITION_WARMUP_TIMEOUT_NANOS) { throw new IllegalStateException("Partition warmup timeout. Partitions didn't get an owner in time"); } while (partition.getOwner() == null) { LOGGER.debug("Partition owner is not yet set for partitionId: " + partition.getPartitionId()); sleepMillisThrowException(PARTITION_WARMUP_SLEEP_INTERVAL_MILLIS); } } LOGGER.info("Partitions are warmed up successfully"); }
@Override public void init(@Nonnull Context context) { addrToPartitions = context.jetInstance().getHazelcastInstance().getPartitionService().getPartitions() .stream() .collect(groupingBy(p -> p.getOwner().getAddress(), mapping(Partition::getPartitionId, toList()))); }
@Override protected void execute(String... args) { withHazelcast(hazelcast -> { String name = "default"; MultiMap<String, Integer> map = hazelcast.getMultiMap(name); if (args.length > 0) { if ("master".equals(args[0])) { IntStream .rangeClosed(1, 10) .forEach(i -> IntStream.rangeClosed(1, 5).forEach(j -> map.put("key" + i, j))); } } readConsoleWhile(hazelcast, name, () -> { map .keySet() .forEach(k -> { Partition partition = hazelcast.getPartitionService().getPartition(k); show("key = %s, values = %s, partitionId = %d, owner = %s.", k, map.get(k), partition.getPartitionId(), partition.getOwner()); }); return null; }, map::size); }); }
@Override protected void execute(String... args) { String name = "default"; Configuration<String, Integer> configuration = new MutableConfiguration<String, Integer>() .setTypes(String.class, Integer.class); try (CachingProvider cachingProvider = Caching.getCachingProvider(); CacheManager cacheManager = cachingProvider.getCacheManager(); Cache<String, Integer> cache = cacheManager.createCache(name, configuration)) { HazelcastCacheManager hazelcastCacheManager = cacheManager.unwrap(HazelcastServerCacheManager.class); HazelcastInstance hazelcast = hazelcastCacheManager.getHazelcastInstance(); ICache<String, Integer> hazelcastCache = cache.unwrap(ICache.class); if (args.length > 0) { if ("master".equals(args[0])) { IntStream.rangeClosed(1, 10).forEach(i -> hazelcastCache.put("key" + i, i)); } } readConsoleWhile(hazelcast, name, () -> { StreamSupport.stream(hazelcastCache.spliterator(), false) .forEach(entry -> { String k = entry.getKey(); Partition partition = hazelcast.getPartitionService().getPartition(k); show("key = %s, partitionId = %d, owner = %s.", k, partition.getPartitionId(), partition.getOwner()); }); return null; }, hazelcastCache::size); } }
/** * Returns the next {@code long} key owned by the given Hazelcast instance. * * @param instance Hazelcast instance to search next key for * @param lastKey last key to start search from * @return next key owned by given Hazelcast instance */ public static long nextKeyOwnedBy(HazelcastInstance instance, long lastKey) { Member localMember = instance.getCluster().getLocalMember(); PartitionService partitionService = instance.getPartitionService(); while (true) { Partition partition = partitionService.getPartition(lastKey); if (localMember.equals(partition.getOwner())) { return lastKey; } lastKey++; } }
/** * Checks if a key is located on a Hazelcast instance. * * @param instance the HazelcastInstance the key should belong to * @param key the key to check * @return <tt>true</tt> if the key belongs to the Hazelcast instance, <tt>false</tt> otherwise */ public static boolean isLocalKey(HazelcastInstance instance, Object key) { PartitionService partitionService = instance.getPartitionService(); Partition partition = partitionService.getPartition(key); Member owner; while (true) { owner = partition.getOwner(); if (owner != null) { break; } sleepSeconds(1); } return owner.equals(instance.getLocalEndpoint()); }
private void verifyHasPartitions(Set<Integer> targetPartitions) { if (targetPartitions.isEmpty()) { Map<Member, Integer> partitionsPerMember = new HashMap<Member, Integer>(); for (Partition partition : partitionService.getPartitions()) { Member owner = partition.getOwner(); if (owner == null) { throw new IllegalStateException("Owner is null for partition: " + partition); } Integer value = partitionsPerMember.get(owner); Integer result = value == null ? 1 : value + 1; partitionsPerMember.put(owner, result); } throw new IllegalStateException("No partitions found, partitionsPerMember: " + partitionsPerMember); } }
private void initLocal(Set<Partition> partitions) { addrToPartitions = partitions.stream() .collect(groupingBy(p -> p.getOwner().getAddress(), mapping(Partition::getPartitionId, toList()))); }
private int partitionId(@Nonnull String sequencerName) { Partition partition = partitionService.getPartition(sequencerName); return partition.getPartitionId(); }
private static void run() { HazelcastInstance hzInstance1 = Hazelcast.newHazelcastInstance(); HazelcastInstance hzInstance2 = Hazelcast.newHazelcastInstance(); String hz1PartitionKey = hzInstance1.getPartitionService() .randomPartitionKey(); String queueName = QNAME + "@" + hz1PartitionKey; // who is queue owner? HazelcastInstance ownerInstance; HazelcastInstance secondInstance; Partition partition = hzInstance1.getPartitionService().getPartition(hz1PartitionKey); if (hzInstance1.getCluster().getLocalMember().equals(partition.getOwner())) { ownerInstance = hzInstance1; secondInstance = hzInstance2; } else { ownerInstance = hzInstance2; secondInstance = hzInstance1; } IQueue<Integer> queue = secondInstance.getQueue(queueName); long startTime = System.currentTimeMillis(); int i = 0; while (i++ < 100000) { if (i % 10000 == 0) { logger.info("add " + Integer.toString(i) + "\t" + String.format("%8.3f", (double) (System.currentTimeMillis() - startTime) / i)); } queue.add(i); } startTime = System.currentTimeMillis(); i = 0; while (i++ < 100000) { if (i % 10000 == 0) { logger.info("poll " + Integer.toString(i) + "\t" + String.format("%8.3f", (double) (System.currentTimeMillis() - startTime) / i)); } Integer intVal = queue.poll(); if (intVal == null || intVal.intValue() != i) { logger.info("Error: " + (intVal) + "!=" + i); System.exit(-1); } } }
private static void run() { HazelcastInstance hzInstance1 = Hazelcast.newHazelcastInstance(); HazelcastInstance hzInstance2 = Hazelcast.newHazelcastInstance(); String hz1PartitionKey = hzInstance1.getPartitionService() .randomPartitionKey(); String queueName = QNAME + "@" + hz1PartitionKey; // who is queue owner? HazelcastInstance ownerInstance; HazelcastInstance secondInstance; Partition partition = hzInstance1.getPartitionService().getPartition(hz1PartitionKey); if (hzInstance1.getCluster().getLocalMember().equals(partition.getOwner())) { ownerInstance = hzInstance1; secondInstance = hzInstance2; } else { ownerInstance = hzInstance2; secondInstance = hzInstance1; } IQueue<Integer> queue = ownerInstance.getQueue(queueName); long startTime = System.currentTimeMillis(); int i = 0; while (i++ < 100000) { if (i % 10000 == 0) { logger.info("add " + Integer.toString(i) + "\t" + String.format("%8.3f", (double) (System.currentTimeMillis() - startTime) / i)); } queue.add(i); } startTime = System.currentTimeMillis(); i = 0; while (i++ < 100000) { if (i % 10000 == 0) { logger.info("poll " + Integer.toString(i) + "\t" + String.format("%8.3f", (double) (System.currentTimeMillis() - startTime) / i)); } Integer intVal = queue.poll(); if (intVal == null || intVal.intValue() != i) { logger.info("Error: " + (intVal) + "!=" + i); System.exit(-1); } } }
private static void run() { HazelcastInstance hzInstance1 = Hazelcast.newHazelcastInstance(createConfig("1")); HazelcastInstance hzInstance2 = Hazelcast.newHazelcastInstance(createConfig("2")); String hz1PartitionKey = hzInstance1.getPartitionService() .randomPartitionKey(); String queueName = QNAME + "@" + hz1PartitionKey; // who is queue owner? HazelcastInstance ownerInstance; HazelcastInstance secondInstance; Partition partition = hzInstance1.getPartitionService().getPartition(hz1PartitionKey); if (hzInstance1.getCluster().getLocalMember().equals(partition.getOwner())) { ownerInstance = hzInstance1; secondInstance = hzInstance2; } else { ownerInstance = hzInstance2; secondInstance = hzInstance1; } IQueue<Integer> queue = ownerInstance.getQueue(queueName); long startTime = System.currentTimeMillis(); int i = 0; while (i++ < 100000) { if (i % 10000 == 0) { logger.info("add " + Integer.toString(i) + "\t" + String.format("%8.3f", (double) (System.currentTimeMillis() - startTime) / i)); } queue.add(i); } ownerInstance.shutdown(); queue = secondInstance.getQueue(queueName); startTime = System.currentTimeMillis(); int breakOrder = 0; HashMap<Long, Integer> queueItems = new HashMap<Long, Integer>(); i = 0; while (i++ < 100000) { if (i % 10000 == 0) { logger.info("poll " + Integer.toString(i) + "\t" + String.format("%8.3f", (double) (System.currentTimeMillis() - startTime) / i)); } Integer intVal = queue.poll(); if (intVal != null) { queueItems.put((long) i, intVal); if (intVal.intValue() != i) { breakOrder++; } } } if (breakOrder > 0) { logger.info("Error: mixed order of items - " + breakOrder); } if (queueItems.size() != 100000) { logger.info("Error: missed queue items - " + (100000 - queueItems.size())); } }
private static void run() { HazelcastInstance hzInstance1 = Hazelcast.newHazelcastInstance(); HazelcastInstance hzInstance2 = Hazelcast.newHazelcastInstance(); String hz1PartitionKey = hzInstance1.getPartitionService() .randomPartitionKey(); String queueName = QNAME + "@" + hz1PartitionKey; // who is queue owner? HazelcastInstance ownerInstance; HazelcastInstance secondInstance; Partition partition = hzInstance1.getPartitionService().getPartition(hz1PartitionKey); if (hzInstance1.getCluster().getLocalMember().equals(partition.getOwner())) { ownerInstance = hzInstance1; secondInstance = hzInstance2; } else { ownerInstance = hzInstance2; secondInstance = hzInstance1; } IQueue<Integer> queue = ownerInstance.getQueue(queueName); long startTime = System.currentTimeMillis(); int i = 0; while (i++ < 100000) { if (i % 10000 == 0) { logger.info("add " + Integer.toString(i) + "\t" + String.format("%8.3f", (double) (System.currentTimeMillis() - startTime) / i)); } queue.add(i); } ownerInstance.shutdown(); queue = secondInstance.getQueue(queueName); startTime = System.currentTimeMillis(); i = 0; while (i++ < 100000) { if (i % 10000 == 0) { logger.info("poll " + Integer.toString(i) + "\t" + String.format("%8.3f", (double) (System.currentTimeMillis() - startTime) / i)); } Integer intVal = queue.poll(); if (intVal == null || intVal.intValue() != i) { logger.info("Error: " + (intVal) + "!=" + i); System.exit(-1); } } }
private static void run() { HazelcastInstance hzInstance1 = Hazelcast.newHazelcastInstance(createConfig("1")); HazelcastInstance hzInstance2 = Hazelcast.newHazelcastInstance(createConfig("2")); String hz1PartitionKey = hzInstance1.getPartitionService() .randomPartitionKey(); String queueName = QNAME + "@" + hz1PartitionKey; // who is queue owner? HazelcastInstance ownerInstance; HazelcastInstance secondInstance; Partition partition = hzInstance1.getPartitionService().getPartition(hz1PartitionKey); if (hzInstance1.getCluster().getLocalMember().equals(partition.getOwner())) { ownerInstance = hzInstance1; secondInstance = hzInstance2; } else { ownerInstance = hzInstance2; secondInstance = hzInstance1; } IQueue<Integer> queue = ownerInstance.getQueue(queueName); long startTime = System.currentTimeMillis(); int i = 0; while (i++ < 100000) { if (i % 10000 == 0) { logger.info("add " + Integer.toString(i) + "\t" + String.format("%8.3f", (double) (System.currentTimeMillis() - startTime) / i)); } queue.add(i); } ownerInstance.shutdown(); queue = secondInstance.getQueue(queueName); Integer intVal = queue.poll(); if (intVal != null) { logger.info("Error: Queue should be empty"); System.exit(-1); } }