private void initProducer(String bootstrapServer) { producer = TestUtils.createNewProducer( bootstrapServer, 1, 60 * 1000L, 1024L * 1024L, 0, 0L, 5 * 1000L, SecurityProtocol.PLAINTEXT, null, Option$.MODULE$.apply(new Properties()), new StringSerializer(), new ByteArraySerializer(), Option$.MODULE$.apply(new Properties())); }
private static void produceRecords(String bootstrapServers) { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); Producer<Long, byte[]> producer = new KafkaProducer<>(properties); LongStream.rangeClosed(1, 100).boxed() .map(number -> new ProducerRecord<>( TOPIC, //topic number, //key String.format("record-%s", number.toString()).getBytes())) //value .forEach(record -> producer.send(record)); producer.close(); }
private static void produceRecords(String bootstrapServers) { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); Producer<String, byte[]> producer = new KafkaProducer<>(properties); IntStream.rangeClosed(1, 100).boxed() .map(number -> new ProducerRecord<>( TOPIC, //topic number.toString(), //key UserAvroSerdes.serialize(new User(String.format("user-%s", number.toString()))))) //value .forEach(record -> producer.send(record)); producer.close(); }
public static void main(String[] args) { final ActorSystem system = ActorSystem.create("KafkaProducerSystem"); final Materializer materializer = ActorMaterializer.create(system); final ProducerSettings<byte[], String> producerSettings = ProducerSettings .create(system, new ByteArraySerializer(), new StringSerializer()) .withBootstrapServers("localhost:9092"); CompletionStage<Done> done = Source.range(1, 100) .map(n -> n.toString()) .map(elem -> new ProducerRecord<byte[], String>( "topic1-ts", 0, Instant.now().getEpochSecond(), null, elem)) .runWith(Producer.plainSink(producerSettings), materializer); done.whenComplete((d, ex) -> System.out.println("sent")); }
private static void produceRecords() { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); Producer<Integer, byte[]> producer = new KafkaProducer<>(properties); IntStream.rangeClosed(1, 10000).boxed() .map(number -> new ProducerRecord<>( TOPIC, 1, //Key KafkaProducerUtil.createMessage(1000))) //Value .forEach(record -> { producer.send(record); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } }); producer.close(); }
private static void produceRecords(String bootstrapServers) { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); Producer<Integer, byte[]> producer = new KafkaProducer<>(properties); IntStream.rangeClosed(1, 10000).boxed() .map(number -> new ProducerRecord<>( TOPIC, 1, //Key KafkaProducerUtil.createMessage(1000))) //Value .forEach(record -> { producer.send(record); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } }); producer.close(); }
KafkaBasedLog<String, byte[]> setupAndCreateKafkaBasedLog(String topic, final WorkerConfig config) { Map<String, Object> producerProps = new HashMap<>(); producerProps.putAll(config.originals()); producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); Map<String, Object> consumerProps = new HashMap<>(); consumerProps.putAll(config.originals()); consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); Map<String, Object> adminProps = new HashMap<>(config.originals()); NewTopic topicDescription = TopicAdmin.defineTopic(topic). compacted(). partitions(1). replicationFactor(config.getShort(DistributedConfig.CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG)). build(); return createKafkaBasedLog(topic, producerProps, consumerProps, new ConsumeCallback(), topicDescription, adminProps); }
@Test public void testWindowedSerializerNoArgConstructors() { Map<String, String> props = new HashMap<>(); // test key[value].serializer.inner.class takes precedence over serializer.inner.class WindowedSerializer<StringSerializer> windowedSerializer = new WindowedSerializer<>(); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "host:1"); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId"); props.put("key.serializer.inner.class", "org.apache.kafka.common.serialization.StringSerializer"); props.put("serializer.inner.class", "org.apache.kafka.common.serialization.StringSerializer"); windowedSerializer.configure(props, true); Serializer<?> inner = windowedSerializer.innerSerializer(); assertNotNull("Inner serializer should be not null", inner); assertTrue("Inner serializer type should be StringSerializer", inner instanceof StringSerializer); // test serializer.inner.class props.put("serializer.inner.class", "org.apache.kafka.common.serialization.ByteArraySerializer"); props.remove("key.serializer.inner.class"); props.remove("value.serializer.inner.class"); WindowedSerializer<?> windowedSerializer1 = new WindowedSerializer<>(); windowedSerializer1.configure(props, false); Serializer<?> inner1 = windowedSerializer1.innerSerializer(); assertNotNull("Inner serializer should be not null", inner1); assertTrue("Inner serializer type should be ByteArraySerializer", inner1 instanceof ByteArraySerializer); }
private Properties setProduceConsumeProperties(final String clientId) { Properties props = new Properties(); props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); // the socket buffer needs to be large, especially when running in AWS with // high latency. if running locally the default is fine. props.put(ProducerConfig.SEND_BUFFER_CONFIG, SOCKET_SIZE_BYTES); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); // the socket buffer needs to be large, especially when running in AWS with // high latency. if running locally the default is fine. props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, SOCKET_SIZE_BYTES); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, MAX_POLL_RECORDS); return props; }
@Test public void testConstructorFailureCloseResource() { Properties props = new Properties(); props.setProperty(ProducerConfig.CLIENT_ID_CONFIG, "testConstructorClose"); props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "some.invalid.hostname.foo.bar.local:9999"); props.setProperty(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName()); final int oldInitCount = MockMetricsReporter.INIT_COUNT.get(); final int oldCloseCount = MockMetricsReporter.CLOSE_COUNT.get(); try { KafkaProducer<byte[], byte[]> producer = new KafkaProducer<byte[], byte[]>( props, new ByteArraySerializer(), new ByteArraySerializer()); } catch (KafkaException e) { assertEquals(oldInitCount + 1, MockMetricsReporter.INIT_COUNT.get()); assertEquals(oldCloseCount + 1, MockMetricsReporter.CLOSE_COUNT.get()); assertEquals("Failed to construct kafka producer", e.getMessage()); return; } fail("should have caught an exception and returned"); }
public KafkaManager(final LoggerContext loggerContext, final String name, final String topic, final String zkServers, final String mail, final String rpc, final String app, final String host, final Property[] properties) { super(loggerContext, name); this.topic = topic; this.zkServers = zkServers; this.mail = mail; this.rpc = rpc; this.app = app; this.orginApp = app; this.host = host; this.checkAndSetConfig(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); this.checkAndSetConfig(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // 设置分区类, 使用自定义的KeyModPartitioner,同样的key进入相同的partition this.checkAndSetConfig(ProducerConfig.PARTITIONER_CLASS_CONFIG, KeyModPartitioner.class.getName()); // xml配置里面的参数 for (final Property property : properties) { this.config.put(property.getName(), property.getValue()); } // 由于容器部署需要从外部获取host this.config.put(ProducerConfig.CLIENT_ID_CONFIG, this.app + Constants.MIDDLE_LINE + this.host + Constants.MIDDLE_LINE + "log4j2"); }
@Test public void testConstructorFailureCloseResource() { Properties props = new Properties(); props.setProperty(ProducerConfig.CLIENT_ID_CONFIG, "testConstructorClose"); props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "some.invalid.hostname.foo.bar:9999"); props.setProperty(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName()); final int oldInitCount = MockMetricsReporter.INIT_COUNT.get(); final int oldCloseCount = MockMetricsReporter.CLOSE_COUNT.get(); try { KafkaProducer<byte[], byte[]> producer = new KafkaProducer<byte[], byte[]>( props, new ByteArraySerializer(), new ByteArraySerializer()); } catch (KafkaException e) { Assert.assertEquals(oldInitCount + 1, MockMetricsReporter.INIT_COUNT.get()); Assert.assertEquals(oldCloseCount + 1, MockMetricsReporter.CLOSE_COUNT.get()); Assert.assertEquals("Failed to construct kafka producer", e.getMessage()); return; } Assert.fail("should have caught an exception and returned"); }
@Test public void testZeroLengthValue() throws Exception { Properties producerPropertyOverrides = new Properties(); producerPropertyOverrides.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); try (LiKafkaProducer producer = createProducer(producerPropertyOverrides)) { producer.send(new ProducerRecord<>("testZeroLengthValue", "key", new byte[0])).get(); } Properties consumerProps = new Properties(); consumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); try (LiKafkaConsumer consumer = createConsumer(consumerProps)) { consumer.subscribe(Collections.singleton("testZeroLengthValue")); long startMs = System.currentTimeMillis(); ConsumerRecords records = ConsumerRecords.empty(); while (records.isEmpty() && System.currentTimeMillis() < startMs + 30000) { records = consumer.poll(100); } assertEquals(1, records.count()); ConsumerRecord record = (ConsumerRecord) records.iterator().next(); assertEquals("key", record.key()); assertEquals(((byte[]) record.value()).length, 0); } }
@Override public PTransform<? super PCollection<BeamRecord>, PDone> buildIOWriter() { checkArgument(topics != null && topics.size() == 1, "Only one topic can be acceptable as output."); return new PTransform<PCollection<BeamRecord>, PDone>() { @Override public PDone expand(PCollection<BeamRecord> input) { return input.apply("out_reformat", getPTransformForOutput()).apply("persistent", KafkaIO.<byte[], byte[]>write() .withBootstrapServers(bootstrapServers) .withTopic(topics.get(0)) .withKeySerializer(ByteArraySerializer.class) .withValueSerializer(ByteArraySerializer.class)); } }; }
private KafkaProducer<String, byte[]> createKafkaProducer() { Properties props = new Properties(); String kafkaServers = System.getenv().get("KAFKA_SERVERS"); if (null == kafkaServers) { kafkaServers = Constants.KAFKA_SERVERS; } props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); KafkaProducer<String, byte[]> producer = new KafkaProducer<>(props); return producer; }
/** * Configures a Kafka Producer. * * @return new Kafka Producer Object */ public KafkaProducer<String, byte[]> createProducer() { logger.info("Creating Kafka Producer"); Properties props = new Properties(); String kafkaServers = System.getenv().get("KAFKA_SERVERS"); if (null == kafkaServers) { kafkaServers = Constants.KAFKA_SERVERS; } logger.info("Using Kafka servers: " + kafkaServers); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.ACKS_CONFIG, "1"); props.put(ProducerConfig.BATCH_SIZE_CONFIG, 200); return new KafkaProducer<>(props); }
public ProducerDriver createProducerDriver(CountDownLatch latch, String topic, int messageCount) { Map<String, Object> producerConfigs = new ProducerConfigsBuilder() .bootstrapServers("127.0.0.1:" + broker.getPort()) .requestRequiredAcks(ProducerConfigsBuilder.RequestRequiredAcks.ackFromLeader) .producerType(ProducerConfigsBuilder.ProducerType.sync) .keySerializer(ByteArraySerializer.class) .valueSerializer(ByteArraySerializer.class) .batchSize(0) .build(); ProducerDefinition producerDefinition = new ProducerDefinition(); producerDefinition.setConfig(producerConfigs); producerDefinition.setTopic(topic); producerDefinition.setMessageSize(100 * 1024); producerDefinition.setMessagesToSend(messageCount); producerDefinition.setSendBlocking(true); return new ProducerDriver(producerDefinition, latch); }
@Test public void testKafkaProducer09WriteFailsRecordTooLarge() throws IOException, StageException { HashMap<String, Object> kafkaProducerConfigs = new HashMap<>(); kafkaProducerConfigs.put("retries", 0); kafkaProducerConfigs.put("batch.size", 100); kafkaProducerConfigs.put("linger.ms", 0); kafkaProducerConfigs.put(KafkaConstants.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); kafkaProducerConfigs.put(KafkaConstants.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); // Set the message size to 510 as "message.max.bytes" is set to 500 final String message = StringUtils.leftPad("a", 510, "b"); SdcKafkaProducer sdcKafkaProducer = createSdcKafkaProducer(port, kafkaProducerConfigs); sdcKafkaProducer.init(); String topic = getNextTopic(); sdcKafkaProducer.enqueueMessage(topic, message.getBytes(), "0"); try { sdcKafkaProducer.write(); fail("Expected exception but didn't get any"); } catch (StageException se) { assertEquals(KafkaErrors.KAFKA_69, se.getErrorCode()); } catch (Exception e) { fail("Expected Stage Exception but got " + e); } }
protected Properties getProducerConfig() { final Properties producerProperties = (Properties) kafkaTopicDeliveryEndPoint.getProperties().clone(); if (producerProperties.getProperty("value.serializer") != null || producerProperties.getProperty("key.serializer") != null) { logger.warn("serializer cannot be provided as producer properties. " + "Overriding manually to be the correct serialization type."); } producerProperties.setProperty("key.serializer", StringSerializer.class.getName()); producerProperties.setProperty("value.serializer", ByteArraySerializer.class.getName()); producerProperties.setProperty("client.id", kafkaTopicDeliveryEndPoint.getProperties().getProperty("client.id", UUID.randomUUID().toString())); producerProperties.setProperty("bootstrap.servers", kafkaTopicDeliveryEndPoint.getBrokerList()); producerProperties.setProperty("metric.reporters", "com.outbrain.aletheia.datum.metrics.kafka.KafkaMetrics"); logger.warn("Using producer config: {}", producerProperties); return producerProperties; }
@Override public void start() { super.start(); // Important to not initialize this until we are started, because ServerInfo itself logs... if (clientId == null) { clientId = OptionalServerInfo.getDefaultClientName(this::addError); } final Properties config = new Properties(); config.put("bootstrap.servers", brokerList); config.put("acks", "1"); config.put("compression.type", compressionCodec); config.put("client.id", clientId); producer = new KafkaProducer<>(config, new ByteArraySerializer(), new ByteArraySerializer()); }
public KafkaProducerPusher(String brokers, String topic, Optional<Config> kafkaConfig) { this.closer = Closer.create(); this.topic = topic; Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); props.put(ProducerConfig.ACKS_CONFIG, "1"); // add the kafka scoped config. if any of the above are specified then they are overridden if (kafkaConfig.isPresent()) { props.putAll(ConfigUtils.configToProperties(kafkaConfig.get())); } this.producer = createProducer(props); }
@Override public void configure(Map<String, ?> config) { _partitionMetricSampleStoreTopic = (String) config.get(PARTITION_METRIC_SAMPLE_STORE_TOPIC_CONFIG); _brokerMetricSampleStoreTopic = (String) config.get(BROKER_METRIC_SAMPLE_STORE_TOPIC_CONFIG); if (_partitionMetricSampleStoreTopic == null || _brokerMetricSampleStoreTopic == null || _partitionMetricSampleStoreTopic.isEmpty() || _brokerMetricSampleStoreTopic.isEmpty()) { throw new IllegalArgumentException("The sample store topic names must be configured."); } String numProcessingThreadsString = (String) config.get(NUM_SAMPLE_LOADING_THREADS); int numProcessingThreads = numProcessingThreadsString == null || numProcessingThreadsString.isEmpty() ? 8 : Integer.parseInt(numProcessingThreadsString); _metricProcessorExecutor = Executors.newFixedThreadPool(numProcessingThreads); _consumers = new ArrayList<>(numProcessingThreads); for (int i = 0; i < numProcessingThreads; i++) { _consumers.add(createConsumers(config)); } Properties producerProps = new Properties(); producerProps.putAll(config); producerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, (String) config.get(KafkaCruiseControlConfig.BOOTSTRAP_SERVERS_CONFIG)); producerProps.setProperty(ProducerConfig.CLIENT_ID_CONFIG, PRODUCER_CLIENT_ID); // Set batch.size and linger.ms to a big number to have better batching. producerProps.setProperty(ProducerConfig.LINGER_MS_CONFIG, "30000"); producerProps.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "800000"); producerProps.setProperty(ProducerConfig.RETRIES_CONFIG, "5"); producerProps.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip"); producerProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); producerProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); _producer = new KafkaProducer<>(producerProps); _loadingProgress = -1.0; ensureTopicCreated(config); }
private static KafkaProducer<byte[], byte[]> getProducer(String brokerList) { Properties prop = new Properties(); prop.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList); prop.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName()); prop.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName()); return new KafkaProducer<>(prop); }
public KafkaAppender() { // setting these as config values sidesteps an unnecessary warning (minor bug in KafkaProducer) addProducerConfigValue(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); addProducerConfigValue(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); Properties properties = PropertyUtils.getProperties(); if (properties != null && !properties.isEmpty()) { addProducerConfigValue(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getProperty("logcenter.bootstrap.servers", bootstrapServers)); addProducerConfigValue(ProducerConfig.ACKS_CONFIG, properties.getProperty("logcenter.acks", "0")); addProducerConfigValue(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, properties.getProperty("logcenter.metadata.fetch.timeout.ms", "1000")); batchNum = Integer.parseInt(properties.getProperty("logcenter.batchNum", "100")); maxCommitInterval = Integer.parseInt(properties.getProperty("logcenter.maxCommitInterval", "10000")); } }
Producer(Properties kafkaProducerConfig) { // Mandatory settings, not changeable. kafkaProducerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); kafkaProducerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); kafkaProducerConfig.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, SixtPartitioner.class.getName()); kafka = new org.apache.kafka.clients.producer.KafkaProducer<>(kafkaProducerConfig); logger.info("Created producer."); }
private void initProducer() { Properties props = new Properties(); props.put("bootstrap.servers", HOST + ":" + serverPort); props.put("acks", "1"); producer = new KafkaProducer<String,byte[]>(props, new StringSerializer(), new ByteArraySerializer()); }
@Override public void configure(final WorkerConfig config) { this.topic = config.getString(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG); if (topic.equals("")) throw new ConfigException("Must specify topic for connector status."); Map<String, Object> producerProps = new HashMap<>(); producerProps.putAll(config.originals()); producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); producerProps.put(ProducerConfig.RETRIES_CONFIG, 0); // we handle retries in this class Map<String, Object> consumerProps = new HashMap<>(); consumerProps.putAll(config.originals()); consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); Map<String, Object> adminProps = new HashMap<>(config.originals()); NewTopic topicDescription = TopicAdmin.defineTopic(topic). compacted(). partitions(config.getInt(DistributedConfig.STATUS_STORAGE_PARTITIONS_CONFIG)). replicationFactor(config.getShort(DistributedConfig.STATUS_STORAGE_REPLICATION_FACTOR_CONFIG)). build(); Callback<ConsumerRecord<String, byte[]>> readCallback = new Callback<ConsumerRecord<String, byte[]>>() { @Override public void onCompletion(Throwable error, ConsumerRecord<String, byte[]> record) { read(record); } }; this.kafkaLog = createKafkaBasedLog(topic, producerProps, consumerProps, readCallback, topicDescription, adminProps); }
@Override public void configure(final WorkerConfig config) { String topic = config.getString(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG); if (topic.equals("")) throw new ConfigException("Offset storage topic must be specified"); data = new HashMap<>(); Map<String, Object> producerProps = new HashMap<>(); producerProps.putAll(config.originals()); producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); Map<String, Object> consumerProps = new HashMap<>(); consumerProps.putAll(config.originals()); consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); Map<String, Object> adminProps = new HashMap<>(config.originals()); NewTopic topicDescription = TopicAdmin.defineTopic(topic). compacted(). partitions(config.getInt(DistributedConfig.OFFSET_STORAGE_PARTITIONS_CONFIG)). replicationFactor(config.getShort(DistributedConfig.OFFSET_STORAGE_REPLICATION_FACTOR_CONFIG)). build(); offsetLog = createKafkaBasedLog(topic, producerProps, consumerProps, consumedCallback, topicDescription, adminProps); }
@Test public void testOsDefaultSocketBufferSizes() throws Exception { Map<String, Object> config = new HashMap<>(); config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); config.put(ProducerConfig.SEND_BUFFER_CONFIG, Selectable.USE_DEFAULT_BUFFER_SIZE); config.put(ProducerConfig.RECEIVE_BUFFER_CONFIG, Selectable.USE_DEFAULT_BUFFER_SIZE); KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>( config, new ByteArraySerializer(), new ByteArraySerializer()); producer.close(); }
@Test(expected = KafkaException.class) public void testInvalidSocketSendBufferSize() throws Exception { Map<String, Object> config = new HashMap<>(); config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); config.put(ProducerConfig.SEND_BUFFER_CONFIG, -2); new KafkaProducer<>(config, new ByteArraySerializer(), new ByteArraySerializer()); }
@Test(expected = KafkaException.class) public void testInvalidSocketReceiveBufferSize() throws Exception { Map<String, Object> config = new HashMap<>(); config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); config.put(ProducerConfig.RECEIVE_BUFFER_CONFIG, -2); new KafkaProducer<>(config, new ByteArraySerializer(), new ByteArraySerializer()); }
@Test public void closeShouldBeIdempotent() { Properties producerProps = new Properties(); producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000"); Producer producer = new KafkaProducer<>(producerProps, new ByteArraySerializer(), new ByteArraySerializer()); producer.close(); producer.close(); }
@Test public void testMetricConfigRecordingLevel() { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000"); try (KafkaProducer producer = new KafkaProducer<>(props, new ByteArraySerializer(), new ByteArraySerializer())) { assertEquals(Sensor.RecordingLevel.INFO, producer.metrics.config().recordLevel()); } props.put(ProducerConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG"); try (KafkaProducer producer = new KafkaProducer<>(props, new ByteArraySerializer(), new ByteArraySerializer())) { assertEquals(Sensor.RecordingLevel.DEBUG, producer.metrics.config().recordLevel()); } }
public void testProduce() throws Exception { Properties producerProps = new Properties(); producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, testConfig.bootstrapServer); ByteArraySerializer serializer = new ByteArraySerializer(); KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps, serializer, serializer); ProducerRecord<byte[], byte[]> record1 = new ProducerRecord<>(testConfig.topic, message1); Future<RecordMetadata> future1 = producer.send(record1); ProducerRecord<byte[], byte[]> record2 = new ProducerRecord<>(testConfig.topic, message2); Future<RecordMetadata> future2 = producer.send(record2); producer.flush(); future1.get(); future2.get(); producer.close(); }
/** * Prepare final configuration map for producer to initiate kafka producer instance * Configurations will be picked up from {@link ModuleAware} * * @return */ private static Map<String, Object> prepareConfiguration() { Map<String, Object> properties = new HashMap<>(); properties.put(BOOTSTRAP_SERVERS_CONFIG, ModuleAware.CONTEXT.getBrokers()); properties.put(KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); properties.put(VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); properties.put(BATCH_SIZE_CONFIG, ModuleAware.CONTEXT.getBatchSize()); properties.put(ACKS_CONFIG, ModuleAware.CONTEXT.getAcks()); properties.put(BUFFER_MEMORY_CONFIG, ModuleAware.CONTEXT.getBufferMemory()); return properties; }
/** * 构造方法 */ public KafkaAppender() { this.checkAndSetConfig(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); this.checkAndSetConfig(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // 设置分区类, 使用自定义的KeyModPartitioner,同样的key进入相同的partition this.checkAndSetConfig(ProducerConfig.PARTITIONER_CLASS_CONFIG, KeyModPartitioner.class.getName()); // 添加hook Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { close(); } }); }
/** * 构造方法 */ public KafkaAppender() { this.checkAndSetConfig(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); this.checkAndSetConfig(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // 设置分区类, 使用自定义的KeyModPartitioner,同样的key进入相同的partition this.checkAndSetConfig(ProducerConfig.PARTITIONER_CLASS_CONFIG, KeyModPartitioner.class.getName()); // 由于容器部署需要从外部获取host this.checkAndSetConfig(ProducerConfig.CLIENT_ID_CONFIG, this.app + Constants.MIDDLE_LINE + this.host + Constants.MIDDLE_LINE + "logback"); shutdownHook = new DelayingShutdownHook(); }