Un'applicazione web per la telemetria con Laravel e Apache Kafka

La telemetria moderna richiede infrastrutture capaci di gestire flussi continui di dati provenienti da migliaia di dispositivi, sensori e applicazioni distribuite. In questo articolo costruiremo un'applicazione web completa basata su Laravel e Apache Kafka per raccogliere, elaborare e visualizzare metriche in tempo reale. L'obiettivo è realizzare un sistema robusto, scalabile e pronto per la produzione, capace di ingerire eventi a elevata frequenza e di esporli tramite API e dashboard.

Architettura generale

Il sistema è composto da tre macro-componenti che cooperano in modo asincrono. Il primo è il producer HTTP, ossia un endpoint Laravel che riceve i payload di telemetria via REST e li pubblica su Kafka. Il secondo è il consumer worker, un processo Laravel persistente che legge dai topic Kafka e persiste i dati in un database time-series. Il terzo è la dashboard web, una vista Laravel che interroga i dati aggregati e li mostra all'utente con aggiornamenti push tramite WebSocket.

Apache Kafka funge da spina dorsale del sistema, disaccoppiando l'ingestione dalla persistenza e permettendo a più consumer di processare lo stesso flusso per scopi differenti, ad esempio archiviazione, alerting e analytics. Useremo PHP-RdKafka come libreria di basso livello e mateusjunges/laravel-kafka come integrazione idiomatica con il framework.

Prerequisiti e ambiente

L'ambiente di sviluppo si basa su Docker Compose per orchestrare Kafka, Zookeeper, PostgreSQL con estensione TimescaleDB, e l'applicazione Laravel su PHP 8.4-FPM dietro Nginx. Questa scelta riflette uno stack moderno e portabile, in cui ogni componente è isolato in un container e configurabile tramite variabili d'ambiente.

# File: docker-compose.yml
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.6.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:7.6.0
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"

  database:
    image: timescale/timescaledb:latest-pg16
    environment:
      POSTGRES_DB: telemetry
      POSTGRES_USER: telemetry
      POSTGRES_PASSWORD: secret
    ports:
      - "5432:5432"
    volumes:
      - db_data:/var/lib/postgresql/data

  app:
    build: ./docker/php
    volumes:
      - ./:/var/www/html
    depends_on:
      - kafka
      - database

volumes:
  db_data:

L'immagine PHP deve includere l'estensione rdkafka, che è il binding nativo della libreria librdkafka. Senza questa estensione la libreria Laravel non può funzionare, poiché si appoggia interamente al client C per le operazioni di rete con i broker.

# File: docker/php/Dockerfile
FROM php:8.4-fpm

RUN apt-get update && apt-get install -y \
    git unzip libpq-dev librdkafka-dev \
    && docker-php-ext-install pdo pdo_pgsql \
    && pecl install rdkafka \
    && docker-php-ext-enable rdkafka

COPY --from=composer:2 /usr/bin/composer /usr/bin/composer

WORKDIR /var/www/html

Installazione delle dipendenze Laravel

All'interno del progetto Laravel installiamo il pacchetto per Kafka e configuriamo le variabili d'ambiente. Il pacchetto espone una facade Kafka che consente di costruire producer e consumer con un'API fluente, mantenendo la familiarità tipica del framework.

composer require mateusjunges/laravel-kafka
php artisan vendor:publish --tag=laravel-kafka-config

Le variabili d'ambiente principali nel file .env definiscono il broker e le opzioni di consumo. È buona pratica definire un identificatore di gruppo consumer specifico per ciascun worker logico, in modo da poter scalare orizzontalmente i processi senza duplicare i messaggi.

# File: .env
KAFKA_BROKERS=kafka:29092
KAFKA_AUTO_OFFSET_RESET=earliest
KAFKA_CONSUMER_GROUP_ID=telemetry-consumer
KAFKA_SECURITY_PROTOCOL=PLAINTEXT

Modello dei dati e migrazioni

Il cuore della persistenza è una tabella ipertabellare TimescaleDB, ottimizzata per query temporali su grandi volumi. Ogni evento di telemetria contiene un timestamp, l'identificativo del dispositivo, il nome della metrica, il valore numerico e un payload JSON con metadati arbitrari. Indicizziamo i campi più filtrati per garantire query veloci sulla dashboard.

<?php
// File: database/migrations/2026_04_01_000001_create_telemetry_events_table.php

use Illuminate\Database\Migrations\Migration;
use Illuminate\Database\Schema\Blueprint;
use Illuminate\Support\Facades\DB;
use Illuminate\Support\Facades\Schema;

return new class extends Migration {
    public function up(): void
    {
        // Creazione della tabella principale degli eventi
        Schema::create('telemetry_events', function (Blueprint $table) {
            $table->timestampTz('recorded_at');
            $table->string('device_id', 64);
            $table->string('metric_name', 128);
            $table->double('metric_value');
            $table->jsonb('metadata')->nullable();
            $table->index(['device_id', 'recorded_at']);
            $table->index(['metric_name', 'recorded_at']);
        });

        // Conversione in hypertable di TimescaleDB
        DB::statement("SELECT create_hypertable('telemetry_events', 'recorded_at')");
    }

    public function down(): void
    {
        Schema::dropIfExists('telemetry_events');
    }
};

La conversione in hypertable è l'aspetto chiave: TimescaleDB partiziona automaticamente i dati per finestra temporale, mantenendo l'API SQL standard ma offrendo prestazioni di query e aggregazione paragonabili a database time-series specializzati.

Il producer HTTP

Il producer è esposto come endpoint REST su /api/telemetry. Riceve un payload JSON, lo valida con Form Request, e lo pubblica su un topic Kafka chiamato telemetry-events. La chiave del messaggio è l'identificativo del dispositivo, scelta che garantisce l'ordinamento per dispositivo grazie al partizionamento di Kafka.

<?php
// File: app/Http/Requests/StoreTelemetryRequest.php

namespace App\Http\Requests;

use Illuminate\Foundation\Http\FormRequest;

class StoreTelemetryRequest extends FormRequest
{
    public function authorize(): bool
    {
        return true;
    }

    public function rules(): array
    {
        return [
            'device_id' => ['required', 'string', 'max:64'],
            'metric_name' => ['required', 'string', 'max:128'],
            'metric_value' => ['required', 'numeric'],
            'recorded_at' => ['nullable', 'date'],
            'metadata' => ['nullable', 'array'],
        ];
    }
}

Il controller delega la pubblicazione a un servizio dedicato, mantenendo una separazione netta tra il livello HTTP e il livello di messaggistica. Questa scelta architetturale facilita il testing e permette di sostituire il producer Kafka con altre implementazioni, ad esempio una coda Redis, senza toccare il controller.

<?php
// File: app/Services/TelemetryProducer.php

namespace App\Services;

use Junges\Kafka\Facades\Kafka;
use Junges\Kafka\Message\Message;

class TelemetryProducer
{
    private const TOPIC = 'telemetry-events';

    public function publish(array $payload): void
    {
        $deviceId = $payload['device_id'];

        // Costruzione del messaggio Kafka con chiave per partizionamento
        $message = new Message(
            topicName: self::TOPIC,
            partition: null,
            headers: ['content-type' => 'application/json'],
            body: $payload,
            key: $deviceId
        );

        Kafka::publish()
            ->onTopic(self::TOPIC)
            ->withMessage($message)
            ->send();
    }
}
<?php
// File: app/Http/Controllers/TelemetryController.php

namespace App\Http\Controllers;

use App\Http\Requests\StoreTelemetryRequest;
use App\Services\TelemetryProducer;
use Illuminate\Http\JsonResponse;

class TelemetryController extends Controller
{
    public function __construct(private TelemetryProducer $producer) {}

    public function store(StoreTelemetryRequest $request): JsonResponse
    {
        $data = $request->validated();
        $data['recorded_at'] ??= now()->toIso8601String();

        // Pubblicazione asincrona su Kafka
        $this->producer->publish($data);

        return response()->json(['status' => 'accepted'], 202);
    }
}

La rotta restituisce 202 Accepted per riflettere la natura asincrona dell'elaborazione: il messaggio è stato accettato per la pubblicazione, ma la persistenza avverrà successivamente nel consumer. Questo pattern è fondamentale per ottenere alti throughput, poiché il client HTTP non deve attendere la scrittura sul database.

Il consumer worker

Il consumer è un comando Artisan che gira come processo persistente, tipicamente sotto Supervisor o systemd. Si sottoscrive al topic telemetry-events, riceve i messaggi e li scrive in batch sul database. La scrittura in batch è un'ottimizzazione cruciale per evitare di saturare il database con migliaia di INSERT al secondo.

<?php
// File: app/Console/Commands/ConsumeTelemetryCommand.php

namespace App\Console\Commands;

use App\Services\TelemetryBatchWriter;
use Illuminate\Console\Command;
use Junges\Kafka\Contracts\ConsumerMessage;
use Junges\Kafka\Facades\Kafka;

class ConsumeTelemetryCommand extends Command
{
    protected $signature = 'telemetry:consume';
    protected $description = 'Consume telemetry events from Kafka';

    public function handle(TelemetryBatchWriter $writer): int
    {
        $consumer = Kafka::consumer(['telemetry-events'])
            ->withConsumerGroupId(config('kafka.consumer_group_id'))
            ->withAutoCommit(false)
            ->withHandler(function (ConsumerMessage $message) use ($writer) {
                // Aggiunta del messaggio al buffer di scrittura
                $writer->add($message->getBody());

                // Flush periodico in batch
                if ($writer->shouldFlush()) {
                    $writer->flush();
                }
            })
            ->build();

        $consumer->consume();

        return self::SUCCESS;
    }
}

Il TelemetryBatchWriter incapsula la logica di accumulo e scrittura. Mantiene un buffer in memoria e lo svuota quando raggiunge una certa soglia di dimensione o di tempo, qualunque condizione si verifichi prima. Questo bilanciamento tra latenza e throughput è classico nei sistemi di stream processing.

<?php
// File: app/Services/TelemetryBatchWriter.php

namespace App\Services;

use Illuminate\Support\Facades\DB;

class TelemetryBatchWriter
{
    private const MAX_BATCH_SIZE = 500;
    private const MAX_BATCH_AGE_SECONDS = 2;

    private array $buffer = [];
    private float $bufferStartedAt;

    public function __construct()
    {
        $this->bufferStartedAt = microtime(true);
    }

    public function add(array $event): void
    {
        $this->buffer[] = [
            'recorded_at' => $event['recorded_at'],
            'device_id' => $event['device_id'],
            'metric_name' => $event['metric_name'],
            'metric_value' => $event['metric_value'],
            'metadata' => isset($event['metadata']) ? json_encode($event['metadata']) : null,
        ];
    }

    public function shouldFlush(): bool
    {
        // Flush se il buffer è pieno o se è trascorso troppo tempo
        $isFull = count($this->buffer) >= self::MAX_BATCH_SIZE;
        $isStale = (microtime(true) - $this->bufferStartedAt) >= self::MAX_BATCH_AGE_SECONDS;
        return $isFull || $isStale;
    }

    public function flush(): void
    {
        if (empty($this->buffer)) {
            return;
        }

        // Inserimento massivo in una singola transazione
        DB::table('telemetry_events')->insert($this->buffer);

        $this->buffer = [];
        $this->bufferStartedAt = microtime(true);
    }
}

Gestione degli errori e dead letter queue

In un sistema di produzione un messaggio può fallire l'elaborazione per molte ragioni: payload malformato, errore transitorio del database, vincolo violato. La strategia consigliata è introdurre un topic telemetry-events-dlq dove inviare i messaggi che hanno fallito ripetutamente, in modo da non bloccare il flusso principale e poterli analizzare in seguito.

<?php
// File: app/Services/TelemetryDeadLetterPublisher.php

namespace App\Services;

use Junges\Kafka\Facades\Kafka;
use Junges\Kafka\Message\Message;

class TelemetryDeadLetterPublisher
{
    private const TOPIC = 'telemetry-events-dlq';

    public function publish(array $originalPayload, string $reason): void
    {
        // Invio del messaggio fallito al topic di quarantena
        $message = new Message(
            topicName: self::TOPIC,
            body: [
                'original' => $originalPayload,
                'failure_reason' => $reason,
                'failed_at' => now()->toIso8601String(),
            ],
            key: $originalPayload['device_id'] ?? null
        );

        Kafka::publish()
            ->onTopic(self::TOPIC)
            ->withMessage($message)
            ->send();
    }
}

Nel consumer è sufficiente avvolgere la chiamata a add() in un blocco try/catch e delegare al publisher di dead letter quando la validazione fallisce. È buona pratica registrare anche in un canale di log dedicato per facilitare il monitoraggio operativo.

API di lettura per la dashboard

La dashboard ha bisogno di interrogare i dati aggregati per finestre temporali. Sfruttiamo le funzioni time_bucket di TimescaleDB per raggruppare gli eventi in intervalli regolari, ottenendo serie storiche pronte da disegnare in un grafico.

<?php
// File: app/Http/Controllers/MetricsController.php

namespace App\Http\Controllers;

use Illuminate\Http\JsonResponse;
use Illuminate\Http\Request;
use Illuminate\Support\Facades\DB;

class MetricsController extends Controller
{
    public function timeseries(Request $request): JsonResponse
    {
        $validated = $request->validate([
            'device_id' => ['required', 'string'],
            'metric_name' => ['required', 'string'],
            'from' => ['required', 'date'],
            'to' => ['required', 'date', 'after:from'],
            'bucket' => ['nullable', 'string'],
        ]);

        $bucket = $validated['bucket'] ?? '1 minute';

        // Aggregazione per finestre temporali con TimescaleDB
        $rows = DB::select(
            "SELECT time_bucket(?, recorded_at) AS window_start,
                    AVG(metric_value) AS avg_value,
                    MAX(metric_value) AS max_value,
                    MIN(metric_value) AS min_value,
                    COUNT(*) AS sample_count
             FROM telemetry_events
             WHERE device_id = ?
               AND metric_name = ?
               AND recorded_at BETWEEN ? AND ?
             GROUP BY window_start
             ORDER BY window_start ASC",
            [
                $bucket,
                $validated['device_id'],
                $validated['metric_name'],
                $validated['from'],
                $validated['to'],
            ]
        );

        return response()->json(['data' => $rows]);
    }
}

Aggiornamenti push tramite WebSocket

Per offrire una dashboard live introduciamo Laravel Reverb, il server WebSocket nativo del framework. Un secondo consumer Kafka, in parallelo a quello che persiste i dati, pubblica gli eventi su un canale broadcast. I client connessi ricevono così aggiornamenti in tempo reale senza dover effettuare polling.

<?php
// File: app/Events/TelemetryEventReceived.php

namespace App\Events;

use Illuminate\Broadcasting\Channel;
use Illuminate\Broadcasting\InteractsWithSockets;
use Illuminate\Contracts\Broadcasting\ShouldBroadcast;
use Illuminate\Foundation\Events\Dispatchable;
use Illuminate\Queue\SerializesModels;

class TelemetryEventReceived implements ShouldBroadcast
{
    use Dispatchable, InteractsWithSockets, SerializesModels;

    public function __construct(public array $payload) {}

    public function broadcastOn(): Channel
    {
        // Canale dedicato al singolo dispositivo
        return new Channel('devices.' . $this->payload['device_id']);
    }

    public function broadcastAs(): string
    {
        return 'telemetry.received';
    }
}

Il consumer di broadcasting è quasi identico a quello di persistenza, ma invece di scrivere sul database emette un evento Laravel. Grazie ai consumer group differenti, lo stesso topic Kafka serve entrambi i flussi senza interferenze.

<?php
// File: app/Console/Commands/BroadcastTelemetryCommand.php

namespace App\Console\Commands;

use App\Events\TelemetryEventReceived;
use Illuminate\Console\Command;
use Junges\Kafka\Contracts\ConsumerMessage;
use Junges\Kafka\Facades\Kafka;

class BroadcastTelemetryCommand extends Command
{
    protected $signature = 'telemetry:broadcast';
    protected $description = 'Broadcast telemetry events over WebSocket';

    public function handle(): int
    {
        $consumer = Kafka::consumer(['telemetry-events'])
            ->withConsumerGroupId('telemetry-broadcaster')
            ->withHandler(function (ConsumerMessage $message) {
                // Inoltro dell'evento al canale broadcast
                event(new TelemetryEventReceived($message->getBody()));
            })
            ->build();

        $consumer->consume();

        return self::SUCCESS;
    }
}

Considerazioni sulla scalabilità

Kafka scala orizzontalmente attraverso il partizionamento dei topic. Aumentando il numero di partizioni di telemetry-events e avviando più istanze del comando telemetry:consume con lo stesso consumer group, Kafka distribuisce automaticamente le partizioni tra i worker. La regola pratica è non superare il numero di partizioni con quello di consumer, poiché ogni partizione è assegnata a un solo consumer del gruppo.

Per quanto riguarda il database, TimescaleDB offre policy di compressione e retention che permettono di mantenere snella la tabella principale spostando i dati storici in segmenti compressi o eliminandoli oltre una certa età. Una compression policy a 7 giorni e una retention policy a 90 giorni sono valori di partenza ragionevoli per la maggior parte dei casi d'uso di telemetria.

Osservabilità del sistema stesso

Un sistema di telemetria deve essere a sua volta osservabile. È buona pratica esportare metriche operative come throughput dei messaggi, lag dei consumer, latenza end-to-end e tasso di errore. Strumenti come Prometheus e Grafana, abbinati a esportatori per Kafka e PostgreSQL, completano lo stack senza richiedere modifiche significative al codice applicativo.

Conclusioni

Abbiamo costruito un'applicazione Laravel completa per la telemetria, sfruttando Apache Kafka come backbone per il disaccoppiamento tra ingestione e persistenza, TimescaleDB per lo storage efficiente di serie temporali, e Reverb per la distribuzione push degli aggiornamenti. L'architettura è modulare, testabile e pronta per crescere orizzontalmente. Le prossime evoluzioni naturali includono l'introduzione di Kafka Streams per aggregazioni in tempo reale lato broker, di Schema Registry per la validazione dei payload con Avro o Protobuf, e di un layer di alerting basato su soglie configurabili dall'utente. Il pattern qui presentato resta valido come fondamento per sistemi IoT, monitoraggio applicativo e analisi operativa su larga scala.