/** * Apply an entry processor to an entry. * * @param <T> * the return type of the processor * @param id * the ID of the entry * @param applyOnBackup * whether to apply the processor on backup entries * @param processor * the processor * @return the result */ @SuppressWarnings("unchecked") static <T> T execute(GlobalID id, boolean applyOnBackup, EntryProcessor<T> processor) { try { return (T) GlobalRuntimeImpl.getRuntime().resilientFinishMap.executeOnKey( id, new AbstractEntryProcessor<GlobalID, ResilientFinishState>( applyOnBackup) { private static final long serialVersionUID = -8787905766218374656L; @Override public T process(Map.Entry<GlobalID, ResilientFinishState> entry) { return processor.process(entry); } }); } catch (final DeadPlaceError | HazelcastInstanceNotActiveException e) { // this place is dead for the world System.exit(42); throw e; } }
@Nonnull @SuppressWarnings("unchecked") public static ProcessorMetaSupplier writeMapP(@Nonnull String name, @Nullable ClientConfig clientConfig) { boolean isLocal = clientConfig == null; return dontParallelize(new HazelcastWriterSupplier<>( serializableConfig(clientConfig), index -> new ArrayMap(), ArrayMap::add, instance -> { IMap map = instance.getMap(name); return buffer -> { try { map.putAll(buffer); } catch (HazelcastInstanceNotActiveException e) { handleInstanceNotActive(instance, e, isLocal); } buffer.clear(); }; }, noopConsumer() )); }
@Nonnull public static ProcessorMetaSupplier writeListP(@Nonnull String name, @Nullable ClientConfig clientConfig) { boolean isLocal = clientConfig == null; return dontParallelize(new HazelcastWriterSupplier<>( serializableConfig(clientConfig), index -> new ArrayList<>(), ArrayList::add, instance -> { IList<Object> list = instance.getList(name); return buffer -> { try { list.addAll(buffer); } catch (HazelcastInstanceNotActiveException e) { handleInstanceNotActive(instance, e, isLocal); } buffer.clear(); }; }, noopConsumer() )); }
@Override protected boolean tryProcess(int ordinal, @Nonnull Object object) throws Exception { checkError(); if (!tryIncrement(numConcurrentOps, 1, MAX_PARALLEL_ASYNC_OPS)) { return false; } try { T item = (T) object; EntryProcessor<K, V> entryProcessor = toEntryProcessorFn.apply(item); K key = toKeyFn.apply(item); map.submitToKey(key, entryProcessor, callback); return true; } catch (HazelcastInstanceNotActiveException e) { handleInstanceNotActive(instance, e, isLocal); return false; } }
private void scanJobs() { if (!shouldStartJobs()) { return; } try { Collection<JobRecord> jobs = jobRepository.getJobRecords(); jobs.forEach(this::startJobIfNotStartedOrCompleted); performCleanup(); } catch (Exception e) { if (e instanceof HazelcastInstanceNotActiveException) { return; } logger.severe("Scanning jobs failed", e); } }
public void stopServer() { LOG.info("Unbinding server from the configured ports"); m_acceptor.close(); LOG.trace("Stopping MQTT protocol processor"); m_processorBootstrapper.shutdown(); m_initialized = false; if (hazelcastInstance != null) { LOG.trace("Stopping embedded Hazelcast instance"); try { hazelcastInstance.shutdown(); } catch (HazelcastInstanceNotActiveException e) { LOG.warn("embedded Hazelcast instance is already shut down."); } } scheduler.shutdown(); LOG.info("Moquette server has been stopped."); }
private static void handleInstanceNotActive( HazelcastInstance instance, HazelcastInstanceNotActiveException e, boolean isLocal ) { if (isLocal) { // if we are writing to a local instance, we can safely ignore this exception // as the job will eventually restart on its own. instance.getLoggingService().getLogger(HazelcastWriters.class).fine( "Ignoring HazelcastInstanceNotActiveException from local cluster as the job will be" + " restarted automatically.", e); return; } throw e; }
static DistributedFunction<HazelcastInstance, DistributedConsumer<ArrayMap>> flushToCache( String name, boolean isLocal ) { return instance -> { ICache cache = instance.getCacheManager().getCache(name); return buffer -> { try { cache.putAll(buffer); } catch (HazelcastInstanceNotActiveException e) { handleInstanceNotActive(instance, e, isLocal); } buffer.clear(); }; }; }
public static boolean isTopologicalFailure(Object t) { return t instanceof TopologyChangedException || t instanceof MemberLeftException || t instanceof TargetNotMemberException || t instanceof CallerNotMemberException || t instanceof HazelcastInstanceNotActiveException; }
private static boolean isInstanceActive(JetInstance instance) { if (instance.getHazelcastInstance() instanceof HazelcastInstanceProxy) { try { ((HazelcastInstanceProxy) instance.getHazelcastInstance()).getOriginal(); return true; } catch (HazelcastInstanceNotActiveException exception) { return false; } } else if (instance.getHazelcastInstance() instanceof HazelcastInstanceImpl) { return getNode(instance.getHazelcastInstance()).getState() == NodeState.ACTIVE; } else { throw new AssertionError("Unsupported HazelcastInstance type"); } }
@Test public void when_nonCoordinatorLeavesDuringExecution_then_jobRestarts() throws Throwable { // Given DAG dag = new DAG().vertex(new Vertex("test", new MockPS(StuckProcessor::new, nodeCount))); // When Job job = instances[0].newJob(dag); StuckProcessor.executionStarted.await(); instances[2].getHazelcastInstance().getLifecycleService().terminate(); StuckProcessor.proceedLatch.countDown(); job.join(); // upon non-coordinator member leave, remaining members restart and complete the job final int count = nodeCount * 2 - 1; assertEquals(count, MockPS.initCount.get()); assertTrueEventually(new AssertTask() { @Override public void run() throws Exception { assertEquals(count, MockPS.completeCount.get()); assertEquals(nodeCount, MockPS.completeErrors.size()); for (int i = 0; i < MockPS.completeErrors.size(); i++) { Throwable error = MockPS.completeErrors.get(i); assertTrue(error instanceof TopologyChangedException || error instanceof HazelcastInstanceNotActiveException); } } }); }
@Override public void clearStore() { try { cache.clear(); } catch (HazelcastInstanceNotActiveException e) { log.debug("Hazelcast instance inactive during cache clearing", e); } }
@Override public void close() throws StorageException { try { cache.destroy(); } catch (HazelcastInstanceNotActiveException e) { log.debug("Hazelcast instance inactive during cache deletion", e); } }
private void failIfNotRunning() { if (!nodeEngine.isRunning()) { throw new HazelcastInstanceNotActiveException(); } }
@Nonnull @SuppressWarnings("unchecked") public static <T, K, V> ProcessorMetaSupplier updateMapP( @Nonnull String name, @Nullable ClientConfig clientConfig, @Nonnull DistributedFunction<T, K> toKeyFn, @Nonnull DistributedBiFunction<V, T, V> updateFn ) { boolean isLocal = clientConfig == null; return dontParallelize(new HazelcastWriterSupplier<>( serializableConfig(clientConfig), index -> new ArrayList<>(), ArrayList::add, instance -> { IMap map = instance.getMap(name); Map<K, T> tmpMap = new HashMap<>(); ApplyFnEntryProcessor<K, V, T> entryProcessor = new ApplyFnEntryProcessor<>(tmpMap, updateFn); return buffer -> { try { if (buffer.isEmpty()) { return; } for (Object object : buffer) { T item = (T) object; K key = toKeyFn.apply(item); // on duplicate key, we'll flush immediately if (tmpMap.containsKey(key)) { map.executeOnKeys(tmpMap.keySet(), entryProcessor); tmpMap.clear(); } tmpMap.put(key, item); } map.executeOnKeys(tmpMap.keySet(), entryProcessor); tmpMap.clear(); } catch (HazelcastInstanceNotActiveException e) { handleInstanceNotActive(instance, e, isLocal); } buffer.clear(); }; }, noopConsumer() )); }
@Override public void shutdown(boolean terminate) { jobExecutionService.reset("shutdown", HazelcastInstanceNotActiveException::new); networking.shutdown(); taskletExecutionService.shutdown(); }
@Test public void when_coordinatorLeavesDuringExecution_then_jobCompletes() throws Throwable { // Given DAG dag = new DAG().vertex(new Vertex("test", new MockPS(StuckProcessor::new, nodeCount))); // When Long jobId = null; try { Job job = instances[0].newJob(dag); Future<Void> future = job.getFuture(); jobId = job.getId(); StuckProcessor.executionStarted.await(); instances[0].getHazelcastInstance().getLifecycleService().terminate(); StuckProcessor.proceedLatch.countDown(); future.get(); fail(); } catch (ExecutionException expected) { assertTrue(expected.getCause() instanceof HazelcastInstanceNotActiveException); } // Then assertNotNull(jobId); final long completedJobId = jobId; JobRepository jobRepository = getJetService(instances[1]).getJobRepository(); assertTrueEventually(new AssertTask() { @Override public void run() throws Exception { JobResult jobResult = jobRepository.getJobResult(completedJobId); assertNotNull(jobResult); assertTrue(jobResult.isSuccessful()); } }); final int count = liteMemberFlags[0] ? (2 * nodeCount) : (2 * nodeCount - 1); assertEquals(count, MockPS.initCount.get()); assertTrueEventually(() -> { assertEquals(count, MockPS.completeCount.get()); assertEquals(nodeCount, MockPS.completeErrors.size()); for (int i = 0; i < MockPS.completeErrors.size(); i++) { Throwable error = MockPS.completeErrors.get(i); assertTrue(error instanceof TopologyChangedException || error instanceof HazelcastInstanceNotActiveException); } }); }