Skip to content
Snippets Groups Projects
Commit 13457112 authored by KERDREUX Jerome's avatar KERDREUX Jerome
Browse files

Code refactoring in progress

Refactoring Go code is not that hard (thanks to types), so let's give it
a try
parent 0485310b
No related branches found
No related tags found
No related merge requests found
mqtt.go 0 → 100644
package main
import (
"encoding/json"
"fmt"
"log/slog"
"sort"
MQTT "github.com/eclipse/paho.mqtt.golang"
"github.com/jedib0t/go-pretty/v6/table"
)
// mqttSetup creates a new MQTT client
func mqttSetup(mqttBroker string, port int) MQTT.Client {
// This JS style of creating a client is awfully verbose
opts := MQTT.NewClientOptions().
AddBroker(fmt.Sprintf("tcp://%s:%d", mqttBroker, port)).
SetClientID(mqttClientID).
SetDefaultPublishHandler(mqttPublishHander).SetAutoReconnect(true)
client := MQTT.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
}
slog.Debug("Connected to", "Broker", mqttBroker, "Port", port, "Client", mqttClientID)
if token := client.Subscribe(mqttTopic+"/#", 0, nil); token.Wait() && token.Error() != nil {
panic(token.Error())
}
return client
}
// mqttDumpMsg displays the MQTT message
func mqttDumpMsg(msg MQTT.Message) {
var data map[string]interface{}
err := json.Unmarshal(msg.Payload(), &data)
if err != nil {
slog.Error("Error decoding JSON %v", "err", err)
}
// sort keys
keys := make([]string, 0, len(data))
for key := range data {
keys = append(keys, key)
}
sort.Strings(keys)
// dump keys
tab := table.NewWriter()
tab.SetTitle("MQTT update")
tab.SetStyle(table.StyleRounded)
for _, key := range keys {
if key != "update" {
tab.AppendRow(table.Row{key, data[key]})
}
}
fmt.Println(tab.Render())
}
......@@ -5,7 +5,6 @@ import (
"fmt"
"log/slog"
"slices"
"sort"
"strings"
MQTT "github.com/eclipse/paho.mqtt.golang"
......@@ -148,6 +147,17 @@ func (zDev *Z2MDevice) GetExpose(name string) *Expose {
return nil
}
// updates the xAAL device with the MQTT message
func (zDev *Z2MDevice) HandleMessage(msg MQTT.Message) {
var data map[string]interface{}
err := json.Unmarshal(msg.Payload(), &data)
if err != nil {
slog.Error("Error decoding JSON", "err", err)
} else {
updateXAALDevice(zDev, data)
}
}
// jsonParseDevices parses the bridge/devices json and creates new xAAL devices
// if they don't exist
func jsonParseDevices(jsonData []byte) {
......@@ -172,43 +182,6 @@ func jsonParseDevices(jsonData []byte) {
}
}
// mqttDeviceHandler updates the xAAL device with the MQTT message
func mqttDeviceHandler(zDev *Z2MDevice, msg MQTT.Message) {
var data map[string]interface{}
err := json.Unmarshal(msg.Payload(), &data)
if err != nil {
slog.Error("Error decoding JSON", "err", err)
} else {
updateXAALDevice(zDev, data)
}
}
// mqttDumpMsg displays the MQTT message
func mqttDumpMsg(msg MQTT.Message) {
var data map[string]interface{}
err := json.Unmarshal(msg.Payload(), &data)
if err != nil {
slog.Error("Error decoding JSON %v", "err", err)
}
// sort keys
keys := make([]string, 0, len(data))
for key := range data {
keys = append(keys, key)
}
sort.Strings(keys)
// dump keys
tab := table.NewWriter()
tab.SetTitle("MQTT update")
tab.SetStyle(table.StyleRounded)
for _, key := range keys {
if key != "update" {
tab.AppendRow(table.Row{key, data[key]})
}
}
fmt.Println(tab.Render())
}
// mqttPublishHander handles all incoming MQTT messages
// If the topic is /bridge/devices it will parse the json and create new devices
// Else it will find the device with the topic and call the mqttDeviceHandler
......@@ -226,26 +199,7 @@ func mqttPublishHander(client MQTT.Client, msg MQTT.Message) {
dev := GetGW().GetZDeviceByTopic(msg.Topic())
mqttDumpMsg(msg)
if dev != nil {
mqttDeviceHandler(dev, msg)
}
}
}
// mqttSetup creates a new MQTT client
func mqttSetup(mqttBroker string, port int) MQTT.Client {
// This JS style of creating a client is awfully verbose
opts := MQTT.NewClientOptions().
AddBroker(fmt.Sprintf("tcp://%s:%d", mqttBroker, port)).
SetClientID(mqttClientID).
SetDefaultPublishHandler(mqttPublishHander).SetAutoReconnect(true)
client := MQTT.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
dev.HandleMessage(msg)
}
slog.Debug("Connected to", "Broker", mqttBroker, "Port", port, "Client", mqttClientID)
if token := client.Subscribe(mqttTopic+"/#", 0, nil); token.Wait() && token.Error() != nil {
panic(token.Error())
}
return client
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment