In questo articolo vediamo passo per passo come integrare Apache Kafka in un'applicazione Spring Boot e come far girare il tutto in Docker, usando un semplice esempio di produttore e consumatore di messaggi.
1. Concetti di base
Prima di entrare nel codice, riepiloghiamo i concetti essenziali:
- Broker Kafka: il server che riceve, memorizza e distribuisce i messaggi.
- Topic: canale logico dove i messaggi vengono scritti e letti.
- Producer: applicazione che invia messaggi ad un topic.
- Consumer: applicazione che legge i messaggi da uno o più topic.
- Consumer group: gruppo di consumer che si dividono i messaggi di uno o più topic.
La nostra architettura sarà composta da:
- Un cluster Kafka "mononodo" in Docker (con Zookeeper, se si usa una versione che lo richiede).
- Una app Spring Boot che contiene sia il producer che il consumer.
- Un file
docker-compose.ymlper orchestrare i container.
2. Preparare il progetto Spring Boot
Puoi creare il progetto con Spring Initializr oppure manualmente. I punti fondamentali sono:
- Java 17 (o versione supportata da Spring Boot che stai usando).
- Dipendenze principali:
spring-boot-starter-webspring-kafka
2.1. Esempio di file pom.xml
Un esempio semplificato di pom.xml per un progetto Maven potrebbe essere:
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>kafka-spring-docker</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.3.0</version>
<relativePath/>
</parent>
<properties>
<java.version>17</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
3. Configurare Kafka in Spring Boot
Usiamo un file application.yml per configurare la connessione al broker Kafka e alcune impostazioni base
di producer e consumer.
3.1. Configurazione application.yml
spring:
application:
name: kafka-spring-docker
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: demo-group
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
app:
topic: demo-topic
Nota: quando eseguiremo il tutto in Docker, l'indirizzo del broker non sarà più
localhost, ma il nome del servizio Docker del broker Kafka.
3.2. Classe di configurazione Kafka
Spring Kafka può configurare automaticamente molte cose, ma vediamo una configurazione esplicita per comprendere meglio i bean creati.
package com.example.kafkaspringdocker.config;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
@Configuration
public class KafkaConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.group-id}")
private String consumerGroupId;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return props;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
4. Implementare il producer Kafka
Creiamo un semplice servizio che espone un metodo per inviare messaggi su un topic,
usando KafkaTemplate.
package com.example.kafkaspringdocker.service;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class MessageProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
private final String topic;
public MessageProducer(KafkaTemplate<String, String> kafkaTemplate,
@Value("${app.topic}") String topic) {
this.kafkaTemplate = kafkaTemplate;
this.topic = topic;
}
public void send(String message) {
kafkaTemplate.send(topic, message);
}
}
4.1. Controller REST per inviare messaggi
Per testare facilmente il producer, esponiamo un endpoint REST.
package com.example.kafkaspringdocker.web;
import com.example.kafkaspringdocker.service.MessageProducer;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/api/messages")
public class MessageController {
private final MessageProducer messageProducer;
public MessageController(MessageProducer messageProducer) {
this.messageProducer = messageProducer;
}
@PostMapping
public ResponseEntity<String> sendMessage(@RequestBody String message) {
messageProducer.send(message);
return ResponseEntity.ok("Messaggio inviato a Kafka");
}
}
5. Implementare il consumer Kafka
Per ricevere i messaggi dal topic, con Spring Kafka basta annotare un metodo con
@KafkaListener specificando il topic e l'id del container.
package com.example.kafkaspringdocker.service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class MessageConsumer {
private static final Logger log = LoggerFactory.getLogger(MessageConsumer.class);
@KafkaListener(topics = "${app.topic}", groupId = "${spring.kafka.consumer.group-id}")
public void listen(String message) {
log.info("Messaggio ricevuto da Kafka: {}", message);
}
}
Avviando l'applicazione e inviando una richiesta POST al nostro endpoint, il consumer dovrebbe loggare il messaggio ricevuto.
6. Creare l'immagine Docker della app Spring Boot
Per containerizzare l'app, creiamo un Dockerfile che esegua il jar Spring Boot.
FROM eclipse-temurin:17-jdk-alpine
ARG JAR_FILE=target/kafka-spring-docker-0.0.1-SNAPSHOT.jar
COPY ${JAR_FILE} app.jar
ENTRYPOINT ["java", "-jar", "/app.jar"]
Dopo aver messo il Dockerfile nella radice del progetto, puoi costruire il jar ed
eseguire la build dell'immagine con:
mvn clean package -DskipTests
docker build -t kafka-spring-docker-app .
7. Eseguire Kafka e l'app con Docker Compose
Ora ci serve un ambiente Kafka in Docker e un servizio per la nostra app.
Usiamo docker-compose.yml.
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.6.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- "2181:2181"
kafka:
image: confluentinc/cp-kafka:7.6.0
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
app:
image: kafka-spring-docker-app
depends_on:
- kafka
environment:
SPRING_KAFKA_BOOTSTRAP_SERVERS: kafka:9092
SPRING_KAFKA_CONSUMER_BOOTSTRAP_SERVERS: kafka:9092
SPRING_KAFKA_PRODUCER_BOOTSTRAP_SERVERS: kafka:9092
APP_TOPIC: demo-topic
ports:
- "8080:8080"
7.1. Adattare la configurazione Spring al profilo Docker
Per far sì che la nostra applicazione usi l'indirizzo kafka:9092
quando gira in Docker, possiamo usare un profilo Spring specifico, ad esempio
docker.
Aggiungiamo un file application-docker.yml:
spring:
kafka:
bootstrap-servers: kafka:9092
app:
topic: demo-topic
E impostiamo il profilo attivo via variabile d'ambiente nel servizio Docker:
app:
image: kafka-spring-docker-app
depends_on:
- kafka
environment:
SPRING_PROFILES_ACTIVE: docker
ports:
- "8080:8080"
8. Avviare l'ambiente completo
Una volta che il jar è compilato e l'immagine Docker dell'app è stata creata, possiamo avviare tutto con:
docker-compose up
Dopo l'avvio:
- Kafka sarà in ascolto sulla porta
9092(interna e pubblicata su localhost). - L'app Spring Boot sarà in ascolto sulla porta
8080.
Possiamo inviare un messaggio a Kafka con una semplice chiamata HTTP:
curl -X POST http://localhost:8080/api/messages -H "Content-Type: text/plain" -d "Ciao da Kafka con Spring Boot e Docker"
Nel log del container dell'app dovresti vedere il messaggio ricevuto dal consumer.
9. Debug e problemi comuni
Alcuni errori ricorrenti quando si lavora con Kafka, Spring Boot e Docker:
-
Errore di connessione al broker (Connection refused):
controlla che l'app stia usando il nome host corretto (ad esempio
kafkainvece dilocalhostquando gira in Docker). - Timeout nel consumer: verifica che il topic esista e che l'app sia iscritta con il group id giusto.
-
Messaggi non consumati:
se hai modificato il group id, potresti aver cambiato la posizione degli offset.
In sviluppo, puoi usare
auto-offset-reset: earliestper leggere anche i messaggi passati. - Versioni incompatibili: assicurati che le versioni di Kafka, del client Kafka e di Spring Kafka siano compatibili tra loro.
10. Conclusioni e passi successivi
In questa guida abbiamo visto come:
- Configurare Kafka in un progetto Spring Boot.
- Creare un semplice producer e consumer con Spring Kafka.
- Containerizzare l'app con Docker.
- Orchestrare Kafka e l'app con Docker Compose.
Da qui puoi estendere l'esempio introducendo:
- Messaggi strutturati (JSON) e serializzazione personalizzata.
- Più topic e consumer group separati.
- Pattern come event sourcing e CQRS.
- Monitoraggio e metriche tramite strumenti come Prometheus e Grafana.
Avere Kafka integrato in Spring Boot e gestito via Docker ti offre un ambiente potente, riproducibile e facile da distribuire, adatto sia ai test locali che agli ambienti di produzione.