diff --git a/mqtt.go b/mqtt.go index 3745509aa04060bfe863a0915d70951713bad0f6..f377f6722d8ebebec942b8db95cbd60cf14b1003 100644 --- a/mqtt.go +++ b/mqtt.go @@ -4,6 +4,7 @@ import ( "encoding/json" "fmt" "log/slog" + "time" mqtt "github.com/eclipse/paho.mqtt.golang" ) @@ -14,19 +15,35 @@ func MQTTSetup(cfg *Config, publishHandler mqtt.MessageHandler) mqtt.Client { opts := mqtt.NewClientOptions(). AddBroker(fmt.Sprintf("tcp://%s:%d", cfg.brokerHost, cfg.brokerPort)). SetClientID(mqttClientID). - SetDefaultPublishHandler(publishHandler).SetAutoReconnect(true) + SetDefaultPublishHandler(publishHandler). + SetConnectionLostHandler(MQTTLostCnx). + SetMaxReconnectInterval(time.Minute * 2) + + opts = opts.SetOnConnectHandler(func(client mqtt.Client) { + MQTTSubscribe(client, cfg) + }) client := mqtt.NewClient(opts) if token := client.Connect(); token.Wait() && token.Error() != nil { - panic(token.Error()) - } - slog.Debug("Connected to", "Broker", cfg.brokerHost, "Port", cfg.brokerPort, "Client", mqttClientID) - if token := client.Subscribe(cfg.topic+"/#", 0, nil); token.Wait() && token.Error() != nil { - panic(token.Error()) + // panic(token.Error()) + slog.Warn("MQTTSetup failed", "err", token.Error()) } return client } +func MQTTSubscribe(client mqtt.Client, cfg *Config) error { + token := client.Subscribe(cfg.topic+"/#", 0, nil) + if err := token.Wait(); token.Error() != nil { + return fmt.Errorf("failed to subscribe to topic '%s/#': %v", cfg.topic, err) + } + slog.Debug("Subscribed to", "Topic", cfg.topic+"/#") + return nil +} + +func MQTTLostCnx(client mqtt.Client, err error) { + slog.Warn("MQTT connection lost") +} + // MQTTDumpMsg displays the MQTT message func MQTTDumpMsg(msg mqtt.Message) { var data map[string]interface{}