Usare Apache Kafka con Java Spring Boot e Docker

In questo articolo vediamo passo per passo come integrare Apache Kafka in un'applicazione Spring Boot e come far girare il tutto in Docker, usando un semplice esempio di produttore e consumatore di messaggi.

1. Concetti di base

Prima di entrare nel codice, riepiloghiamo i concetti essenziali:

  • Broker Kafka: il server che riceve, memorizza e distribuisce i messaggi.
  • Topic: canale logico dove i messaggi vengono scritti e letti.
  • Producer: applicazione che invia messaggi ad un topic.
  • Consumer: applicazione che legge i messaggi da uno o più topic.
  • Consumer group: gruppo di consumer che si dividono i messaggi di uno o più topic.

La nostra architettura sarà composta da:

  • Un cluster Kafka "mononodo" in Docker (con Zookeeper, se si usa una versione che lo richiede).
  • Una app Spring Boot che contiene sia il producer che il consumer.
  • Un file docker-compose.yml per orchestrare i container.

2. Preparare il progetto Spring Boot

Puoi creare il progetto con Spring Initializr oppure manualmente. I punti fondamentali sono:

  • Java 17 (o versione supportata da Spring Boot che stai usando).
  • Dipendenze principali:
    • spring-boot-starter-web
    • spring-kafka

2.1. Esempio di file pom.xml

Un esempio semplificato di pom.xml per un progetto Maven potrebbe essere:

<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
         http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.example</groupId>
    <artifactId>kafka-spring-docker</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.3.0</version>
        <relativePath/>
    </parent>

    <properties>
        <java.version>17</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

3. Configurare Kafka in Spring Boot

Usiamo un file application.yml per configurare la connessione al broker Kafka e alcune impostazioni base di producer e consumer.

3.1. Configurazione application.yml

spring:
  application:
    name: kafka-spring-docker
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: demo-group
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

app:
  topic: demo-topic

Nota: quando eseguiremo il tutto in Docker, l'indirizzo del broker non sarà più localhost, ma il nome del servizio Docker del broker Kafka.

3.2. Classe di configurazione Kafka

Spring Kafka può configurare automaticamente molte cose, ma vediamo una configurazione esplicita per comprendere meglio i bean creati.

package com.example.kafkaspringdocker.config;

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

@Configuration
public class KafkaConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.consumer.group-id}")
    private String consumerGroupId;

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return props;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

4. Implementare il producer Kafka

Creiamo un semplice servizio che espone un metodo per inviare messaggi su un topic, usando KafkaTemplate.

package com.example.kafkaspringdocker.service;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class MessageProducer {

    private final KafkaTemplate<String, String> kafkaTemplate;
    private final String topic;

    public MessageProducer(KafkaTemplate<String, String> kafkaTemplate,
                           @Value("${app.topic}") String topic) {
        this.kafkaTemplate = kafkaTemplate;
        this.topic = topic;
    }

    public void send(String message) {
        kafkaTemplate.send(topic, message);
    }
}

4.1. Controller REST per inviare messaggi

Per testare facilmente il producer, esponiamo un endpoint REST.

package com.example.kafkaspringdocker.web;

import com.example.kafkaspringdocker.service.MessageProducer;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/api/messages")
public class MessageController {

    private final MessageProducer messageProducer;

    public MessageController(MessageProducer messageProducer) {
        this.messageProducer = messageProducer;
    }

    @PostMapping
    public ResponseEntity<String> sendMessage(@RequestBody String message) {
        messageProducer.send(message);
        return ResponseEntity.ok("Messaggio inviato a Kafka");
    }
}

5. Implementare il consumer Kafka

Per ricevere i messaggi dal topic, con Spring Kafka basta annotare un metodo con @KafkaListener specificando il topic e l'id del container.

package com.example.kafkaspringdocker.service;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class MessageConsumer {

    private static final Logger log = LoggerFactory.getLogger(MessageConsumer.class);

    @KafkaListener(topics = "${app.topic}", groupId = "${spring.kafka.consumer.group-id}")
    public void listen(String message) {
        log.info("Messaggio ricevuto da Kafka: {}", message);
    }
}

Avviando l'applicazione e inviando una richiesta POST al nostro endpoint, il consumer dovrebbe loggare il messaggio ricevuto.

6. Creare l'immagine Docker della app Spring Boot

Per containerizzare l'app, creiamo un Dockerfile che esegua il jar Spring Boot.

FROM eclipse-temurin:17-jdk-alpine

ARG JAR_FILE=target/kafka-spring-docker-0.0.1-SNAPSHOT.jar
COPY ${JAR_FILE} app.jar

ENTRYPOINT ["java", "-jar", "/app.jar"]

Dopo aver messo il Dockerfile nella radice del progetto, puoi costruire il jar ed eseguire la build dell'immagine con:

mvn clean package -DskipTests
docker build -t kafka-spring-docker-app .

7. Eseguire Kafka e l'app con Docker Compose

Ora ci serve un ambiente Kafka in Docker e un servizio per la nostra app. Usiamo docker-compose.yml.

version: '3.8'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.6.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - "2181:2181"

  kafka:
    image: confluentinc/cp-kafka:7.6.0
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

  app:
    image: kafka-spring-docker-app
    depends_on:
      - kafka
    environment:
      SPRING_KAFKA_BOOTSTRAP_SERVERS: kafka:9092
      SPRING_KAFKA_CONSUMER_BOOTSTRAP_SERVERS: kafka:9092
      SPRING_KAFKA_PRODUCER_BOOTSTRAP_SERVERS: kafka:9092
      APP_TOPIC: demo-topic
    ports:
      - "8080:8080"

7.1. Adattare la configurazione Spring al profilo Docker

Per far sì che la nostra applicazione usi l'indirizzo kafka:9092 quando gira in Docker, possiamo usare un profilo Spring specifico, ad esempio docker.

Aggiungiamo un file application-docker.yml:

spring:
  kafka:
    bootstrap-servers: kafka:9092

app:
  topic: demo-topic

E impostiamo il profilo attivo via variabile d'ambiente nel servizio Docker:

  app:
    image: kafka-spring-docker-app
    depends_on:
      - kafka
    environment:
      SPRING_PROFILES_ACTIVE: docker
    ports:
      - "8080:8080"

8. Avviare l'ambiente completo

Una volta che il jar è compilato e l'immagine Docker dell'app è stata creata, possiamo avviare tutto con:

docker-compose up

Dopo l'avvio:

  • Kafka sarà in ascolto sulla porta 9092 (interna e pubblicata su localhost).
  • L'app Spring Boot sarà in ascolto sulla porta 8080.

Possiamo inviare un messaggio a Kafka con una semplice chiamata HTTP:

curl -X POST http://localhost:8080/api/messages      -H "Content-Type: text/plain"      -d "Ciao da Kafka con Spring Boot e Docker"

Nel log del container dell'app dovresti vedere il messaggio ricevuto dal consumer.

9. Debug e problemi comuni

Alcuni errori ricorrenti quando si lavora con Kafka, Spring Boot e Docker:

  • Errore di connessione al broker (Connection refused): controlla che l'app stia usando il nome host corretto (ad esempio kafka invece di localhost quando gira in Docker).
  • Timeout nel consumer: verifica che il topic esista e che l'app sia iscritta con il group id giusto.
  • Messaggi non consumati: se hai modificato il group id, potresti aver cambiato la posizione degli offset. In sviluppo, puoi usare auto-offset-reset: earliest per leggere anche i messaggi passati.
  • Versioni incompatibili: assicurati che le versioni di Kafka, del client Kafka e di Spring Kafka siano compatibili tra loro.

10. Conclusioni e passi successivi

In questa guida abbiamo visto come:

  • Configurare Kafka in un progetto Spring Boot.
  • Creare un semplice producer e consumer con Spring Kafka.
  • Containerizzare l'app con Docker.
  • Orchestrare Kafka e l'app con Docker Compose.

Da qui puoi estendere l'esempio introducendo:

  • Messaggi strutturati (JSON) e serializzazione personalizzata.
  • Più topic e consumer group separati.
  • Pattern come event sourcing e CQRS.
  • Monitoraggio e metriche tramite strumenti come Prometheus e Grafana.

Avere Kafka integrato in Spring Boot e gestito via Docker ti offre un ambiente potente, riproducibile e facile da distribuire, adatto sia ai test locali che agli ambienti di produzione.

Torna su