Un'applicazione web per la telemetria con Laravel e Apache Kafka
La telemetria moderna richiede infrastrutture capaci di gestire flussi continui di dati provenienti da migliaia di dispositivi, sensori e applicazioni distribuite. In questo articolo costruiremo un'applicazione web completa basata su Laravel e Apache Kafka per raccogliere, elaborare e visualizzare metriche in tempo reale. L'obiettivo è realizzare un sistema robusto, scalabile e pronto per la produzione, capace di ingerire eventi a elevata frequenza e di esporli tramite API e dashboard.
Architettura generale
Il sistema è composto da tre macro-componenti che cooperano in modo asincrono. Il primo è il producer HTTP, ossia un endpoint Laravel che riceve i payload di telemetria via REST e li pubblica su Kafka. Il secondo è il consumer worker, un processo Laravel persistente che legge dai topic Kafka e persiste i dati in un database time-series. Il terzo è la dashboard web, una vista Laravel che interroga i dati aggregati e li mostra all'utente con aggiornamenti push tramite WebSocket.
Apache Kafka funge da spina dorsale del sistema, disaccoppiando l'ingestione dalla persistenza e permettendo a più consumer di processare lo stesso flusso per scopi differenti, ad esempio archiviazione, alerting e analytics. Useremo PHP-RdKafka come libreria di basso livello e mateusjunges/laravel-kafka come integrazione idiomatica con il framework.
Prerequisiti e ambiente
L'ambiente di sviluppo si basa su Docker Compose per orchestrare Kafka, Zookeeper, PostgreSQL con estensione TimescaleDB, e l'applicazione Laravel su PHP 8.4-FPM dietro Nginx. Questa scelta riflette uno stack moderno e portabile, in cui ogni componente è isolato in un container e configurabile tramite variabili d'ambiente.
# File: docker-compose.yml
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.6.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:7.6.0
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
database:
image: timescale/timescaledb:latest-pg16
environment:
POSTGRES_DB: telemetry
POSTGRES_USER: telemetry
POSTGRES_PASSWORD: secret
ports:
- "5432:5432"
volumes:
- db_data:/var/lib/postgresql/data
app:
build: ./docker/php
volumes:
- ./:/var/www/html
depends_on:
- kafka
- database
volumes:
db_data:
L'immagine PHP deve includere l'estensione rdkafka, che è il binding nativo della libreria librdkafka. Senza questa estensione la libreria Laravel non può funzionare, poiché si appoggia interamente al client C per le operazioni di rete con i broker.
# File: docker/php/Dockerfile
FROM php:8.4-fpm
RUN apt-get update && apt-get install -y \
git unzip libpq-dev librdkafka-dev \
&& docker-php-ext-install pdo pdo_pgsql \
&& pecl install rdkafka \
&& docker-php-ext-enable rdkafka
COPY --from=composer:2 /usr/bin/composer /usr/bin/composer
WORKDIR /var/www/html
Installazione delle dipendenze Laravel
All'interno del progetto Laravel installiamo il pacchetto per Kafka e configuriamo le variabili d'ambiente. Il pacchetto espone una facade Kafka che consente di costruire producer e consumer con un'API fluente, mantenendo la familiarità tipica del framework.
composer require mateusjunges/laravel-kafka
php artisan vendor:publish --tag=laravel-kafka-config
Le variabili d'ambiente principali nel file .env definiscono il broker e le opzioni di consumo. È buona pratica definire un identificatore di gruppo consumer specifico per ciascun worker logico, in modo da poter scalare orizzontalmente i processi senza duplicare i messaggi.
# File: .env
KAFKA_BROKERS=kafka:29092
KAFKA_AUTO_OFFSET_RESET=earliest
KAFKA_CONSUMER_GROUP_ID=telemetry-consumer
KAFKA_SECURITY_PROTOCOL=PLAINTEXT
Modello dei dati e migrazioni
Il cuore della persistenza è una tabella ipertabellare TimescaleDB, ottimizzata per query temporali su grandi volumi. Ogni evento di telemetria contiene un timestamp, l'identificativo del dispositivo, il nome della metrica, il valore numerico e un payload JSON con metadati arbitrari. Indicizziamo i campi più filtrati per garantire query veloci sulla dashboard.
<?php
// File: database/migrations/2026_04_01_000001_create_telemetry_events_table.php
use Illuminate\Database\Migrations\Migration;
use Illuminate\Database\Schema\Blueprint;
use Illuminate\Support\Facades\DB;
use Illuminate\Support\Facades\Schema;
return new class extends Migration {
public function up(): void
{
// Creazione della tabella principale degli eventi
Schema::create('telemetry_events', function (Blueprint $table) {
$table->timestampTz('recorded_at');
$table->string('device_id', 64);
$table->string('metric_name', 128);
$table->double('metric_value');
$table->jsonb('metadata')->nullable();
$table->index(['device_id', 'recorded_at']);
$table->index(['metric_name', 'recorded_at']);
});
// Conversione in hypertable di TimescaleDB
DB::statement("SELECT create_hypertable('telemetry_events', 'recorded_at')");
}
public function down(): void
{
Schema::dropIfExists('telemetry_events');
}
};
La conversione in hypertable è l'aspetto chiave: TimescaleDB partiziona automaticamente i dati per finestra temporale, mantenendo l'API SQL standard ma offrendo prestazioni di query e aggregazione paragonabili a database time-series specializzati.
Il producer HTTP
Il producer è esposto come endpoint REST su /api/telemetry. Riceve un payload JSON, lo valida con Form Request, e lo pubblica su un topic Kafka chiamato telemetry-events. La chiave del messaggio è l'identificativo del dispositivo, scelta che garantisce l'ordinamento per dispositivo grazie al partizionamento di Kafka.
<?php
// File: app/Http/Requests/StoreTelemetryRequest.php
namespace App\Http\Requests;
use Illuminate\Foundation\Http\FormRequest;
class StoreTelemetryRequest extends FormRequest
{
public function authorize(): bool
{
return true;
}
public function rules(): array
{
return [
'device_id' => ['required', 'string', 'max:64'],
'metric_name' => ['required', 'string', 'max:128'],
'metric_value' => ['required', 'numeric'],
'recorded_at' => ['nullable', 'date'],
'metadata' => ['nullable', 'array'],
];
}
}
Il controller delega la pubblicazione a un servizio dedicato, mantenendo una separazione netta tra il livello HTTP e il livello di messaggistica. Questa scelta architetturale facilita il testing e permette di sostituire il producer Kafka con altre implementazioni, ad esempio una coda Redis, senza toccare il controller.
<?php
// File: app/Services/TelemetryProducer.php
namespace App\Services;
use Junges\Kafka\Facades\Kafka;
use Junges\Kafka\Message\Message;
class TelemetryProducer
{
private const TOPIC = 'telemetry-events';
public function publish(array $payload): void
{
$deviceId = $payload['device_id'];
// Costruzione del messaggio Kafka con chiave per partizionamento
$message = new Message(
topicName: self::TOPIC,
partition: null,
headers: ['content-type' => 'application/json'],
body: $payload,
key: $deviceId
);
Kafka::publish()
->onTopic(self::TOPIC)
->withMessage($message)
->send();
}
}
<?php
// File: app/Http/Controllers/TelemetryController.php
namespace App\Http\Controllers;
use App\Http\Requests\StoreTelemetryRequest;
use App\Services\TelemetryProducer;
use Illuminate\Http\JsonResponse;
class TelemetryController extends Controller
{
public function __construct(private TelemetryProducer $producer) {}
public function store(StoreTelemetryRequest $request): JsonResponse
{
$data = $request->validated();
$data['recorded_at'] ??= now()->toIso8601String();
// Pubblicazione asincrona su Kafka
$this->producer->publish($data);
return response()->json(['status' => 'accepted'], 202);
}
}
La rotta restituisce 202 Accepted per riflettere la natura asincrona dell'elaborazione: il messaggio è stato accettato per la pubblicazione, ma la persistenza avverrà successivamente nel consumer. Questo pattern è fondamentale per ottenere alti throughput, poiché il client HTTP non deve attendere la scrittura sul database.
Il consumer worker
Il consumer è un comando Artisan che gira come processo persistente, tipicamente sotto Supervisor o systemd. Si sottoscrive al topic telemetry-events, riceve i messaggi e li scrive in batch sul database. La scrittura in batch è un'ottimizzazione cruciale per evitare di saturare il database con migliaia di INSERT al secondo.
<?php
// File: app/Console/Commands/ConsumeTelemetryCommand.php
namespace App\Console\Commands;
use App\Services\TelemetryBatchWriter;
use Illuminate\Console\Command;
use Junges\Kafka\Contracts\ConsumerMessage;
use Junges\Kafka\Facades\Kafka;
class ConsumeTelemetryCommand extends Command
{
protected $signature = 'telemetry:consume';
protected $description = 'Consume telemetry events from Kafka';
public function handle(TelemetryBatchWriter $writer): int
{
$consumer = Kafka::consumer(['telemetry-events'])
->withConsumerGroupId(config('kafka.consumer_group_id'))
->withAutoCommit(false)
->withHandler(function (ConsumerMessage $message) use ($writer) {
// Aggiunta del messaggio al buffer di scrittura
$writer->add($message->getBody());
// Flush periodico in batch
if ($writer->shouldFlush()) {
$writer->flush();
}
})
->build();
$consumer->consume();
return self::SUCCESS;
}
}
Il TelemetryBatchWriter incapsula la logica di accumulo e scrittura. Mantiene un buffer in memoria e lo svuota quando raggiunge una certa soglia di dimensione o di tempo, qualunque condizione si verifichi prima. Questo bilanciamento tra latenza e throughput è classico nei sistemi di stream processing.
<?php
// File: app/Services/TelemetryBatchWriter.php
namespace App\Services;
use Illuminate\Support\Facades\DB;
class TelemetryBatchWriter
{
private const MAX_BATCH_SIZE = 500;
private const MAX_BATCH_AGE_SECONDS = 2;
private array $buffer = [];
private float $bufferStartedAt;
public function __construct()
{
$this->bufferStartedAt = microtime(true);
}
public function add(array $event): void
{
$this->buffer[] = [
'recorded_at' => $event['recorded_at'],
'device_id' => $event['device_id'],
'metric_name' => $event['metric_name'],
'metric_value' => $event['metric_value'],
'metadata' => isset($event['metadata']) ? json_encode($event['metadata']) : null,
];
}
public function shouldFlush(): bool
{
// Flush se il buffer è pieno o se è trascorso troppo tempo
$isFull = count($this->buffer) >= self::MAX_BATCH_SIZE;
$isStale = (microtime(true) - $this->bufferStartedAt) >= self::MAX_BATCH_AGE_SECONDS;
return $isFull || $isStale;
}
public function flush(): void
{
if (empty($this->buffer)) {
return;
}
// Inserimento massivo in una singola transazione
DB::table('telemetry_events')->insert($this->buffer);
$this->buffer = [];
$this->bufferStartedAt = microtime(true);
}
}
Gestione degli errori e dead letter queue
In un sistema di produzione un messaggio può fallire l'elaborazione per molte ragioni: payload malformato, errore transitorio del database, vincolo violato. La strategia consigliata è introdurre un topic telemetry-events-dlq dove inviare i messaggi che hanno fallito ripetutamente, in modo da non bloccare il flusso principale e poterli analizzare in seguito.
<?php
// File: app/Services/TelemetryDeadLetterPublisher.php
namespace App\Services;
use Junges\Kafka\Facades\Kafka;
use Junges\Kafka\Message\Message;
class TelemetryDeadLetterPublisher
{
private const TOPIC = 'telemetry-events-dlq';
public function publish(array $originalPayload, string $reason): void
{
// Invio del messaggio fallito al topic di quarantena
$message = new Message(
topicName: self::TOPIC,
body: [
'original' => $originalPayload,
'failure_reason' => $reason,
'failed_at' => now()->toIso8601String(),
],
key: $originalPayload['device_id'] ?? null
);
Kafka::publish()
->onTopic(self::TOPIC)
->withMessage($message)
->send();
}
}
Nel consumer è sufficiente avvolgere la chiamata a add() in un blocco try/catch e delegare al publisher di dead letter quando la validazione fallisce. È buona pratica registrare anche in un canale di log dedicato per facilitare il monitoraggio operativo.
API di lettura per la dashboard
La dashboard ha bisogno di interrogare i dati aggregati per finestre temporali. Sfruttiamo le funzioni time_bucket di TimescaleDB per raggruppare gli eventi in intervalli regolari, ottenendo serie storiche pronte da disegnare in un grafico.
<?php
// File: app/Http/Controllers/MetricsController.php
namespace App\Http\Controllers;
use Illuminate\Http\JsonResponse;
use Illuminate\Http\Request;
use Illuminate\Support\Facades\DB;
class MetricsController extends Controller
{
public function timeseries(Request $request): JsonResponse
{
$validated = $request->validate([
'device_id' => ['required', 'string'],
'metric_name' => ['required', 'string'],
'from' => ['required', 'date'],
'to' => ['required', 'date', 'after:from'],
'bucket' => ['nullable', 'string'],
]);
$bucket = $validated['bucket'] ?? '1 minute';
// Aggregazione per finestre temporali con TimescaleDB
$rows = DB::select(
"SELECT time_bucket(?, recorded_at) AS window_start,
AVG(metric_value) AS avg_value,
MAX(metric_value) AS max_value,
MIN(metric_value) AS min_value,
COUNT(*) AS sample_count
FROM telemetry_events
WHERE device_id = ?
AND metric_name = ?
AND recorded_at BETWEEN ? AND ?
GROUP BY window_start
ORDER BY window_start ASC",
[
$bucket,
$validated['device_id'],
$validated['metric_name'],
$validated['from'],
$validated['to'],
]
);
return response()->json(['data' => $rows]);
}
}
Aggiornamenti push tramite WebSocket
Per offrire una dashboard live introduciamo Laravel Reverb, il server WebSocket nativo del framework. Un secondo consumer Kafka, in parallelo a quello che persiste i dati, pubblica gli eventi su un canale broadcast. I client connessi ricevono così aggiornamenti in tempo reale senza dover effettuare polling.
<?php
// File: app/Events/TelemetryEventReceived.php
namespace App\Events;
use Illuminate\Broadcasting\Channel;
use Illuminate\Broadcasting\InteractsWithSockets;
use Illuminate\Contracts\Broadcasting\ShouldBroadcast;
use Illuminate\Foundation\Events\Dispatchable;
use Illuminate\Queue\SerializesModels;
class TelemetryEventReceived implements ShouldBroadcast
{
use Dispatchable, InteractsWithSockets, SerializesModels;
public function __construct(public array $payload) {}
public function broadcastOn(): Channel
{
// Canale dedicato al singolo dispositivo
return new Channel('devices.' . $this->payload['device_id']);
}
public function broadcastAs(): string
{
return 'telemetry.received';
}
}
Il consumer di broadcasting è quasi identico a quello di persistenza, ma invece di scrivere sul database emette un evento Laravel. Grazie ai consumer group differenti, lo stesso topic Kafka serve entrambi i flussi senza interferenze.
<?php
// File: app/Console/Commands/BroadcastTelemetryCommand.php
namespace App\Console\Commands;
use App\Events\TelemetryEventReceived;
use Illuminate\Console\Command;
use Junges\Kafka\Contracts\ConsumerMessage;
use Junges\Kafka\Facades\Kafka;
class BroadcastTelemetryCommand extends Command
{
protected $signature = 'telemetry:broadcast';
protected $description = 'Broadcast telemetry events over WebSocket';
public function handle(): int
{
$consumer = Kafka::consumer(['telemetry-events'])
->withConsumerGroupId('telemetry-broadcaster')
->withHandler(function (ConsumerMessage $message) {
// Inoltro dell'evento al canale broadcast
event(new TelemetryEventReceived($message->getBody()));
})
->build();
$consumer->consume();
return self::SUCCESS;
}
}
Considerazioni sulla scalabilità
Kafka scala orizzontalmente attraverso il partizionamento dei topic. Aumentando il numero di partizioni di telemetry-events e avviando più istanze del comando telemetry:consume con lo stesso consumer group, Kafka distribuisce automaticamente le partizioni tra i worker. La regola pratica è non superare il numero di partizioni con quello di consumer, poiché ogni partizione è assegnata a un solo consumer del gruppo.
Per quanto riguarda il database, TimescaleDB offre policy di compressione e retention che permettono di mantenere snella la tabella principale spostando i dati storici in segmenti compressi o eliminandoli oltre una certa età. Una compression policy a 7 giorni e una retention policy a 90 giorni sono valori di partenza ragionevoli per la maggior parte dei casi d'uso di telemetria.
Osservabilità del sistema stesso
Un sistema di telemetria deve essere a sua volta osservabile. È buona pratica esportare metriche operative come throughput dei messaggi, lag dei consumer, latenza end-to-end e tasso di errore. Strumenti come Prometheus e Grafana, abbinati a esportatori per Kafka e PostgreSQL, completano lo stack senza richiedere modifiche significative al codice applicativo.
Conclusioni
Abbiamo costruito un'applicazione Laravel completa per la telemetria, sfruttando Apache Kafka come backbone per il disaccoppiamento tra ingestione e persistenza, TimescaleDB per lo storage efficiente di serie temporali, e Reverb per la distribuzione push degli aggiornamenti. L'architettura è modulare, testabile e pronta per crescere orizzontalmente. Le prossime evoluzioni naturali includono l'introduzione di Kafka Streams per aggregazioni in tempo reale lato broker, di Schema Registry per la validazione dei payload con Avro o Protobuf, e di un layer di alerting basato su soglie configurabili dall'utente. Il pattern qui presentato resta valido come fondamento per sistemi IoT, monitoraggio applicativo e analisi operativa su larga scala.