In questo articolo vediamo come applicare il pattern CQRS (Command Query Responsibility Segregation) in un progetto Laravel, usando Kafka come event bus e Docker per orchestrare l'ambiente di sviluppo.
1. Cos'è CQRS e perché usarlo con Laravel
CQRS separa in modo esplicito le operazioni che modificano lo stato dell'applicazione (Command) da quelle che leggono lo stato (Query). In un tipico progetto Laravel, spesso i controller chiamano direttamente i model Eloquent sia per scrivere che per leggere, con il rischio di mescolare logica, accoppiamenti forti e difficoltà di scalare.
Con CQRS avremo:
- Command side: responsabile di accettare i comandi (scritture), validare, applicare la logica di dominio e pubblicare eventi.
- Query side: responsabile di leggere i dati, spesso da un modello di lettura ottimizzato (read model).
Kafka entra in gioco come message broker per distribuire gli eventi generati dal lato Command verso uno o più consumer che aggiornano i read model o integrano altri servizi.
2. Architettura di riferimento
L'architettura a grandi linee sarà questa:
- Laravel App
- Endpoint HTTP (API REST) che espongono Command e Query.
- Strato di dominio con Command, Query, Handler ed Event.
- Producer Kafka che pubblica eventi sul broker.
- Consumer Kafka (eseguiti come processi separati) che aggiornano i read model.
- Kafka + Zookeeper (in Docker): broker che riceve gli eventi dal producer e li consegna ai consumer.
- Database (es. MySQL o PostgreSQL): può avere una o più tabelle dedicate ai read model.
3. Setup del progetto Laravel
3.1 Creazione del progetto
composer create-project laravel/laravel cqrs-kafka-demo
cd cqrs-kafka-demo
3.2 Dipendenze per Kafka
Esistono vari package per integrare Kafka con Laravel. Uno dei più usati è, ad esempio,
junges/laravel-kafka. Installiamolo:
composer require mateusjunges/laravel-kafka
Pubblica eventualmente la configurazione (il comando può variare a seconda della versione del package):
php artisan vendor:publish --provider="Junges\Kafka\KafkaServiceProvider"
4. Modellare Command e Query
Supponiamo di gestire un caso semplice: creazione e lettura di ordini. Vogliamo:
- Un comando
CreateOrderCommandche crea un ordine. - Un evento
OrderCreatedpubblicato su Kafka. - Un consumer Kafka che aggiorna una tabella
orders_read. - Una query
GetOrderByIdQueryche legge dal read model.
4.1 Struttura delle cartelle
Possiamo creare una semplice organizzazione all'interno di app/:
app/
Domain/
Orders/
Commands/
Queries/
Events/
Handlers/
Models/
ReadModel/
Orders/
4.2 Definizione del Command
In app/Domain/Orders/Commands/CreateOrderCommand.php:
<?php
namespace App\Domain\Orders\Commands;
class CreateOrderCommand
{
public function __construct(
public readonly string $customerId,
public readonly array $items,
public readonly float $total
) {
}
}
4.3 Handler del Command
In app/Domain/Orders/Handlers/CreateOrderHandler.php:
<?php
namespace App\Domain\Orders\Handlers;
use App\Domain\Orders\Commands\CreateOrderCommand;
use App\Domain\Orders\Events\OrderCreated;
use App\Models\Order;
use Junges\Kafka\Contracts\KafkaProducer;
class CreateOrderHandler
{
public function __construct(
private KafkaProducer $producer
) {
}
public function handle(CreateOrderCommand $command): Order
{
$order = new Order();
$order->customer_id = $command->customerId;
$order->items = $command->items;
$order->total = $command->total;
$order->status = 'created';
$order->save();
$event = new OrderCreated(
orderId: $order->id,
customerId: $order->customer_id,
total: $order->total
);
$this->producer
->withBodyKey('order_id', $event->orderId)
->withBodyKey('customer_id', $event->customerId)
->withBodyKey('total', $event->total)
->withTopic('orders.created')
->send();
return $order;
}
}
4.4 Evento di dominio
In app/Domain/Orders/Events/OrderCreated.php:
<?php
namespace App\Domain\Orders\Events;
class OrderCreated
{
public function __construct(
public readonly int $orderId,
public readonly string $customerId,
public readonly float $total
) {
}
}
4.5 Controller per esporre il Command
Invece di mettere logica di business nel controller, invochiamo il Command handler.
In app/Http/Controllers/OrderCommandController.php:
<?php
namespace App\Http\Controllers;
use App\Domain\Orders\Commands\CreateOrderCommand;
use App\Domain\Orders\Handlers\CreateOrderHandler;
use Illuminate\Http\Request;
use Illuminate\Http\JsonResponse;
class OrderCommandController extends Controller
{
public function __construct(
private CreateOrderHandler $handler
) {
}
public function store(Request $request): JsonResponse
{
$validated = $request->validate([
'customer_id' => ['required', 'string'],
'items' => ['required', 'array'],
'total' => ['required', 'numeric'],
]);
$command = new CreateOrderCommand(
customerId: $validated['customer_id'],
items: $validated['items'],
total: (float) $validated['total']
);
$order = $this->handler->handle($command);
return response()->json([
'id' => $order->id,
'status' => $order->status,
], 201);
}
}
Aggiungiamo la route in routes/api.php:
<?php
use App\Http\Controllers\OrderCommandController;
use Illuminate\Support\Facades\Route;
Route::post('/orders', [OrderCommandController::class, 'store']);
5. Lato Query: read model e Query object
5.1 Read model
Possiamo creare una tabella dedicata ai read model, ad esempio orders_read. Migrazione semplificata:
<?php
use Illuminate\Database\Migrations\Migration;
use Illuminate\Database\Schema\Blueprint;
use Illuminate\Support\Facades\Schema;
return new class extends Migration {
public function up(): void
{
Schema::create('orders_read', function (Blueprint $table) {
$table->id();
$table->unsignedBigInteger('order_id')->unique();
$table->string('customer_id');
$table->decimal('total', 10, 2);
$table->string('status');
$table->timestamps();
});
}
public function down(): void
{
Schema::dropIfExists('orders_read');
}
};
E il relativo model app/ReadModel/Orders/OrderRead.php:
<?php
namespace App\ReadModel\Orders;
use Illuminate\Database\Eloquent\Model;
class OrderRead extends Model
{
protected $table = 'orders_read';
protected $fillable = [
'order_id',
'customer_id',
'total',
'status',
];
}
5.2 Query object e handler
In app/Domain/Orders/Queries/GetOrderByIdQuery.php:
<?php
namespace App\Domain\Orders\Queries;
class GetOrderByIdQuery
{
public function __construct(
public readonly int $orderId
) {
}
}
E il relativo handler in app/Domain/Orders/Handlers/GetOrderByIdHandler.php:
<?php
namespace App\Domain\Orders\Handlers;
use App\Domain\Orders\Queries\GetOrderByIdQuery;
use App\ReadModel\Orders\OrderRead;
use Illuminate\Database\Eloquent\ModelNotFoundException;
class GetOrderByIdHandler
{
public function handle(GetOrderByIdQuery $query): OrderRead
{
$order = OrderRead::where('order_id', $query->orderId)->first();
if (! $order) {
throw new ModelNotFoundException('Order not found in read model.');
}
return $order;
}
}
5.3 Controller per le Query
In app/Http/Controllers/OrderQueryController.php:
<?php
namespace App\Http\Controllers;
use App\Domain\Orders\Handlers\GetOrderByIdHandler;
use App\Domain\Orders\Queries\GetOrderByIdQuery;
use Illuminate\Database\Eloquent\ModelNotFoundException;
use Illuminate\Http\JsonResponse;
class OrderQueryController extends Controller
{
public function __construct(
private GetOrderByIdHandler $handler
) {
}
public function show(int $orderId): JsonResponse
{
try {
$query = new GetOrderByIdQuery($orderId);
$order = $this->handler->handle($query);
return response()->json($order);
} catch (ModelNotFoundException $e) {
return response()->json(['message' => 'Order not found'], 404);
}
}
}
Route corrispondente in routes/api.php:
Route::get('/orders/{id}', [OrderQueryController::class, 'show']);
6. Integrazione con Kafka: consumer e proiezioni
6.1 Consumer Kafka in Laravel
Per aggiornare il read model, creiamo un consumer Kafka che ascolta il topic
orders.created e aggiorna la tabella orders_read.
Possiamo incapsulare la logica dentro un command artisan. In
app/Console/Commands/ConsumeOrdersCreated.php:
<?php
namespace App\Console\Commands;
use App\ReadModel\Orders\OrderRead;
use Illuminate\Console\Command;
use Junges\Kafka\Contracts\KafkaConsumer;
use Junges\Kafka\Facades\Kafka;
class ConsumeOrdersCreated extends Command
{
protected $signature = 'kafka:consume-orders-created';
protected $description = 'Consuma gli eventi orders.created da Kafka e aggiorna il read model.';
public function handle(): int
{
$consumer = Kafka::createConsumer()
->subscribe('orders.created')
->withHandler(function ($message) {
$payload = $message->getBody();
OrderRead::updateOrCreate(
['order_id' => $payload['order_id']],
[
'customer_id' => $payload['customer_id'],
'total' => $payload['total'],
'status' => 'created',
]
);
$this->info('Order read model updated for order ' . $payload['order_id']);
})
->build();
$consumer->consume();
return self::SUCCESS;
}
}
6.2 Avviare il consumer
Una volta che l'ambiente Docker è in esecuzione, potrai lanciare il consumer:
php artisan kafka:consume-orders-created
In produzione, questo processo dovrebbe essere gestito da un supervisor (es. Supervisor, systemd, Kubernetes, ecc.).
7. Configurare Kafka e Laravel con Docker
7.1 Dockerfile per Laravel
Esempio minimalista di Dockerfile per l'app Laravel (PHP FPM):
FROM php:8.3-fpm
RUN apt-get update && apt-get install -y git unzip libpq-dev libzip-dev && docker-php-ext-install pdo pdo_mysql pdo_pgsql zip
COPY --from=composer:2 /usr/bin/composer /usr/bin/composer
WORKDIR /var/www/html
COPY . .
RUN composer install --no-interaction --prefer-dist --optimize-autoloader
RUN cp .env.example .env && php artisan key:generate
CMD ["php-fpm"]
7.2 docker-compose.yml con Kafka
Esempio di docker-compose.yml semplificato con:
- App Laravel (php-fpm)
- Nginx
- MySQL
- Kafka + Zookeeper
version: "3.8"
services:
app:
build: .
container_name: cqrs_app
volumes:
- .:/var/www/html
depends_on:
- db
- kafka
nginx:
image: nginx:1.27
container_name: cqrs_nginx
ports:
- "8080:80"
volumes:
- .:/var/www/html
- ./docker/nginx.conf:/etc/nginx/conf.d/default.conf
depends_on:
- app
db:
image: mysql:8.0
container_name: cqrs_db
environment:
MYSQL_ROOT_PASSWORD: root
MYSQL_DATABASE: cqrs
MYSQL_USER: cqrs
MYSQL_PASSWORD: cqrs
ports:
- "3306:3306"
zookeeper:
image: confluentinc/cp-zookeeper:7.7.0
container_name: cqrs_zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- "2181:2181"
kafka:
image: confluentinc/cp-kafka:7.7.0
container_name: cqrs_kafka
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
7.3 Configurazione di Laravel per Kafka
Nel file .env aggiungi ad esempio:
KAFKA_BROKERS=kafka:9092
E in config/kafka.php verifica che il valore dei broker venga letto dall'environment:
<?php
return [
'broker' => env('KAFKA_BROKERS', 'kafka:9092'),
// Altri parametri specifici del package...
];
7.4 Avvio dell'ambiente
Avvia tutto con:
docker compose up -d
Esegui eventualmente le migrazioni:
docker exec -it cqrs_app php artisan migrate
8. Flusso end-to-end
Una volta che tutto è in esecuzione, il flusso di lavoro sarà:
- Il client invia una richiesta POST a
/api/orderscon i dati dell'ordine. - Il controller chiama il
CreateOrderHandlerche:- salva l'ordine nel database di scrittura,
- pubblica un evento
orders.createdsu Kafka.
- Il command artisan
kafka:consume-orders-createdlegge il messaggio da Kafka e aggiornaorders_read. - Una richiesta GET a
/api/orders/{id}chiama ilGetOrderByIdHandlerche legge dal read model.
9. Buone pratiche e passi successivi
- Definisci bene i confini del dominio: CQRS è più utile per domini complessi, non per CRUD banali.
- Evita ottimizzazioni premature: puoi introdurre CQRS gradualmente, a partire dalle parti più critiche.
- Monitora i consumer: il flusso di eventi è fondamentale, quindi log e alert dei consumer Kafka sono essenziali.
- Versiona gli eventi: in sistemi reali, gli eventi cambiano nel tempo; usa versioning del payload per evitare rotture.
- Read model multipli: puoi avere più read model per casi d'uso diversi, alimentati dagli stessi eventi.
Con questa base hai una struttura concreta per usare CQRS in Laravel sfruttando Kafka come bus di eventi e Docker per isolare e replicare facilmente l'intero ambiente. Da qui puoi estendere il dominio, aggiungere nuovi eventi, curare la sicurezza, gestire la scalabilità e integrare altri servizi che consumano gli stessi stream Kafka.