plc4go/internal/cbus/Connection.go (560 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package cbus
import (
"context"
"runtime/debug"
"sync"
"time"
"github.com/pkg/errors"
"github.com/rs/zerolog"
"github.com/apache/plc4x/plc4go/pkg/api"
apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
readWriteModel "github.com/apache/plc4x/plc4go/protocols/cbus/readwrite/model"
"github.com/apache/plc4x/plc4go/spi"
"github.com/apache/plc4x/plc4go/spi/default"
spiModel "github.com/apache/plc4x/plc4go/spi/model"
"github.com/apache/plc4x/plc4go/spi/options"
"github.com/apache/plc4x/plc4go/spi/tracer"
"github.com/apache/plc4x/plc4go/spi/transactions"
)
//go:generate go tool plc4xGenerator -type=AlphaGenerator
type AlphaGenerator struct {
currentAlpha byte `hasLocker:"lock"`
lock sync.Mutex
}
func (t *AlphaGenerator) getAndIncrement() byte {
t.lock.Lock()
defer t.lock.Unlock()
// If we've reached the max value 'z', reset back to 'g'
if t.currentAlpha > 'z' {
t.currentAlpha = 'g'
}
result := t.currentAlpha
t.currentAlpha += 1
return result
}
//go:generate go tool plc4xGenerator -type=Connection
type Connection struct {
_default.DefaultConnection
alphaGenerator AlphaGenerator `stringer:"true"`
messageCodec *MessageCodec
subscribers []*Subscriber
tm transactions.RequestTransactionManager
configuration Configuration `stringer:"true"`
driverContext DriverContext `stringer:"true"`
handlerWaitGroup sync.WaitGroup
connectionId string
tracer tracer.Tracer
wg sync.WaitGroup // use to track spawned go routines
log zerolog.Logger `ignore:"true"`
_options []options.WithOption `ignore:"true"` // Used to pass them downstream
}
func NewConnection(messageCodec *MessageCodec, configuration Configuration, driverContext DriverContext, tagHandler spi.PlcTagHandler, tm transactions.RequestTransactionManager, connectionOptions map[string][]string, _options ...options.WithOption) *Connection {
customLogger := options.ExtractCustomLoggerOrDefaultToGlobal(_options...)
connection := &Connection{
alphaGenerator: AlphaGenerator{currentAlpha: 'g'},
messageCodec: messageCodec,
configuration: configuration,
driverContext: driverContext,
tm: tm,
log: customLogger,
_options: _options,
}
if traceEnabledOption, ok := connectionOptions["traceEnabled"]; ok {
if len(traceEnabledOption) == 1 {
connection.tracer = tracer.NewTracer(connection.connectionId, _options...)
}
}
connection.DefaultConnection = _default.NewDefaultConnection(
connection,
append(_options,
_default.WithPlcTagHandler(tagHandler),
_default.WithPlcValueHandler(NewValueHandler(_options...)),
)...,
)
return connection
}
func (c *Connection) GetConnectionId() string {
return c.connectionId
}
func (c *Connection) IsTraceEnabled() bool {
return c.tracer != nil
}
func (c *Connection) GetTracer() tracer.Tracer {
return c.tracer
}
func (c *Connection) GetConnection() plc4go.PlcConnection {
return c
}
func (c *Connection) GetMessageCodec() spi.MessageCodec {
return c.messageCodec
}
func (c *Connection) ConnectWithContext(ctx context.Context) <-chan plc4go.PlcConnectionConnectResult {
c.log.Trace().Msg("Connecting")
ch := make(chan plc4go.PlcConnectionConnectResult, 1)
c.wg.Add(1)
go func() {
defer c.wg.Done()
defer func() {
if err := recover(); err != nil {
c.fireConnectionError(errors.Errorf("panic-ed %v. Stack:\n%s", err, debug.Stack()), ch)
}
}()
if err := c.messageCodec.ConnectWithContext(ctx); err != nil {
c.fireConnectionError(errors.Wrap(err, "Error connecting codec"), ch)
return
}
// For testing purposes we can skip the waiting for a complete connection
if !c.driverContext.awaitSetupComplete {
go c.setupConnection(ctx, ch)
c.log.Warn().Msg("Connection used in an unsafe way. !!!DON'T USE IN PRODUCTION!!!")
// Here we write directly and don't wait till the connection is "really" connected
// Note: we can't use fireConnected here as it's guarded against m.driverContext.awaitSetupComplete
ch <- _default.NewDefaultPlcConnectionConnectResult(c, nil)
c.SetConnected(true)
return
}
c.setupConnection(ctx, ch)
}()
return ch
}
func (c *Connection) Close() <-chan plc4go.PlcConnectionCloseResult {
results := make(chan plc4go.PlcConnectionCloseResult, 1)
c.wg.Add(1)
go func() {
defer c.wg.Done()
result := <-c.DefaultConnection.Close()
c.log.Trace().Msg("Waiting for handlers to stop")
c.handlerWaitGroup.Wait()
c.log.Trace().Msg("handlers stopped, dispatching result")
results <- result
}()
return results
}
func (c *Connection) GetMetadata() apiModel.PlcConnectionMetadata {
return &_default.DefaultConnectionMetadata{
ProvidesReading: true,
ProvidesWriting: true,
ProvidesSubscribing: true,
ProvidesBrowsing: true,
}
}
func (c *Connection) ReadRequestBuilder() apiModel.PlcReadRequestBuilder {
return spiModel.NewDefaultPlcReadRequestBuilder(
c.GetPlcTagHandler(),
NewReader(
&c.alphaGenerator,
c.messageCodec,
c.tm,
append(c._options, options.WithCustomLogger(c.log))...,
),
)
}
func (c *Connection) WriteRequestBuilder() apiModel.PlcWriteRequestBuilder {
return spiModel.NewDefaultPlcWriteRequestBuilder(c.GetPlcTagHandler(), c.GetPlcValueHandler(), NewWriter(&c.alphaGenerator, c.messageCodec, c.tm))
}
func (c *Connection) SubscriptionRequestBuilder() apiModel.PlcSubscriptionRequestBuilder {
return spiModel.NewDefaultPlcSubscriptionRequestBuilder(
c.GetPlcTagHandler(),
c.GetPlcValueHandler(),
NewSubscriber(
c.addSubscriber,
append(c._options, options.WithCustomLogger(c.log))...,
),
)
}
func (c *Connection) UnsubscriptionRequestBuilder() apiModel.PlcUnsubscriptionRequestBuilder {
return spiModel.NewDefaultPlcUnsubscriptionRequestBuilder()
}
func (c *Connection) BrowseRequestBuilder() apiModel.PlcBrowseRequestBuilder {
return spiModel.NewDefaultPlcBrowseRequestBuilder(
c.GetPlcTagHandler(),
NewBrowser(
c,
append(c._options, options.WithCustomLogger(c.log))...,
),
)
}
func (c *Connection) addSubscriber(subscriber *Subscriber) {
for _, sub := range c.subscribers {
if sub == subscriber {
c.log.Debug().
Stringer("subscriber", subscriber).
Msg("Subscriber already added")
return
}
}
c.subscribers = append(c.subscribers, subscriber)
}
func (c *Connection) setupConnection(ctx context.Context, ch chan plc4go.PlcConnectionConnectResult) {
cbusOptions := &c.messageCodec.cbusOptions
requestContext := &c.messageCodec.requestContext
if !c.sendReset(ctx, ch, cbusOptions, requestContext, false) {
c.log.Warn().Msg("First reset failed")
// We try a second reset in case we get a power up
if !c.sendReset(ctx, ch, cbusOptions, requestContext, true) {
c.log.Trace().Msg("Reset failed")
return
}
}
if !c.setApplicationFilter(ctx, ch, requestContext, cbusOptions) {
c.log.Trace().Msg("Set application filter failed")
return
}
if !c.setInterfaceOptions3(ctx, ch, requestContext, cbusOptions) {
c.log.Trace().Msg("Set interface options 3 failed")
return
}
if !c.setInterface1PowerUpSettings(ctx, ch, requestContext, cbusOptions) {
c.log.Trace().Msg("Set interface options 1 power up settings failed")
return
}
if !c.setInterfaceOptions1(ctx, ch, requestContext, cbusOptions) {
c.log.Trace().Msg("Set interface options 1 failed")
return
}
c.log.Trace().Msg("Connection setup done")
c.fireConnected(ch)
c.log.Trace().Msg("Connect fired")
c.startSubscriptionHandler()
c.log.Trace().Msg("subscription handler started")
}
func (c *Connection) startSubscriptionHandler() {
c.log.Debug().Msg("Starting SAL handler")
c.handlerWaitGroup.Add(1)
c.wg.Add(1)
go func() {
defer c.wg.Done()
salLogger := c.log.With().Str("handlerType", "SAL").Logger()
defer c.handlerWaitGroup.Done()
defer func() {
if err := recover(); err != nil {
salLogger.Error().
Str("stack", string(debug.Stack())).
Interface("err", err).
Msg("panic-ed")
}
}()
salLogger.Debug().Msg("SAL handler started")
for c.IsConnected() {
for monitoredSal := range c.messageCodec.monitoredSALs {
if monitoredSal == nil {
salLogger.Trace().Msg("monitoredSal chan closed")
break
}
salLogger.Trace().
Stringer("monitoredSal", monitoredSal).
Msg("got a SAL")
handled := false
for _, subscriber := range c.subscribers {
if ok := subscriber.handleMonitoredSAL(monitoredSal); ok {
salLogger.Debug().
Stringer("monitoredSal", monitoredSal).
Stringer("subscriber", subscriber).
Msg("handled")
handled = true
}
}
if !handled {
salLogger.Debug().
Stringer("monitoredSal", monitoredSal).
Msg("SAL was not handled")
}
}
}
salLogger.Info().Msg("handler ended")
}()
c.log.Debug().Msg("Starting MMI handler")
c.handlerWaitGroup.Add(1)
c.wg.Add(1)
go func() {
defer c.wg.Done()
mmiLogger := c.log.With().Str("handlerType", "MMI").Logger()
defer c.handlerWaitGroup.Done()
defer func() {
if err := recover(); err != nil {
mmiLogger.Error().
Str("stack", string(debug.Stack())).
Interface("err", err).
Msg("panic-ed")
}
}()
mmiLogger.Debug().Msg("default MMI started")
for c.IsConnected() {
for calReply := range c.messageCodec.monitoredMMIs {
if calReply == nil {
mmiLogger.Trace().Msg("channel closed")
break
}
mmiLogger.Trace().Msg("got a MMI")
handled := false
for _, subscriber := range c.subscribers {
if ok := subscriber.handleMonitoredMMI(calReply); ok {
mmiLogger.Debug().
Stringer("subscriber", subscriber).
Msg("handled")
handled = true
}
}
if !handled {
mmiLogger.Debug().Msg("MMI was not handled")
}
}
}
mmiLogger.Info().Msg("handler ended")
}()
}
func (c *Connection) sendReset(ctx context.Context, ch chan plc4go.PlcConnectionConnectResult, cbusOptions *readWriteModel.CBusOptions, requestContext *readWriteModel.RequestContext, sendOutErrorNotification bool) (ok bool) {
c.log.Debug().Bool("sendOutErrorNotification", sendOutErrorNotification).Msg("Send a reset")
requestTypeReset := readWriteModel.RequestType_RESET
requestReset := readWriteModel.NewRequestReset(
requestTypeReset,
nil,
&requestTypeReset,
requestTypeReset,
readWriteModel.NewRequestTermination(),
requestTypeReset,
&requestTypeReset,
requestTypeReset,
&requestTypeReset,
*cbusOptions,
)
cBusMessage := readWriteModel.NewCBusMessageToServer(requestReset, *requestContext, *cbusOptions)
receivedResetEchoChan := make(chan bool, 1)
receivedResetEchoErrorChan := make(chan error, 1)
if err := c.messageCodec.SendRequest(
ctx,
cBusMessage,
func(message spi.Message) bool {
c.log.Trace().Msg("Checking message")
switch message := message.(type) {
case readWriteModel.CBusMessageToClient:
switch reply := message.GetReply().(type) {
case readWriteModel.ReplyOrConfirmationReply:
switch reply.GetReply().(type) {
case readWriteModel.PowerUpReply:
c.log.Debug().Msg("Received a PUN reply")
return true
default:
c.log.Trace().Type("reply", reply).Msg("not relevant")
return false
}
default:
c.log.Trace().Type("reply", reply).Msg("not relevant")
return false
}
case readWriteModel.CBusMessageToServer:
switch request := message.GetRequest().(type) {
case readWriteModel.RequestReset:
c.log.Debug().Msg("Received a Reset reply")
return true
default:
c.log.Trace().Type("request", request).Msg("not relevant")
return false
}
default:
c.log.Trace().Type("message", message).Msg("not relevant")
return false
}
},
func(message spi.Message) error {
c.log.Trace().Msg("Handling message")
switch message.(type) {
case readWriteModel.CBusMessageToClient:
// This is the powerup notification
select {
case receivedResetEchoChan <- false:
c.log.Trace().Msg("notified reset chan from message to client")
default:
}
case readWriteModel.CBusMessageToServer:
// This is the echo
select {
case receivedResetEchoChan <- true:
c.log.Trace().Msg("notified reset chan from message to server")
default:
}
default:
return errors.Errorf("Unmapped type %T", message)
}
return nil
},
func(err error) error {
select {
case receivedResetEchoErrorChan <- errors.Wrap(err, "got error processing request"):
c.log.Trace().Msg("notified error chan")
default:
}
return nil
}, c.GetTtl()); err != nil {
if sendOutErrorNotification {
c.fireConnectionError(errors.Wrap(err, "Error during sending of Reset Request"), ch)
} else {
c.log.Warn().Err(err).Msg("connect failed")
}
return false
}
startTime := time.Now()
timeout := time.NewTimer(time.Millisecond * 500)
select {
case <-receivedResetEchoChan:
c.log.Debug().Msg("We received the echo")
case err := <-receivedResetEchoErrorChan:
if sendOutErrorNotification {
c.fireConnectionError(errors.Wrap(err, "Error receiving of Reset"), ch)
} else {
c.log.Trace().Err(err).Msg("connect failed")
}
return false
case timeout := <-timeout.C:
if sendOutErrorNotification {
c.fireConnectionError(errors.Errorf("Timeout after %v", timeout.Sub(startTime)), ch)
} else {
c.log.Trace().Dur("timeout", timeout.Sub(startTime)).Msg("Timeout")
}
return false
}
c.log.Debug().Msg("Reset done")
return true
}
func (c *Connection) setApplicationFilter(ctx context.Context, ch chan plc4go.PlcConnectionConnectResult, requestContext *readWriteModel.RequestContext, cbusOptions *readWriteModel.CBusOptions) (ok bool) {
c.log.Debug().Msg("Set application filter to all")
applicationAddress1 := readWriteModel.NewParameterValueApplicationAddress1(readWriteModel.NewApplicationAddress1(c.configuration.MonitoredApplication1), nil, 1)
if !c.sendCalDataWrite(ctx, ch, readWriteModel.Parameter_APPLICATION_ADDRESS_1, applicationAddress1, requestContext, cbusOptions) {
return false
}
applicationAddress2 := readWriteModel.NewParameterValueApplicationAddress2(readWriteModel.NewApplicationAddress2(c.configuration.MonitoredApplication2), nil, 1)
if !c.sendCalDataWrite(ctx, ch, readWriteModel.Parameter_APPLICATION_ADDRESS_2, applicationAddress2, requestContext, cbusOptions) {
return false
}
c.log.Debug().Msg("Application filter set")
return true
}
func (c *Connection) setInterfaceOptions3(ctx context.Context, ch chan plc4go.PlcConnectionConnectResult, requestContext *readWriteModel.RequestContext, cbusOptions *readWriteModel.CBusOptions) (ok bool) {
c.log.Debug().Msg("Set interface options 3")
interfaceOptions3 := readWriteModel.NewParameterValueInterfaceOptions3(readWriteModel.NewInterfaceOptions3(c.configuration.Exstat, c.configuration.Pun, c.configuration.LocalSal, c.configuration.Pcn), nil, 1)
if !c.sendCalDataWrite(ctx, ch, readWriteModel.Parameter_INTERFACE_OPTIONS_3, interfaceOptions3, requestContext, cbusOptions) {
return false
}
// TODO: add localsal to the options
*cbusOptions = readWriteModel.NewCBusOptions(false, false, false, c.configuration.Exstat, false, false, c.configuration.Pun, c.configuration.Pcn, false)
c.log.Debug().Msg("Interface options 3 set")
return true
}
func (c *Connection) setInterface1PowerUpSettings(ctx context.Context, ch chan plc4go.PlcConnectionConnectResult, requestContext *readWriteModel.RequestContext, cbusOptions *readWriteModel.CBusOptions) (ok bool) {
c.log.Debug().Msg("Set interface options 1 power up settings")
interfaceOptions1PowerUpSettings := readWriteModel.NewParameterValueInterfaceOptions1PowerUpSettings(readWriteModel.NewInterfaceOptions1PowerUpSettings(readWriteModel.NewInterfaceOptions1(c.configuration.Idmon, c.configuration.Monitor, c.configuration.Smart, c.configuration.Srchk, c.configuration.XonXoff, c.configuration.Connect)), 1)
if !c.sendCalDataWrite(ctx, ch, readWriteModel.Parameter_INTERFACE_OPTIONS_1_POWER_UP_SETTINGS, interfaceOptions1PowerUpSettings, requestContext, cbusOptions) {
return false
}
// TODO: what is with monall
*cbusOptions = readWriteModel.NewCBusOptions(c.configuration.Connect, c.configuration.Smart, c.configuration.Idmon, c.configuration.Exstat, c.configuration.Monitor, false, c.configuration.Pun, c.configuration.Pcn, c.configuration.Srchk)
c.log.Debug().Msg("Interface options 1 power up settings set")
return true
}
func (c *Connection) setInterfaceOptions1(ctx context.Context, ch chan plc4go.PlcConnectionConnectResult, requestContext *readWriteModel.RequestContext, cbusOptions *readWriteModel.CBusOptions) bool {
c.log.Debug().Msg("Set interface options 1")
interfaceOptions1 := readWriteModel.NewParameterValueInterfaceOptions1(readWriteModel.NewInterfaceOptions1(c.configuration.Idmon, c.configuration.Monitor, c.configuration.Smart, c.configuration.Srchk, c.configuration.XonXoff, c.configuration.Connect), nil, 1)
if !c.sendCalDataWrite(ctx, ch, readWriteModel.Parameter_INTERFACE_OPTIONS_1, interfaceOptions1, requestContext, cbusOptions) {
return false
}
// TODO: what is with monall
*cbusOptions = readWriteModel.NewCBusOptions(c.configuration.Connect, c.configuration.Smart, c.configuration.Idmon, c.configuration.Exstat, c.configuration.Monitor, false, c.configuration.Pun, c.configuration.Pcn, c.configuration.Srchk)
c.log.Debug().Msg("Interface options 1 set")
return true
}
// This is used for connection setup
func (c *Connection) sendCalDataWrite(ctx context.Context, ch chan plc4go.PlcConnectionConnectResult, paramNo readWriteModel.Parameter, parameterValue readWriteModel.ParameterValue, requestContext *readWriteModel.RequestContext, cbusOptions *readWriteModel.CBusOptions) bool {
calCommandTypeContainer := readWriteModel.CALCommandTypeContainer_CALCommandWrite_2Bytes + readWriteModel.CALCommandTypeContainer(parameterValue.GetLengthInBytes(ctx))
calData := readWriteModel.NewCALDataWrite(
calCommandTypeContainer,
nil,
paramNo,
0x0,
parameterValue,
*requestContext,
)
directCommand := readWriteModel.NewRequestDirectCommandAccess(
0x40,
nil,
nil,
0x0,
readWriteModel.NewRequestTermination(),
calData,
/*we don't want an alpha otherwise the PCI will auto-switch*/ nil,
*cbusOptions,
)
cBusMessage := readWriteModel.NewCBusMessageToServer(directCommand, *requestContext, *cbusOptions)
directCommandAckChan := make(chan bool, 1)
directCommandAckErrorChan := make(chan error, 1)
if err := c.messageCodec.SendRequest(ctx, cBusMessage, func(message spi.Message) bool {
switch message := message.(type) {
case readWriteModel.CBusMessageToClient:
switch reply := message.GetReply().(type) {
case readWriteModel.ReplyOrConfirmationReply:
switch reply := reply.GetReply().(type) {
case readWriteModel.ReplyEncodedReply:
switch encodedReply := reply.GetEncodedReply().(type) {
case readWriteModel.EncodedReplyCALReply:
switch data := encodedReply.GetCalReply().GetCalData().(type) {
case readWriteModel.CALDataAcknowledge:
if data.GetParamNo() == paramNo {
return true
}
}
}
}
}
}
return false
}, func(message spi.Message) error {
switch message := message.(type) {
case readWriteModel.CBusMessageToClient:
switch reply := message.GetReply().(type) {
case readWriteModel.ReplyOrConfirmationReply:
switch reply := reply.GetReply().(type) {
case readWriteModel.ReplyEncodedReply:
switch encodedReply := reply.GetEncodedReply().(type) {
case readWriteModel.EncodedReplyCALReply:
switch data := encodedReply.GetCalReply().GetCalData().(type) {
case readWriteModel.CALDataAcknowledge:
if data.GetParamNo() == paramNo {
select {
case directCommandAckChan <- true:
default:
}
}
}
}
}
}
}
return nil
}, func(err error) error {
select {
case directCommandAckErrorChan <- errors.Wrap(err, "got error processing request"):
default:
}
return nil
}, c.GetTtl()); err != nil {
c.fireConnectionError(errors.Wrap(err, "Error during sending of write request"), ch)
return false
}
startTime := time.Now()
timeout := time.NewTimer(2 * time.Second)
select {
case <-directCommandAckChan:
c.log.Debug().Msg("We received the ack")
case err := <-directCommandAckErrorChan:
c.fireConnectionError(errors.Wrap(err, "Error receiving of ack"), ch)
return false
case timeout := <-timeout.C:
c.fireConnectionError(errors.Errorf("Timeout after %v", timeout.Sub(startTime)), ch)
return false
}
return true
}
func (c *Connection) fireConnectionError(err error, ch chan<- plc4go.PlcConnectionConnectResult) {
if c.driverContext.awaitSetupComplete {
ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Wrap(err, "Error during connection"))
} else {
c.log.Error().Err(err).Msg("awaitSetupComplete set to false and we got a error during connect")
}
if err := c.messageCodec.Disconnect(); err != nil {
c.log.Debug().Err(err).Msg("Error disconnecting message codec on connection error")
}
}
func (c *Connection) fireConnected(ch chan<- plc4go.PlcConnectionConnectResult) {
if c.driverContext.awaitSetupComplete {
ch <- _default.NewDefaultPlcConnectionConnectResult(c, nil)
} else {
c.log.Info().Msg("Successfully connected")
}
c.SetConnected(true)
}