plc4go/internal/cbus/Subscriber.go (395 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package cbus
import (
"context"
"fmt"
"runtime/debug"
"strings"
"sync"
"time"
"github.com/pkg/errors"
"github.com/rs/zerolog"
apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
apiValues "github.com/apache/plc4x/plc4go/pkg/api/values"
readWriteModel "github.com/apache/plc4x/plc4go/protocols/cbus/readwrite/model"
spiModel "github.com/apache/plc4x/plc4go/spi/model"
"github.com/apache/plc4x/plc4go/spi/options"
spiValues "github.com/apache/plc4x/plc4go/spi/values"
)
//go:generate go tool plc4xGenerator -type=Subscriber
type Subscriber struct {
consumers map[*spiModel.DefaultPlcConsumerRegistration]apiModel.PlcSubscriptionEventConsumer `hasLocker:"consumersMutex"`
addSubscriber func(subscriber *Subscriber)
consumersMutex sync.RWMutex
wg sync.WaitGroup // use to track spawned go routines
log zerolog.Logger `ignore:"true"`
_options []options.WithOption `ignore:"true"` // Used to pass them downstream
}
func NewSubscriber(addSubscriber func(subscriber *Subscriber), _options ...options.WithOption) *Subscriber {
customLogger := options.ExtractCustomLoggerOrDefaultToGlobal(_options...)
return &Subscriber{
addSubscriber: addSubscriber,
consumers: make(map[*spiModel.DefaultPlcConsumerRegistration]apiModel.PlcSubscriptionEventConsumer),
log: customLogger,
_options: _options,
}
}
func (s *Subscriber) Subscribe(_ context.Context, subscriptionRequest apiModel.PlcSubscriptionRequest) <-chan apiModel.PlcSubscriptionRequestResult {
result := make(chan apiModel.PlcSubscriptionRequestResult, 1)
s.wg.Add(1)
go func() {
defer s.wg.Done()
defer func() {
if err := recover(); err != nil {
result <- spiModel.NewDefaultPlcSubscriptionRequestResult(subscriptionRequest, nil, errors.Errorf("panic-ed %v. Stack: %s", err, debug.Stack()))
}
}()
internalPlcSubscriptionRequest := subscriptionRequest.(*spiModel.DefaultPlcSubscriptionRequest)
// Add this subscriber to the connection.
s.addSubscriber(s)
// Just populate all requests with an OK
responseCodes := map[string]apiModel.PlcResponseCode{}
subscriptionValues := make(map[string]apiModel.PlcSubscriptionHandle)
for _, tagName := range internalPlcSubscriptionRequest.GetTagNames() {
responseCodes[tagName] = apiModel.PlcResponseCode_OK
handle := NewSubscriptionHandle(
s,
tagName,
internalPlcSubscriptionRequest.GetTag(tagName),
internalPlcSubscriptionRequest.GetType(tagName),
internalPlcSubscriptionRequest.GetInterval(tagName),
)
preRegisteredConsumers := internalPlcSubscriptionRequest.GetPreRegisteredConsumers(tagName)
for _, consumer := range preRegisteredConsumers {
_ = handle.Register(consumer)
}
subscriptionValues[tagName] = handle
}
result <- spiModel.NewDefaultPlcSubscriptionRequestResult(
subscriptionRequest,
spiModel.NewDefaultPlcSubscriptionResponse(
subscriptionRequest,
responseCodes,
subscriptionValues,
append(s._options, options.WithCustomLogger(s.log))...,
),
nil,
)
}()
return result
}
func (s *Subscriber) Unsubscribe(ctx context.Context, unsubscriptionRequest apiModel.PlcUnsubscriptionRequest) <-chan apiModel.PlcUnsubscriptionRequestResult {
// TODO: handle context
result := make(chan apiModel.PlcUnsubscriptionRequestResult, 1)
result <- spiModel.NewDefaultPlcUnsubscriptionRequestResult(unsubscriptionRequest, nil, errors.New("Not Implemented"))
// TODO: As soon as we establish a connection, we start getting data...
// subscriptions are more a internal handling of which values to pass where.
_ = ctx
_ = unsubscriptionRequest
return result
}
func (s *Subscriber) handleMonitoredMMI(calReply readWriteModel.CALReply) bool {
s.log.Debug().Stringer("calReply", calReply).Msg("handling")
var unitAddressString string
switch calReply := calReply.(type) {
case readWriteModel.CALReplyLong:
if calReply.GetIsUnitAddress() {
unitAddressString = fmt.Sprintf("u%d", calReply.GetUnitAddress().GetAddress())
} else {
unitAddressString = fmt.Sprintf("b%d", calReply.GetBridgeAddress().GetAddress())
replyNetwork := calReply.GetReplyNetwork()
for _, bridgeAddress := range replyNetwork.GetNetworkRoute().GetAdditionalBridgeAddresses() {
unitAddressString += fmt.Sprintf("-b%d", bridgeAddress.GetAddress())
}
unitAddressString += fmt.Sprintf("-u%d", replyNetwork.GetUnitAddress().GetAddress())
}
default:
unitAddressString = "u0" // On short form it should be always unit 0 TODO: double check that
}
s.log.Debug().Str("unitAddressString", unitAddressString).Msg("Unit address string")
calData := calReply.GetCalData()
handled := false
s.consumersMutex.RLock()
defer s.consumersMutex.RUnlock()
for registration, consumer := range s.consumers {
s.log.Debug().
Stringer("registration", registration).
Interface("consumer", consumer).
Msg("Checking with registration and consumer")
for _, subscriptionHandle := range registration.GetSubscriptionHandles() {
s.log.Debug().Stringer("subscriptionHandle", subscriptionHandle).Msg("offering to")
handleHandled := s.offerMMI(unitAddressString, calData, subscriptionHandle.(*SubscriptionHandle), consumer)
s.log.Debug().Bool("handleHandled", handleHandled).Msg("handle handled")
handled = handled || handleHandled
}
}
s.log.Debug().Bool("handled", handled).Msg("final handled")
return handled
}
func (s *Subscriber) offerMMI(unitAddressString string, calData readWriteModel.CALData, subscriptionHandle *SubscriptionHandle, consumer apiModel.PlcSubscriptionEventConsumer) bool {
tag, ok := subscriptionHandle.tag.(*mmiMonitorTag)
if !ok {
s.log.Debug().
Interface("tag", subscriptionHandle.tag).
Msg("Unusable tag for mmi subscription")
return false
}
tags := map[string]apiModel.PlcTag{}
types := map[string]apiModel.PlcSubscriptionType{}
intervals := map[string]time.Duration{}
responseCodes := map[string]apiModel.PlcResponseCode{}
address := map[string]string{}
sources := map[string]string{}
plcValues := map[string]apiValues.PlcValue{}
tagName := subscriptionHandle.tagName
if unitAddress := tag.GetUnitAddress(); unitAddress != nil {
unitSuffix := fmt.Sprintf("u%d", unitAddress.GetAddress())
if !strings.HasSuffix(unitAddressString, unitSuffix) {
s.log.Debug().
Str("unitAddressString", unitAddressString).
Str("unitSuffix", unitSuffix).
Msg("Current address string unitAddressString has not the suffix unitSuffix")
return false
}
}
sources[tagName] = unitAddressString
subscriptionType := subscriptionHandle.subscriptionType
// TODO: handle subscriptionType
_ = subscriptionType
tags[tagName] = tag
types[tagName] = subscriptionHandle.subscriptionType
intervals[tagName] = subscriptionHandle.interval
var applicationString string
isLevel := true
blockStart := byte(0x0)
// var application readWriteModel.ApplicationIdContainer
switch calData := calData.(type) {
case readWriteModel.CALDataStatus:
application := calData.GetApplication()
applicationString = application.ApplicationId().String()
blockStart = calData.GetBlockStart()
statusBytes := calData.GetStatusBytes()
responseCodes[tagName] = apiModel.PlcResponseCode_OK
plcListValues := make([]apiValues.PlcValue, len(statusBytes)*4)
for i, statusByte := range statusBytes {
plcListValues[i*4+0] = spiValues.NewPlcSTRING(statusByte.GetGav0().String())
plcListValues[i*4+1] = spiValues.NewPlcSTRING(statusByte.GetGav1().String())
plcListValues[i*4+2] = spiValues.NewPlcSTRING(statusByte.GetGav2().String())
plcListValues[i*4+3] = spiValues.NewPlcSTRING(statusByte.GetGav3().String())
}
plcValues[tagName] = spiValues.NewPlcStruct(map[string]apiValues.PlcValue{
"application": spiValues.NewPlcSTRING(application.PLC4XEnumName()),
"blockStart": spiValues.NewPlcBYTE(blockStart),
"values": spiValues.NewPlcList(plcListValues),
})
case readWriteModel.CALDataStatusExtended:
application := calData.GetApplication()
applicationString = application.ApplicationId().String()
isLevel = calData.GetCoding() == readWriteModel.StatusCoding_LEVEL_BY_ELSEWHERE || calData.GetCoding() == readWriteModel.StatusCoding_LEVEL_BY_THIS_SERIAL_INTERFACE
blockStart = calData.GetBlockStart()
coding := calData.GetCoding()
switch coding {
case readWriteModel.StatusCoding_BINARY_BY_THIS_SERIAL_INTERFACE:
fallthrough
case readWriteModel.StatusCoding_BINARY_BY_ELSEWHERE:
statusBytes := calData.GetStatusBytes()
responseCodes[tagName] = apiModel.PlcResponseCode_OK
plcListValues := make([]apiValues.PlcValue, len(statusBytes)*4)
for i, statusByte := range statusBytes {
plcListValues[i*4+0] = spiValues.NewPlcSTRING(statusByte.GetGav0().String())
plcListValues[i*4+1] = spiValues.NewPlcSTRING(statusByte.GetGav1().String())
plcListValues[i*4+2] = spiValues.NewPlcSTRING(statusByte.GetGav2().String())
plcListValues[i*4+3] = spiValues.NewPlcSTRING(statusByte.GetGav3().String())
}
plcValues[tagName] = spiValues.NewPlcStruct(map[string]apiValues.PlcValue{
"application": spiValues.NewPlcSTRING(application.PLC4XEnumName()),
"blockStart": spiValues.NewPlcBYTE(blockStart),
"values": spiValues.NewPlcList(plcListValues),
})
case readWriteModel.StatusCoding_LEVEL_BY_THIS_SERIAL_INTERFACE:
fallthrough
case readWriteModel.StatusCoding_LEVEL_BY_ELSEWHERE:
levelInformation := calData.GetLevelInformation()
responseCodes[tagName] = apiModel.PlcResponseCode_OK
plcListValues := make([]apiValues.PlcValue, len(levelInformation))
for i, levelInformation := range levelInformation {
switch levelInformation := levelInformation.(type) {
case readWriteModel.LevelInformationAbsent:
plcListValues[i] = spiValues.NewPlcSTRING("is absent")
case readWriteModel.LevelInformationCorrupted:
plcListValues[i] = spiValues.NewPlcSTRING("corrupted")
case readWriteModel.LevelInformationNormal:
plcListValues[i] = spiValues.NewPlcUSINT(levelInformation.GetActualLevel())
}
}
plcValues[tagName] = spiValues.NewPlcList(plcListValues)
}
default:
s.log.Error().Type("calData", calData).Msg("Unmapped type")
return false
}
if application := tag.GetApplication(); application != nil {
if actualApplicationIdString := application.ApplicationId().String(); applicationString != actualApplicationIdString {
s.log.Debug().
Str("unitAddressString", unitAddressString).
Str("actualApplicationIdString", actualApplicationIdString).
Msg("Current application id unitAddressString doesn't match actual id actualApplicationIdString")
return false
}
}
statusType := "binary"
if isLevel {
statusType = fmt.Sprintf("level=%#02X", blockStart)
}
address[tagName] = fmt.Sprintf("status/%s/%s", statusType, applicationString)
// Assemble a PlcSubscription event
event := NewSubscriptionEvent(tags, types, intervals, responseCodes, address, sources, plcValues)
consumer(&event)
return true
}
func (s *Subscriber) handleMonitoredSAL(sal readWriteModel.MonitoredSAL) bool {
handled := false
s.consumersMutex.RLock()
defer s.consumersMutex.RUnlock()
for registration, consumer := range s.consumers {
for _, subscriptionHandle := range registration.GetSubscriptionHandles() {
handled = handled || s.offerSAL(sal, subscriptionHandle.(*SubscriptionHandle), consumer)
}
}
return handled
}
func (s *Subscriber) offerSAL(sal readWriteModel.MonitoredSAL, subscriptionHandle *SubscriptionHandle, consumer apiModel.PlcSubscriptionEventConsumer) bool {
tag, ok := subscriptionHandle.tag.(*salMonitorTag)
if !ok {
s.log.Debug().Interface("tag", subscriptionHandle.tag).Msg("Unusable tag for mmi subscription")
return false
}
tags := map[string]apiModel.PlcTag{}
types := map[string]apiModel.PlcSubscriptionType{}
intervals := map[string]time.Duration{}
responseCodes := map[string]apiModel.PlcResponseCode{}
address := map[string]string{}
sources := map[string]string{}
plcValues := map[string]apiValues.PlcValue{}
tagName := subscriptionHandle.tagName
subscriptionType := subscriptionHandle.subscriptionType
// TODO: handle subscriptionType
_ = subscriptionType
tags[tagName] = tag
types[tagName] = subscriptionType
intervals[tagName] = subscriptionHandle.interval
var salData readWriteModel.SALData
var unitAddressString, applicationString string
switch sal := sal.(type) {
case readWriteModel.MonitoredSALShortFormBasicMode:
unitAddressString = "u0" // On short form it should be always unit 0 TODO: double check that
applicationString = sal.GetApplication().ApplicationId().String()
salData = sal.GetSalData()
case readWriteModel.MonitoredSALLongFormSmartMode:
if sal.GetIsUnitAddress() {
unitAddressString = fmt.Sprintf("u%d", sal.GetUnitAddress().GetAddress())
} else {
unitAddressString = fmt.Sprintf("b%d", sal.GetBridgeAddress().GetAddress())
replyNetwork := sal.GetReplyNetwork()
for _, bridgeAddress := range replyNetwork.GetNetworkRoute().GetAdditionalBridgeAddresses() {
unitAddressString += fmt.Sprintf("-b%d", bridgeAddress.GetAddress())
}
unitAddressString += fmt.Sprintf("-u%d", replyNetwork.GetUnitAddress().GetAddress())
}
applicationString = sal.GetApplication().ApplicationId().String()
salData = sal.GetSalData()
}
if unitAddress := tag.GetUnitAddress(); unitAddress != nil {
unitSuffix := fmt.Sprintf("u%d", unitAddress.GetAddress())
if !strings.HasSuffix(unitAddressString, unitSuffix) {
s.log.Debug().
Str("unitAddressString", unitAddressString).
Str("unitSuffix", unitSuffix).
Msg("Current address string unitAddressString has not the suffix unitSuffix")
return false
}
}
sources[tagName] = unitAddressString
if application := tag.GetApplication(); application != nil {
if actualApplicationIdString := application.ApplicationId().String(); applicationString != actualApplicationIdString {
s.log.Debug().
Str("unitAddressString", unitAddressString).
Str("actualApplicationIdString", actualApplicationIdString).
Msg("Current application id unitAddressString doesn't match actual id actualApplicationIdString")
return false
}
}
var commandTypeGetter interface {
PLC4XEnumName() string
}
switch salData := salData.(type) {
case readWriteModel.SALDataAccessControl:
commandTypeGetter = salData.GetAccessControlData().GetCommandType()
case readWriteModel.SALDataAirConditioning:
commandTypeGetter = salData.GetAirConditioningData().GetCommandType()
case readWriteModel.SALDataAudioAndVideo:
commandTypeGetter = salData.GetAudioVideoData().GetCommandType()
case readWriteModel.SALDataClockAndTimekeeping:
commandTypeGetter = salData.GetClockAndTimekeepingData().GetCommandType()
case readWriteModel.SALDataEnableControl:
commandTypeGetter = salData.GetEnableControlData().GetCommandType()
case readWriteModel.SALDataErrorReporting:
commandTypeGetter = salData.GetErrorReportingData().GetCommandType()
case readWriteModel.SALDataFreeUsage:
s.log.Info().Msg("Unknown command type")
case readWriteModel.SALDataHeating:
commandTypeGetter = salData.GetHeatingData().GetCommandType()
case readWriteModel.SALDataHvacActuator:
commandTypeGetter = salData.GetHvacActuatorData().GetCommandType()
case readWriteModel.SALDataIrrigationControl:
commandTypeGetter = salData.GetIrrigationControlData().GetCommandType()
case readWriteModel.SALDataLighting:
commandTypeGetter = salData.GetLightingData().GetCommandType()
case readWriteModel.SALDataMeasurement:
commandTypeGetter = salData.GetMeasurementData().GetCommandType()
case readWriteModel.SALDataMediaTransport:
commandTypeGetter = salData.GetMediaTransportControlData().GetCommandType()
case readWriteModel.SALDataMetering:
commandTypeGetter = salData.GetMeteringData().GetCommandType()
case readWriteModel.SALDataPoolsSpasPondsFountainsControl:
commandTypeGetter = salData.GetPoolsSpaPondsFountainsData().GetCommandType()
case readWriteModel.SALDataReserved:
s.log.Info().Msg("Unknown command type")
case readWriteModel.SALDataRoomControlSystem:
s.log.Info().Msg("Unknown command type not implemented yet") // TODO: implement once there
case readWriteModel.SALDataSecurity:
commandTypeGetter = salData.GetSecurityData().GetCommandType()
case readWriteModel.SALDataTelephonyStatusAndControl:
commandTypeGetter = salData.GetTelephonyData().GetCommandType()
case readWriteModel.SALDataTemperatureBroadcast:
commandTypeGetter = salData.GetTemperatureBroadcastData().GetCommandType()
case readWriteModel.SALDataTesting:
s.log.Info().Msg("Unknown command type not implemented yet") // TODO: implement once there
case readWriteModel.SALDataTriggerControl:
commandTypeGetter = salData.GetTriggerControlData().GetCommandType()
case readWriteModel.SALDataVentilation:
commandTypeGetter = salData.GetVentilationData().GetCommandType()
default:
s.log.Error().Type("salData", salData).Msg("Unmapped type")
}
commandType := "Unknown"
if commandTypeGetter != nil {
commandType = commandTypeGetter.PLC4XEnumName()
}
// TODO: we need to map commands e.g. if we get a MeteringDataElectricityConsumption we can map that to MeteringDataMeasureElectricity
address[tagName] = fmt.Sprintf("sal/%s/%s", applicationString, commandType)
rbvb := spiValues.NewWriteBufferPlcValueBased()
err := salData.SerializeWithWriteBuffer(context.Background(), rbvb)
if err != nil {
s.log.Error().Err(err).Msg("Error serializing to plc value... just returning it as string")
plcValues[tagName] = spiValues.NewPlcSTRING(fmt.Sprintf("%s", salData))
} else {
plcValues[tagName] = rbvb.GetPlcValue()
}
responseCodes[tagName] = apiModel.PlcResponseCode_OK
// Assemble a PlcSubscription event
event := NewSubscriptionEvent(tags, types, intervals, responseCodes, address, sources, plcValues)
consumer(&event)
return true
}
func (s *Subscriber) Register(consumer apiModel.PlcSubscriptionEventConsumer, handles []apiModel.PlcSubscriptionHandle) apiModel.PlcConsumerRegistration {
s.consumersMutex.Lock()
defer s.consumersMutex.Unlock()
consumerRegistration := spiModel.NewDefaultPlcConsumerRegistration(s, consumer, handles...)
s.consumers[consumerRegistration.(*spiModel.DefaultPlcConsumerRegistration)] = consumer
return consumerRegistration
}
func (s *Subscriber) Unregister(registration apiModel.PlcConsumerRegistration) {
s.log.Trace().Msg("unregister")
s.consumersMutex.Lock()
defer s.consumersMutex.Unlock()
delete(s.consumers, registration.(*spiModel.DefaultPlcConsumerRegistration))
s.log.Trace().Msg("registration removed")
}