From c52973f22545a547cee421a8d55812990600896a Mon Sep 17 00:00:00 2001 From: Darell Tan Date: Fri, 16 Jan 2026 18:31:47 +0800 Subject: [PATCH] Maintain MQTT message order during processing Previously the message handler spawned a goroutine for each message and hoped for the best, but obviously it didn't work. Z2M often emitted the current (old) state first, before sending out the updated state. Without ordering, the old state might get processed later and overwrote the new, correct state. Message handling is now serialized, but decoupled, using a buffered channel and a separate goroutine for processing. --- bridge.go | 127 +++++++++++++++++++++++++++++++----------------------- 1 file changed, 74 insertions(+), 53 deletions(-) diff --git a/bridge.go b/bridge.go index 9c6f564..dcf6ca5 100644 --- a/bridge.go +++ b/bridge.go @@ -78,11 +78,18 @@ type Bridge struct { hapInitCh chan struct{} pendingUpdates sync.Map // queued updates before HAP init - mqttClient mqtt.Client + mqttClient mqtt.Client + mqttMessages chan *mqttMessage updateListeners sync.Map } +type mqttMessage struct { + Tstamp time.Time + Topic string + Payload []byte +} + type BridgeDevice struct { Device *Device Accessory *accessory.A @@ -98,6 +105,8 @@ func NewBridge(ctx context.Context, storeDir string) *Bridge { hapInitCh: make(chan struct{}), devices: make(map[string]*BridgeDevice), + + mqttMessages: make(chan *mqttMessage, 100), } br.bridgeAcc = accessory.NewBridge(accessory.Info{ @@ -259,6 +268,16 @@ func (br *Bridge) ConnectMQTT() error { return tok.Error() } + // start MQTT message processor + go func() { + // only start processing after init + br.WaitConfigured() + + for m := range br.mqttMessages { + br.UpdateAccessoryState(m.Topic, m.Payload, m.Tstamp) + } + }() + return nil } @@ -383,62 +402,65 @@ func (br *Bridge) handleMqttMessage(_ mqtt.Client, msg mqtt.Message) { } } - // spawn a goroutine to handle message, since mutex might block - go func() { - if br.DebugMode && topic != "bridge/logging" { - log.Printf("received MQTT %s: %s", topic, payload) - } + isBridgeTopic := strings.HasPrefix(topic, "bridge/") - br.hapInitMutex.RLock() - defer br.hapInitMutex.RUnlock() + if br.DebugMode && topic != "bridge/logging" { + log.Printf("received MQTT %s: %s", topic, payload) + } - isBridgeTopic := strings.HasPrefix(topic, "bridge/") + br.hapInitMutex.RLock() + defer br.hapInitMutex.RUnlock() - // check if HAP bridge device has been initialized - if !br.hapInitDone { - if isBridgeTopic { - // need to look out for bridge/devices for intiial setup - if topic == "bridge/devices" { - err := br.AddDevicesFromJSON(payload) - if err != nil { - log.Printf("unable to add devices from JSON: %v", err) - } + // check if HAP bridge device has been initialized + if !br.hapInitDone { + // need to look out for bridge/devices for intiial setup + if topic == "bridge/devices" { + go func() { + br.hapInitMutex.Lock() + defer br.hapInitMutex.Unlock() - // populate initial characteristic values from cache, if available - err = br.loadZ2MState(&br.pendingUpdates) - if err != nil { - log.Printf("cannot load z2m state: %s", err) - } - - // "upgrade" to write lock for modifications - br.hapInitMutex.RUnlock() // unlock now, relock again later - defer br.hapInitMutex.RLock() // ... before defer kicks in - - br.hapInitMutex.Lock() - defer br.hapInitMutex.Unlock() - - // dequeue and apply state updates - log.Print("applying deferred updates...") - br.pendingUpdates.Range(func(k, v any) bool { - devName := k.(string) - br.UpdateAccessoryState(devName, v.([]byte), recvTime) - return true // continue - }) - - br.hapInitDone = true - - close(br.hapInitCh) // signal to waiting threads + // re-verify that init has not been done + if br.hapInitDone { + return } - } else { - // queue other messages for state updating after setup - // only keep latest message for each device - log.Printf("queueing updates for %s", topic) - br.pendingUpdates.Store(topic, payload) - } + + err := br.AddDevicesFromJSON(payload) + if err != nil { + log.Printf("unable to add devices from JSON: %v", err) + } + + // populate initial characteristic values from cache, if available + err = br.loadZ2MState(&br.pendingUpdates) + if err != nil { + log.Printf("cannot load z2m state: %s", err) + } + + // dequeue and apply state updates + log.Print("applying deferred updates...") + br.pendingUpdates.Range(func(k, v any) bool { + devName := k.(string) + br.UpdateAccessoryState(devName, v.([]byte), recvTime) + return true // continue + }) + + br.hapInitDone = true + + close(br.hapInitCh) // signal to waiting threads + }() } else if !isBridgeTopic { - br.UpdateAccessoryState(topic, payload, recvTime) + // queue other messages for state updating after setup + // only keep latest message for each device + log.Printf("queueing updates for %s", topic) + br.pendingUpdates.Store(topic, payload) } - }() + } else if !isBridgeTopic { + // queue up MQTT messages + select { + case br.mqttMessages <- &mqttMessage{recvTime, topic, payload}: + default: + log.Printf("cannot queue MQTT message %s: %s", topic, payload) + } + } } // Gets a list of all added accessories @@ -597,8 +619,7 @@ wait: } } - //br.updateListeners.CompareAndDelete(key, ch) // needs go 1.20 - br.updateListeners.Delete(key) + br.updateListeners.CompareAndDelete(key, ch) if !updated { err = ErrUpdateTimeout @@ -704,7 +725,7 @@ func (br *Bridge) UpdateAccessoryState(devName string, payload []byte, tstamp ti updateKey := makeUpdateKey(dev.Device, mapping) if ch, waiting := br.updateListeners.Load(updateKey); waiting { if BRIDGE_DEVMODE { - log.Printf("sending new value for %q via chan", updateKey) + log.Printf("sending new value %+v for %q via chan", newVal, updateKey) } select { case ch.(chan any) <- newVal: