我有一个普通的java Function; 我试图绑定:
Function
当我在Web上下文中使用函数时,它总是将返回的结果值Function单独返回给Web客户端。我可以做这样的事情吗:
spring.cloud.stream.bindings.input.binder=web spring.cloud.stream.bindings.output.binder=kafka
我目前甚至尝试将分成Function2个:
spring.cloud.stream.sendto.destination
仍然这种方法也不起作用。动态路由(spring.cloud.stream.sendto.destination)重新显示在Web客户端上;但不会Message发送到kafka绑定本身。这是我在第二种方法(2个函数)中使用的代码,希望简单地获得一个Spring功能应用程序以将其输入绑定到Web端点并输出到kafka主题。
Message
WebToKafkaApp.java
@SpringBootApplication public class WebToKafkaApp { public static void main(String[] args) { SpringApplication.run(WebToKafkaApp.class, args); } @Bean public Function<String, Message<String>> webFunction() { return payload -> createPayloadMapperToMessage("kafkaFunction").apply(payload); } @Bean public Function<Flux<Message<String>>, Flux<Message<String>>> kafkaFunction() { return flux -> flux.map(msg -> createPayloadMapperToMessage("").apply(msg.getPayload())); } private Function<String, Message<String>> createPayloadMapperToMessage(String destination) { return payload -> MessageBuilder .withPayload(payload.toUpperCase()) .setHeader("spring.cloud.stream.sendto.destination", destination) .build(); } }
application.yml
spring.cloud.stream.bindings.webFunction-in-0: destination: webFunctionIN contentType: application/json spring.cloud.stream.bindings.webFunction-out-0: destination: webFunctionOUT contentType: application/json spring.cloud.stream.bindings.kafkaFunction-in-0: destination: kafkaFunctionIN contentType: application/json binder: kafka spring.cloud.stream.bindings.kafkaFunction-out-0: destination: kafkaFunctionOUT contentType: application/json binder: kafka spring: cloud: stream: kafka: binder: brokers: localhost:9092 spring.cloud.stream.function.routing.enabled: true spring.cloud.function.definition: webFunction
build.gradle
plugins { id 'org.springframework.boot' version '2.2.1.RELEASE' id 'io.spring.dependency-management' version '1.0.8.RELEASE' id 'java' } group = 'com.example' version = '0.0.1-SNAPSHOT' sourceCompatibility = '1.8' repositories { mavenCentral() } ext { set('springCloudVersion', "Hoxton.RELEASE") } dependencies { implementation 'org.springframework.boot:spring-boot-starter' implementation 'org.springframework.cloud:spring-cloud-starter-function-web' implementation 'org.springframework.cloud:spring-cloud-starter-function-webflux' implementation 'org.springframework.cloud:spring-cloud-stream' implementation 'org.springframework.cloud:spring-cloud-starter-stream-kafka' testImplementation('org.springframework.boot:spring-boot-starter-test') { exclude group: 'org.junit.vintage', module: 'junit-vintage-engine' } } dependencyManagement { imports { mavenBom "org.springframework.cloud:spring-cloud-dependencies:${springCloudVersion}" } } test { useJUnitPlatform() }
任何帮助,将不胜感激。
感谢奥列格张贴这一解决方案背后的想法。从本质上讲,我增强了他的建议,以便大致处理以下两者之间的桥梁:
此解决方案将Oleg示例中描述的问题封装在的自定义实现中Supplier。这种实现公开了一个API来触发,Supplier以发出作为参数传递的消息。这样的类如下所示:
Supplier
import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import java.util.function.Supplier; import reactor.core.publisher.EmitterProcessor; import reactor.core.publisher.Flux; public class StreamSupplier implements Supplier<Flux<?>> { private static final String SPRING_CLOUD_STREAM_SENDTO_DESTINATION = "spring.cloud.stream.sendto.destination"; public static <T> Message<?> createMessage(T payload, String destination) { MessageBuilder<T> builder = MessageBuilder.withPayload(payload); if (destination != null && !destination.isEmpty()) builder.setHeader(SPRING_CLOUD_STREAM_SENDTO_DESTINATION, destination); return builder.build(); } private String defaultDestination; private EmitterProcessor<? super Object> processor = EmitterProcessor.create(); public StreamSupplier() { this(null); } public StreamSupplier(String defaultDestination) { this.defaultDestination = defaultDestination; } // SEND APIs public <T> Message<?> sendMessage(T payload) { return sendMessage(payload, defaultDestination); } public <T> Message<?> sendMessage(T payload, String destination) { return sendBody(createMessage(payload, destination)); } public <T> T sendBody(T body) { processor.onNext(body); return body; } /** * Returns {@link EmitterProcessor} used internally to programmatically publish messages onto * the output binding associated with this {@link Supplier}. Such programmatic publications * are available through the {@code sendXXX} API methods available in this class. */ @Override public Flux<?> get() { return processor; } }
然后,开发人员只需:
bean``Spring``spring-cloud-function``bean``FunctionCatalog
spring-cloud-stream
下面的示例演示了这一点:
import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Controller; import java.util.function.Function; import java.util.function.Supplier; import reactor.core.publisher.Flux; @SpringBootApplication @Controller public class MyApp { public static void main(String[] args) { SpringApplication.run(MyApp.class, "--spring.cloud.function.definition=streamSupplierFunction;webToStreamFunction"); } // Functional Web Controller @Bean public Function<String, String> webToStreamFunction() { return msg -> streamSupplier().sendBody(msg); } // Functional Stream Supplier @Bean public Supplier<Flux<?>> streamSupplierFunction() { return new StreamSupplier(); } // DOUBLE REGISTRATION TO AVOID POLLABLE CONFIGURATION // LIMITATION OF SPRING-CLOUD-FUNCTION @Bean public StreamSupplier streamSupplier() { return (StreamSupplier) streamSupplierFunction(); } }