Dependencies

<dependency>
	<groupId>org.springframework.cloud</groupId>
	<artifactId>spring-cloud-stream</artifactId>
</dependency>
 
<!-- For Tests -->
<dependency>
	<groupId>org.springframework.cloud</groupId>
	<artifactId>spring-cloud-stream</artifactId>
	<scope>test</scope>
	<classifier>test-binder</classifier>
	<type>test-jar</type>
</dependency>

Define Functions

import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.time.Duration;
import java.util.Random;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
 
@Slf4j
@Configuration
public class AmqpConfiguration {
 
    @Bean
    public Supplier<Flux<Integer>> fizzBuzzProducer() {
	    Random random = new Random();
        return () -> Flux.interval(Duration.ofSeconds(5)).map(value -> random.nextInt(1000 - 1) + 1).log();
    }
 
    @Bean
    public Function<Flux<Integer>, Flux<String>> fizzBuzzProcessor() {
        return longFlux -> longFlux
                .map(i -> i + i)
                .log();
    }
 
    @Bean
    public Consumer<String> fizzBuzzConsumer() {
        return (value) -> log.info("Consumer Received : " + value);
    }
}

Configure Functions

The configuration is similar to the annotation-based approach but the binding name is determined by the framework based on this naming convention: <function name>-in-<index> where <index> is always 0 for most cases unless functions with multiple inputs and outputs.

spring:
  cloud:
    stream:
      function:
        definition: fizzBuzzProducer;fizzBuzzProcessor;fizzBuzzConsumer
 
      bindings:
		# tells fizzBuzzProducer() to publish to topic `numbers`
        fizzBuzzProducer-out-0:
          destination: numbers
 
		# tells fizzBuzzProcessor() to subscribe from topic `numbers`
        fizzBuzzProcessor-in-0:
          destination: numbers
		# tells fizzBuzzProcessor() to publish to topic `fizz-buz`
        fizzBuzzProcessor-out-0:
          destination: fizz-buzz
 
		# tells fizzBuzzConsumer() to subscribe from topic `fizz-buz`
        fizzBuzzConsumer-in-0:
          destination: fizz-buzz
 
      kafka:
        binder:
          brokers: localhost:9092
          auto-create-topics: true

Resources