@Override public void onReceive(Object message) { if (message instanceof DeadLetter) { Object innerMessage = ((DeadLetter) message).message(); logger.warn(".onReceive: encountered dead letter: {}", innerMessage); if (innerMessage instanceof RegisterWorker){ logger.error("worker could not register to supervisor, shutting down"); BatchApplication.exit(); } } }
@Override public Receive createReceive() { return ReceiveBuilder.create().match(DeadLetter.class, msg -> { System.out.println(msg); }).matchAny(msg -> { System.out.println(msg); }).build(); }
@SuppressWarnings({ "rawtypes", "unchecked" }) @Test public void testDataChangedWithNoSender() { new JavaTestKit(getSystem()) { { final AsyncDataChangeEvent mockChangeEvent = Mockito.mock(AsyncDataChangeEvent.class); final AsyncDataChangeListener mockListener = Mockito.mock(AsyncDataChangeListener.class); final Props props = DataChangeListener.props(mockListener, TEST_PATH); final ActorRef subject = getSystem().actorOf(props, "testDataChangedWithNoSender"); getSystem().eventStream().subscribe(getRef(), DeadLetter.class); subject.tell(new DataChanged(mockChangeEvent), ActorRef.noSender()); // Make sure no DataChangedReply is sent to DeadLetters. while (true) { DeadLetter deadLetter; try { deadLetter = expectMsgClass(duration("1 seconds"), DeadLetter.class); } catch (AssertionError e) { // Timed out - got no DeadLetter - this is good break; } // We may get DeadLetters for other messages we don't care // about. Assert.assertFalse("Unexpected DataChangedReply", deadLetter.message() instanceof DataChangedReply); } } }; }
@Test public void testDataChangedWithNoSender() { new JavaTestKit(getSystem()) { { final DataTreeCandidate mockTreeCandidate = Mockito.mock(DataTreeCandidate.class); final ImmutableList<DataTreeCandidate> mockCandidates = ImmutableList.of(mockTreeCandidate); final DOMDataTreeChangeListener mockListener = Mockito.mock(DOMDataTreeChangeListener.class); final Props props = DataTreeChangeListenerActor.props(mockListener, TEST_PATH); final ActorRef subject = getSystem().actorOf(props, "testDataTreeChangedWithNoSender"); getSystem().eventStream().subscribe(getRef(), DeadLetter.class); subject.tell(new DataTreeChanged(mockCandidates), ActorRef.noSender()); // Make sure no DataChangedReply is sent to DeadLetters. while (true) { DeadLetter deadLetter; try { deadLetter = expectMsgClass(duration("1 seconds"), DeadLetter.class); } catch (AssertionError e) { // Timed out - got no DeadLetter - this is good break; } // We may get DeadLetters for other messages we don't care // about. Assert.assertFalse("Unexpected DataTreeChangedReply", deadLetter.message() instanceof DataTreeChangedReply); } } }; }
@Test public void shouldSendMsgToDeadLetterWhenQueueIsFull() throws InterruptedException { final JavaTestKit mockReceiver = new JavaTestKit(actorSystem); actorSystem.eventStream().subscribe(mockReceiver.getRef(), DeadLetter.class); final FiniteDuration twentySeconds = new FiniteDuration(20, TimeUnit.SECONDS); ActorRef pingPongActor = actorSystem.actorOf(PingPongActor.props(lock).withMailbox(config.getMailBoxName()), "pingpongactor"); actorSystem.mailboxes().settings(); lock.lock(); try { //queue capacity = 10 //need to send 12 messages; 1 message is dequeued and actor waits on lock, //2nd to 11th messages are put on the queue //12th message is sent to dead letter. for (int i = 0; i < 12; i++) { pingPongActor.tell("ping", mockReceiver.getRef()); } mockReceiver.expectMsgClass(twentySeconds, DeadLetter.class); } finally { lock.unlock(); } mockReceiver.receiveN(11, twentySeconds); }
public DeadLetterActor() { receive(ReceiveBuilder.match(DeadLetter.class, msg -> { LOGGER.warn("Following message fail to be distributed from {} to {} : {}", msg.sender().path(), msg.recipient().path(), msg.message().toString()); }).build()); }
@Override public void onReceive(Object message) throws Exception { if (message instanceof DeadLetter) { DeadLetter deadLetter = (DeadLetter) message; writeDeadLetterToFile(deadLetter); } else { String className = message != null ? message.getClass().getName() : "NULL"; String value = message != null ? message.toString() : "NULL"; log.error(String.format("Message sent to DeadLetter but it is not one (Class: %s, Value: %s)", className, value)); } }
/** * Write the dead letters to the file system * * @param deadLetter * a dead letter */ private void writeDeadLetterToFile(DeadLetter deadLetter) { String fileName = String.format(OUTPUT_FILE_NAME_TEMPLATE, deadLetter.recipient().path().name(), new Date(), ".log"); BufferedWriter bWriter = null; try { bWriter = new BufferedWriter(new FileWriter(new File(getDeadLetterFileSystem(), fileName))); // Serialize the message as XML ByteArrayOutputStream baOut = new ByteArrayOutputStream(); XMLEncoder encoder = new XMLEncoder(baOut); if (deadLetter.message() != null) { encoder.writeObject(deadLetter.message()); } encoder.flush(); encoder.close(); // Write it to a file // First line is actor path bWriter.write(deadLetter.recipient().path().toString()); bWriter.newLine(); // Then write the message bWriter.write(baOut.toString()); bWriter.flush(); } catch (Exception e) { log.error("Error while writing the dead letter info the dead letters folder", e); } finally { IOUtils.closeQuietly(bWriter); } }
/** * Initialize the dead letter actor (which is actually this class) * * @throws ActorSystemPluginException */ private void initializeTheDeadLetterActor(final File deadLetterFileSystem, final File reprocessedDeadLetters) throws ActorSystemPluginException { final ActorRef actor = getActorSystem().actorOf(Props.create(new DeadLetterManager.DeadLetterCreator(deadLetterFileSystem, reprocessedDeadLetters))); getActorSystem().eventStream().subscribe(actor, DeadLetter.class); }