Préambule
Quand on parle de domotique maison, il y a un petit plaisir geek qu’on ne boude jamais : brancher un modem TIC sur son compteur Linky pour récupérer, en quasi temps réel, toutes les infos de consommation électrique. Un index qui s’incrémente ? Une bascule tarifaire heure pleine/heure creuse ? Une info relais ou un état de délestage ? Boum, tout ça arrive directement dans ton broker MQTT.
Mais pourquoi refaire un script alors qu’il en existe déjà plein, dont celui très connu de Charles Hallard ou le flow Node-RED présent sur son forum. Eh bien parce que j’ai voulu une version qui colle pile poil à mon écosystème — un truc propre, sobre, et surtout homogène avec mes autres scripts comme Ze2MQTT, Tuya2MQTT, etc.
Ce TeleInfo2MQTT utilise le modem en série (/dev/ttyACM0), décode la trame TIC (mode standard), traite intelligemment les champs spéciaux comme STGE et RELAIS, et balance le tout sur MQTT avec un format JSON lisible et exploitable dans Jeedom ou n’importe quelle plateforme domotique. Et bien sûr, le tout est prévu pour tourner dans un container Docker à la sauce “une image pour les gouverner tous”.
Pré-requis
Avant de commencer, assure-toi d’avoir :
✔️ Une machine virtuelle (VM) ou un nano-ordinateur (Raspberry Pi, Debian, etc.).
✔️ Un accès SSH et les droits root.
✔️ Docker installé.
✔️ Un modem TIC branché sur le port série (/dev/ttyACM0 par exemple).
Etape par étape
(A venir l’article : Le Dockerfile du Destin : une images pour les gouverner tous, pour que tu puisses regrouper tous les bidule2mqtt avec la même image).
Création du container Docker
Je crée le répertoire d’installation :
mkdir -p ~/docker/teleinfo2mqtt && cd ~/docker/teleinfo2mqtt
Création d’un fichier docker-compose.yml :
nano docker-compose.yml
Contenu du fichier :
services:
teleinfo2mqtt:
build: .
container_name: teleinfo2mqtt
restart: unless-stopped
devices:
- /dev/ttyACM0:/dev/ttyACM0
volumes:
- ./teleinfo2mqtt.py:/app/teleinfo2mqtt.py
- ./config.json:/app/config.json
logging:
driver: journald
environment:
- TZ=Europe/Paris # Pour que les logs soient à la bonne heure
Création d’un fichier dockerfile :
nano dockerfile
Contenu du fichier :
# Image de base légère avec Python FROM python:3.11-slim # Variables d’environnement par défaut (modifiable via docker run ou compose) ENV PYTHONUNBUFFERED=1 # Dossier de travail dans le conteneur WORKDIR /app # Copie des fichiers dans le conteneur COPY requirements.txt ./ RUN pip install --no-cache-dir -r requirements.txt # Commande lancée automatiquement CMD ["python", "teleinfo2mqtt.py"]
Création d’un fichier requirements.txt :
nano requirements.txt
Contenu du fichier :
paho-mqtt pyserial
Le script teleinfo2mqtt.py
Le script teleinfo2mqtt.py s’occupe de lire la trame TIC via pyserial, vérifier les checksums, convertir certains champs (notamment STGE et RELAIS) en texte lisible et de publier les données en JSON sur un broker MQTT, avec un topic configurable.
nano teleinfo2mqtt.py
Contenu du fichier :
import serial
import json
import time
import signal
import sys
import paho.mqtt.client as mqtt
import logging
from datetime import datetime
# Chargement du fichier de configuration JSON
def load_config():
with open('./config.json', 'r') as f:
config = json.load(f)
return config
config = load_config()
# Configuration
LOG_LEVEL = config.get("log_level", "INFO").upper()
# Conf -> MODEM
SERIAL_PORT = config["modem"]["port"]
BAUDRATE = config["modem"].get("baudrate", 9600)
MODE = config["modem"].get("mode", "standard")
## Conf -> MQTT
MQTT_BROKER = config["mqtt"]["broker"]
MQTT_PORT = config["mqtt"]["port"]
MQTT_USERNAME = config["mqtt"].get("username")
MQTT_PASSWORD = config["mqtt"].get("password")
MQTT_PREFIX = config["mqtt"]["prefix_topic"]
# Configuration du logger
logging.basicConfig(level=getattr(logging, LOG_LEVEL, logging.INFO),
format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger("TeleInfo2MQTT")
# Initialisation MQTT
mqtt_client = None
def init_mqtt():
client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
if MQTT_USERNAME:
client.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD)
client.on_connect = on_connect
client.connect(MQTT_BROKER, MQTT_PORT, 60)
client.loop_start()
return client
# Publication MQTT
def publish(client, topic, payload, retain=False):
client.publish(topic, json.dumps(payload), retain=retain)
logger.debug(f"Publié: {topic} -> {payload}")
# Fonction appelée en cas de connexion au broker MQTT
def on_connect(client, userdata, flags, reason_code, properties):
if reason_code == 0:
logger.info("Connecté au broker MQTT")
else:
logger.error(f"Erreur de connexion MQTT : {reason_code}")
def checksum_ok(frame, checksum):
"""Vérifie le checksum d'une ligne (mode standard)"""
parts = frame.strip().split('\t')
if len(parts) < 2:
return False
# Cas standard : name + value ou name + date + value
if len(parts) == 3:
data = '\t'.join(parts[:2])
elif len(parts) == 4:
data = '\t'.join(parts[:3])
else:
return False
cksum = sum(ord(c) for c in data) + 9
expected = chr((cksum & 0b111111) + 0x20)
return expected == checksum[0:1]
def parse_standard_frame(ser):
"""Lit une trame complète en mode standard"""
content = {}
patience = 0
while True:
line = ser.readline().decode(errors="ignore").strip()
if "ADSC" in line:
break
content.clear()
patience += 1
if patience > 50:
logging.warning("Attente longue de début de trame (ADSC)")
patience = 0
while True:
line = ser.readline().decode(errors="ignore").strip()
parts = line.split('\t')
if len(parts) == 4:
name, maybe_date, value, checksum = parts
if maybe_date.startswith("E2") and maybe_date[1:13].isdigit():
value = maybe_date + " " + value
else:
value = maybe_date
elif len(parts) == 3:
name, value, checksum = parts
else:
continue
if not checksum_ok(line, checksum):
logging.warning("Checksum invalide pour %s", line)
continue
# force les valeurs en int si possible
try:
if value.isdigit():
content[name] = int(value)
else:
content[name] = value.strip()
except Exception:
content[name] = value.strip()
if name == "STGE":
try:
stgebin = bin(int(value, 16))[2:].zfill(32)
bits = [stgebin[31 - i] for i in range(33)] # même logique qu'avant
content["STGE01"] = switch_mot01(int(bits[1]))
content["STGE02"] = switch_mot02(int("".join([bits[4], bits[3], bits[2]]), 2))
content["STGE03"] = switch_mot03(int(bits[5]))
content["STGE04"] = switch_mot04(int(bits[6]))
content["STGE05"] = switch_mot05(int(bits[7]))
content["STGE06"] = switch_mot06(int(bits[8]))
content["STGE07"] = switch_mot07(int(bits[9]))
content["STGE08"] = switch_mot08(int(bits[10]))
content["STGE09"] = switch_mot09(int("".join([bits[14], bits[13], bits[12], bits[11]]), 2))
content["STGE10"] = switch_mot10(int("".join([bits[16], bits[15]]), 2))
content["STGE11"] = switch_mot11(int(bits[17]))
content["STGE12"] = switch_mot12(int(bits[18]))
content["STGE13"] = switch_mot13(int(bits[19]))
content["STGE14"] = switch_mot14(int("".join([bits[21], bits[20]]), 2))
content["STGE15"] = switch_mot15(int("".join([bits[23], bits[22]]), 2))
content["STGE16"] = switch_mot16(int(bits[24]))
content["STGE17"] = switch_mot17(int("".join([bits[26], bits[25]]), 2))
content["STGE18"] = switch_mot18(int("".join([bits[28], bits[27]]), 2))
content["STGE19"] = switch_mot19(int("".join([bits[30], bits[29]]), 2))
content["STGE20"] = switch_mot20(int("".join([bits[32], bits[31]]), 2))
except Exception as e:
logging.warning(f"[STGE] Erreur dans le décodage : {e}")
if name == "RELAIS":
try:
relaisbin = bin(int(value))[2:].zfill(8)
for i in range(8):
bit = int(relaisbin[::-1][i]) # inversion du binaire
content[f"RELAIS0{i+1}"] = switch_relais(bit)
except Exception as e:
logging.warning(f"[RELAIS] Erreur dans le décodage : {e}")
if name == "ADSC":
break
logging.debug(f"[TIC] Trame capturée avec {len(content)} champs : {content}")
return content
# ----------------------------
# STGE Info Translation Fields
# ----------------------------
def switch_mot01(argument):
switcher = {
0: "Ferme",
1: "Ouvert",
}
return switcher.get(argument, "Invalide")
def switch_mot02(argument):
switcher = {
0: "Ferme",
1: "Ouvert sur surpuissance",
2: "ouvert sur surtension",
3: "Ouvert sur delestage",
4: "Ouvert sur ordre CPL ou Euridis",
5: "Ouvert sur surchauffe avec I > Imax",
6: "Ouvert sur surchauffe avec I < Imax",
}
return switcher.get(argument, "Invalide")
def switch_mot03(argument):
switcher = {
0: "Ferme",
1: "Ouvert",
}
return switcher.get(argument, "Invalide")
def switch_mot04(argument):
switcher = {
0: "Toujours a 0",
1: "Anormal",
}
return switcher.get(argument, "Invalide")
def switch_mot05(argument):
switcher = {
0: "Pas de surtension",
1: "Surtension",
}
return switcher.get(argument, "Invalide")
def switch_mot06(argument):
switcher = {
0: "Pas de depassement",
1: "Depassement en cours",
}
return switcher.get(argument, "Invalide")
def switch_mot07(argument):
switcher = {
0: "Consommateur",
1: "Producteur",
}
return switcher.get(argument, "Invalide")
def switch_mot08(argument):
switcher = {
0: "Energie active positive",
1: "Energie active negative",
}
return switcher.get(argument, "Invalide")
def switch_mot09(argument):
switcher = {
0: "Energie ventilee sur Index 1",
1: "Energie ventilee sur Index 2",
2: "Energie ventilee sur Index 3",
3: "Energie ventilee sur Index 4",
4: "Energie ventilee sur Index 5",
5: "Energie ventilee sur Index 6",
6: "Energie ventilee sur Index 7",
7: "Energie ventilee sur Index 8",
8: "Energie ventilee sur Index 9",
9: "Energie ventilee sur Index 10",
}
return switcher.get(argument, "Invalide")
def switch_mot10(argument):
switcher = {
0: "Energie ventilee sur Index 1",
1: "Energie ventilee sur Index 2",
2: "Energie ventilee sur Index 3",
3: "Energie ventilee sur Index 4",
}
return switcher.get(argument, "Invalide")
def switch_mot11(argument):
switcher = {
0: "Horloge correcte",
1: "Horloge en mode degrade",
}
return switcher.get(argument, "Invalide")
def switch_mot12(argument):
switcher = {
0: "Mode Historique",
1: "Mode Standard",
}
return switcher.get(argument, "Invalide")
def switch_mot13(argument):
switcher = {
0: "Non utilise",
1: "Non utilise",
}
return switcher.get(argument, "Invalide")
def switch_mot14(argument):
switcher = {
0: "Desactivee",
1: "Activee sans securite",
2: "Invalide",
3: "Activee avec securite",
}
return switcher.get(argument, "Invalide")
def switch_mot15(argument):
switcher = {
0: "New/Unlock",
1: "New/Lock",
2: "Registered",
3: "Invalide",
}
return switcher.get(argument, "Invalide")
def switch_mot16(argument):
switcher = {
0: "Compteur non synchronise",
1: "Compteur synchronise",
}
return switcher.get(argument, "Invalide")
def switch_mot17(argument):
switcher = {
0: "Pas d‘annonce",
1: "Bleu",
2: "Blanc",
3: "Rouge",
}
return switcher.get(argument, "Invalide")
def switch_mot18(argument):
switcher = {
0: "Pas d‘annonce",
1: "Bleu",
2: "Blanc",
3: "Rouge",
}
return switcher.get(argument, "Invalide")
def switch_mot19(argument):
switcher = {
0: "Pas de preavis en cours",
1: "Preavis PM1 en cours",
2: "Preavis PM2 en cours",
3: "Preavis PM3 en cours",
}
return switcher.get(argument, "Invalide")
def switch_mot20(argument):
switcher = {
0: "Pas de pointe mobile",
1: "PM1 en cours",
2: "PM2 en cours",
3: "PM3 en cours",
}
return switcher.get(argument, "Invalide")
# ------------------------------
# RELAIS Info Translation Fields
# ------------------------------
def switch_relais(argument):
switcher = {
0: "Ouvert",
1: "Ferme",
}
return switcher.get(argument, "Invalide")
def main():
global mqtt_client
mqtt_client = init_mqtt()
logger.info("Démarrage du daemon TeleInfo2MQTT")
# Port série
ser = serial.Serial(
port=SERIAL_PORT,
baudrate=BAUDRATE,
bytesize=serial.SEVENBITS,
parity=serial.PARITY_EVEN,
stopbits=serial.STOPBITS_ONE,
timeout=1
)
logging.info(f"Connexion au port série : {SERIAL_PORT} ({MODE})")
while True:
try:
if MODE == "standard":
frame = parse_standard_frame(ser)
else:
logging.error("Mode %s non encore implémenté", MODE)
continue
if frame:
payload = {k: v for k, v in frame.items() if not k.endswith("_val") and not k.startswith("STGE0")}
# Extraire les champs pour les mettre au début
adsc = payload.pop("ADSC", None)
timestamp = int(time.time() * 1000) if config.get("timestamp", False) else None
# Construire le payload dans l'ordre souhaité
ordered_payload = {}
if timestamp is not None:
ordered_payload["timestamp"] = timestamp
if adsc is not None:
ordered_payload["ADSC"] = adsc
# Ajouter les autres champs
ordered_payload.update(payload)
# Appliquer le mode Jeedom
jeedom_mode = config["mqtt"].get("jeedom", True)
payload = {"TIC": ordered_payload} if jeedom_mode else ordered_payload
topic = f"{MQTT_PREFIX}/{adsc}" if adsc else f"{MQTT_PREFIX}/TIC"
publish(mqtt_client, topic, payload)
except KeyboardInterrupt:
logging.info("Arrêt demandé par l'utilisateur.")
break
except Exception as e:
logging.error("Erreur : %s", e)
time.sleep(2)
ser.close()
# Gestion des signaux pour arrêter proprement le daemon
def signal_handler(sig, frame):
logger.info("Arrêt du daemon TeleInfo2MQTT")
mqtt_client.loop_stop()
mqtt_client.disconnect()
sys.exit(0)
if __name__ == "__main__":
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
main()
Le fichier de configuration
Il est entièrement piloté par un fichier config.json qui permet d’ajuster le port série, les paramètres MQTT et le niveau de log.
nano config.json
Contenu du fichier :
{
"log_level": "WARNING",
"timestamp": false,
"modem": {
"port": "/dev/ttyACM0",
"baudrate": 9600,
"mode": "standard"
},
"mqtt": {
"prefix_topic": "teleinfo",
"broker": "localhost",
"port": 1883,
"username": "mon_utilisateur",
"password": "mon_mot_de_passe",
"jeedom": true
}
}
Et côté journald, tu règles une limite avec :
mkdir -p /etc/systemd/journald.conf.d nano /etc/systemd/journald.conf.d/teleinfo2mqtt.conf
Contenu du fichier :
[Journal] SystemMaxUse=100M SystemMaxFileSize=10M MaxRetentionSec=1month
Puis :
sudo systemctl restart systemd-journald
Compilation du container
La structure doit ressembler à ça :
.
├── config.json
├── docker-compose.yml
├── dockerfile
├── requirements.txt
└── teleinfo2mqtt.py
Maintenant que tous fichiers sont présent dans mon dossier d’installation je lance la compilation :
docker compose build
Puis exécute :
docker compose up -d
Une fois démarré, le script capte les trames, filtre ce qu’il faut, et publie les valeurs pertinentes dans des topics du genre :
TIC/ADSC→ adresse compteurTIC/IINST→ intensité instantanéeTIC/BASEouTIC/HCHC/HCHP→ index de consommationTIC/LTARF→ nom de la plage tarifaireTIC/STGExxetTIC/RELAISxx→ décodage binaire complet à partir de l’extension Jeedom
Et tout ça… dans la seconde.
Si tu as besoin de modifier le fichier config.json, il te suffit de relancer le container :
docker compose restart teleinfo2mqtt
Et pour consulter le journal (il existe d’autre méthode) :
journalctl CONTAINER_NAME=teleinfo2mqtt -f
Conclusion
Le script TeleInfo2MQTT est ta passerelle simple et rapide entre ton compteur Linky (via le port TIC) et ton univers domotique MQTT. En quelques minutes, tu récupères en direct tes index de consommation, tu sais si tu es en heures pleines ou creuses, et tu peux déclencher des scénarios domotiques en fonction de ton tarif, de ta consommation ou d’un délestage actif.
Bref : un must-have pour toute installation domotique qui veut garder l’œil sur la facture EDF tout en restant élégante et bien structurée.
Et si tu as besoin d’un modem USB passe par la boutique de Charles Hallard.
Sources :
https://github.com/hallard/uTeleinfo
https://community.ch2i.eu/topic/961/r%C3%A9cup%C3%A9ration-et-publication-mqtt-des-donn%C3%A9es-de-la-t%C3%A9l%C3%A9information-avec-raspberry-pi-et-docker?_=1744811984947
https://github.com/Noyax-37/plugin-teleinfo
