Usare il pattern CQRS in Laravel con Kafka e Docker

In questo articolo vediamo come applicare il pattern CQRS (Command Query Responsibility Segregation) in un progetto Laravel, usando Kafka come event bus e Docker per orchestrare l'ambiente di sviluppo.

1. Cos'è CQRS e perché usarlo con Laravel

CQRS separa in modo esplicito le operazioni che modificano lo stato dell'applicazione (Command) da quelle che leggono lo stato (Query). In un tipico progetto Laravel, spesso i controller chiamano direttamente i model Eloquent sia per scrivere che per leggere, con il rischio di mescolare logica, accoppiamenti forti e difficoltà di scalare.

Con CQRS avremo:

  • Command side: responsabile di accettare i comandi (scritture), validare, applicare la logica di dominio e pubblicare eventi.
  • Query side: responsabile di leggere i dati, spesso da un modello di lettura ottimizzato (read model).

Kafka entra in gioco come message broker per distribuire gli eventi generati dal lato Command verso uno o più consumer che aggiornano i read model o integrano altri servizi.

2. Architettura di riferimento

L'architettura a grandi linee sarà questa:

  • Laravel App
    • Endpoint HTTP (API REST) che espongono Command e Query.
    • Strato di dominio con Command, Query, Handler ed Event.
    • Producer Kafka che pubblica eventi sul broker.
    • Consumer Kafka (eseguiti come processi separati) che aggiornano i read model.
  • Kafka + Zookeeper (in Docker): broker che riceve gli eventi dal producer e li consegna ai consumer.
  • Database (es. MySQL o PostgreSQL): può avere una o più tabelle dedicate ai read model.

3. Setup del progetto Laravel

3.1 Creazione del progetto

composer create-project laravel/laravel cqrs-kafka-demo
cd cqrs-kafka-demo

3.2 Dipendenze per Kafka

Esistono vari package per integrare Kafka con Laravel. Uno dei più usati è, ad esempio, junges/laravel-kafka. Installiamolo:

composer require mateusjunges/laravel-kafka

Pubblica eventualmente la configurazione (il comando può variare a seconda della versione del package):

php artisan vendor:publish --provider="Junges\Kafka\KafkaServiceProvider"

4. Modellare Command e Query

Supponiamo di gestire un caso semplice: creazione e lettura di ordini. Vogliamo:

  • Un comando CreateOrderCommand che crea un ordine.
  • Un evento OrderCreated pubblicato su Kafka.
  • Un consumer Kafka che aggiorna una tabella orders_read.
  • Una query GetOrderByIdQuery che legge dal read model.

4.1 Struttura delle cartelle

Possiamo creare una semplice organizzazione all'interno di app/:

app/
  Domain/
    Orders/
      Commands/
      Queries/
      Events/
      Handlers/
      Models/
  ReadModel/
    Orders/

4.2 Definizione del Command

In app/Domain/Orders/Commands/CreateOrderCommand.php:

<?php

namespace App\Domain\Orders\Commands;

class CreateOrderCommand
{
    public function __construct(
        public readonly string $customerId,
        public readonly array $items,
        public readonly float $total
    ) {
    }
}

4.3 Handler del Command

In app/Domain/Orders/Handlers/CreateOrderHandler.php:

<?php

namespace App\Domain\Orders\Handlers;

use App\Domain\Orders\Commands\CreateOrderCommand;
use App\Domain\Orders\Events\OrderCreated;
use App\Models\Order;
use Junges\Kafka\Contracts\KafkaProducer;

class CreateOrderHandler
{
    public function __construct(
        private KafkaProducer $producer
    ) {
    }

    public function handle(CreateOrderCommand $command): Order
    {
        $order = new Order();
        $order->customer_id = $command->customerId;
        $order->items = $command->items;
        $order->total = $command->total;
        $order->status = 'created';
        $order->save();

        $event = new OrderCreated(
            orderId: $order->id,
            customerId: $order->customer_id,
            total: $order->total
        );

        $this->producer
            ->withBodyKey('order_id', $event->orderId)
            ->withBodyKey('customer_id', $event->customerId)
            ->withBodyKey('total', $event->total)
            ->withTopic('orders.created')
            ->send();

        return $order;
    }
}

4.4 Evento di dominio

In app/Domain/Orders/Events/OrderCreated.php:

<?php

namespace App\Domain\Orders\Events;

class OrderCreated
{
    public function __construct(
        public readonly int $orderId,
        public readonly string $customerId,
        public readonly float $total
    ) {
    }
}

4.5 Controller per esporre il Command

Invece di mettere logica di business nel controller, invochiamo il Command handler. In app/Http/Controllers/OrderCommandController.php:

<?php

namespace App\Http\Controllers;

use App\Domain\Orders\Commands\CreateOrderCommand;
use App\Domain\Orders\Handlers\CreateOrderHandler;
use Illuminate\Http\Request;
use Illuminate\Http\JsonResponse;

class OrderCommandController extends Controller
{
    public function __construct(
        private CreateOrderHandler $handler
    ) {
    }

    public function store(Request $request): JsonResponse
    {
        $validated = $request->validate([
            'customer_id' => ['required', 'string'],
            'items' => ['required', 'array'],
            'total' => ['required', 'numeric'],
        ]);

        $command = new CreateOrderCommand(
            customerId: $validated['customer_id'],
            items: $validated['items'],
            total: (float) $validated['total']
        );

        $order = $this->handler->handle($command);

        return response()->json([
            'id' => $order->id,
            'status' => $order->status,
        ], 201);
    }
}

Aggiungiamo la route in routes/api.php:

<?php

use App\Http\Controllers\OrderCommandController;
use Illuminate\Support\Facades\Route;

Route::post('/orders', [OrderCommandController::class, 'store']);

5. Lato Query: read model e Query object

5.1 Read model

Possiamo creare una tabella dedicata ai read model, ad esempio orders_read. Migrazione semplificata:

<?php

use Illuminate\Database\Migrations\Migration;
use Illuminate\Database\Schema\Blueprint;
use Illuminate\Support\Facades\Schema;

return new class extends Migration {
    public function up(): void
    {
        Schema::create('orders_read', function (Blueprint $table) {
            $table->id();
            $table->unsignedBigInteger('order_id')->unique();
            $table->string('customer_id');
            $table->decimal('total', 10, 2);
            $table->string('status');
            $table->timestamps();
        });
    }

    public function down(): void
    {
        Schema::dropIfExists('orders_read');
    }
};

E il relativo model app/ReadModel/Orders/OrderRead.php:

<?php

namespace App\ReadModel\Orders;

use Illuminate\Database\Eloquent\Model;

class OrderRead extends Model
{
    protected $table = 'orders_read';

    protected $fillable = [
        'order_id',
        'customer_id',
        'total',
        'status',
    ];
}

5.2 Query object e handler

In app/Domain/Orders/Queries/GetOrderByIdQuery.php:

<?php

namespace App\Domain\Orders\Queries;

class GetOrderByIdQuery
{
    public function __construct(
        public readonly int $orderId
    ) {
    }
}

E il relativo handler in app/Domain/Orders/Handlers/GetOrderByIdHandler.php:

<?php

namespace App\Domain\Orders\Handlers;

use App\Domain\Orders\Queries\GetOrderByIdQuery;
use App\ReadModel\Orders\OrderRead;
use Illuminate\Database\Eloquent\ModelNotFoundException;

class GetOrderByIdHandler
{
    public function handle(GetOrderByIdQuery $query): OrderRead
    {
        $order = OrderRead::where('order_id', $query->orderId)->first();

        if (! $order) {
            throw new ModelNotFoundException('Order not found in read model.');
        }

        return $order;
    }
}

5.3 Controller per le Query

In app/Http/Controllers/OrderQueryController.php:

<?php

namespace App\Http\Controllers;

use App\Domain\Orders\Handlers\GetOrderByIdHandler;
use App\Domain\Orders\Queries\GetOrderByIdQuery;
use Illuminate\Database\Eloquent\ModelNotFoundException;
use Illuminate\Http\JsonResponse;

class OrderQueryController extends Controller
{
    public function __construct(
        private GetOrderByIdHandler $handler
    ) {
    }

    public function show(int $orderId): JsonResponse
    {
        try {
            $query = new GetOrderByIdQuery($orderId);
            $order = $this->handler->handle($query);

            return response()->json($order);
        } catch (ModelNotFoundException $e) {
            return response()->json(['message' => 'Order not found'], 404);
        }
    }
}

Route corrispondente in routes/api.php:

Route::get('/orders/{id}', [OrderQueryController::class, 'show']);

6. Integrazione con Kafka: consumer e proiezioni

6.1 Consumer Kafka in Laravel

Per aggiornare il read model, creiamo un consumer Kafka che ascolta il topic orders.created e aggiorna la tabella orders_read.

Possiamo incapsulare la logica dentro un command artisan. In app/Console/Commands/ConsumeOrdersCreated.php:

<?php

namespace App\Console\Commands;

use App\ReadModel\Orders\OrderRead;
use Illuminate\Console\Command;
use Junges\Kafka\Contracts\KafkaConsumer;
use Junges\Kafka\Facades\Kafka;

class ConsumeOrdersCreated extends Command
{
    protected $signature = 'kafka:consume-orders-created';

    protected $description = 'Consuma gli eventi orders.created da Kafka e aggiorna il read model.';

    public function handle(): int
    {
        $consumer = Kafka::createConsumer()
            ->subscribe('orders.created')
            ->withHandler(function ($message) {
                $payload = $message->getBody();

                OrderRead::updateOrCreate(
                    ['order_id' => $payload['order_id']],
                    [
                        'customer_id' => $payload['customer_id'],
                        'total' => $payload['total'],
                        'status' => 'created',
                    ]
                );

                $this->info('Order read model updated for order ' . $payload['order_id']);
            })
            ->build();

        $consumer->consume();

        return self::SUCCESS;
    }
}

6.2 Avviare il consumer

Una volta che l'ambiente Docker è in esecuzione, potrai lanciare il consumer:

php artisan kafka:consume-orders-created

In produzione, questo processo dovrebbe essere gestito da un supervisor (es. Supervisor, systemd, Kubernetes, ecc.).

7. Configurare Kafka e Laravel con Docker

7.1 Dockerfile per Laravel

Esempio minimalista di Dockerfile per l'app Laravel (PHP FPM):

FROM php:8.3-fpm

RUN apt-get update && apt-get install -y     git unzip libpq-dev libzip-dev     && docker-php-ext-install pdo pdo_mysql pdo_pgsql zip

COPY --from=composer:2 /usr/bin/composer /usr/bin/composer

WORKDIR /var/www/html

COPY . .

RUN composer install --no-interaction --prefer-dist --optimize-autoloader

RUN cp .env.example .env && php artisan key:generate

CMD ["php-fpm"]

7.2 docker-compose.yml con Kafka

Esempio di docker-compose.yml semplificato con:

  • App Laravel (php-fpm)
  • Nginx
  • MySQL
  • Kafka + Zookeeper
version: "3.8"

services:
  app:
    build: .
    container_name: cqrs_app
    volumes:
      - .:/var/www/html
    depends_on:
      - db
      - kafka

  nginx:
    image: nginx:1.27
    container_name: cqrs_nginx
    ports:
      - "8080:80"
    volumes:
      - .:/var/www/html
      - ./docker/nginx.conf:/etc/nginx/conf.d/default.conf
    depends_on:
      - app

  db:
    image: mysql:8.0
    container_name: cqrs_db
    environment:
      MYSQL_ROOT_PASSWORD: root
      MYSQL_DATABASE: cqrs
      MYSQL_USER: cqrs
      MYSQL_PASSWORD: cqrs
    ports:
      - "3306:3306"

  zookeeper:
    image: confluentinc/cp-zookeeper:7.7.0
    container_name: cqrs_zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - "2181:2181"

  kafka:
    image: confluentinc/cp-kafka:7.7.0
    container_name: cqrs_kafka
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

7.3 Configurazione di Laravel per Kafka

Nel file .env aggiungi ad esempio:

KAFKA_BROKERS=kafka:9092

E in config/kafka.php verifica che il valore dei broker venga letto dall'environment:

<?php

return [
    'broker' => env('KAFKA_BROKERS', 'kafka:9092'),

    // Altri parametri specifici del package...
];

7.4 Avvio dell'ambiente

Avvia tutto con:

docker compose up -d

Esegui eventualmente le migrazioni:

docker exec -it cqrs_app php artisan migrate

8. Flusso end-to-end

Una volta che tutto è in esecuzione, il flusso di lavoro sarà:

  1. Il client invia una richiesta POST a /api/orders con i dati dell'ordine.
  2. Il controller chiama il CreateOrderHandler che:
    • salva l'ordine nel database di scrittura,
    • pubblica un evento orders.created su Kafka.
  3. Il command artisan kafka:consume-orders-created legge il messaggio da Kafka e aggiorna orders_read.
  4. Una richiesta GET a /api/orders/{id} chiama il GetOrderByIdHandler che legge dal read model.

9. Buone pratiche e passi successivi

  • Definisci bene i confini del dominio: CQRS è più utile per domini complessi, non per CRUD banali.
  • Evita ottimizzazioni premature: puoi introdurre CQRS gradualmente, a partire dalle parti più critiche.
  • Monitora i consumer: il flusso di eventi è fondamentale, quindi log e alert dei consumer Kafka sono essenziali.
  • Versiona gli eventi: in sistemi reali, gli eventi cambiano nel tempo; usa versioning del payload per evitare rotture.
  • Read model multipli: puoi avere più read model per casi d'uso diversi, alimentati dagli stessi eventi.

Con questa base hai una struttura concreta per usare CQRS in Laravel sfruttando Kafka come bus di eventi e Docker per isolare e replicare facilmente l'intero ambiente. Da qui puoi estendere il dominio, aggiungere nuovi eventi, curare la sicurezza, gestire la scalabilità e integrare altri servizi che consumano gli stessi stream Kafka.

Torna su