public void start() throws InterruptedException { RandomGenerator random = RandomManager.getRandom(); Properties props = ConfigUtils.keyValueToProperties( "bootstrap.servers", "localhost:" + kafkaPort, "key.serializer", "org.apache.kafka.common.serialization.StringSerializer", "value.serializer", "org.apache.kafka.common.serialization.StringSerializer", "compression.type", "gzip", "batch.size", 0, "acks", 1, "max.request.size", 1 << 26 // TODO ); try (Producer<String,String> producer = new KafkaProducer<>(props)) { for (int i = 0; i < howMany; i++) { Pair<String,String> datum = datumGenerator.generate(i, random); ProducerRecord<String,String> record = new ProducerRecord<>(topic, datum.getFirst(), datum.getSecond()); producer.send(record); log.debug("Sent datum {} = {}", record.key(), record.value()); if (intervalMsec > 0) { Thread.sleep(intervalMsec); } } } }
@Test public void nullKey() throws Exception { Producer<Integer, String> producer = createProducer(); ProducerRecord<Integer, String> record = new ProducerRecord<>("messages", "test"); producer.send(record); final Map<String, Object> consumerProps = KafkaTestUtils .consumerProps("sampleRawConsumer", "false", embeddedKafka); consumerProps.put("auto.offset.reset", "earliest"); final CountDownLatch latch = new CountDownLatch(1); createConsumer(latch, null); producer.close(); }
@Before public void setUp() { super.setUp(); Properties props = new Properties(); props.setProperty(ProducerConfig.ACKS_CONFIG, "-1"); AtomicInteger failed = new AtomicInteger(0); try (Producer<String, String> producer = createProducer(props)) { for (int i = 0; i < 10; i++) { producer.send(new ProducerRecord<>("TestTopic", Integer.toString(i)), new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e != null) { failed.incrementAndGet(); } } }); } } assertEquals(0, failed.get()); }
/** * Create an AbstractResourceService with the given producer * @param baseUrl the base URL * @param producer the kafka producer * @param curator the zookeeper curator * @param notifications the event service * @param idSupplier a supplier of new identifiers * @param async write cached resources asynchronously if true, synchronously if false */ public AbstractResourceService(final String baseUrl, final Producer<String, String> producer, final CuratorFramework curator, final EventService notifications, final Supplier<String> idSupplier, final Boolean async) { this.baseUrl = baseUrl; this.notifications = notifications; this.async = async; this.idSupplier = idSupplier; this.producer = producer; this.curator = curator; try { this.curator.createContainers(ZNODE_COORDINATION); } catch (final Exception ex) { LOGGER.error("Could not create zk session node: {}", ex.getMessage()); throw new RuntimeTrellisException(ex); } }
private static void sendAckInfoToCtrlTopic(String dataSourceInfo, String completedTime, String pullStatus) { try { // 在源dataSourceInfo的基础上,更新全量拉取相关信息。然后发回src topic JSONObject jsonObj = JSONObject.parseObject(dataSourceInfo); jsonObj.put(DataPullConstants.FullPullInterfaceJson.FROM_KEY, DataPullConstants.FullPullInterfaceJson.FROM_VALUE); jsonObj.put(DataPullConstants.FullPullInterfaceJson.TYPE_KEY, DataPullConstants.FullPullInterfaceJson.TYPE_VALUE); // notifyFullPullRequestor JSONObject payloadObj = jsonObj.getJSONObject(DataPullConstants.FullPullInterfaceJson.PAYLOAD_KEY); // 完成时间 payloadObj.put(DataPullConstants.FullPullInterfaceJson.COMPLETE_TIME_KEY, completedTime); // 拉取是否成功标志位 payloadObj.put(DataPullConstants.FullPullInterfaceJson.DATA_STATUS_KEY, pullStatus); jsonObj.put(DataPullConstants.FullPullInterfaceJson.PAYLOAD_KEY, payloadObj); String ctrlTopic = getFullPullProperties(Constants.ZkTopoConfForFullPull.COMMON_CONFIG, true) .getProperty(Constants.ZkTopoConfForFullPull.FULL_PULL_SRC_TOPIC); Producer producer = DbusHelper .getProducer(getFullPullProperties(Constants.ZkTopoConfForFullPull.BYTE_PRODUCER_CONFIG, true)); ProducerRecord record = new ProducerRecord<>(ctrlTopic, DataPullConstants.FullPullInterfaceJson.TYPE_VALUE, jsonObj.toString().getBytes()); Future<RecordMetadata> future = producer.send(record); RecordMetadata meta = future.get(); } catch (Exception e) { Log.error("Error occurred when report full data pulling status.", e); throw new RuntimeException(e); } }
private void loadRunningConf(String reloadMsgJson) { String notifyEvtName = reloadMsgJson == null ? "loaded" : "reloaded"; String loadResultMsg = null; try { this.confMap = FullPullHelper.loadConfProps(zkconnect, topologyId, zkTopoRoot, null); this.commonProps = (Properties) confMap.get(FullPullHelper.RUNNING_CONF_KEY_COMMON); this.dsName = commonProps.getProperty(Constants.ZkTopoConfForFullPull.DATASOURCE_NAME); this.byteProducer = (Producer) confMap.get(FullPullHelper.RUNNING_CONF_KEY_BYTE_PRODUCER); this.zkService = (ZkService) confMap.get(FullPullHelper.RUNNING_CONF_KEY_ZK_SERVICE); loadResultMsg = "Running Config is " + notifyEvtName + " successfully for DataShardsSplittingBolt!"; LOG.info(loadResultMsg); } catch (Exception e) { loadResultMsg = e.getMessage(); LOG.error(notifyEvtName + "ing running configuration encountered Exception!", loadResultMsg); } finally { if (reloadMsgJson != null) { FullPullHelper.saveReloadStatus(reloadMsgJson, "splitting-bolt", false, zkconnect); } } }
public static void main(String[] args) throws Exception { Properties props = new Properties(); props.put("bootstrap.servers", "192.168.77.7:9094,192.168.77.7:9093,192.168.77.7:9092"); props.put("retries", 0); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props); for(int i = 0; i < 10; i++) { producer.send(new ProducerRecord<>("test", Long.toString(System.currentTimeMillis()), Integer.toString(i))); System.out.println("Sent message: " + i); } producer.close(); }
/** * Publish 'numMessages' arbitrary events from live users with the provided delay, to a * Kafka topic. */ public static void publishDataToKafka(int numMessages, int delayInMillis) throws IOException { Producer<String, String> producer = new KafkaProducer<>(kafkaProps); for (int i = 0; i < Math.max(1, numMessages); i++) { Long currTime = System.currentTimeMillis(); String message = generateEvent(currTime, delayInMillis); producer.send(new ProducerRecord<String, String>("game", null, message)); //TODO(fjp): Generalize // TODO(fjp): How do we get late data working? // if (delayInMillis != 0) { // System.out.println(pubsubMessage.getAttributes()); // System.out.println("late data for: " + message); // } // pubsubMessages.add(pubsubMessage); } producer.close(); }
TestStreamTask(final TaskId id, final String applicationId, final Collection<TopicPartition> partitions, final ProcessorTopology topology, final Consumer<byte[], byte[]> consumer, final Producer<byte[], byte[]> producer, final Consumer<byte[], byte[]> restoreConsumer, final StreamsConfig config, final StreamsMetrics metrics, final StateDirectory stateDirectory) { super(id, applicationId, partitions, topology, consumer, new StoreChangelogReader(restoreConsumer, Time.SYSTEM, 5000), config, metrics, stateDirectory, null, new MockTime(), producer); }
/** * Create a File-based repository service * @param partitionData the partition data configuration * @param partitionUrls the partition URL configuration * @param curator the curator framework * @param producer the kafka producer * @param notifications the notification service * @param idSupplier an identifier supplier for new resources * @param async generate cached resources asynchronously if true, synchonously if false * @throws IOException if the directory is not writable */ public FileResourceService(final Map<String, String> partitionData, final Map<String, String> partitionUrls, final CuratorFramework curator, final Producer<String, String> producer, final EventService notifications, final Supplier<String> idSupplier, final Boolean async) throws IOException { super(partitionUrls, producer, curator, notifications, idSupplier, async); requireNonNull(partitionData, "partition data configuration may not be null!"); RESERVED_PARTITION_NAMES.stream().filter(partitionData::containsKey).findAny().ifPresent(name -> { throw new IllegalArgumentException("Invalid partition name: " + name); }); this.partitionData = partitionData; init(); }
public static void sendStringMessage() throws Exception{ Properties props = new Properties(); props.put("bootstrap.servers", servers); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props); //没有任何分区,默认1个分区,发送消息 int i=0; while(i<1000){ Thread.sleep(1000L); String message = "zhangsan"+i; producer.send(new ProducerRecord<>("NL_U_APP_ALARM_APP_STRING",message)); i++; producer.flush(); } producer.close(); }
public static void main(String[] args) throws InterruptedException { Properties props = new Properties(); props.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ACKS_CONFIG, "all"); props.put(RETRIES_CONFIG, 0); props.put(BATCH_SIZE_CONFIG, 16384); props.put(LINGER_MS_CONFIG, 0); props.put(BUFFER_MEMORY_CONFIG, 33554432); props.put(KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.LongSerializer"); props.put(VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); org.apache.kafka.clients.producer.Producer<Long, String> producer = new KafkaProducer<>(props); System.out.println("Start sending!"); for(int i = 1; i <= 12; i++) { producer.send(new ProducerRecord<>("produktion", round(random() * 6) + 1, "Message: " + i)); } System.out.println("done!"); producer.close(); }
/** * Prepare Transporter for production based on Delivery type {@link DeliveryType} * <p/> * Provide {@link Producer} to transporter that will be used to communicate and * send data to kafka broker. * * @param deliveryType * @param producer * @return */ public static Transporter getTransporter(DeliveryType deliveryType, Producer<byte[], byte[]> producer) { Transporter transporter = null; switch (deliveryType) { case NORMAL: transporter = new NormalTransporter(producer); break; case YIElD: transporter = new YieldTransporter(producer); break; default: transporter = new NormalTransporter(producer); break; } return transporter; }
/** {@inheritDoc} */ @Override public void afterPropertiesSet() throws Exception { bindings.put(Exporter.class, FileExporter.class); bindings.put(Serializer.class, JavaSerializer.class); bindings.put(KeyValueManager.class, KeyValueManagerImpl.class); bindings.put(MetadataProvider.class, MetadataProviderImpl.class); bindings.put(MetadataManager.class, InMemoryMetadataManager.class); bindings.put(KeyValueProvider.class, QuasiKafkaKeyValueProvider.class); bindings.put(KeyValueReader.class, SnapshotAwareKeyValueReaderListener.class); bindings.put(IdSequencer.class, InMemoryIdSequencer.class); if (producer != null) { factories.put(Producer.class, factoryOf((Serializable)producer)); } else { factories.put(Producer.class, producerFactory); } List classes = Collections.singletonList(SnapshotAwareKeyValueReaderListener.class); factories.put(List.class, new Injection.ListOf<>(classes)); }
public static void main(String[] args) throws Exception { // set producer properties Properties prop = PropertyFileReader.readPropertyFile(); Properties properties = new Properties(); properties.put("bootstrap.servers", prop.getProperty("kafka.bootstrap.servers")); properties.put("acks", prop.getProperty("kafka.acks")); properties.put("retries",prop.getProperty("kafka.retries")); properties.put("batch.size", prop.getProperty("kafka.batch.size")); properties.put("linger.ms", prop.getProperty("kafka.linger.ms")); properties.put("max.request.size", prop.getProperty("kafka.max.request.size")); properties.put("compression.type", prop.getProperty("kafka.compression.type")); properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // generate event Producer<String, String> producer = new KafkaProducer<String, String>(properties); generateIoTEvent(producer,prop.getProperty("kafka.topic"),prop.getProperty("camera.id"),prop.getProperty("camera.url")); }
public void sendData(String data) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props); Map<MetricName, ? extends Metric> metrics = producer.metrics(); System.out.println(metrics); for (int i = 0; i < 100; i++) producer.send(new ProducerRecord<String, String>("video_view", data)); producer.close(); }
private static void produceRecords(String bootstrapServers) { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); Producer<Long, byte[]> producer = new KafkaProducer<>(properties); LongStream.rangeClosed(1, 100).boxed() .map(number -> new ProducerRecord<>( TOPIC, //topic number, //key String.format("record-%s", number.toString()).getBytes())) //value .forEach(record -> producer.send(record)); producer.close(); }
private static void produceRecords(String bootstrapServers) { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); Producer<String, byte[]> producer = new KafkaProducer<>(properties); IntStream.rangeClosed(1, 100).boxed() .map(number -> new ProducerRecord<>( TOPIC, //topic number.toString(), //key UserAvroSerdes.serialize(new User(String.format("user-%s", number.toString()))))) //value .forEach(record -> producer.send(record)); producer.close(); }
private static void produceRecords(String bootstrapServers) { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); Producer<Integer, String> producer = new KafkaProducer<>(properties); IntStream.rangeClosed(1, 100).boxed() .map(number -> new ProducerRecord<>( TOPIC, number, //Key String.format("record-%s", number))) //Value .forEach(record -> producer.send(record)); producer.close(); }
private static void produceRecords(String bootstrapServers) { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); Producer<Integer, String> producer = new KafkaProducer<>(properties); IntStream.rangeClosed(1, 10000).boxed() .map(number -> new ProducerRecord<>( TOPIC, 1, //Key String.format("record-%s", number))) //Value .forEach(record -> producer.send(record)); producer.close(); }
private static void produceRecords(String bootstrapServers) { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); Producer<Integer, String> producer = new KafkaProducer<>(properties); IntStream .rangeClosed(1, 100000).boxed() .map(number -> new ProducerRecord<>( TOPIC, 1, //Key String.format("record-%s", number))) //Value .forEach(record -> producer.send(record)); producer.close(); }
private static void produceRecords() { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); Producer<Integer, byte[]> producer = new KafkaProducer<>(properties); IntStream.rangeClosed(1, 10000).boxed() .map(number -> new ProducerRecord<>( TOPIC, 1, //Key KafkaProducerUtil.createMessage(1000))) //Value .forEach(record -> { producer.send(record); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } }); producer.close(); }
private static void produceRecords(String bootstrapServers) { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ProducerConfig.ACKS_CONFIG, "all"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); Producer<Integer, String> producer = new KafkaProducer<>(properties); IntStream.rangeClosed(1, 100).boxed() .map(number -> new ProducerRecord<>( TOPIC, number, //Key String.format("record-%s", number))) //Value .forEach(record -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } producer.send(record); }); producer.close(); }
@Test public void shouldInjectSharedProducerForAllTasksUsingClientSupplierOnCreateIfEosDisabled() { builder.addSource("source1", "someTopic"); final StreamThread thread = new StreamThread( builder, config, clientSupplier, applicationId, clientId, processId, metrics, mockTime, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0); final Map<TaskId, Set<TopicPartition>> assignment = new HashMap<>(); assignment.put(new TaskId(0, 0), Collections.singleton(new TopicPartition("someTopic", 0))); assignment.put(new TaskId(0, 1), Collections.singleton(new TopicPartition("someTopic", 1))); thread.setPartitionAssignor(new MockStreamsPartitionAssignor(assignment)); thread.rebalanceListener.onPartitionsAssigned(Collections.singleton(new TopicPartition("someTopic", 0))); assertEquals(1, clientSupplier.producers.size()); final Producer globalProducer = clientSupplier.producers.get(0); assertSame(globalProducer, thread.threadProducer); for (final StreamTask task : thread.tasks().values()) { assertSame(globalProducer, ((RecordCollectorImpl) task.recordCollector()).producer()); } assertSame(clientSupplier.consumer, thread.consumer); assertSame(clientSupplier.restoreConsumer, thread.restoreConsumer); }
@Test public void test() throws Exception { Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka); Properties config = new Properties(); config.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-app"); config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, senderProps.get("bootstrap.servers")); config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); Producer<Integer, String> producer = createProducer(); ProducerRecord<Integer, String> record = new ProducerRecord<>("stream-test", 1, "test"); producer.send(record); final Serde<String> stringSerde = Serdes.String(); final Serde<Integer> intSerde = Serdes.Integer(); KStreamBuilder builder = new KStreamBuilder(); KStream<Integer, String> kStream = builder .stream(intSerde, stringSerde, "stream-test"); kStream.map((key, value) -> new KeyValue<>(key, value + "map")).to("stream-out"); KafkaStreams streams = new KafkaStreams(builder, new StreamsConfig(config), new TracingKafkaClientSupplier(mockTracer)); streams.start(); await().atMost(15, TimeUnit.SECONDS).until(reportedSpansSize(), equalTo(3)); streams.close(); producer.close(); List<MockSpan> spans = mockTracer.finishedSpans(); assertEquals(3, spans.size()); checkSpans(spans); assertNull(mockTracer.activeSpan()); }
@Override public Producer<byte[], byte[]> getProducer(final Map<String, Object> config) { if (applicationId != null) { assertThat((String) config.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG), startsWith(applicationId + "-")); } else { assertFalse(config.containsKey(ProducerConfig.TRANSACTIONAL_ID_CONFIG)); } final MockProducer<byte[], byte[]> producer = new MockProducer<>(true, BYTE_ARRAY_SERIALIZER, BYTE_ARRAY_SERIALIZER); producers.add(producer); return producer; }
@Override public Producer getObject() throws Exception { if (producer == null) { afterPropertiesSet(); } return producer; }
public static void sendWrapperMessage() throws Exception { Properties props = new Properties(); props.put("bootstrap.servers", servers); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "com.gochinatv.spark.kafka.SerializedMessage"); Producer<String, WrapperAppMessage> producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props); //case 1: //没有任何分区,默认1个分区,发送消息 int i=0; while(i<1000){ Thread.sleep(1000L); WrapperAppMessage message = new WrapperAppMessage(); message.setAgreeId((i+1)%5); message.setCityId((i+1)%3); message.setConnectType((i+1)%4); message.setCount((i+100)%10); message.setInstanceId((i+1)%6); message.setProvinceId((i+1)%4); message.setTimestamp(System.currentTimeMillis()); message.setValue((float)((i+200)%4)); producer.send(new ProducerRecord<>("NL_U_APP_ALARM_APP",message)); System.out.println(message.toString()); i++; producer.flush(); } producer.close(); }
@Override public void write(Message message) { Uninterruptibles.awaitUninterruptibly(transportInitialized); LOG.debug("Sending message: {}", message); try { Producer<String, String> producer = getProducer(configuration); producer.send(new ProducerRecord<>(String.valueOf(configuration.getTopic()), "message", message.getMessage())); } catch (Exception e) { LOG.error("Failed to send message", e); } }
public Producer getProducer(Properties props){ if(producerMap.containsKey(props)){ return producerMap.get(props); }else{ Producer producer = new KafkaProducer<>(props); producerMap.put(props, producer); return producer; } }
private static void generateIoTEvent(Producer<String, String> producer, String topic, String camId, String videoUrl) throws Exception { String[] urls = videoUrl.split(","); String[] ids = camId.split(","); if(urls.length != ids.length){ throw new Exception("There should be same number of camera Id and url"); } logger.info("Total urls to process "+urls.length); for(int i=0;i<urls.length;i++){ Thread t = new Thread(new VideoEventGenerator(ids[i].trim(),urls[i].trim(),producer,topic)); t.start(); } }
private Producer<String, String> createProducer() throws Exception { Properties props = PropertiesHolder.getProperties(Constants.Properties.PRODUCER_CONFIG); props.setProperty("client.id", this.topologyId + "_wrapper_" + context.getThisTaskId()); Producer<String, String> producer = new KafkaProducer<>(props); return producer; }
private Producer<String, String> createProducer() throws Exception { Properties props = PropertiesHolder.getProperties(Constants.Properties.PRODUCER_CONFIG); props.setProperty("client.id", this.topologyId + "_writer_" + context.getThisTaskId()); Producer<String, String> producer = new KafkaProducer<>(props); return producer; }
private Producer<String, byte[]> createProducer() throws Exception { Properties props = PropertiesHolder.getProperties(Constants.Properties.PRODUCER_CONTROL); props.setProperty("client.id", this.topologyId + "_control_" + context.getThisTaskId()); Producer<String, byte[]> producer = new KafkaProducer<>(props); return producer; }
private static Producer<String, String> createProducer() throws Exception { Properties props = PropertiesHolder.getProperties(Constants.Properties.PRODUCER_CONFIG); props.setProperty("client.id", "meta-event-warning"); Producer<String, String> producer = new KafkaProducer<>(props); return producer; }
private void loadRunningConf(String reloadMsgJson) { String notifyEvtName = reloadMsgJson == null ? "loaded" : "reloaded"; String loadResultMsg = null; try { this.confMap = FullPullHelper.loadConfProps(zkconnect, topologyId, zkTopoRoot, null); this.commonProps = (Properties) confMap.get(FullPullHelper.RUNNING_CONF_KEY_COMMON); this.dsName = commonProps.getProperty(Constants.ZkTopoConfForFullPull.DATASOURCE_NAME); this.stringProducer = (Producer) confMap.get(FullPullHelper.RUNNING_CONF_KEY_STRING_PRODUCER); this.zkService = (ZkService) confMap.get(FullPullHelper.RUNNING_CONF_KEY_ZK_SERVICE); this.stringProducerProps = (Properties) confMap.get(FullPullHelper.RUNNING_CONF_KEY_STRING_PRODUCER_PROPS); String sendBatchSizeStr = stringProducerProps.getProperty(DataPullConstants.KAFKA_SEND_BATCH_SIZE); String sendRowsStr = stringProducerProps.getProperty(DataPullConstants.KAFKA_SEND_ROWS); if (StringUtils.isNotBlank(sendBatchSizeStr) && (Long.valueOf(sendBatchSizeStr) != kafkaSendBatchSize.get())) { kafkaSendBatchSize.set(Long.valueOf(sendBatchSizeStr)); } if (StringUtils.isNotBlank(sendRowsStr) && (Long.valueOf(sendRowsStr) != kafkaSendRows.get())) { kafkaSendRows.set(Long.valueOf(sendRowsStr)); } loadResultMsg = "Running Config is " + notifyEvtName + " successfully for PagedBatchDataFetchingBolt!"; LOG.info(loadResultMsg); } catch (Exception e) { loadResultMsg = e.getMessage(); LOG.error(notifyEvtName + "ing running configuration encountered Exception!", loadResultMsg); throw e; } finally { if (reloadMsgJson != null) { FullPullHelper.saveReloadStatus(reloadMsgJson, "pulling-dataFetching-bolt", false, zkconnect); } } }
public static void main(String[] args) throws IOException, ParseException { //Kafka Part Properties properties = new Properties(); //set the kafka boostrap Server properties.setProperty("bootstrap.servers", KafkaProperties.KAFKA_SERVER_URL); //tell the client if the key and value is a string or something else properties.setProperty("key.serializer", StringSerializer.class.getName()); properties.setProperty("value.serializer", StringSerializer.class.getName()); //set the acknowledge of the producer to -1, 0, 1 properties.setProperty("acks", "1"); //if there is no connection how often the client should retry it until it stops properties.setProperty("retries", "3"); //it will send ever ms a message otherwise use producer.flush() below where marked properties.setProperty("linger.ms", "1"); //use a truststore and https properties.setProperty("security.protocol",KafkaProperties.SECURITY_PROTOCOL); properties.setProperty("ssl.truststore.location", KafkaProperties.TRUSTSTORE_LOCATION); properties.setProperty("ssl.truststore.password",KafkaProperties.TRUSTSTORE_PASSWORD); properties.setProperty("ssl.endpoint.identification.algorithm",KafkaProperties.ENDPOINT_ALGORITHM); Producer<String, String> producer = new org.apache.kafka.clients.producer.KafkaProducer<String, String>(properties); //Simple Message Producer instead of the for loop => ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("foobar", "2", "Huh!"); for (int key=0; key < 10; key++){ //change here the topic ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(KafkaProperties.TOPIC, Integer.toString(key), "My new keys are here: "+ Integer.toString(key)); producer.send(producerRecord); } //here you could use also producer.flush() to send the message producer.close(); }
/** * @param topic Kafka topic to write the data records to * @param records Data records to write to Kafka * @param producerConfig Kafka producer configuration * @param <K> Key type of the data records * @param <V> Value type of the data records */ public static <K, V> void produceKeyValuesSynchronously( String topic, Collection<KeyValue<K, V>> records, Properties producerConfig) throws ExecutionException, InterruptedException { Producer<K, V> producer = new KafkaProducer<>(producerConfig); for (KeyValue<K, V> record : records) { Future<RecordMetadata> f = producer.send( new ProducerRecord<>(topic, record.key, record.value)); f.get(); } producer.flush(); producer.close(); }
/** * 实例化 * @param config * @return */ public static Producer<byte[], String> getInstance(Map<String, Object> config) { if (producer == null) { synchronized(LazySingletonProducer.class) { if (producer == null) { producer = new KafkaProducer<byte[], String>(config); } } } return producer; }
@Override public void addBook(AddBookCommand command) { Producer<String, String> producer = new KafkaProducer<>(properties); //TODO: move topic to configuration file //TODO: decide what to use for key ProducerRecord<String, String> producerRecord = new ProducerRecord<>("my-topic", "key", command.toString()); producer.send(producerRecord); log.debug("Record sent={}", producerRecord); producer.close(); }