Préambule
Ce projet Tuya2MQTT a connu plusieurs vies. Il a d’abord vu le jour sous forme d’un simple flux Node-RED qui appelait un script Python via un nœud exec. Ensuite, j’ai tenté un portage (avorté) en C++ sur ESP32, avant d’opter pour un service local en Python, pour finalement le containeriser dans Docker.
Je ne vais pas prétendre m’être réconcilié avec l’univers Tuya. Mais faute d’une vraie solution domotique 32A compatible V2G et intégrable avec Victron ou Enphase, j’ai temporairement investi dans un câble de recharge EV pilotable via Tuya. Le but : attendre que ces fabricants sortent des produits V2G grand public… et que je dispose moi-même d’un véhicule compatible.
Après avoir galéré (et le mot est faible) pour faire fonctionner ce chargeur en commande locale, je me suis dit que ça valait le coup de documenter la solution.
Non, je ne mettrai pas de lien vers ce chargeur trouvé dans la caverne d’Ali, car d’ici là, l’annonce aura disparu 😏
Si tu es arrivé jusqu’ici, c’est que tu sais probablement déjà comment obtenir les device_id et local_key de ton appareil Tuya. Mais si ce n’est pas encore le cas, voici un bon point de départ :
🔗 Article LinkdHome : Contrôle local des appareils Tuya
Pourquoi tinytuya ?
J’ai utilisé la bibliothèque TinyTuya, un module Python léger, pour interagir localement avec les périphériques. Le point crucial : il faut absolument connaître la version de protocole utilisée. Pour mon chargeur, c’est du 3.5, et pas 3.1, sinon rien ne répond.
Un point qui m’énerve avec Tuya : l’obligation de renouveler son compte développeur tous les 6 mois. Une vraie plaie.
En passant par tinytuya, tout se fait en local, sans dépendre du cloud ni d’un token qui expire en douce. Et ça, c’est un vrai soulagement.
Même si ce script a été conçu pour un chargeur EV, il est entièrement réutilisable pour tout autre appareil Tuya. Tant qu’on a créé un compte développeur une bonne fois pour toutes, autant en profiter pour récupérer les DPS utiles pour chaque fonction.
Intégration dans Jeedom
Une fois le script en place, on peut créer un objet MQTT dans jMQTT.
L’avantage : partager facilement les templates avec la communauté.
Pré-requis
Avant de commencer, assure-toi d’avoir :
✔️ Une machine virtuelle (VM) ou un nano-ordinateur (Raspberry Pi, Debian, etc.).
✔️ Un accès SSH et les droits root.
✔️ Docker installé (article en préparation).
Etape par étape
(A venir l’article : Le Dockerfile du Destin : une images pour les gouverner tous, pour que tu puisses regrouper tous les bidule2mqtt avec la même image).
Création du containers Docker
Je crée le répertoire d’installation :
mkdir -p ~/docker/tuya2mqtt && cd ~/docker/tuya2mqtt
Création d’un fichier docker-compose.yml :
nano docker-compose.yml
Contenu du fichier :
services:
tuya2mqtt_status:
container_name: tuya2mqtt_status
restart: unless-stopped
build: ./
volumes:
- ./tuya2mqtt_status.py:/app/tuya2mqtt_status.py
- ./config.json:/app/config.json
logging:
driver: journald
environment:
- TZ=Europe/Paris
command: ["python", "tuya2mqtt_status.py"]
tuya2mqtt_cmd:
container_name: tuya2mqtt_cmd
restart: unless-stopped
build: ./
volumes:
- ./tuya2mqtt_cmd.py:/app/tuya2mqtt_cmd.py
- ./config.json:/app/config.json
logging:
driver: journald
environment:
- TZ=Europe/Paris
command: ["python", "tuya2mqtt_cmd.py"]
Création d’un fichier dockerfile :
nano dockerfile
Contenu du fichier :
# Image de base légère avec Python
FROM python:3.11-slim
# Variables d’environnement par défaut (modifiable via docker run ou compose)
ENV PYTHONUNBUFFERED=1
# Dossier de travail dans le conteneur
WORKDIR /app
# Copie des fichiers dans le conteneur
COPY requirements.txt ./
# Installation des dépendances
RUN apt-get update && \
apt-get install -y --no-install-recommends \
build-essential \
libssl-dev \
libffi-dev \
python3-dev \
&& rm -rf /var/lib/apt/lists/*
RUN pip install --no-cache-dir -r requirements.txt
Création d’un fichier requirements.txt :
nano requirements.txt
Contenu du fichier :
paho-mqtt tinytuya
Le script tuya2mqtt_status.py
Ce script est chargé d’interroger périodiquement les statuts des périphériques Tuya définis dans config.json, et de les publier sur MQTT.
nano tuya2mqtt_status.py
import tinytuya
import json
import time
import paho.mqtt.client as mqtt
import logging
import os
import signal
import sys
# Chargement du fichier de configuration JSON
def load_config():
with open('./config.json', 'r') as f:
config = json.load(f)
return config
config = load_config()
# Configuration
LOG_LEVEL = config.get("log_level", "INFO").upper()
# Conf -> MQTT
MQTT_BROKER = config["mqtt"]["broker"]
MQTT_PORT = config["mqtt"]["port"]
MQTT_USERNAME = config["mqtt"].get("username")
MQTT_PASSWORD = config["mqtt"].get("password")
MQTT_PREFIX = config["mqtt"]["prefix_topic"]
INTERVAL = config["interval"]["basic_info_period"]
# Configuration du logger
logging.basicConfig(level=getattr(logging, LOG_LEVEL, logging.INFO),
format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger("Tuya2MQTT_Status")
# Initialisation MQTT
mqtt_client = None
def init_mqtt():
client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
if MQTT_USERNAME:
client.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD)
client.on_connect = on_connect
client.connect(MQTT_BROKER, MQTT_PORT, 60)
client.loop_start()
return client
# Publication MQTT
def publish(client, topic, payload, retain=False):
client.publish(topic, json.dumps(payload), retain=retain)
logger.debug(f"Publié: {topic} -> {payload}")
# Fonction appelée en cas de connexion au broker MQTT
def on_connect(client, userdata, flags, reason_code, properties):
if reason_code == 0:
logger.info("Connecté au broker MQTT")
else:
logger.error(f"Erreur de connexion MQTT : {reason_code}")
# Boucle principale
def main():
global mqtt_client
mqtt_client = init_mqtt()
logger.info("Démarrage du daemon Tuya2MQTT Status")
last_status = {}
devices = config.get("devices", [])
while True:
for device in devices:
d_name = device["d_name"]
device_id = f"{d_name}-{device['d_id'][:12]}"
topic_status = f"{MQTT_PREFIX}/{device_id}/status"
topic_error = f"{MQTT_PREFIX}/{device_id}/error"
topic_info = f"{MQTT_PREFIX}/{device_id}/info"
try:
# Initialisation sécurisée du périphérique
try:
d = tinytuya.OutletDevice(
dev_id=device["d_id"],
address=device["d_add"],
local_key=device["d_lk"],
version=device["d_v"]
)
d.set_version(float(device["d_v"]))
d.set_socketTimeout(2)
d.set_socketPersistent(False)
d.set_socketRetryLimit(1)
except Exception as e:
publish(mqtt_client, topic_error, {"error": f"Échec init {d_name}: {str(e)}"}, retain=True)
logger.warning(f"[X] Erreur init {d_name}: {e}")
continue
if device_id not in last_status or last_status[device_id] == "error":
device_info = {
"d_id": device["d_id"],
"d_add": device["d_add"],
"d_v": device["d_v"],
"d_lk": device["d_lk"]
}
publish(mqtt_client, topic_info, device_info, retain=True)
try:
status = d.status()
publish(mqtt_client, topic_status, status)
last_status[device_id] = "ok"
except Exception as e:
publish(mqtt_client, topic_error, {"error": str(e)}, retain=True)
logger.warning(f"[X] Erreur {d_name}: {e}")
last_status[device_id] = "error"
continue
except Exception as e:
logger.error(f"[CRITICAL] Erreur inattendue: {e}")
time.sleep(INTERVAL)
# Gestion des signaux pour arrêter proprement le daemon
def signal_handler(sig, frame):
logger.info("Arrêt du daemon Tuya2MQTT Status")
mqtt_client.loop_stop()
mqtt_client.disconnect()
sys.exit(0)
if __name__ == "__main__":
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
main()
Le script tuya2mqtt_cmd.py
Ce deuxième script écoute les commandes MQTT sur les topics correspondants et les relaie aux appareils Tuya.
nano tuya2mqtt_cmd.py
import tinytuya
import json
import paho.mqtt.client as mqtt
import logging
import os
import signal
import sys
import asyncio
# Chargement du fichier de configuration JSON
def load_config():
with open('./config.json', 'r') as f:
config = json.load(f)
return config
config = load_config()
# Configuration
LOG_LEVEL = config.get("log_level", "INFO").upper()
# Conf -> MQTT
MQTT_BROKER = config["mqtt"]["broker"]
MQTT_PORT = config["mqtt"]["port"]
MQTT_USERNAME = config["mqtt"].get("username")
MQTT_PASSWORD = config["mqtt"].get("password")
MQTT_PREFIX = config["mqtt"]["prefix_topic"]
# Configuration du logger
logging.basicConfig(level=getattr(logging, LOG_LEVEL, logging.INFO),
format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger("Tuya2MQTT_Cmd")
# Initialisation MQTT
mqtt_client = None
def init_mqtt():
client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
if MQTT_USERNAME:
client.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD)
client.on_connect = on_connect
client.on_message = on_message
client.connect(MQTT_BROKER, MQTT_PORT, 60)
client.loop_start()
return client
# Publication MQTT
def publish(topic, payload, retain=False):
client.publish(topic, json.dumps(payload), retain=retain)
logger.debug(f"Publié: {topic} -> {payload}")
# Fonction appelée en cas de connexion au broker MQTT
def on_connect(client, userdata, flags, reason_code, properties):
if reason_code == 0:
logger.info("Connecté au broker MQTT")
client.subscribe(f"{MQTT_PREFIX}/#")
logger.info(f"Souscription au topic de commande")
else:
logger.error(f"Erreur de connexion MQTT : {reason_code}")
# Callback MQTT
def on_message(client, userdata, msg):
try:
topic_parts = msg.topic.split("/")
if len(topic_parts) < 3 or topic_parts[-1] != "command":
return
device_id = topic_parts[1]
topic_ack = f"tuya/{device_id}/ack"
topic_error = f"tuya/{device_id}/error"
payload = json.loads(msg.payload)
devices = config.get("devices", [])
device = next((d for d in devices if f"{d['d_name']}-{d['d_id'][:12]}" == device_id), None)
if not device:
logger.error(f"Périphérique non trouvé pour {device_id}")
return
d = tinytuya.OutletDevice(
dev_id=device["d_id"],
address=device["d_add"],
local_key=device["d_lk"],
version=device["d_v"]
)
d.set_version(float(device["d_v"]))
if "command" in payload and "dps" in payload and "value" in payload:
try:
payload["dps"] = int(payload["dps"])
except ValueError:
logger.error(f"DPS invalide: {payload['dps']}")
if isinstance(payload["value"], str):
val = payload["value"].lower()
if val == "true":
payload["value"] = True
elif val == "false":
payload["value"] = False
else:
try:
if "." in val:
payload["value"] = float(val)
else:
payload["value"] = int(val)
except ValueError:
pass
logger.info(f"Commande : dps={payload['dps']}, value={payload['value']}")
d.set_value(payload["dps"], payload["value"])
publish(topic_ack, {"dps": payload["dps"], "value": payload["value"], "status": "OK"})
else:
logger.warning(f"Format de commande invalide: {payload}")
publish(topic_error, {"status": "FAILED", "error": "Format invalide"})
except Exception as e:
logger.error(f"Erreur traitement commande: {e}")
publish(msg.topic.replace("command", "error"), {"status": "FAILED", "error": str(e)})
# Signal handler
running = True
def signal_handler(sig, frame):
global running
logger.info("Arrêt du daemon Tuya2MQTT Command")
running = False
# Boucle principale async
async def main_loop():
global mqtt_client
mqtt_client = init_mqtt()
logger.info("Démarrage du daemon Tuya2MQTT Command")
while running:
await asyncio.sleep(1)
mqtt_client.loop_stop()
mqtt_client.disconnect()
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
if __name__ == "__main__":
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(main_loop())
finally:
loop.close()
Le fichier de configuration
Je créé ensuite le fichier de configuration config qui permet d’ajuster les périphériques, les paramètres MQTT et le niveau de log..json
nano config.json
Contenu du fichier :
{
"log_level": "WARNING",
"tuya": {
"user": "ton_email@example.com",
"password": "ton_mot_de_passe"
},
"mqtt": {
"prefix_topic": "tuya",
"broker": "localhost",
"port": 1883,
"username": "mon_utilisateur",
"password": "mon_mot_de_passe"
},
"interval": {
"basic_info_period": 60
},
"devices": [
{
"d_name": "EV_Charger",
"d_id": "bf14e4f03bb3bv3eg",
"d_add": "192.168.1.207",
"d_lk": "4Ic_4Ya}>s/J",
"d_v": 3.5
},
{
"d_name": "lumiere_cuisine",
"d_id": "bfc91e7e91d6e7j2",
"d_add": "192.168.1.208",
"d_lk": "6_4Ydxa}>k/T",
"d_v": 3.3
}
]
}
Et côté journald, tu règles une limite avec :
mkdir -p /etc/systemd/journald.conf.d nano /etc/systemd/journald.conf.d/tuya2mqtt_cmd.conf nano /etc/systemd/journald.conf.d/tuya2mqtt_status.conf
Contenu des fichier :
[Journal] SystemMaxUse=100M SystemMaxFileSize=10M MaxRetentionSec=1month
Puis :
sudo systemctl restart systemd-journald
Compilation du container
La structure doit ressembler à ça :
. ├── config.json ├── docker-compose.yml ├── dockerfile ├── requirements.txt ├── tuya2mqtt_cmd.py └── tuya2mqtt_status.yml
Maintenant que tous fichiers sont présent dans mon dossier d’installation je lance la compilation :
docker compose build
Puis exécute :
docker compose up -d
Si tu as besoin de modifier le fichier config.json, il te suffit de relancer le container :
docker compose restart tuya2mqtt_cmd docker compose restart tuya2mqtt_status
Et pour consulter le journal (il existe d’autre méthode) :
journalctl CONTAINER_NAME=tuya2mqtt_cmd -f journalctl CONTAINER_NAME=tuya2mqtt_status -f
Conclusion
Avec cette solution, tu disposes d’un pont local fiable entre Tuya et MQTT, sans dépendance cloud, sans abonnement développeur, et avec une souplesse totale côté domotique.
C’est un petit pas vers l’indépendance domotique… en attendant mieux (et surtout en attendant les vraies bornes V2G).
Les commandes sont envoyé sous forme de JSON :
{
"command":"set",
"dps":"18",
"value":true
}
sur le topic {mqtt_prefix}/{device_id}/command du périphérique.
L’article devrait à l’avenir être compléter avec :
– un explicatif pour connaitre les DPS d’un périphérique.
– une interface Web de configuration.
– éventuellement, une page qui dans l’interface Web qui interrogerais le Cloud pour lister les DPS.
