L’Internet of Things (IoT) richiede scelte tecniche ponderate: protocolli leggeri, sicurezza end-to-end, modelli asincroni e payload compatti. Python, grazie all’ecosistema di librerie e alla semplicità del linguaggio, è un ottimo collante tra firmware, gateway edge e servizi cloud. In questo articolo analizziamo i principali protocolli (MQTT, CoAP, HTTP/REST, WebSocket, BLE), come usarli in Python e come sceglierli in base ai vincoli del progetto.
Panoramica rapida dei protocolli
| Protocollo | Pattern | Trasporto | Peso | Quando usarlo | Note |
|---|---|---|---|---|---|
| MQTT | Pub/Sub | TCP + TLS | Basso | Telemetria, comandi, migliaia di device | QoS 0/1/2, Retain, LWT, topic gerarchici |
| CoAP | Request/Response, Observe | UDP + DTLS | Molto basso | Reti lossy (LPWAN), batterie | CBOR/JSON, blockwise, multicast |
| HTTP/REST | Request/Response | TCP + TLS | Medio | Integrazioni web, tool esistenti | Idempotenza, caching |
| WebSocket | Full-duplex | TCP + TLS | Medio | Streaming eventi, pannelli real-time | Stateful, ping/pong |
| BLE | GATT (client/server) | Bluetooth LE | Molto basso | Breve raggio, mobile, provisioning | MTU ridotta, discovery |
MQTT con Python
MQTT è lo standard de facto per telemetria IoT. In Python si usa spesso paho-mqtt. Di seguito un esempio robusto con TLS, Last Will e gestione riconnessioni.
import json, ssl, time
from threading import Event
from paho.mqtt import client as mqtt
BROKER = "mqtts.example.com"
PORT = 8883
TOPIC_PUB = "sensors/{device_id}/telemetry".format(device_id="edge-001")
TOPIC_SUB = "sensors/edge-001/cmd"
STOP = Event()
def on_connect(client, userdata, flags, rc, properties=None):
print("Connected:", rc)
client.subscribe(TOPIC_SUB, qos=1)
def on_message(client, userdata, msg):
print("CMD:", msg.topic, msg.payload.decode())
def make_client(client_id="edge-001"):
c = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id=client_id, clean_session=False)
c.tls_set(
ca_certs="ca.pem",
certfile="device.crt",
keyfile="device.key",
cert_reqs=ssl.CERT_REQUIRED,
tls_version=ssl.PROTOCOL_TLS_CLIENT,
)
c.will_set("sensors/edge-001/status", payload="offline", qos=1, retain=True)
c.on_connect = on_connect
c.on_message = on_message
return c
client = make_client()
client.connect(BROKER, PORT, keepalive=60)
client.loop_start()
client.publish("sensors/edge-001/status", "online", qos=1, retain=True)
try:
while not STOP.is_set():
payload = {"t": time.time(), "temp": 22.7, "hum": 0.51}
client.publish(TOPIC_PUB, json.dumps(payload), qos=1)
time.sleep(5)
except KeyboardInterrupt:
pass
finally:
client.publish("sensors/edge-001/status", "offline", qos=1, retain=True)
client.loop_stop()
client.disconnect()
Linee guida MQTT
- Topic design: usare gerarchie stabili (es.
sensors/<site>/<device>/<signal>) e nomi snake_case. Evitare dati sensibili nei topic. - QoS: 0 per dati rumorosi, 1 per telemetria critica, 2 raramente (overhead).
- Retain: utile per ultimo stato; pulire pubblicando
retaincon payload vuoto per cancellare. - LWT: essenziale per presenza dispositivi.
- Security: TLS obbligatorio, mutual TLS per dispositivi, ACL per topic, rotazione certificati.
CoAP con Python (aiocoap)
CoAP offre un’API stile REST su UDP, ideale su reti a bassa potenza. aiocoap supporta asyncio e DTLS.
# Server CoAP minimal con aiocoap
import asyncio
from aiocoap import resource, Context, Message, CONTENT
class TempResource(resource.Resource):
async def render_get(self, request):
payload = b'{"temp":22.7}'
return Message(payload=payload, code=CONTENT, content_format=50) # 50 = application/json
async def main():
root = resource.Site()
root.add_resource(["sensors", "temp"], TempResource())
await Context.create_server_context(root)
await asyncio.get_running_loop().create_future()
asyncio.run(main())
# Client CoAP
import asyncio
from aiocoap import *
async def get_temp():
protocol = await Context.create_client_context()
req = Message(code=GET, uri="coap://localhost/sensors/temp")
resp = await protocol.request(req).response
print(resp.code, resp.payload)
asyncio.run(get_temp())
Per payload compatti usare CBOR; con blockwise transfer si gestiscono messaggi grandi; la modalità Observe abilita stream di aggiornamenti.
HTTP/REST e FastAPI
HTTP resta utile per integrazioni, provisioning e callback. Con FastAPI si ottiene un gateway leggero e tipizzato.
# Gateway REST con FastAPI
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()
class Telemetry(BaseModel):
device_id: str
t: float
temp: float
hum: float | None = None
@app.post("/ingest")
def ingest(data: Telemetry):
# TODO: validazione, persistenza, backpressure
return {"ok": True, "device": data.device_id}
# Client dispositivo con requests
import time, requests
while True:
r = requests.post("https://api.example.com/ingest", json={
"device_id":"edge-001","t":time.time(),"temp":22.7})
time.sleep(5)
WebSocket per eventi in tempo reale
Utile per telemetria verso dashboard o controlli bidirezionali con bassa latenza.
# Client WebSocket
import asyncio, json, websockets
async def run():
async with websockets.connect("wss://ws.example.com/stream") as ws:
await ws.send(json.dumps({"hello":"edge-001"}))
while True:
msg = await ws.recv()
print("EVENT:", msg)
asyncio.run(run())
BLE (Bluetooth Low Energy) con bleak
Per provisioning locale, pairing o letture a corto raggio. Con bleak si interagisce con servizi GATT.
# Scan BLE e lettura caratteristica GATT
import asyncio
from bleak import BleakScanner, BleakClient
TARGET_NAME = "sensor-tag"
CHAR_UUID = "00002a6e-0000-1000-8000-00805f9b34fb" # Temperature
async def main():
devices = await BleakScanner.discover(timeout=5.0)
target = next((d for d in devices if d.name == TARGET_NAME), None)
if not target:
print("Device non trovato"); return
async with BleakClient(target.address) as client:
value = await client.read_gatt_char(CHAR_UUID)
print("Temp raw:", int.from_bytes(value, "little"))
asyncio.run(main())
Sicurezza: TLS/DTLS, identity, principle of least privilege
- Mutual TLS: certificati per dispositivo; CRL/OCSP o short-lived cert per revoca rapida.
- DTLS per CoAP: cifratura leggera su UDP.
- Segreti: evitare chiavi hardcoded; usare secure element o KMS lato backend.
- ACL: limitare i topic MQTT e gli endpoint per device/tenant.
- Firmware: secure boot, firma aggiornamenti OTA, anti-rollback.
# Esempio SSLContext avanzato per client MQTT/HTTP
import ssl
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ctx.minimum_version = ssl.TLSVersion.TLSv1_2
ctx.set_ciphers("ECDHE+AESGCM:@STRENGTH")
ctx.load_verify_locations("ca.pem")
ctx.load_cert_chain("device.crt","device.key")
ctx.check_hostname = True
Formati payload: JSON, CBOR, Protobuf
JSON è leggibile ma verboso; CBOR e Protobuf riducono dimensione e CPU. CBOR è ottimo con CoAP.
# CBOR con cbor2
import cbor2, time
msg = {"t": time.time(), "temp": 22.7, "ok": True}
blob = cbor2.dumps(msg)
restored = cbor2.loads(blob)
Asyncio ed edge computing
Sui gateway è comune orchestrare più protocolli in parallelo con asyncio, integrando buffer, retry e backpressure.
# Pipeline asincrona: BLE -> normalizza -> MQTT
import asyncio, json, time
from bleak import BleakClient
from paho.mqtt.client import Client
async def read_ble(addr, uuid):
async with BleakClient(addr) as c:
while True:
raw = await c.read_gatt_char(uuid)
yield int.from_bytes(raw, "little")
await asyncio.sleep(2)
async def publisher(mqttc: Client, topic: str, source):
for _ in range(1): pass
async for v in source:
payload = {"t": time.time(), "temp": v}
mqttc.publish(topic, json.dumps(payload), qos=1)
await asyncio.sleep(0)
async def main():
mqttc = Client()
mqttc.connect("broker.example.com", 1883, 60)
mqttc.loop_start()
ble_stream = read_ble("01:23:45:67:89:AB", "00002a6e-0000-1000-8000-00805f9b34fb")
await publisher(mqttc, "sensors/edge-001/telemetry", ble_stream)
asyncio.run(main())
Scelta del protocollo: checklist pratica
- Energia e rete: batteria e link instabili? Preferire CoAP/UDP o MQTT con QoS 1 e backoff.
- Pattern: broadcast e fan-out? MQTT. Richieste puntuali? HTTP/CoAP.
- Latenza: necessità di stream bidirezionale? WebSocket o MQTT.
- Interoperabilità: integrazione con sistemi IT? HTTP/REST o bridge MQTT->HTTP.
- Sicurezza: disponibilità PKI e gestione certificati? Se no, valutare token a breve scadenza e rotazione.
- Scalabilità: milioni di messaggi/ora? MQTT con partizionamento e cluster broker.
Pattern architetturali comuni
- Bridge: device MQTT → gateway edge (Python) → normalizza → pubblica su Kafka/HTTP.
- Digital Twin: uno stream per stato e uno per comandi; usare retained e snapshot periodici.
- Provisioning sicuro: BLE o AP locale per bootstrap, poi onboarding con certificati mTLS.
- Store-and-forward: coda locale su disco quando il broker non è raggiungibile.
Test e osservabilità
- Contract test: schemi JSON/Protobuf versionati.
- Telemetria: metriche su reconnect, RTT, rate di drop, latency P99.
- Chaos: simulare perdita pacchetti, jitter e blackout rete.
Esempio end-to-end: dal sensore al cloud
- Provisioning via BLE con bleak per impostare Wi-Fi e ottenere certificato mTLS.
- Telemetria periodica via MQTT (QoS 1, retained stato, LWT).
- Comandi downlink su topic dedicato con autorizzazioni per device.
- Gateway HTTP per integrazioni legacy e dashboard WebSocket per monitoraggio live.
Conclusioni
Non esiste un protocollo “migliore” in assoluto: la scelta dipende da energia, rete, requisiti di sicurezza, latenza e tooling del team. Python consente di prototipare velocemente e portare in produzione pipeline affidabili, unificando protocolli diversi con poche decine di righe di codice e ottime librerie.