@Override public int compare(InputLogEvent o1, InputLogEvent o2) { if (o1.getTimestamp() == null) { if (o2.getTimestamp() == null) { return 0; } else { // null - long return -1; } } else if (o2.getTimestamp() == null) { // long - null return 1; } else { return o1.getTimestamp().compareTo(o2.getTimestamp()); } }
private void flush(boolean all) { try { long lostCount = this.lostCount.getAndSet(0); if (lostCount > 0) { getAwsLogsAppender().addWarn(lostCount + " events lost"); } if (!queue.isEmpty()) { do { Collection<InputLogEvent> batch = drainBatchFromQueue(); getAwsLogsAppender().getAwsLogsStub().logEvents(batch); } while (queue.size() >= maxBatchLogEvents || (all && !queue.isEmpty())); } } catch (Exception e) { getAwsLogsAppender().addError("Unable to flush events to AWS", e); } }
private void flush() { int drained; final List<InputLogEvent> logEvents = new ArrayList<>(AWS_DRAIN_LIMIT); do { drained = queue.drainTo(logEvents, AWS_DRAIN_LIMIT); if (logEvents.isEmpty()) { break; } logEvents.sort(Comparator.comparing(InputLogEvent::getTimestamp)); if (lastReportedTimestamp > 0) { for (final InputLogEvent event : logEvents) { if (event.getTimestamp() < lastReportedTimestamp) { event.setTimestamp(lastReportedTimestamp); } } } lastReportedTimestamp = logEvents.get(logEvents.size() - 1).getTimestamp(); final PutLogEventsRequest putLogEventsRequest = new PutLogEventsRequest(logGroupName, logStreamName, logEvents); putLogEventsRequest.setSequenceToken(sequenceTokenCache); try { final PutLogEventsResult putLogEventsResult = awsLogsClient.putLogEvents(putLogEventsRequest); sequenceTokenCache = putLogEventsResult.getNextSequenceToken(); } catch (final DataAlreadyAcceptedException daae) { sequenceTokenCache = daae.getExpectedSequenceToken(); } catch (final InvalidSequenceTokenException iste) { sequenceTokenCache = iste.getExpectedSequenceToken(); } catch (final Exception e) { LOGGER.error(e.getMessage(), e); } logEvents.clear(); } while (drained >= AWS_DRAIN_LIMIT); }
@Override public void append(final LogEvent logEvent) { final LogEvent event = LoggingUtils.prepareLogEvent(logEvent); final InputLogEvent awsLogEvent = new InputLogEvent(); final long timestamp = event.getTimeMillis(); final String message = new String(getLayout().toByteArray(event)); awsLogEvent.setTimestamp(timestamp); awsLogEvent.setMessage(message); if (!queue.offer(awsLogEvent) && !queueFull) { queueFull = true; } else if (queueFull) { queueFull = false; } }
@Override protected void append(final ILoggingEvent event) { synchronized (lockObject) { eventQueue.add(new InputLogEvent() .withTimestamp(event.getTimeStamp()) .withMessage(layout.doLayout(event))); } }
private Collection<InputLogEvent> getBatch() { synchronized (lockObject) { final Collection<InputLogEvent> result = new ArrayList<>(eventQueue); eventQueue.clear(); return result; } }
InputLogEvent asInputLogEvent(ILoggingEvent event) { InputLogEvent inputLogEvent = new InputLogEvent().withTimestamp(event.getTimeStamp()) .withMessage(awsLogsAppender.getLayout().doLayout(event)); if (eventSize(inputLogEvent) > MAX_EVENT_SIZE) { awsLogsAppender .addWarn(String.format("Log message exceeded Cloudwatch Log's limit of %d bytes", MAX_EVENT_SIZE)); trimMessage(inputLogEvent, MAX_EVENT_SIZE); } return inputLogEvent; }
private static final void trimMessage(InputLogEvent event, int eventSize) { int trimmedMessageSize = eventSize - EVENT_SIZE_PADDING - ELLIPSIS.getBytes(EVENT_SIZE_CHARSET).length; byte[] message = event.getMessage().getBytes(EVENT_SIZE_CHARSET); String unsafeTrimmed = new String(message, 0, trimmedMessageSize + 1, EVENT_SIZE_CHARSET); // The last character might be a chopped UTF-8 character String trimmed = unsafeTrimmed.substring(0, unsafeTrimmed.length() - 1); event.setMessage(trimmed + ELLIPSIS); }
AsyncWorker(AwsLogsAppender awsLogsAppender) { super(awsLogsAppender); maxBatchLogEvents = awsLogsAppender.getMaxBatchLogEvents(); discardThreshold = (int) Math.ceil(maxBatchLogEvents * 1.5); running = new AtomicBoolean(false); queue = new ArrayBlockingQueue<InputLogEvent>(maxBatchLogEvents * 2); lostCount = new AtomicLong(0); }
private Collection<InputLogEvent> drainBatchFromQueue() { Deque<InputLogEvent> batch = new ArrayDeque<InputLogEvent>(maxBatchLogEvents); queue.drainTo(batch, MAX_BATCH_LOG_EVENTS); int batchSize = batchSize(batch); while (batchSize > MAX_BATCH_SIZE) { InputLogEvent removed = batch.removeLast(); batchSize -= eventSize(removed); if (!queue.offer(removed)) { getAwsLogsAppender().addWarn("Failed requeing message from too big batch"); } } return batch; }
private static int batchSize(Collection<InputLogEvent> batch) { int size = 0; for (InputLogEvent event : batch) { size += eventSize(event); } return size; }
private void verifyLogRequests(List<CloudWatchLogger.LogRequest> logRequests, EventTag eventTag) { List<String> unpackedLines = new ArrayList<>(); for (CloudWatchLogger.LogRequest logRequest : logRequests) { for (InputLogEvent logEvent : logRequest.getLogEvents()) { unpackedLines.add(logEvent.getMessage()); } } assertThat(unpackedLines, equalTo(eventTag.getEvents())); }
private void sendEvent(String message) { List<InputLogEvent> logEvents = new LinkedList<>(); logEvents.add(new InputLogEvent().withTimestamp(new Date().getTime()).withMessage(message)); PutLogEventsRequest putLogEventsRequest = new PutLogEventsRequest(logGroupName, logStreamName, logEvents); putLogEventsRequest.setSequenceToken(lastSequenceToken); PutLogEventsResult putLogEventsResult = awsLogsClient.putLogEvents(putLogEventsRequest); lastSequenceToken = putLogEventsResult.getNextSequenceToken(); }
@Test(timeout = 5000) public void testBasic() throws InterruptedException { CloudWatchAppender appender = new CloudWatchAppender(); AWSLogsClient awsLogClient = createMock(AWSLogsClient.class); appender.setAwsLogsClient(awsLogClient); appender.setMaxBatchSize(1); appender.setRegion("region"); final String logGroup = "pfqoejpfqe"; appender.setLogGroup(logGroup); final String logStream = "pffqjfqjpoqoejpfqe"; appender.setLogStream(logStream); PatternLayout layout = new PatternLayout(); layout.setContext(new LoggerContext()); layout.setPattern("[%thread] %level %logger{20} - %msg%n%xThrowable"); layout.start(); appender.setLayout(layout); LoggingEvent event = new LoggingEvent(); event.setTimeStamp(System.currentTimeMillis()); String loggerName = "name"; event.setLoggerName(loggerName); Level level = Level.DEBUG; event.setLevel(level); String message = "fjpewjfpewjfpewjfepowf"; event.setMessage(message); String threadName = Thread.currentThread().getName(); final String fullMessage = "[" + threadName + "] " + level + " " + loggerName + " - " + message + "\n"; final PutLogEventsResult result = new PutLogEventsResult(); String sequence = "ewopjfewfj"; result.setNextSequenceToken(sequence); expect(awsLogClient.putLogEvents(isA(PutLogEventsRequest.class))).andAnswer(new IAnswer<PutLogEventsResult>() { @Override public PutLogEventsResult answer() { PutLogEventsRequest request = (PutLogEventsRequest) getCurrentArguments()[0]; assertEquals(logGroup, request.getLogGroupName()); assertEquals(logStream, request.getLogStreamName()); List<InputLogEvent> events = request.getLogEvents(); assertEquals(1, events.size()); assertEquals(fullMessage, events.get(0).getMessage()); return result; } }).times(2); awsLogClient.shutdown(); // ===================================== replay(awsLogClient); appender.start(); // for coverage appender.start(); appender.append(event); Thread.sleep(10); appender.append(event); while (appender.getEventsWrittenCount() < 2) { Thread.sleep(10); } appender.stop(); verify(awsLogClient); }
@Test public void testMessageErrorHandling() throws Exception { // WARNING: this test may break if the internal implementation changes initialize("TestCloudWatchAppender/testMessageErrorHandling.properties"); // the mock client -- will throw on odd invocations MockCloudWatchClient mockClient = new MockCloudWatchClient() { @Override protected PutLogEventsResult putLogEvents(PutLogEventsRequest request) { if (putLogEventsInvocationCount % 2 == 1) { throw new TestingException("anything"); } else { return super.putLogEvents(request); } } }; appender.setThreadFactory(new DefaultThreadFactory()); appender.setWriterFactory(mockClient.newWriterFactory()); for (int ii = 0 ; ii < 10 ; ii++) { logger.debug("message " + ii); } mockClient.allowWriterThread(); assertEquals("first batch, number of events in request", 10, mockClient.mostRecentEvents.size()); List<InputLogEvent> preservedEvents = new ArrayList<InputLogEvent>(mockClient.mostRecentEvents); // the first batch should have been returned to the message queue, in order mockClient.allowWriterThread(); assertEquals("second batch, number of events in request", 10, mockClient.mostRecentEvents.size()); for (int ii = 0 ; ii < mockClient.mostRecentEvents.size() ; ii++) { assertEquals("event #" + ii, preservedEvents.get(ii), mockClient.mostRecentEvents.get(ii)); } // now assert that those messages will not be resent for (int ii = 100 ; ii < 102 ; ii++) { logger.debug("message " + ii); } mockClient.allowWriterThread(); assertEquals("third batch, number of events in request", 2, mockClient.mostRecentEvents.size()); }
static final int eventSize(InputLogEvent event) { return event.getMessage().getBytes(EVENT_SIZE_CHARSET).length + EVENT_SIZE_PADDING; }
@Override public void append(ILoggingEvent event) { // don't log if discardThreshold is met and event is not important (< WARN) if (queue.size() >= discardThreshold && !event.getLevel().isGreaterOrEqual(Level.WARN)) { lostCount.incrementAndGet(); synchronized (running) { running.notifyAll(); } return; } InputLogEvent logEvent = asInputLogEvent(event); // are we allowed to block ? if (getAwsLogsAppender().getMaxBlockTimeMillis() > 0) { // we are allowed to block, offer uninterruptibly for the configured maximum blocking time boolean interrupted = false; long until = System.currentTimeMillis() + getAwsLogsAppender().getMaxBlockTimeMillis(); try { long now = System.currentTimeMillis(); while (now < until) { try { if (!queue.offer(logEvent, until - now, TimeUnit.MILLISECONDS)) { lostCount.incrementAndGet(); } break; } catch (InterruptedException e) { interrupted = true; now = System.currentTimeMillis(); } } } finally { if (interrupted) { Thread.currentThread().interrupt(); } } } else { // we are not allowed to block, offer without blocking if (!queue.offer(logEvent)) { lostCount.incrementAndGet(); } } // trigger a flush if queue is full if (queue.size() >= maxBatchLogEvents) { synchronized (running) { running.notifyAll(); } } }
private static Collection<InputLogEvent> anyInputLogEvents() { return anyCollection(); }
@Theory public void eventShouldNotBeTrimmed(@FromDataPoints("UNTRIMMED") String message) { InputLogEvent event = worker.asInputLogEvent(asEvent(message)); assertFalse(event.getMessage().endsWith("...")); }
@Theory public void eventShouldBeTrimmed(@FromDataPoints("TRIMMED") String message) { InputLogEvent event = worker.asInputLogEvent(asEvent(message)); assertTrue(event.getMessage().endsWith("...")); }
@Theory public void trimmingShouldNotChopMultibyteCharacter(@FromDataPoints("TRIMMED_MB") String message) { InputLogEvent event = worker.asInputLogEvent(asEvent(message)); assertTrue(event.getMessage().endsWith("ö...")); }
@Theory public void eventShouldNeverExceed262144Bytes(String message) throws UnsupportedEncodingException { InputLogEvent event = worker.asInputLogEvent(asEvent(message)); int eventSize = event.getMessage().getBytes("UTF-8").length + 26; assertTrue(eventSize <= 262144); }