private void flush() { int drained; final List<InputLogEvent> logEvents = new ArrayList<>(AWS_DRAIN_LIMIT); do { drained = queue.drainTo(logEvents, AWS_DRAIN_LIMIT); if (logEvents.isEmpty()) { break; } logEvents.sort(Comparator.comparing(InputLogEvent::getTimestamp)); if (lastReportedTimestamp > 0) { for (final InputLogEvent event : logEvents) { if (event.getTimestamp() < lastReportedTimestamp) { event.setTimestamp(lastReportedTimestamp); } } } lastReportedTimestamp = logEvents.get(logEvents.size() - 1).getTimestamp(); final PutLogEventsRequest putLogEventsRequest = new PutLogEventsRequest(logGroupName, logStreamName, logEvents); putLogEventsRequest.setSequenceToken(sequenceTokenCache); try { final PutLogEventsResult putLogEventsResult = awsLogsClient.putLogEvents(putLogEventsRequest); sequenceTokenCache = putLogEventsResult.getNextSequenceToken(); } catch (final DataAlreadyAcceptedException daae) { sequenceTokenCache = daae.getExpectedSequenceToken(); } catch (final InvalidSequenceTokenException iste) { sequenceTokenCache = iste.getExpectedSequenceToken(); } catch (final Exception e) { LOGGER.error(e.getMessage(), e); } logEvents.clear(); } while (drained >= AWS_DRAIN_LIMIT); }