Java 类com.amazonaws.services.kinesis.producer.Attempt 实例源码

项目:flink    文件:FlinkKinesisProducer.java   
/**
 * Check if there are any asynchronous exceptions. If so, rethrow the exception.
 */
private void checkAndPropagateAsyncError() throws Exception {
    if (thrownException != null) {
        String errorMessages = "";
        if (thrownException instanceof UserRecordFailedException) {
            List<Attempt> attempts = ((UserRecordFailedException) thrownException).getResult().getAttempts();
            for (Attempt attempt: attempts) {
                if (attempt.getErrorMessage() != null) {
                    errorMessages += attempt.getErrorMessage() + "\n";
                }
            }
        }
        if (failOnError) {
            throw new RuntimeException("An exception was thrown while processing a record: " + errorMessages, thrownException);
        } else {
            LOG.warn("An exception was thrown while processing a record: {}", thrownException, errorMessages);

            // reset, prevent double throwing
            thrownException = null;
        }
    }
}
项目:koupler    文件:KinesisEventProducer.java   
public void send(String event) throws UnsupportedEncodingException {
    byte[] bytes = event.getBytes("UTF-8");
    this.metrics.queueEvent(bytes.length);
    ByteBuffer data = ByteBuffer.wrap(bytes);
    String partitionKey = getPartitionKey(event);
    if (partitionKey != null) {
        ListenableFuture<UserRecordResult> f = producer.addUserRecord(streamName, partitionKey, data);
        Futures.addCallback(f, new FutureCallback<UserRecordResult>() {
            @Override
            public void onFailure(Throwable t) {
                if (t instanceof UserRecordFailedException) {
                    Attempt last = Iterables.getLast(((UserRecordFailedException) t).getResult().getAttempts());
                    LOGGER.error(String.format("Record failed to put - %s : %s", last.getErrorCode(), last.getErrorMessage()));
                }
                LOGGER.error("Exception during put", t);
            }

            @Override
            public void onSuccess(UserRecordResult result) {
                metrics.ackEvent();
            }
        });
    }
}
项目:kinesis-kafka-connector    文件:AmazonKinesisSinkTask.java   
@Override
public void onFailure(Throwable t) {
    if (t instanceof UserRecordFailedException) {
        Attempt last = Iterables.getLast(((UserRecordFailedException) t).getResult().getAttempts());
        throw new DataException("Kinesis Producer was not able to publish data - " + last.getErrorCode() + "-"
                + last.getErrorMessage());

    }
    throw new DataException("Exception during Kinesis put", t);
}
项目:aws-kinesis-zombies    文件:Drone.java   
@Override
public void onFailure(Throwable t) {
    if (t instanceof UserRecordFailedException ){
        Attempt last = Iterables.getLast(
                ((UserRecordFailedException) t).getResult().getAttempts());
        log.error(format(
                "Record failed to put - %s : %s",
                last.getErrorCode(), last.getErrorMessage()));
    }
    log.error("Exception during put", t);
}
项目:datacollector    文件:KinesisTarget.java   
private void getAndCheck(Future<UserRecordResult> future) throws StageException {
  try {
    UserRecordResult result = future.get();
    if (!result.isSuccessful()) {
      for (Attempt attempt : result.getAttempts()) {
        LOG.error("Failed to put record: {}", attempt.getErrorMessage());
      }
      throw new StageException(Errors.KINESIS_00, result.getAttempts().get(0).getErrorMessage());
    }
  } catch (InterruptedException | ExecutionException e) {
    LOG.error("Pipeline is shutting down.", e);
    // We should flush if we encounter an error.
    kinesisProducer.flushSync();
  }
}
项目:flink    文件:FlinkKinesisProducer.java   
@Override
public void invoke(OUT value) throws Exception {
    if (this.producer == null) {
        throw new RuntimeException("Kinesis producer has been closed");
    }
    if (thrownException != null) {
        String errorMessages = "";
        if (thrownException instanceof UserRecordFailedException) {
            List<Attempt> attempts = ((UserRecordFailedException) thrownException).getResult().getAttempts();
            for (Attempt attempt: attempts) {
                if (attempt.getErrorMessage() != null) {
                    errorMessages += attempt.getErrorMessage() +"\n";
                }
            }
        }
        if (failOnError) {
            throw new RuntimeException("An exception was thrown while processing a record: " + errorMessages, thrownException);
        } else {
            LOG.warn("An exception was thrown while processing a record: {}", thrownException, errorMessages);
            thrownException = null; // reset
        }
    }

    String stream = defaultStream;
    String partition = defaultPartition;

    ByteBuffer serialized = schema.serialize(value);

    // maybe set custom stream
    String customStream = schema.getTargetStream(value);
    if (customStream != null) {
        stream = customStream;
    }

    String explicitHashkey = null;
    // maybe set custom partition
    if (customPartitioner != null) {
        partition = customPartitioner.getPartitionId(value);
        explicitHashkey = customPartitioner.getExplicitHashKey(value);
    }

    if (stream == null) {
        if (failOnError) {
            throw new RuntimeException("No target stream set");
        } else {
            LOG.warn("No target stream set. Skipping record");
            return;
        }
    }

    ListenableFuture<UserRecordResult> cb = producer.addUserRecord(stream, partition, explicitHashkey, serialized);
    Futures.addCallback(cb, callback);
}