Files
hapz2m/bridge.go
Darell Tan 54aa0795c3 Fix for unacknowledged numeric Z2M state updates
So far Z2M state updates were only boolean (the device `state` on/off),
but after introducing `brightness` values, Z2M state updates may not be
recognized/acknowledged. Unmarshalled Z2M numeric values are always
float64, but updates sent out via MQTT might not be. As a result, the
values may not directly equal and may never match.

To solve this, cast the outgoing expected value into a float64 to
compare against the received Z2M value, which is already a float64.
2023-06-25 02:34:34 +08:00

594 lines
15 KiB
Go

package hapz2m
import (
"github.com/brutella/hap"
"github.com/brutella/hap/accessory"
mqtt "github.com/eclipse/paho.mqtt.golang"
"context"
"encoding/json"
"fmt"
"log"
"net/http"
"reflect"
"strings"
"sync"
"time"
)
var (
ErrDeviceExists = fmt.Errorf("device already exists")
ErrDuplicateExposesMapping = fmt.Errorf("duplicate property name in Exposes mapping")
ErrUpdateTimeout = fmt.Errorf("update timeout")
ErrAlreadyConnected = fmt.Errorf("already connected")
)
const (
MQTT_TOPIC_PREFIX = "zigbee2mqtt/"
// Store name for persisting zigbee2mqtt state
Z2M_STATE_STORE = "z2m_state"
// timeout for UpdateZ2MState
Z2M_UPDATE_TIMEOUT = 3 * time.Second
// timeout for marking devices as non-responsive
Z2M_LAST_SEEN_TIMEOUT = 24 * time.Hour
)
const BRIDGE_DEBUG = false
type Bridge struct {
// MQTT broker and credentials
Server string
Username string
Password string
ctx context.Context
bridgeAcc *accessory.Bridge
devices map[string]*BridgeDevice
server *hap.Server
store hap.Store
// RWMutex protects hap init variables below
hapInitMutex sync.RWMutex
hapInitDone bool
hapInitCh chan struct{}
pendingUpdates sync.Map // queued updates before HAP init
mqttClient mqtt.Client
updateListeners sync.Map
}
type BridgeDevice struct {
Device *Device
Accessory *accessory.A
Mappings map[string]*ExposeMapping
LastSeen time.Time
}
// Creates and initializes a Bridge.
func NewBridge(ctx context.Context, storeDir string) *Bridge {
br := &Bridge{
ctx: ctx,
store: hap.NewFsStore(storeDir),
hapInitCh: make(chan struct{}),
devices: make(map[string]*BridgeDevice),
}
br.bridgeAcc = accessory.NewBridge(accessory.Info{
Name: "hap-z2m Bridge",
Manufacturer: "geekman",
})
return br
}
// Initializes the hap.Server and calls ListenAndServe().
// ListenAndServe() will block until the context is cancelled
func (br *Bridge) StartHAP() error {
if br.bridgeAcc == nil {
return fmt.Errorf("bridge accessory not created yet")
}
br.hapInitMutex.RLock()
if !br.hapInitDone {
br.hapInitMutex.RUnlock()
return fmt.Errorf("HAP accessories not yet initialized")
}
acc := br.accessories()
br.hapInitMutex.RUnlock()
var err error
br.server, err = hap.NewServer(br.store, br.bridgeAcc.A, acc...)
if err != nil {
return err
}
err = br.server.ListenAndServe(br.ctx)
// disconnect from MQTT
br.mqttClient.Disconnect(1000)
// flush z2m state to disk
if err := br.saveZ2MState(); err != nil {
log.Printf("cannot persist Z2M state: %s", err)
}
return err
}
// Waits until the Bridge has configured itself from Z2M state.
// Once that happens, subsequent calls return immediately.
func (br *Bridge) WaitConfigured() {
for !br.hapInitDone {
<-br.hapInitCh
}
}
// Connects to the MQTT server.
// Blocks until the connection is established, then auto-reconnect logic takes over
func (br *Bridge) ConnectMQTT() error {
if br.mqttClient != nil && br.mqttClient.IsConnected() {
return ErrAlreadyConnected
}
opts := mqtt.NewClientOptions().
AddBroker(br.Server).
SetUsername(br.Username).
SetPassword(br.Password).
SetClientID("hap-z2m").
SetKeepAlive(60 * time.Second).
SetPingTimeout(2 * time.Second).
SetConnectRetry(true)
opts.SetOnConnectHandler(func(c mqtt.Client) {
log.Printf("connected to MQTT broker")
tok := c.Subscribe(MQTT_TOPIC_PREFIX+"#", 0, br.handleMqttMessage)
if tok.Wait() && tok.Error() != nil {
log.Fatal(tok.Error())
}
log.Printf("subscribed to MQTT topic")
})
br.mqttClient = mqtt.NewClient(opts)
if tok := br.mqttClient.Connect(); tok.Wait() && tok.Error() != nil {
return tok.Error()
}
return nil
}
// Load the Z2M state from hap.Store into the sync.Map.
// If the state was blank or not found, a nil error will be returned.
// Existing state for devices in the Map will not be overwritten.
func (br *Bridge) loadZ2MState(m *sync.Map) error {
state, err := br.store.Get(Z2M_STATE_STORE)
if err != nil || len(state) == 0 {
return nil
}
var stateMap map[string][]byte
err = json.Unmarshal(state, &stateMap)
if err != nil {
return err
}
for k, v := range stateMap {
// add a last_seen timestamp for saved states without one
var devState map[string]any
err = json.Unmarshal(v, &devState)
if err != nil {
log.Printf("cannot unmarshal device %s JSON: %v", k, err)
continue
}
if _, hasLastSeen := devState["last_seen"]; !hasLastSeen {
devState["last_seen"] = time.Now().Add(-Z2M_LAST_SEEN_TIMEOUT)
// re-marshal the JSON
newJson, err := json.Marshal(devState)
if err == nil {
v = newJson
} else {
log.Printf("cannot re-marshal JSON state after adding last_seen for %s: %v", k, err)
}
}
// LoadOrStore retains existing data, only storing if empty
if _, exists := m.LoadOrStore(k, v); exists {
log.Printf("skipping %s, newer data is available", k)
}
}
return nil
}
// Persists the Z2M state into hap.Store
// Returns an error if there was a problem with translation, serialization or storing.
func (br *Bridge) saveZ2MState() error {
devices := make(map[string][]byte)
for name, dev := range br.devices {
devState := make(map[string]any)
for prop, mapping := range dev.Mappings {
// don't bother persisting property if it is "zero"
if reflect.ValueOf(mapping.Characteristic.Val).IsZero() {
continue
}
v, err := mapping.ToExposedValue(mapping.Characteristic.Val)
if err != nil {
return err
}
devState[prop] = v
}
// serialize into JSON
lastSeenSince := time.Since(dev.LastSeen)
if len(devState) > 0 || lastSeenSince < Z2M_LAST_SEEN_TIMEOUT {
devState["last_seen"] = dev.LastSeen.Unix() * 1000 // timestamp in millis
jsonState, err := json.Marshal(devState)
if err != nil {
return err
}
devices[name] = jsonState
}
}
// return early if there was nothing to persist
if len(devices) == 0 {
return nil
}
allJson, err := json.Marshal(devices)
if err != nil {
return err
}
return br.store.Set(Z2M_STATE_STORE, allJson)
}
func (br *Bridge) handleMqttMessage(_ mqtt.Client, msg mqtt.Message) {
topic, payload := msg.Topic(), msg.Payload()
//log.Printf("received %s %s", topic, payload)
// check for topic prefix and remove it
l := len(MQTT_TOPIC_PREFIX)
if len(topic) <= l || topic[:l] != MQTT_TOPIC_PREFIX {
return
}
topic = topic[l:]
// strip leading slashes if we have to
if topic[0] == '/' {
topic = topic[1:]
}
// ignore /set and /get requests, not sent by z2m
l = len(topic)
if l > len("/get") {
topicSuffix := topic[l-4:]
if topicSuffix == "/get" || topicSuffix == "/set" {
return
}
}
// spawn a goroutine to handle message, since mutex might block
go func() {
br.hapInitMutex.RLock()
defer br.hapInitMutex.RUnlock()
// check if HAP bridge device has been initialized
if !br.hapInitDone {
if strings.HasPrefix(topic, "bridge/") {
// need to look out for bridge/devices for intiial setup
if topic == "bridge/devices" {
err := br.AddDevicesFromJSON(payload)
if err != nil {
log.Printf("unable to add devices from JSON: %v", err)
}
// populate initial characteristic values from cache, if available
err = br.loadZ2MState(&br.pendingUpdates)
if err != nil {
log.Printf("cannot load z2m state: %s", err)
}
// "upgrade" to write lock for modifications
br.hapInitMutex.RUnlock() // unlock now, relock again later
defer br.hapInitMutex.RLock() // ... before defer kicks in
br.hapInitMutex.Lock()
defer br.hapInitMutex.Unlock()
// dequeue and apply state updates
log.Print("applying deferred updates...")
br.pendingUpdates.Range(func(k, v any) bool {
devName := k.(string)
br.UpdateAccessoryState(devName, v.([]byte))
return true // continue
})
br.hapInitDone = true
close(br.hapInitCh) // signal to waiting threads
}
} else {
// queue other messages for state updating after setup
// only keep latest message for each device
log.Printf("queueing updates for %s", topic)
br.pendingUpdates.Store(topic, payload)
}
} else {
br.UpdateAccessoryState(topic, payload)
}
}()
}
// Gets a list of all added accessories
func (br *Bridge) accessories() []*accessory.A {
var acc []*accessory.A
for _, d := range br.devices {
acc = append(acc, d.Accessory)
}
return acc
}
// Creates and calls AddDevice() based on the JSON definitions from zigbee2mqtt/bridge/devices.
func (br *Bridge) AddDevicesFromJSON(devJson []byte) error {
var devices []Device
err := json.Unmarshal(devJson, &devices)
if err != nil {
return err
}
for _, dev := range devices {
dev := dev // make a copy
acc, exp, err := createAccessory(&dev)
if err != nil {
if err == ErrDeviceSkipped {
continue
}
return err
}
err = br.AddDevice(&dev, acc, exp)
if err != nil {
return err
}
}
return nil
}
// Adds a device to this Bridge
func (br *Bridge) AddDevice(dev *Device, acc *accessory.A, mappings []*ExposeMapping) error {
name := dev.FriendlyName
if _, exists := br.devices[name]; exists {
return ErrDeviceExists
}
// put ExposeMapping into a map
em := make(map[string]*ExposeMapping)
for _, m := range mappings {
prop := m.ExposesEntry.Property
if _, exists := em[prop]; exists {
return ErrDuplicateExposesMapping
}
em[prop] = m
}
brdev := &BridgeDevice{dev, acc, em, time.Date(2000, 01, 01, 23, 59, 00, 0, time.Local)}
// wire up accessory's remote value update functions
for _, m := range mappings {
m := m
if m.ExposesEntry.IsSettable() {
m.Characteristic.SetValueRequestFunc = func(newVal any, req *http.Request) (any, int) {
// handle remote value updates only
if req != nil {
updated, err := br.UpdateZ2MState(dev, m, newVal)
if !updated {
log.Printf("error updating z2m for %s: %s", dev.FriendlyName, err)
return nil, hap.JsonStatusServiceCommunicationFailure
}
}
return nil, 0
}
}
m.Characteristic.ValueRequestFunc = func(req *http.Request) (any, int) {
lastSeenSince := time.Since(brdev.LastSeen)
errCode := 0
if lastSeenSince >= Z2M_LAST_SEEN_TIMEOUT {
//log.Printf("dev %s last seen too long ago", name)
errCode = hap.JsonStatusServiceCommunicationFailure
}
return m.Characteristic.Val, errCode
}
}
br.devices[name] = brdev
return nil
}
func makeUpdateKey(dev *Device, mapping *ExposeMapping) string {
return dev.FriendlyName + "/" + mapping.ExposesEntry.Property
}
// Updates zigbee2mqtt device state over MQTT.
// It then waits (with timeout) for zigbee2mqtt to send the updated state via MQTT,
// as acknowledgement of receipt, before returning with an updated status.
// If z2m doesn't respond in time, ErrUpdateTimeout is returned.
func (br *Bridge) UpdateZ2MState(dev *Device, mapping *ExposeMapping, newVal any) (updated bool, err error) {
prop := mapping.ExposesEntry.Property
// map Characteristic to exposed value
expVal, err := mapping.ToExposedValue(newVal)
if err != nil {
return updated, err
}
ch := make(chan any, 2)
defer close(ch)
// only one update should occur at a time
key := makeUpdateKey(dev, mapping)
if _, exists := br.updateListeners.LoadOrStore(key, ch); exists {
return updated, fmt.Errorf("already a pending update on property %s", key)
}
ctx, cancel := context.WithTimeout(br.ctx, Z2M_UPDATE_TIMEOUT)
defer cancel()
log.Printf("updating Z2M state %q -> %+v", prop, expVal)
br.PublishState(dev, map[string]any{prop: expVal})
wait:
for {
select {
case updatedVal := <-ch:
if BRIDGE_DEBUG {
log.Printf("received value %q (expected %q) for %s", updatedVal, expVal, key)
}
if updatedVal == expVal ||
// updatedVal is float64 coz that's how Z2M JSON values are, but expVal may not be
mapping.ExposesEntry.Type == "numeric" && cmpFloat64Numeric(updatedVal, expVal) {
updated = true
break wait
}
case <-ctx.Done():
break wait
}
}
//br.updateListeners.CompareAndDelete(key, ch) // needs go 1.20
br.updateListeners.Delete(key)
if !updated {
err = ErrUpdateTimeout
}
return updated, err
}
// Compare float64 f to numeric value n
// Both parameters are marked as `any`. f will be type-asserted to float64,
// whereas n will be converted to float64 before doing the comparison.
func cmpFloat64Numeric(f, n any) bool {
if ff, ok := f.(float64); ok {
nn, ok := valToFloat64(n)
return ff == nn && ok
}
return false
}
// Publish to the MQTT broker for the specific device
func (br *Bridge) PublishState(dev *Device, payload map[string]any) error {
topic := MQTT_TOPIC_PREFIX + dev.FriendlyName + "/set"
jsonPayload, err := json.Marshal(payload)
if err != nil {
return err
}
if BRIDGE_DEBUG {
log.Printf("publishing %q payload %s", topic, jsonPayload)
}
br.mqttClient.Publish(topic, 0, false, jsonPayload)
return nil
}
// Handle MQTT message to update accessory state
func (br *Bridge) UpdateAccessoryState(devName string, payload []byte) {
dev := br.devices[devName]
if dev == nil {
// skip unknown device
//log.Printf("unknown device %q", devName)
return
}
if BRIDGE_DEBUG {
log.Printf("received update for device %q", devName)
}
var newState map[string]any
err := json.Unmarshal([]byte(payload), &newState)
if err != nil {
log.Printf("unable to parse JSON payload: %v", err)
return
}
lastSeen := time.Now()
if lastSeenProp, found := newState["last_seen"]; found {
switch v := lastSeenProp.(type) {
case float64:
lastSeen = time.Unix(int64(v/1000), 0)
case string:
if lastSeenDate, err := time.Parse(time.RFC3339, v); err == nil {
lastSeen = lastSeenDate
} else {
log.Printf("invalid last_seen timestamp %v", v)
}
default:
log.Printf("invalid last_seen %T %[1]v", v)
}
}
// update LastSeen only if it was valid
if lastSeen.After(dev.LastSeen) {
//log.Printf("updating last seen for %s to %s", devName, lastSeen)
dev.LastSeen = lastSeen
}
for prop, mapping := range dev.Mappings {
newVal, exists := newState[prop]
if !exists {
continue
}
// send updates via channel if requested
if mapping.ExposesEntry.IsSettable() {
updateKey := makeUpdateKey(dev.Device, mapping)
if ch, waiting := br.updateListeners.Load(updateKey); waiting {
if BRIDGE_DEBUG {
log.Printf("sending new value for %q via chan", updateKey)
}
select {
case ch.(chan any) <- newVal:
default:
// couldn't send message
log.Printf("cannot deliver updated value via channel")
}
continue // next property
}
}
// update value into Characteristic
if BRIDGE_DEBUG {
log.Printf("updating %q to %+v", prop, newVal)
}
_, errCode := mapping.SetCharacteristicValue(newVal)
if errCode != 0 {
log.Printf("unable to update characteristic value for %q: %d", prop, errCode)
}
}
}