Python e i protocolli per IoT

L’Internet of Things (IoT) richiede scelte tecniche ponderate: protocolli leggeri, sicurezza end-to-end, modelli asincroni e payload compatti. Python, grazie all’ecosistema di librerie e alla semplicità del linguaggio, è un ottimo collante tra firmware, gateway edge e servizi cloud. In questo articolo analizziamo i principali protocolli (MQTT, CoAP, HTTP/REST, WebSocket, BLE), come usarli in Python e come sceglierli in base ai vincoli del progetto.

Panoramica rapida dei protocolli

Protocollo Pattern Trasporto Peso Quando usarlo Note
MQTT Pub/Sub TCP + TLS Basso Telemetria, comandi, migliaia di device QoS 0/1/2, Retain, LWT, topic gerarchici
CoAP Request/Response, Observe UDP + DTLS Molto basso Reti lossy (LPWAN), batterie CBOR/JSON, blockwise, multicast
HTTP/REST Request/Response TCP + TLS Medio Integrazioni web, tool esistenti Idempotenza, caching
WebSocket Full-duplex TCP + TLS Medio Streaming eventi, pannelli real-time Stateful, ping/pong
BLE GATT (client/server) Bluetooth LE Molto basso Breve raggio, mobile, provisioning MTU ridotta, discovery

MQTT con Python

MQTT è lo standard de facto per telemetria IoT. In Python si usa spesso paho-mqtt. Di seguito un esempio robusto con TLS, Last Will e gestione riconnessioni.

import json, ssl, time
from threading import Event
from paho.mqtt import client as mqtt

BROKER = "mqtts.example.com"
PORT = 8883
TOPIC_PUB = "sensors/{device_id}/telemetry".format(device_id="edge-001")
TOPIC_SUB = "sensors/edge-001/cmd"
STOP = Event()

def on_connect(client, userdata, flags, rc, properties=None):
  print("Connected:", rc)
  client.subscribe(TOPIC_SUB, qos=1)

def on_message(client, userdata, msg):
  print("CMD:", msg.topic, msg.payload.decode())

def make_client(client_id="edge-001"):
  c = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id=client_id, clean_session=False)
  c.tls_set(
    ca_certs="ca.pem",
    certfile="device.crt",
    keyfile="device.key",
    cert_reqs=ssl.CERT_REQUIRED,
    tls_version=ssl.PROTOCOL_TLS_CLIENT,
  )
  c.will_set("sensors/edge-001/status", payload="offline", qos=1, retain=True)
  c.on_connect = on_connect
  c.on_message = on_message
  return c

client = make_client()
client.connect(BROKER, PORT, keepalive=60)
client.loop_start()

client.publish("sensors/edge-001/status", "online", qos=1, retain=True)

try:
  while not STOP.is_set():
  payload = {"t": time.time(), "temp": 22.7, "hum": 0.51}
  client.publish(TOPIC_PUB, json.dumps(payload), qos=1)
  time.sleep(5)
except KeyboardInterrupt:
  pass
finally:
  client.publish("sensors/edge-001/status", "offline", qos=1, retain=True)
  client.loop_stop()
  client.disconnect() 

Linee guida MQTT

  • Topic design: usare gerarchie stabili (es. sensors/<site>/<device>/<signal>) e nomi snake_case. Evitare dati sensibili nei topic.
  • QoS: 0 per dati rumorosi, 1 per telemetria critica, 2 raramente (overhead).
  • Retain: utile per ultimo stato; pulire pubblicando retain con payload vuoto per cancellare.
  • LWT: essenziale per presenza dispositivi.
  • Security: TLS obbligatorio, mutual TLS per dispositivi, ACL per topic, rotazione certificati.

CoAP con Python (aiocoap)

CoAP offre un’API stile REST su UDP, ideale su reti a bassa potenza. aiocoap supporta asyncio e DTLS.

# Server CoAP minimal con aiocoap
import asyncio
from aiocoap import resource, Context, Message, CONTENT

class TempResource(resource.Resource):
  async def render_get(self, request):
    payload = b'{"temp":22.7}'
    return Message(payload=payload, code=CONTENT, content_format=50)  # 50 = application/json

async def main():
  root = resource.Site()
  root.add_resource(["sensors", "temp"], TempResource())
  await Context.create_server_context(root)
  await asyncio.get_running_loop().create_future()

asyncio.run(main()) 
# Client CoAP
import asyncio
from aiocoap import *

async def get_temp():
    protocol = await Context.create_client_context()
    req = Message(code=GET, uri="coap://localhost/sensors/temp")
    resp = await protocol.request(req).response
    print(resp.code, resp.payload)

asyncio.run(get_temp())

Per payload compatti usare CBOR; con blockwise transfer si gestiscono messaggi grandi; la modalità Observe abilita stream di aggiornamenti.

HTTP/REST e FastAPI

HTTP resta utile per integrazioni, provisioning e callback. Con FastAPI si ottiene un gateway leggero e tipizzato.

# Gateway REST con FastAPI
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()

class Telemetry(BaseModel):
  device_id: str
  t: float
  temp: float
  hum: float | None = None

@app.post("/ingest")
def ingest(data: Telemetry):
  # TODO: validazione, persistenza, backpressure
  return {"ok": True, "device": data.device_id} 
# Client dispositivo con requests
import time, requests
while True:
    r = requests.post("https://api.example.com/ingest", json={
        "device_id":"edge-001","t":time.time(),"temp":22.7})
    time.sleep(5)

WebSocket per eventi in tempo reale

Utile per telemetria verso dashboard o controlli bidirezionali con bassa latenza.

# Client WebSocket
import asyncio, json, websockets

async def run():
  async with websockets.connect("wss://ws.example.com/stream") as ws:
    await ws.send(json.dumps({"hello":"edge-001"}))
    while True:
      msg = await ws.recv()
      print("EVENT:", msg)

asyncio.run(run()) 

BLE (Bluetooth Low Energy) con bleak

Per provisioning locale, pairing o letture a corto raggio. Con bleak si interagisce con servizi GATT.

# Scan BLE e lettura caratteristica GATT
import asyncio
from bleak import BleakScanner, BleakClient

TARGET_NAME = "sensor-tag"
CHAR_UUID = "00002a6e-0000-1000-8000-00805f9b34fb"  # Temperature

async def main():
  devices = await BleakScanner.discover(timeout=5.0)
  target = next((d for d in devices if d.name == TARGET_NAME), None)
  if not target:
    print("Device non trovato"); return
  async with BleakClient(target.address) as client:
    value = await client.read_gatt_char(CHAR_UUID)
    print("Temp raw:", int.from_bytes(value, "little"))

asyncio.run(main()) 

Sicurezza: TLS/DTLS, identity, principle of least privilege

  • Mutual TLS: certificati per dispositivo; CRL/OCSP o short-lived cert per revoca rapida.
  • DTLS per CoAP: cifratura leggera su UDP.
  • Segreti: evitare chiavi hardcoded; usare secure element o KMS lato backend.
  • ACL: limitare i topic MQTT e gli endpoint per device/tenant.
  • Firmware: secure boot, firma aggiornamenti OTA, anti-rollback.
# Esempio SSLContext avanzato per client MQTT/HTTP
import ssl
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ctx.minimum_version = ssl.TLSVersion.TLSv1_2
ctx.set_ciphers("ECDHE+AESGCM:@STRENGTH")
ctx.load_verify_locations("ca.pem")
ctx.load_cert_chain("device.crt","device.key")
ctx.check_hostname = True

Formati payload: JSON, CBOR, Protobuf

JSON è leggibile ma verboso; CBOR e Protobuf riducono dimensione e CPU. CBOR è ottimo con CoAP.

# CBOR con cbor2
import cbor2, time
msg = {"t": time.time(), "temp": 22.7, "ok": True}
blob = cbor2.dumps(msg)
restored = cbor2.loads(blob)

Asyncio ed edge computing

Sui gateway è comune orchestrare più protocolli in parallelo con asyncio, integrando buffer, retry e backpressure.

# Pipeline asincrona: BLE -> normalizza -> MQTT
import asyncio, json, time
from bleak import BleakClient
from paho.mqtt.client import Client

async def read_ble(addr, uuid):
  async with BleakClient(addr) as c:
    while True:
    raw = await c.read_gatt_char(uuid)
    yield int.from_bytes(raw, "little")
  await asyncio.sleep(2)

async def publisher(mqttc: Client, topic: str, source):
  for _ in range(1): pass  
  async for v in source:
    payload = {"t": time.time(), "temp": v}
    mqttc.publish(topic, json.dumps(payload), qos=1)
  await asyncio.sleep(0)

async def main():
  mqttc = Client()
  mqttc.connect("broker.example.com", 1883, 60)
  mqttc.loop_start()
  ble_stream = read_ble("01:23:45:67:89:AB", "00002a6e-0000-1000-8000-00805f9b34fb")
  await publisher(mqttc, "sensors/edge-001/telemetry", ble_stream)

asyncio.run(main()) 

Scelta del protocollo: checklist pratica

  1. Energia e rete: batteria e link instabili? Preferire CoAP/UDP o MQTT con QoS 1 e backoff.
  2. Pattern: broadcast e fan-out? MQTT. Richieste puntuali? HTTP/CoAP.
  3. Latenza: necessità di stream bidirezionale? WebSocket o MQTT.
  4. Interoperabilità: integrazione con sistemi IT? HTTP/REST o bridge MQTT->HTTP.
  5. Sicurezza: disponibilità PKI e gestione certificati? Se no, valutare token a breve scadenza e rotazione.
  6. Scalabilità: milioni di messaggi/ora? MQTT con partizionamento e cluster broker.

Pattern architetturali comuni

  • Bridge: device MQTT → gateway edge (Python) → normalizza → pubblica su Kafka/HTTP.
  • Digital Twin: uno stream per stato e uno per comandi; usare retained e snapshot periodici.
  • Provisioning sicuro: BLE o AP locale per bootstrap, poi onboarding con certificati mTLS.
  • Store-and-forward: coda locale su disco quando il broker non è raggiungibile.

Test e osservabilità

  • Contract test: schemi JSON/Protobuf versionati.
  • Telemetria: metriche su reconnect, RTT, rate di drop, latency P99.
  • Chaos: simulare perdita pacchetti, jitter e blackout rete.

Esempio end-to-end: dal sensore al cloud

  1. Provisioning via BLE con bleak per impostare Wi-Fi e ottenere certificato mTLS.
  2. Telemetria periodica via MQTT (QoS 1, retained stato, LWT).
  3. Comandi downlink su topic dedicato con autorizzazioni per device.
  4. Gateway HTTP per integrazioni legacy e dashboard WebSocket per monitoraggio live.

Conclusioni

Non esiste un protocollo “migliore” in assoluto: la scelta dipende da energia, rete, requisiti di sicurezza, latenza e tooling del team. Python consente di prototipare velocemente e portare in produzione pipeline affidabili, unificando protocolli diversi con poche decine di righe di codice e ottime librerie.

Torna su