我得到以下异常,因为我从一个项目生产而消费者从另一个项目消费。我怎样才能解决这个问题。显然,包不一样。因此,如何确保有正确的json序列化。
The class 'com.lte.assessment.assessments.AssessmentAttemptRequest' is not in the trusted packages: [java.util, java.lang, com.lte.assessmentanalytics.model
消费者配置
@EnableKafka @Configuration public class KafkaConfig { static Map<String, Object> config = new HashMap(); static { config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id"); config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); } @Bean public ConsumerFactory<String, AssessmentAttemptRequest> assessmentAttemptDetailsEntityConsumerFactory() { JsonDeserializer<AssessmentAttemptRequest> deserializer = new JsonDeserializer<>(); deserializer.addTrustedPackages("com.lte.assessment.assessments"); return new DefaultKafkaConsumerFactory(config, new StringDeserializer(), deserializer); } }
生产者配置
@Configuration public class KafkaConfiguration { @Bean public ProducerFactory producerConfig() { Map<String, Object> config = new HashMap(); config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); return new DefaultKafkaProducerFactory(config); } @Bean public KafkaTemplate kafkaTemplate() { return new KafkaTemplate(producerConfig()); } @Bean public ConcurrentKafkaListenerContainerFactory aaKafkaListenerFactory() { ConcurrentKafkaListenerContainerFactory<String, AssessmentAttemptDetailsEntity> factory = new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(assessmentAttemptDetailsEntityConsumerFactory()); return factory; } }
您可以通过以下方式将软件包列入白名单assessmentAttemptDetailsEntityConsumerFactory():
assessmentAttemptDetailsEntityConsumerFactory()
@Bean public ConsumerFactory<String, AssessmentAttemptDetailsEntity> assessmentAttemptDetailsEntityConsumerFactory() { JsonDeserializer<AssessmentAttemptDetailsEntity> deserializer = new JsonDeserializer<>(); deserializer.addTrustedPackages("com.lte.assessment.assessments");//your package return new DefaultKafkaConsumerFactory(config,deserializer); }