小编典典

如何在Spring Cloud Stream中配置函数的绑定以将其输入绑定到Web终结点并将其输出绑定到Kafka主题

spring-boot

我有一个普通的java Function; 我试图绑定:

  1. 输入到Web端点
  2. 其输出为kafka主题。

当我在Web上下文中使用函数时,它总是将返回的结果值Function单独返回给Web客户端。我可以做这样的事情吗:

spring.cloud.stream.bindings.input.binder=web
spring.cloud.stream.bindings.output.binder=kafka

我目前甚至尝试将分成Function2个:

  • 一个将输入绑定到Web客户端,将其输出动态绑定到第二个函数(使用spring.cloud.stream.sendto.destination
  • 另一个函数,其输出绑定到kafka绑定。

仍然这种方法也不起作用。动态路由(spring.cloud.stream.sendto.destination)重新显示在Web客户端上;但不会Message发送到kafka绑定本身。这是我在第二种方法(2个函数)中使用的代码,希望简单地获得一个Spring功能应用程序以将其输入绑定到Web端点并输出到kafka主题。

WebToKafkaApp.java

@SpringBootApplication
public class WebToKafkaApp {
    public static void main(String[] args) {
        SpringApplication.run(WebToKafkaApp.class, args);
    }

    @Bean
    public Function<String, Message<String>> webFunction() {
        return payload -> createPayloadMapperToMessage("kafkaFunction").apply(payload);
    }

    @Bean
    public Function<Flux<Message<String>>, Flux<Message<String>>> kafkaFunction() {
        return flux -> flux.map(msg -> createPayloadMapperToMessage("").apply(msg.getPayload()));
    }

    private Function<String, Message<String>> createPayloadMapperToMessage(String destination) {
        return payload -> MessageBuilder
                .withPayload(payload.toUpperCase())
                .setHeader("spring.cloud.stream.sendto.destination", destination)
                .build();
    }
}

application.yml

spring.cloud.stream.bindings.webFunction-in-0:
  destination: webFunctionIN
  contentType: application/json
spring.cloud.stream.bindings.webFunction-out-0:
  destination: webFunctionOUT
  contentType: application/json
spring.cloud.stream.bindings.kafkaFunction-in-0:
  destination: kafkaFunctionIN
  contentType: application/json
  binder: kafka
spring.cloud.stream.bindings.kafkaFunction-out-0:
  destination: kafkaFunctionOUT
  contentType: application/json
  binder: kafka

spring:
  cloud:
    stream:
      kafka:
        binder:
          brokers: localhost:9092

spring.cloud.stream.function.routing.enabled: true
spring.cloud.function.definition: webFunction

build.gradle

plugins {
    id 'org.springframework.boot' version '2.2.1.RELEASE'
    id 'io.spring.dependency-management' version '1.0.8.RELEASE'
    id 'java'
}
group = 'com.example'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '1.8'
repositories {
    mavenCentral()
}
ext {
    set('springCloudVersion', "Hoxton.RELEASE")
}
dependencies {
    implementation 'org.springframework.boot:spring-boot-starter'
    implementation 'org.springframework.cloud:spring-cloud-starter-function-web'
    implementation 'org.springframework.cloud:spring-cloud-starter-function-webflux'
    implementation 'org.springframework.cloud:spring-cloud-stream'
    implementation 'org.springframework.cloud:spring-cloud-starter-stream-kafka'
    testImplementation('org.springframework.boot:spring-boot-starter-test') {
        exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
    }
}
dependencyManagement {
    imports {
        mavenBom "org.springframework.cloud:spring-cloud-dependencies:${springCloudVersion}"
    }
}
test {
    useJUnitPlatform()
}

任何帮助,将不胜感激。


阅读 890

收藏
2020-05-30

共1个答案

小编典典

感谢奥列格张贴这一解决方案背后的想法。从本质上讲,我增强了他的建议,以便大致处理以下两者之间的桥梁:

  1. 功能齐全的网络控制器;可以接收网络请求。
  2. 流供应商;可以将任何消息转发到消息传递基础结构。

此解决方案将Oleg示例中描述的问题封装在的自定义实现中Supplier。这种实现公开了一个API来触发,Supplier以发出作为参数传递的消息。这样的类如下所示:

import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;

import java.util.function.Supplier;

import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;

public class StreamSupplier implements Supplier<Flux<?>> {

    private static final String SPRING_CLOUD_STREAM_SENDTO_DESTINATION =
            "spring.cloud.stream.sendto.destination";

    public static <T> Message<?> createMessage(T payload, String destination) {
        MessageBuilder<T> builder = MessageBuilder.withPayload(payload);
        if (destination != null && !destination.isEmpty())
            builder.setHeader(SPRING_CLOUD_STREAM_SENDTO_DESTINATION, destination);
        return builder.build();
    }

    private String defaultDestination;
    private EmitterProcessor<? super Object> processor = EmitterProcessor.create();

    public StreamSupplier() {
        this(null);
    }

    public StreamSupplier(String defaultDestination) {
        this.defaultDestination = defaultDestination;
    }

    // SEND APIs

    public <T> Message<?> sendMessage(T payload) {
        return sendMessage(payload, defaultDestination);
    }

    public <T> Message<?> sendMessage(T payload, String destination) {
        return sendBody(createMessage(payload, destination));
    }

    public <T> T sendBody(T body) {
        processor.onNext(body);
        return body;
    }

    /**
     * Returns {@link EmitterProcessor} used internally to programmatically publish messages onto
     * the output binding associated with this {@link Supplier}. Such programmatic publications
     * are available through the {@code sendXXX} API methods available in this class.
     */
    @Override
    public Flux<?> get() {
        return processor;
    }
}

然后,开发人员只需:

  1. 在应用程序中将该特定Supplier实现的实例注册为;然后将其扫描到中。bean``Spring``spring-cloud-function``bean``FunctionCatalog
  2. 创建一个Web功能,以使用先前注册的功能将任何消息转发到流式基础Supplier结构-可以使用的所有功能进行配置spring-cloud-stream

下面的示例演示了这一点:

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Controller;

import java.util.function.Function;
import java.util.function.Supplier;

import reactor.core.publisher.Flux;

@SpringBootApplication
@Controller
public class MyApp {

    public static void main(String[] args) {
        SpringApplication.run(MyApp.class,
                "--spring.cloud.function.definition=streamSupplierFunction;webToStreamFunction");
    }

    // Functional Web Controller
    @Bean
    public Function<String, String> webToStreamFunction() {
        return msg -> streamSupplier().sendBody(msg);
    }

    // Functional Stream Supplier
    @Bean
    public Supplier<Flux<?>> streamSupplierFunction() {
        return new StreamSupplier();
    }

    // DOUBLE REGISTRATION TO AVOID POLLABLE CONFIGURATION
    // LIMITATION OF SPRING-CLOUD-FUNCTION
    @Bean
    public StreamSupplier streamSupplier() {
        return (StreamSupplier) streamSupplierFunction();
    }
}
2020-05-30