/** * Check if there are any asynchronous exceptions. If so, rethrow the exception. */ private void checkAndPropagateAsyncError() throws Exception { if (thrownException != null) { String errorMessages = ""; if (thrownException instanceof UserRecordFailedException) { List<Attempt> attempts = ((UserRecordFailedException) thrownException).getResult().getAttempts(); for (Attempt attempt: attempts) { if (attempt.getErrorMessage() != null) { errorMessages += attempt.getErrorMessage() + "\n"; } } } if (failOnError) { throw new RuntimeException("An exception was thrown while processing a record: " + errorMessages, thrownException); } else { LOG.warn("An exception was thrown while processing a record: {}", thrownException, errorMessages); // reset, prevent double throwing thrownException = null; } } }
public void send(String event) throws UnsupportedEncodingException { byte[] bytes = event.getBytes("UTF-8"); this.metrics.queueEvent(bytes.length); ByteBuffer data = ByteBuffer.wrap(bytes); String partitionKey = getPartitionKey(event); if (partitionKey != null) { ListenableFuture<UserRecordResult> f = producer.addUserRecord(streamName, partitionKey, data); Futures.addCallback(f, new FutureCallback<UserRecordResult>() { @Override public void onFailure(Throwable t) { if (t instanceof UserRecordFailedException) { Attempt last = Iterables.getLast(((UserRecordFailedException) t).getResult().getAttempts()); LOGGER.error(String.format("Record failed to put - %s : %s", last.getErrorCode(), last.getErrorMessage())); } LOGGER.error("Exception during put", t); } @Override public void onSuccess(UserRecordResult result) { metrics.ackEvent(); } }); } }
@Override public void onFailure(Throwable t) { if (t instanceof UserRecordFailedException) { Attempt last = Iterables.getLast(((UserRecordFailedException) t).getResult().getAttempts()); throw new DataException("Kinesis Producer was not able to publish data - " + last.getErrorCode() + "-" + last.getErrorMessage()); } throw new DataException("Exception during Kinesis put", t); }
@Override public void onFailure(Throwable t) { if (t instanceof UserRecordFailedException ){ Attempt last = Iterables.getLast( ((UserRecordFailedException) t).getResult().getAttempts()); log.error(format( "Record failed to put - %s : %s", last.getErrorCode(), last.getErrorMessage())); } log.error("Exception during put", t); }
@Override public void invoke(OUT value) throws Exception { if (this.producer == null) { throw new RuntimeException("Kinesis producer has been closed"); } if (thrownException != null) { String errorMessages = ""; if (thrownException instanceof UserRecordFailedException) { List<Attempt> attempts = ((UserRecordFailedException) thrownException).getResult().getAttempts(); for (Attempt attempt: attempts) { if (attempt.getErrorMessage() != null) { errorMessages += attempt.getErrorMessage() +"\n"; } } } if (failOnError) { throw new RuntimeException("An exception was thrown while processing a record: " + errorMessages, thrownException); } else { LOG.warn("An exception was thrown while processing a record: {}", thrownException, errorMessages); thrownException = null; // reset } } String stream = defaultStream; String partition = defaultPartition; ByteBuffer serialized = schema.serialize(value); // maybe set custom stream String customStream = schema.getTargetStream(value); if (customStream != null) { stream = customStream; } String explicitHashkey = null; // maybe set custom partition if (customPartitioner != null) { partition = customPartitioner.getPartitionId(value); explicitHashkey = customPartitioner.getExplicitHashKey(value); } if (stream == null) { if (failOnError) { throw new RuntimeException("No target stream set"); } else { LOG.warn("No target stream set. Skipping record"); return; } } ListenableFuture<UserRecordResult> cb = producer.addUserRecord(stream, partition, explicitHashkey, serialized); Futures.addCallback(cb, callback); }