diff --git a/bridge.go b/bridge.go index 9c6f564..dcf6ca5 100644 --- a/bridge.go +++ b/bridge.go @@ -78,11 +78,18 @@ type Bridge struct { hapInitCh chan struct{} pendingUpdates sync.Map // queued updates before HAP init - mqttClient mqtt.Client + mqttClient mqtt.Client + mqttMessages chan *mqttMessage updateListeners sync.Map } +type mqttMessage struct { + Tstamp time.Time + Topic string + Payload []byte +} + type BridgeDevice struct { Device *Device Accessory *accessory.A @@ -98,6 +105,8 @@ func NewBridge(ctx context.Context, storeDir string) *Bridge { hapInitCh: make(chan struct{}), devices: make(map[string]*BridgeDevice), + + mqttMessages: make(chan *mqttMessage, 100), } br.bridgeAcc = accessory.NewBridge(accessory.Info{ @@ -259,6 +268,16 @@ func (br *Bridge) ConnectMQTT() error { return tok.Error() } + // start MQTT message processor + go func() { + // only start processing after init + br.WaitConfigured() + + for m := range br.mqttMessages { + br.UpdateAccessoryState(m.Topic, m.Payload, m.Tstamp) + } + }() + return nil } @@ -383,62 +402,65 @@ func (br *Bridge) handleMqttMessage(_ mqtt.Client, msg mqtt.Message) { } } - // spawn a goroutine to handle message, since mutex might block - go func() { - if br.DebugMode && topic != "bridge/logging" { - log.Printf("received MQTT %s: %s", topic, payload) - } + isBridgeTopic := strings.HasPrefix(topic, "bridge/") - br.hapInitMutex.RLock() - defer br.hapInitMutex.RUnlock() + if br.DebugMode && topic != "bridge/logging" { + log.Printf("received MQTT %s: %s", topic, payload) + } - isBridgeTopic := strings.HasPrefix(topic, "bridge/") + br.hapInitMutex.RLock() + defer br.hapInitMutex.RUnlock() - // check if HAP bridge device has been initialized - if !br.hapInitDone { - if isBridgeTopic { - // need to look out for bridge/devices for intiial setup - if topic == "bridge/devices" { - err := br.AddDevicesFromJSON(payload) - if err != nil { - log.Printf("unable to add devices from JSON: %v", err) - } + // check if HAP bridge device has been initialized + if !br.hapInitDone { + // need to look out for bridge/devices for intiial setup + if topic == "bridge/devices" { + go func() { + br.hapInitMutex.Lock() + defer br.hapInitMutex.Unlock() - // populate initial characteristic values from cache, if available - err = br.loadZ2MState(&br.pendingUpdates) - if err != nil { - log.Printf("cannot load z2m state: %s", err) - } - - // "upgrade" to write lock for modifications - br.hapInitMutex.RUnlock() // unlock now, relock again later - defer br.hapInitMutex.RLock() // ... before defer kicks in - - br.hapInitMutex.Lock() - defer br.hapInitMutex.Unlock() - - // dequeue and apply state updates - log.Print("applying deferred updates...") - br.pendingUpdates.Range(func(k, v any) bool { - devName := k.(string) - br.UpdateAccessoryState(devName, v.([]byte), recvTime) - return true // continue - }) - - br.hapInitDone = true - - close(br.hapInitCh) // signal to waiting threads + // re-verify that init has not been done + if br.hapInitDone { + return } - } else { - // queue other messages for state updating after setup - // only keep latest message for each device - log.Printf("queueing updates for %s", topic) - br.pendingUpdates.Store(topic, payload) - } + + err := br.AddDevicesFromJSON(payload) + if err != nil { + log.Printf("unable to add devices from JSON: %v", err) + } + + // populate initial characteristic values from cache, if available + err = br.loadZ2MState(&br.pendingUpdates) + if err != nil { + log.Printf("cannot load z2m state: %s", err) + } + + // dequeue and apply state updates + log.Print("applying deferred updates...") + br.pendingUpdates.Range(func(k, v any) bool { + devName := k.(string) + br.UpdateAccessoryState(devName, v.([]byte), recvTime) + return true // continue + }) + + br.hapInitDone = true + + close(br.hapInitCh) // signal to waiting threads + }() } else if !isBridgeTopic { - br.UpdateAccessoryState(topic, payload, recvTime) + // queue other messages for state updating after setup + // only keep latest message for each device + log.Printf("queueing updates for %s", topic) + br.pendingUpdates.Store(topic, payload) } - }() + } else if !isBridgeTopic { + // queue up MQTT messages + select { + case br.mqttMessages <- &mqttMessage{recvTime, topic, payload}: + default: + log.Printf("cannot queue MQTT message %s: %s", topic, payload) + } + } } // Gets a list of all added accessories @@ -597,8 +619,7 @@ wait: } } - //br.updateListeners.CompareAndDelete(key, ch) // needs go 1.20 - br.updateListeners.Delete(key) + br.updateListeners.CompareAndDelete(key, ch) if !updated { err = ErrUpdateTimeout @@ -704,7 +725,7 @@ func (br *Bridge) UpdateAccessoryState(devName string, payload []byte, tstamp ti updateKey := makeUpdateKey(dev.Device, mapping) if ch, waiting := br.updateListeners.Load(updateKey); waiting { if BRIDGE_DEVMODE { - log.Printf("sending new value for %q via chan", updateKey) + log.Printf("sending new value %+v for %q via chan", newVal, updateKey) } select { case ch.(chan any) <- newVal: