Tuya2MQTT : libérer ses appareils Tuya du cloud

Préambule

Ce projet Tuya2MQTT a connu plusieurs vies. Il a d’abord vu le jour sous forme d’un simple flux Node-RED qui appelait un script Python via un nœud exec. Ensuite, j’ai tenté un portage (avorté) en C++ sur ESP32, avant d’opter pour un service local en Python, pour finalement le containeriser dans Docker.

Je ne vais pas prétendre m’être réconcilié avec l’univers Tuya. Mais faute d’une vraie solution domotique 32A compatible V2G et intégrable avec Victron ou Enphase, j’ai temporairement investi dans un câble de recharge EV pilotable via Tuya. Le but : attendre que ces fabricants sortent des produits V2G grand public… et que je dispose moi-même d’un véhicule compatible.

Après avoir galéré (et le mot est faible) pour faire fonctionner ce chargeur en commande locale, je me suis dit que ça valait le coup de documenter la solution.
Non, je ne mettrai pas de lien vers ce chargeur trouvé dans la caverne d’Ali, car d’ici là, l’annonce aura disparu 😏

Si tu es arrivé jusqu’ici, c’est que tu sais probablement déjà comment obtenir les device_id et local_key de ton appareil Tuya. Mais si ce n’est pas encore le cas, voici un bon point de départ :
🔗 Article LinkdHome : Contrôle local des appareils Tuya

Pourquoi tinytuya ?

J’ai utilisé la bibliothèque TinyTuya, un module Python léger, pour interagir localement avec les périphériques. Le point crucial : il faut absolument connaître la version de protocole utilisée. Pour mon chargeur, c’est du 3.5, et pas 3.1, sinon rien ne répond.

Un point qui m’énerve avec Tuya : l’obligation de renouveler son compte développeur tous les 6 mois. Une vraie plaie.
En passant par tinytuya, tout se fait en local, sans dépendre du cloud ni d’un token qui expire en douce. Et ça, c’est un vrai soulagement.

Même si ce script a été conçu pour un chargeur EV, il est entièrement réutilisable pour tout autre appareil Tuya. Tant qu’on a créé un compte développeur une bonne fois pour toutes, autant en profiter pour récupérer les DPS utiles pour chaque fonction.

Intégration dans Jeedom

Une fois le script en place, on peut créer un objet MQTT dans jMQTT.
L’avantage : partager facilement les templates avec la communauté.

Pré-requis

Avant de commencer, assure-toi d’avoir :
✔️ Une machine virtuelle (VM) ou un nano-ordinateur (Raspberry Pi, Debian, etc.).
✔️ Un accès SSH et les droits root.
✔️ Docker installé (article en préparation).

Etape par étape

(A venir l’article : Le Dockerfile du Destin : une images pour les gouverner tous, pour que tu puisses regrouper tous les bidule2mqtt avec la même image).

Création du containers Docker

Je crée le répertoire d’installation :

mkdir -p ~/docker/tuya2mqtt && cd ~/docker/tuya2mqtt

Création d’un fichier docker-compose.yml :

nano docker-compose.yml

Contenu du fichier :

services:
  tuya2mqtt_status:
    container_name: tuya2mqtt_status
    restart: unless-stopped
    build: ./
    volumes:
      - ./tuya2mqtt_status.py:/app/tuya2mqtt_status.py
      - ./config.json:/app/config.json
    logging:
      driver: journald
    environment:
      - TZ=Europe/Paris
    command: ["python", "tuya2mqtt_status.py"]

  tuya2mqtt_cmd:
    container_name: tuya2mqtt_cmd
    restart: unless-stopped
    build: ./
    volumes:
      - ./tuya2mqtt_cmd.py:/app/tuya2mqtt_cmd.py
      - ./config.json:/app/config.json
    logging:
      driver: journald
    environment:
      - TZ=Europe/Paris
    command: ["python", "tuya2mqtt_cmd.py"]

Création d’un fichier dockerfile :

nano dockerfile

Contenu du fichier :

# Image de base légère avec Python
FROM python:3.11-slim

# Variables d’environnement par défaut (modifiable via docker run ou compose)
ENV PYTHONUNBUFFERED=1

# Dossier de travail dans le conteneur
WORKDIR /app

# Copie des fichiers dans le conteneur
COPY requirements.txt ./

# Installation des dépendances
RUN apt-get update && \
    apt-get install -y --no-install-recommends \
    build-essential \
    libssl-dev \
    libffi-dev \
    python3-dev \
    && rm -rf /var/lib/apt/lists/*

RUN pip install --no-cache-dir -r requirements.txt

Création d’un fichier requirements.txt :

nano requirements.txt

Contenu du fichier :

paho-mqtt
tinytuya

Le script tuya2mqtt_status.py

Ce script est chargé d’interroger périodiquement les statuts des périphériques Tuya définis dans config.json, et de les publier sur MQTT.

nano tuya2mqtt_status.py
import tinytuya
import json
import time
import paho.mqtt.client as mqtt
import logging
import os
import signal
import sys

# Chargement du fichier de configuration JSON
def load_config():
    with open('./config.json', 'r') as f:
        config = json.load(f)
    return config

config = load_config()

# Configuration
LOG_LEVEL = config.get("log_level", "INFO").upper()
# Conf -> MQTT
MQTT_BROKER = config["mqtt"]["broker"]
MQTT_PORT = config["mqtt"]["port"]
MQTT_USERNAME = config["mqtt"].get("username")
MQTT_PASSWORD = config["mqtt"].get("password")
MQTT_PREFIX = config["mqtt"]["prefix_topic"]
INTERVAL = config["interval"]["basic_info_period"]

# Configuration du logger
logging.basicConfig(level=getattr(logging, LOG_LEVEL, logging.INFO),
                    format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger("Tuya2MQTT_Status")

# Initialisation MQTT
mqtt_client = None
def init_mqtt():
    client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
    if MQTT_USERNAME:
        client.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD)
    client.on_connect = on_connect
    client.connect(MQTT_BROKER, MQTT_PORT, 60)
    client.loop_start()
    return client

# Publication MQTT
def publish(client, topic, payload, retain=False):
    client.publish(topic, json.dumps(payload), retain=retain)
    logger.debug(f"Publié: {topic} -> {payload}")

# Fonction appelée en cas de connexion au broker MQTT
def on_connect(client, userdata, flags, reason_code, properties):
    if reason_code == 0:
        logger.info("Connecté au broker MQTT")
    else:
        logger.error(f"Erreur de connexion MQTT : {reason_code}")

# Boucle principale
def main():
    global mqtt_client
    mqtt_client = init_mqtt()
    logger.info("Démarrage du daemon Tuya2MQTT Status")

    last_status = {}
    devices = config.get("devices", [])

    while True:
        for device in devices:
            d_name = device["d_name"]
            device_id = f"{d_name}-{device['d_id'][:12]}"
            topic_status = f"{MQTT_PREFIX}/{device_id}/status"
            topic_error = f"{MQTT_PREFIX}/{device_id}/error"
            topic_info = f"{MQTT_PREFIX}/{device_id}/info"

            try:
                # Initialisation sécurisée du périphérique
                try:
                    d = tinytuya.OutletDevice(
                        dev_id=device["d_id"],
                        address=device["d_add"],
                        local_key=device["d_lk"],
                        version=device["d_v"]
                    )
                    d.set_version(float(device["d_v"]))
                    d.set_socketTimeout(2)
                    d.set_socketPersistent(False)
                    d.set_socketRetryLimit(1)
                except Exception as e:
                    publish(mqtt_client, topic_error, {"error": f"Échec init {d_name}: {str(e)}"}, retain=True)
                    logger.warning(f"[X] Erreur init {d_name}: {e}")
                    continue

                if device_id not in last_status or last_status[device_id] == "error":
                    device_info = {
                        "d_id": device["d_id"],
                        "d_add": device["d_add"],
                        "d_v": device["d_v"],
                        "d_lk": device["d_lk"]
                    }
                    publish(mqtt_client, topic_info, device_info, retain=True)

                try:
                    status = d.status()
                    publish(mqtt_client, topic_status, status)
                    last_status[device_id] = "ok"
                except Exception as e:
                    publish(mqtt_client, topic_error, {"error": str(e)}, retain=True)
                    logger.warning(f"[X] Erreur {d_name}: {e}")
                    last_status[device_id] = "error"
                    continue

            except Exception as e:
                logger.error(f"[CRITICAL] Erreur inattendue: {e}")

        time.sleep(INTERVAL)

# Gestion des signaux pour arrêter proprement le daemon
def signal_handler(sig, frame):
    logger.info("Arrêt du daemon Tuya2MQTT Status")
    mqtt_client.loop_stop()
    mqtt_client.disconnect()
    sys.exit(0)

if __name__ == "__main__":
    signal.signal(signal.SIGINT, signal_handler)
    signal.signal(signal.SIGTERM, signal_handler)
    main()

Le script tuya2mqtt_cmd.py

Ce deuxième script écoute les commandes MQTT sur les topics correspondants et les relaie aux appareils Tuya.

nano tuya2mqtt_cmd.py
import tinytuya
import json
import paho.mqtt.client as mqtt
import logging
import os
import signal
import sys
import asyncio

# Chargement du fichier de configuration JSON
def load_config():
    with open('./config.json', 'r') as f:
        config = json.load(f)
    return config

config = load_config()

# Configuration
LOG_LEVEL = config.get("log_level", "INFO").upper()
# Conf -> MQTT
MQTT_BROKER = config["mqtt"]["broker"]
MQTT_PORT = config["mqtt"]["port"]
MQTT_USERNAME = config["mqtt"].get("username")
MQTT_PASSWORD = config["mqtt"].get("password")
MQTT_PREFIX = config["mqtt"]["prefix_topic"]

# Configuration du logger
logging.basicConfig(level=getattr(logging, LOG_LEVEL, logging.INFO),
                    format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger("Tuya2MQTT_Cmd")

# Initialisation MQTT
mqtt_client = None
def init_mqtt():
    client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
    if MQTT_USERNAME:
        client.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD)
    client.on_connect = on_connect
    client.on_message = on_message
    client.connect(MQTT_BROKER, MQTT_PORT, 60)
    client.loop_start()
    return client

# Publication MQTT
def publish(topic, payload, retain=False):
    client.publish(topic, json.dumps(payload), retain=retain)
    logger.debug(f"Publié: {topic} -> {payload}")

# Fonction appelée en cas de connexion au broker MQTT
def on_connect(client, userdata, flags, reason_code, properties):
    if reason_code == 0:
        logger.info("Connecté au broker MQTT")
        client.subscribe(f"{MQTT_PREFIX}/#")
        logger.info(f"Souscription au topic de commande")
    else:
        logger.error(f"Erreur de connexion MQTT : {reason_code}")

# Callback MQTT
def on_message(client, userdata, msg):
    try:
        topic_parts = msg.topic.split("/")
        if len(topic_parts) < 3 or topic_parts[-1] != "command":
            return

        device_id = topic_parts[1]
        topic_ack = f"tuya/{device_id}/ack"
        topic_error = f"tuya/{device_id}/error"
        payload = json.loads(msg.payload)

        devices = config.get("devices", [])
        device = next((d for d in devices if f"{d['d_name']}-{d['d_id'][:12]}" == device_id), None)

        if not device:
            logger.error(f"Périphérique non trouvé pour {device_id}")
            return

        d = tinytuya.OutletDevice(
            dev_id=device["d_id"],
            address=device["d_add"],
            local_key=device["d_lk"],
            version=device["d_v"]
        )
        d.set_version(float(device["d_v"]))

        if "command" in payload and "dps" in payload and "value" in payload:
            try:
                payload["dps"] = int(payload["dps"])
            except ValueError:
                logger.error(f"DPS invalide: {payload['dps']}")

            if isinstance(payload["value"], str):
                val = payload["value"].lower()
                if val == "true":
                    payload["value"] = True
                elif val == "false":
                    payload["value"] = False
                else:
                    try:
                        if "." in val:
                            payload["value"] = float(val)
                        else:
                            payload["value"] = int(val)
                    except ValueError:
                        pass

            logger.info(f"Commande : dps={payload['dps']}, value={payload['value']}")
            d.set_value(payload["dps"], payload["value"])
            publish(topic_ack, {"dps": payload["dps"], "value": payload["value"], "status": "OK"})
        else:
            logger.warning(f"Format de commande invalide: {payload}")
            publish(topic_error, {"status": "FAILED", "error": "Format invalide"})
    except Exception as e:
        logger.error(f"Erreur traitement commande: {e}")
        publish(msg.topic.replace("command", "error"), {"status": "FAILED", "error": str(e)})

# Signal handler
running = True

def signal_handler(sig, frame):
    global running
    logger.info("Arrêt du daemon Tuya2MQTT Command")
    running = False

# Boucle principale async
async def main_loop():
    global mqtt_client
    mqtt_client = init_mqtt()
    logger.info("Démarrage du daemon Tuya2MQTT Command")

    while running:
        await asyncio.sleep(1)
    mqtt_client.loop_stop()
    mqtt_client.disconnect()

signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)

if __name__ == "__main__":
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    try:
        loop.run_until_complete(main_loop())
    finally:
        loop.close()

Le fichier de configuration

Je créé ensuite le fichier de configuration config.json qui permet d’ajuster les périphériques, les paramètres MQTT et le niveau de log.

nano config.json

Contenu du fichier :

{
  "log_level": "WARNING",
  "tuya": {
    "user": "ton_email@example.com",
    "password": "ton_mot_de_passe"
  },
  "mqtt": {
    "prefix_topic": "tuya",
    "broker": "localhost",
    "port": 1883,
    "username": "mon_utilisateur",
    "password": "mon_mot_de_passe"
  },
  "interval": {
    "basic_info_period": 60
  },
  "devices": [
    {
      "d_name": "EV_Charger",
      "d_id": "bf14e4f03bb3bv3eg",
      "d_add": "192.168.1.207",
      "d_lk": "4Ic_4Ya}>s/J",
      "d_v": 3.5
    },
    {
      "d_name": "lumiere_cuisine",
      "d_id": "bfc91e7e91d6e7j2",
      "d_add": "192.168.1.208",
      "d_lk": "6_4Ydxa}>k/T",
      "d_v": 3.3
    }
  ]
}

Et côté journald, tu règles une limite avec :

mkdir -p /etc/systemd/journald.conf.d
nano /etc/systemd/journald.conf.d/tuya2mqtt_cmd.conf
nano /etc/systemd/journald.conf.d/tuya2mqtt_status.conf

Contenu des fichier :

[Journal]
SystemMaxUse=100M
SystemMaxFileSize=10M
MaxRetentionSec=1month

Puis :

sudo systemctl restart systemd-journald

Compilation du container

La structure doit ressembler à ça :

.
├── config.json
├── docker-compose.yml
├── dockerfile
├── requirements.txt
├── tuya2mqtt_cmd.py
└── tuya2mqtt_status.yml

Maintenant que tous fichiers sont présent dans mon dossier d’installation je lance la compilation :

docker compose build

Puis exécute :

docker compose up -d

Si tu as besoin de modifier le fichier config.json, il te suffit de relancer le container :

docker compose restart tuya2mqtt_cmd
docker compose restart tuya2mqtt_status

Et pour consulter le journal (il existe d’autre méthode) :

journalctl CONTAINER_NAME=tuya2mqtt_cmd -f
journalctl CONTAINER_NAME=tuya2mqtt_status -f

Conclusion

Avec cette solution, tu disposes d’un pont local fiable entre Tuya et MQTT, sans dépendance cloud, sans abonnement développeur, et avec une souplesse totale côté domotique.

C’est un petit pas vers l’indépendance domotique… en attendant mieux (et surtout en attendant les vraies bornes V2G).

Les commandes sont envoyé sous forme de JSON :

{
  "command":"set",
  "dps":"18",
  "value":true
}

sur le topic {mqtt_prefix}/{device_id}/command du périphérique.

L’article devrait à l’avenir être compléter avec :
– un explicatif pour connaitre les DPS d’un périphérique.
– une interface Web de configuration.
– éventuellement, une page qui dans l’interface Web qui interrogerais le Cloud pour lister les DPS.

Sources :
https://github.com/jasonacox/tinytuya

Laisser un commentaire

Votre adresse e-mail ne sera pas publiée. Les champs obligatoires sont indiqués avec *