package hapz2m import ( "github.com/brutella/hap" "github.com/brutella/hap/accessory" haplog "github.com/brutella/hap/log" mqtt "github.com/eclipse/paho.mqtt.golang" "crypto/tls" "net/url" "context" "crypto/rand" "encoding/json" "fmt" "log" "math/big" "net" "net/http" "reflect" "strings" "sync" "time" ) var ( ErrDeviceExists = fmt.Errorf("device already exists") ErrDuplicateExposesMapping = fmt.Errorf("duplicate property name in Exposes mapping") ErrUpdateTimeout = fmt.Errorf("update timeout") ErrAlreadyConnected = fmt.Errorf("already connected") ) const ( // Store name for persisting zigbee2mqtt state Z2M_STATE_STORE = "z2m_state" // Store name for server PIN code Z2M_PIN_STORE = "z2m_pin" // timeout for UpdateZ2MState Z2M_UPDATE_TIMEOUT = 3 * time.Second // timeout for marking devices as non-responsive Z2M_LAST_SEEN_TIMEOUT = 24 * time.Hour ) // show more messages for developers const BRIDGE_DEVMODE = false type Bridge struct { // MQTT broker and credentials Server string Username string Password string TopicPrefix string // address and interfaces to bind to ListenAddr string Interfaces []string DebugMode bool QuietMode bool ctx context.Context bridgeAcc *accessory.Bridge devices map[string]*BridgeDevice server *hap.Server store hap.Store pin string // RWMutex protects hap init variables below hapInitMutex sync.RWMutex hapInitDone bool hapInitCh chan struct{} pendingUpdates sync.Map // queued updates before HAP init mqttClient mqtt.Client mqttMessages chan *mqttMessage updateListeners sync.Map } type mqttMessage struct { Tstamp time.Time Topic string Payload []byte } type BridgeDevice struct { Device *Device Accessory *accessory.A Mappings map[string]*ExposeMapping LastSeen time.Time } // Creates and initializes a Bridge. func NewBridge(ctx context.Context, storeDir string) *Bridge { br := &Bridge{ ctx: ctx, store: hap.NewFsStore(storeDir), hapInitCh: make(chan struct{}), devices: make(map[string]*BridgeDevice), mqttMessages: make(chan *mqttMessage, 100), } br.bridgeAcc = accessory.NewBridge(accessory.Info{ Name: "hap-z2m Bridge", Manufacturer: "geekman", }) return br } // Sets the PIN code for the HAP server. // If the given pin is empty, it will be read from the store, or failing that, // one will be generated func (br *Bridge) SetPin(pin string) (string, error) { // if PIN was not explicitly specified, we re-use the existing one from store if pin == "" { if storePin, err := br.store.Get(Z2M_PIN_STORE); err == nil { pin = string(storePin) } } savePin := pin == "" if pin == "" { for { rnd, err := rand.Int(rand.Reader, big.NewInt(99999999+1)) if err != nil { return "", fmt.Errorf("can't generate PIN: %v", err) } // pad if necessary pin = rnd.Text(10) + "00000000" pin = pin[:8] // ensure it's not an insecure PIN if !hap.InvalidPins[pin] { break } } } else if hap.InvalidPins[pin] { return "", fmt.Errorf("insecure pin %s", pin) } // persist the PIN if savePin { br.store.Set(Z2M_PIN_STORE, []byte(pin)) } br.pin = pin return pin, nil } // Returns the PIN func (br *Bridge) GetPin() string { return br.pin } // Initializes the hap.Server and calls ListenAndServe(). // ListenAndServe() will block until the context is cancelled func (br *Bridge) StartHAP() error { if br.bridgeAcc == nil { return fmt.Errorf("bridge accessory not created yet") } // initialize PIN, either from store or dynamically generated if br.pin == "" { if _, err := br.SetPin(""); err != nil { return err } } br.hapInitMutex.RLock() if !br.hapInitDone { br.hapInitMutex.RUnlock() return fmt.Errorf("HAP accessories not yet initialized") } acc := br.accessories() br.hapInitMutex.RUnlock() var err error br.server, err = hap.NewServer(br.store, br.bridgeAcc.A, acc...) if err != nil { return err } br.server.Pin = br.pin br.server.Addr = br.ListenAddr br.server.Ifaces = br.Interfaces if br.DebugMode { haplog.Debug.Enable() if BRIDGE_DEVMODE { // add microseconds to log output log.SetFlags(log.LstdFlags | log.Lmicroseconds) log.Println("Dev mode enabled") } } err = br.server.ListenAndServe(br.ctx) // disconnect from MQTT br.mqttClient.Disconnect(1000) // flush z2m state to disk if err := br.saveZ2MState(); err != nil { log.Printf("cannot persist Z2M state: %s", err) } return err } // Waits until the Bridge has configured itself from Z2M state. // Once that happens, subsequent calls return immediately. func (br *Bridge) WaitConfigured() { for !br.hapInitDone { <-br.hapInitCh } } // Return number of devices added to the bridge. func (br *Bridge) NumDevices() int { return len(br.devices) } // Connects to the MQTT server. // Blocks until the connection is established, then auto-reconnect logic takes over func (br *Bridge) ConnectMQTT() error { if br.mqttClient != nil && br.mqttClient.IsConnected() { return ErrAlreadyConnected } opts := mqtt.NewClientOptions(). AddBroker(br.Server). SetUsername(br.Username). SetPassword(br.Password). SetClientID("hap-z2m"). SetDialer(&net.Dialer{KeepAlive: -1}). SetKeepAlive(60 * time.Second). SetPingTimeout(2 * time.Second). SetConnectRetry(true) opts.SetOnConnectHandler(func(c mqtt.Client) { log.Printf("connected to MQTT broker") tok := c.Subscribe(br.TopicPrefix+"#", 0, br.handleMqttMessage) if tok.Wait() && tok.Error() != nil { log.Fatal(tok.Error()) } log.Printf("subscribed to MQTT topic") }) opts.SetConnectionAttemptHandler(func(broker *url.URL, cfg *tls.Config) *tls.Config { log.Printf("connecting to MQTT %s...", broker) return cfg }) br.mqttClient = mqtt.NewClient(opts) if tok := br.mqttClient.Connect(); tok.Wait() && tok.Error() != nil { return tok.Error() } // start MQTT message processor go func() { // only start processing after init br.WaitConfigured() for m := range br.mqttMessages { br.UpdateAccessoryState(m.Topic, m.Payload, m.Tstamp) } }() return nil } // Load the Z2M state from hap.Store into the sync.Map. // If the state was blank or not found, a nil error will be returned. // Existing state for devices in the Map will not be overwritten. func (br *Bridge) loadZ2MState(m *sync.Map) error { state, err := br.store.Get(Z2M_STATE_STORE) if err != nil || len(state) == 0 { return nil } var stateMap map[string][]byte err = json.Unmarshal(state, &stateMap) if err != nil { return err } for k, v := range stateMap { if BRIDGE_DEVMODE { log.Printf("%s %s\n", k, v) } // add a last_seen timestamp for saved states without one var devState map[string]any err = json.Unmarshal(v, &devState) if err != nil { log.Printf("cannot unmarshal device %s JSON: %v", k, err) continue } if _, hasLastSeen := devState["last_seen"]; !hasLastSeen { devState["last_seen"] = time.Now().Add(-Z2M_LAST_SEEN_TIMEOUT) // re-marshal the JSON newJson, err := json.Marshal(devState) if err == nil { v = newJson } else { log.Printf("cannot re-marshal JSON state after adding last_seen for %s: %v", k, err) } } // LoadOrStore retains existing data, only storing if empty if _, exists := m.LoadOrStore(k, v); exists { log.Printf("skipping %s, newer data is available", k) } } return nil } // Persists the Z2M state into hap.Store // Returns an error if there was a problem with translation, serialization or storing. func (br *Bridge) saveZ2MState() error { devices := make(map[string][]byte) for name, dev := range br.devices { devState := make(map[string]any) for prop, mapping := range dev.Mappings { // don't bother persisting property if it is "zero" if reflect.ValueOf(mapping.Characteristic.Val).IsZero() { continue } v, err := mapping.ToExposedValue(mapping.Characteristic.Val) if err != nil { return err } devState[prop] = v } // serialize into JSON lastSeenSince := time.Since(dev.LastSeen) if len(devState) > 0 || lastSeenSince < Z2M_LAST_SEEN_TIMEOUT { devState["last_seen"] = dev.LastSeen.Unix() * 1000 // timestamp in millis jsonState, err := json.Marshal(devState) if err != nil { return err } devices[name] = jsonState } } // return early if there was nothing to persist if len(devices) == 0 { return nil } allJson, err := json.Marshal(devices) if err != nil { return err } return br.store.Set(Z2M_STATE_STORE, allJson) } func (br *Bridge) handleMqttMessage(_ mqtt.Client, msg mqtt.Message) { topic, payload, recvTime := msg.Topic(), msg.Payload(), time.Now() // check for topic prefix and remove it l := len(br.TopicPrefix) if len(topic) <= l || topic[:l] != br.TopicPrefix { return } topic = topic[l:] // strip leading slashes if we have to if topic[0] == '/' { topic = topic[1:] } // ignore /set and /get requests, not sent by z2m l = len(topic) if l > len("/get") { topicSuffix := topic[l-4:] if topicSuffix == "/get" || topicSuffix == "/set" { return } } isBridgeTopic := strings.HasPrefix(topic, "bridge/") if br.DebugMode && topic != "bridge/logging" { log.Printf("received MQTT %s: %s", topic, payload) } br.hapInitMutex.RLock() defer br.hapInitMutex.RUnlock() // check if HAP bridge device has been initialized if !br.hapInitDone { // need to look out for bridge/devices for intiial setup if topic == "bridge/devices" { go func() { br.hapInitMutex.Lock() defer br.hapInitMutex.Unlock() // re-verify that init has not been done if br.hapInitDone { return } err := br.AddDevicesFromJSON(payload) if err != nil { log.Printf("unable to add devices from JSON: %v", err) } // populate initial characteristic values from cache, if available err = br.loadZ2MState(&br.pendingUpdates) if err != nil { log.Printf("cannot load z2m state: %s", err) } // dequeue and apply state updates log.Print("applying deferred updates...") br.pendingUpdates.Range(func(k, v any) bool { devName := k.(string) br.UpdateAccessoryState(devName, v.([]byte), recvTime) return true // continue }) br.hapInitDone = true close(br.hapInitCh) // signal to waiting threads }() } else if !isBridgeTopic { // queue other messages for state updating after setup // only keep latest message for each device log.Printf("queueing updates for %s", topic) br.pendingUpdates.Store(topic, payload) } } else if !isBridgeTopic { // queue up MQTT messages select { case br.mqttMessages <- &mqttMessage{recvTime, topic, payload}: default: log.Printf("cannot queue MQTT message %s: %s", topic, payload) } } } // Gets a list of all added accessories func (br *Bridge) accessories() []*accessory.A { var acc []*accessory.A for _, d := range br.devices { acc = append(acc, d.Accessory) } return acc } func deviceJsonDescriptor(d Device) []byte { d.Definition = nil j, _ := json.Marshal(d) return j } // Creates and calls AddDevice() based on the JSON definitions from zigbee2mqtt/bridge/devices. func (br *Bridge) AddDevicesFromJSON(devJson []byte) error { var devices []Device err := json.Unmarshal(devJson, &devices) if err != nil { return err } for _, dev := range devices { dev := dev // make a copy acc, exp, err := createAccessory(&dev) if err != nil { if err == ErrDeviceSkipped || err == ErrUnknownDeviceType { continue } return fmt.Errorf("createAccessory failed: %+v %s", err, deviceJsonDescriptor(dev)) } err = br.AddDevice(&dev, acc, exp) if err != nil { return fmt.Errorf("AddDevice failed: %+v %s", err, deviceJsonDescriptor(dev)) } } return nil } // Adds a device to this Bridge func (br *Bridge) AddDevice(dev *Device, acc *accessory.A, mappings []*ExposeMapping) error { name := dev.FriendlyName if _, exists := br.devices[name]; exists { return ErrDeviceExists } // put ExposeMapping into a map em := make(map[string]*ExposeMapping) for _, m := range mappings { prop := m.ExposesEntry.Property if _, exists := em[prop]; exists { return ErrDuplicateExposesMapping } em[prop] = m } brdev := &BridgeDevice{dev, acc, em, time.Date(2000, 01, 01, 23, 59, 00, 0, time.Local)} // wire up accessory's remote value update functions for _, m := range mappings { m := m if m.ExposesEntry.IsSettable() { m.Characteristic.SetValueRequestFunc = func(newVal any, req *http.Request) (any, int) { // handle remote value updates only if req != nil { updated, err := br.UpdateZ2MState(dev, m, newVal) if !updated { log.Printf("error updating z2m for %s: %s", dev.FriendlyName, err) return nil, hap.JsonStatusServiceCommunicationFailure } } return nil, 0 } } m.Characteristic.ValueRequestFunc = func(req *http.Request) (any, int) { lastSeenSince := time.Since(brdev.LastSeen) errCode := 0 if lastSeenSince >= Z2M_LAST_SEEN_TIMEOUT { //log.Printf("dev %s last seen too long ago", name) errCode = hap.JsonStatusServiceCommunicationFailure } return m.Characteristic.Val, errCode } } br.devices[name] = brdev return nil } func makeUpdateKey(dev *Device, mapping *ExposeMapping) string { return dev.FriendlyName + "/" + mapping.ExposesEntry.Property } // Updates zigbee2mqtt device state over MQTT. // It then waits (with timeout) for zigbee2mqtt to send the updated state via MQTT, // as acknowledgement of receipt, before returning with an updated status. // If z2m doesn't respond in time, ErrUpdateTimeout is returned. func (br *Bridge) UpdateZ2MState(dev *Device, mapping *ExposeMapping, newVal any) (updated bool, err error) { prop := mapping.ExposesEntry.Property // map Characteristic to exposed value expVal, err := mapping.ToExposedValue(newVal) if err != nil { return updated, err } ch := make(chan any, 2) defer close(ch) // only one update should occur at a time key := makeUpdateKey(dev, mapping) if _, exists := br.updateListeners.LoadOrStore(key, ch); exists { return updated, fmt.Errorf("already a pending update on property %s", key) } ctx, cancel := context.WithTimeout(br.ctx, Z2M_UPDATE_TIMEOUT) defer cancel() if br.DebugMode { log.Printf("updating Z2M state %q -> %+v", prop, expVal) } br.PublishState(dev, map[string]any{prop: expVal}) wait: for { select { case updatedVal := <-ch: if BRIDGE_DEVMODE { log.Printf("received value %q (expected %q) for %s", updatedVal, expVal, key) } if updatedVal == expVal || // updatedVal is float64 coz that's how Z2M JSON values are, but expVal may not be mapping.ExposesEntry.Type == "numeric" && cmpFloat64Numeric(updatedVal, expVal) { if br.DebugMode { log.Printf("Z2M state %q for %s updated to %q", prop, key, updatedVal) } updated = true break wait } case <-ctx.Done(): if br.DebugMode { log.Printf("Z2M state update %s timed out", key) } break wait } } br.updateListeners.CompareAndDelete(key, ch) if !updated { err = ErrUpdateTimeout } return updated, err } // Compare float64 f to numeric value n // Both parameters are marked as `any`. f will be type-asserted to float64, // whereas n will be converted to float64 before doing the comparison. func cmpFloat64Numeric(f, n any) bool { if ff, ok := f.(float64); ok { nn, ok := valToFloat64(n) return ff == nn && ok } return false } // Publish to the MQTT broker for the specific device func (br *Bridge) PublishState(dev *Device, payload map[string]any) error { topic := br.TopicPrefix + dev.FriendlyName + "/set" jsonPayload, err := json.Marshal(payload) if err != nil { return err } if br.DebugMode { log.Printf("publishing %s: %s", topic, jsonPayload) } br.mqttClient.Publish(topic, 0, false, jsonPayload) return nil } func parseLastSeen(ts any) (time.Time, error) { var lastSeen time.Time var err error switch v := ts.(type) { case float64: lastSeen = time.Unix(int64(v/1000), 0) case string: if lastSeenDate, err := time.Parse(time.RFC3339, v); err == nil { lastSeen = lastSeenDate } else { err = fmt.Errorf("invalid last_seen timestamp %v", v) } default: err = fmt.Errorf("invalid last_seen %T %[1]v", v) } return lastSeen, err } // Handle MQTT message to update accessory state func (br *Bridge) UpdateAccessoryState(devName string, payload []byte, tstamp time.Time) { dev := br.devices[devName] if dev == nil { if br.DebugMode || BRIDGE_DEVMODE { log.Printf("unknown device %q", devName) } // skip unknown device return } if br.DebugMode || (!br.QuietMode && time.Since(dev.LastSeen) > 30*time.Second) { log.Printf("received update for device %q", devName) } var newState map[string]any err := json.Unmarshal([]byte(payload), &newState) if err != nil { log.Printf("unable to parse JSON payload: %v", err) return } lastSeen := tstamp if lastSeenProp, found := newState["last_seen"]; found { ts, err := parseLastSeen(lastSeenProp) if err == nil { lastSeen = ts } else { log.Println(err) } } // update LastSeen only if it was valid if lastSeen.After(dev.LastSeen) { //log.Printf("updating last seen for %s to %s", devName, lastSeen) dev.LastSeen = lastSeen } for prop, mapping := range dev.Mappings { newVal, exists := newState[prop] if !exists { continue } // send updates via channel if requested if mapping.ExposesEntry.IsSettable() { updateKey := makeUpdateKey(dev.Device, mapping) if ch, waiting := br.updateListeners.Load(updateKey); waiting { if BRIDGE_DEVMODE { log.Printf("sending new value %+v for %q via chan", newVal, updateKey) } select { case ch.(chan any) <- newVal: default: // couldn't send message log.Printf("cannot deliver updated value via channel") } continue // next property } } // update value into Characteristic if BRIDGE_DEVMODE { log.Printf("updating %q to %+v", prop, newVal) } // convert to Characteristic value to determine if it has changed // since we don't know what the Exposed -> Characteristic mapping would be newCv, err := mapping.ToCharacteristicValue(newVal) if err != nil { log.Printf("unable to convert characteristic value for %q: %v", prop, err) continue } oldCv := mapping.SetCurrentValue(newCv) changed := oldCv != newCv // call the Set func if defined doDefault := true setFunc := mapping.SetCharacteristicValueFunc if setFunc != nil { doDefault, err = setFunc(mapping, newCv, changed) if err != nil { log.Printf("SetCharacteristicValueFunc for %s error: %+v", mapping, err) continue } } if doDefault { _, errCode := mapping.Characteristic.SetValueRequest(newCv, nil) if errCode != 0 { log.Printf("unable to update characteristic value for %q: %d", prop, errCode) } } } }