Dependencies
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<!-- For Tests -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
<scope>test</scope>
<classifier>test-binder</classifier>
<type>test-jar</type>
</dependency>Define Functions
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.time.Duration;
import java.util.Random;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
@Slf4j
@Configuration
public class AmqpConfiguration {
@Bean
public Supplier<Flux<Integer>> fizzBuzzProducer() {
Random random = new Random();
return () -> Flux.interval(Duration.ofSeconds(5)).map(value -> random.nextInt(1000 - 1) + 1).log();
}
@Bean
public Function<Flux<Integer>, Flux<String>> fizzBuzzProcessor() {
return longFlux -> longFlux
.map(i -> i + i)
.log();
}
@Bean
public Consumer<String> fizzBuzzConsumer() {
return (value) -> log.info("Consumer Received : " + value);
}
}Configure Functions
The configuration is similar to the annotation-based approach but the binding name is determined by the framework based on this naming convention: <function name>-in-<index> where <index> is always 0 for most cases unless functions with multiple inputs and outputs.
spring:
cloud:
stream:
function:
definition: fizzBuzzProducer;fizzBuzzProcessor;fizzBuzzConsumer
bindings:
# tells fizzBuzzProducer() to publish to topic `numbers`
fizzBuzzProducer-out-0:
destination: numbers
# tells fizzBuzzProcessor() to subscribe from topic `numbers`
fizzBuzzProcessor-in-0:
destination: numbers
# tells fizzBuzzProcessor() to publish to topic `fizz-buz`
fizzBuzzProcessor-out-0:
destination: fizz-buzz
# tells fizzBuzzConsumer() to subscribe from topic `fizz-buz`
fizzBuzzConsumer-in-0:
destination: fizz-buzz
kafka:
binder:
brokers: localhost:9092
auto-create-topics: true