Maintain MQTT message order during processing

Previously the message handler spawned a goroutine for each message and
hoped for the best, but obviously it didn't work. Z2M often emitted the
current (old) state first, before sending out the updated state. Without
ordering, the old state might get processed later and overwrote the new,
correct state.

Message handling is now serialized, but decoupled, using a buffered
channel and a separate goroutine for processing.
This commit is contained in:
Darell Tan
2026-01-16 18:31:47 +08:00
parent 8f39074f36
commit c52973f225

View File

@@ -79,10 +79,17 @@ type Bridge struct {
pendingUpdates sync.Map // queued updates before HAP init pendingUpdates sync.Map // queued updates before HAP init
mqttClient mqtt.Client mqttClient mqtt.Client
mqttMessages chan *mqttMessage
updateListeners sync.Map updateListeners sync.Map
} }
type mqttMessage struct {
Tstamp time.Time
Topic string
Payload []byte
}
type BridgeDevice struct { type BridgeDevice struct {
Device *Device Device *Device
Accessory *accessory.A Accessory *accessory.A
@@ -98,6 +105,8 @@ func NewBridge(ctx context.Context, storeDir string) *Bridge {
hapInitCh: make(chan struct{}), hapInitCh: make(chan struct{}),
devices: make(map[string]*BridgeDevice), devices: make(map[string]*BridgeDevice),
mqttMessages: make(chan *mqttMessage, 100),
} }
br.bridgeAcc = accessory.NewBridge(accessory.Info{ br.bridgeAcc = accessory.NewBridge(accessory.Info{
@@ -259,6 +268,16 @@ func (br *Bridge) ConnectMQTT() error {
return tok.Error() return tok.Error()
} }
// start MQTT message processor
go func() {
// only start processing after init
br.WaitConfigured()
for m := range br.mqttMessages {
br.UpdateAccessoryState(m.Topic, m.Payload, m.Tstamp)
}
}()
return nil return nil
} }
@@ -383,8 +402,8 @@ func (br *Bridge) handleMqttMessage(_ mqtt.Client, msg mqtt.Message) {
} }
} }
// spawn a goroutine to handle message, since mutex might block isBridgeTopic := strings.HasPrefix(topic, "bridge/")
go func() {
if br.DebugMode && topic != "bridge/logging" { if br.DebugMode && topic != "bridge/logging" {
log.Printf("received MQTT %s: %s", topic, payload) log.Printf("received MQTT %s: %s", topic, payload)
} }
@@ -392,13 +411,19 @@ func (br *Bridge) handleMqttMessage(_ mqtt.Client, msg mqtt.Message) {
br.hapInitMutex.RLock() br.hapInitMutex.RLock()
defer br.hapInitMutex.RUnlock() defer br.hapInitMutex.RUnlock()
isBridgeTopic := strings.HasPrefix(topic, "bridge/")
// check if HAP bridge device has been initialized // check if HAP bridge device has been initialized
if !br.hapInitDone { if !br.hapInitDone {
if isBridgeTopic {
// need to look out for bridge/devices for intiial setup // need to look out for bridge/devices for intiial setup
if topic == "bridge/devices" { if topic == "bridge/devices" {
go func() {
br.hapInitMutex.Lock()
defer br.hapInitMutex.Unlock()
// re-verify that init has not been done
if br.hapInitDone {
return
}
err := br.AddDevicesFromJSON(payload) err := br.AddDevicesFromJSON(payload)
if err != nil { if err != nil {
log.Printf("unable to add devices from JSON: %v", err) log.Printf("unable to add devices from JSON: %v", err)
@@ -410,13 +435,6 @@ func (br *Bridge) handleMqttMessage(_ mqtt.Client, msg mqtt.Message) {
log.Printf("cannot load z2m state: %s", err) log.Printf("cannot load z2m state: %s", err)
} }
// "upgrade" to write lock for modifications
br.hapInitMutex.RUnlock() // unlock now, relock again later
defer br.hapInitMutex.RLock() // ... before defer kicks in
br.hapInitMutex.Lock()
defer br.hapInitMutex.Unlock()
// dequeue and apply state updates // dequeue and apply state updates
log.Print("applying deferred updates...") log.Print("applying deferred updates...")
br.pendingUpdates.Range(func(k, v any) bool { br.pendingUpdates.Range(func(k, v any) bool {
@@ -428,17 +446,21 @@ func (br *Bridge) handleMqttMessage(_ mqtt.Client, msg mqtt.Message) {
br.hapInitDone = true br.hapInitDone = true
close(br.hapInitCh) // signal to waiting threads close(br.hapInitCh) // signal to waiting threads
} }()
} else { } else if !isBridgeTopic {
// queue other messages for state updating after setup // queue other messages for state updating after setup
// only keep latest message for each device // only keep latest message for each device
log.Printf("queueing updates for %s", topic) log.Printf("queueing updates for %s", topic)
br.pendingUpdates.Store(topic, payload) br.pendingUpdates.Store(topic, payload)
} }
} else if !isBridgeTopic { } else if !isBridgeTopic {
br.UpdateAccessoryState(topic, payload, recvTime) // queue up MQTT messages
select {
case br.mqttMessages <- &mqttMessage{recvTime, topic, payload}:
default:
log.Printf("cannot queue MQTT message %s: %s", topic, payload)
}
} }
}()
} }
// Gets a list of all added accessories // Gets a list of all added accessories
@@ -597,8 +619,7 @@ wait:
} }
} }
//br.updateListeners.CompareAndDelete(key, ch) // needs go 1.20 br.updateListeners.CompareAndDelete(key, ch)
br.updateListeners.Delete(key)
if !updated { if !updated {
err = ErrUpdateTimeout err = ErrUpdateTimeout
@@ -704,7 +725,7 @@ func (br *Bridge) UpdateAccessoryState(devName string, payload []byte, tstamp ti
updateKey := makeUpdateKey(dev.Device, mapping) updateKey := makeUpdateKey(dev.Device, mapping)
if ch, waiting := br.updateListeners.Load(updateKey); waiting { if ch, waiting := br.updateListeners.Load(updateKey); waiting {
if BRIDGE_DEVMODE { if BRIDGE_DEVMODE {
log.Printf("sending new value for %q via chan", updateKey) log.Printf("sending new value %+v for %q via chan", newVal, updateKey)
} }
select { select {
case ch.(chan any) <- newVal: case ch.(chan any) <- newVal: