plc4go/internal/cbus/Reader.go (212 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package cbus
import (
"context"
"runtime/debug"
"sync"
"time"
"github.com/pkg/errors"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
apiValues "github.com/apache/plc4x/plc4go/pkg/api/values"
readWriteModel "github.com/apache/plc4x/plc4go/protocols/cbus/readwrite/model"
"github.com/apache/plc4x/plc4go/spi"
spiModel "github.com/apache/plc4x/plc4go/spi/model"
"github.com/apache/plc4x/plc4go/spi/options"
"github.com/apache/plc4x/plc4go/spi/transactions"
)
type Reader struct {
alphaGenerator *AlphaGenerator
messageCodec *MessageCodec
tm transactions.RequestTransactionManager
log zerolog.Logger
}
func NewReader(tpduGenerator *AlphaGenerator, messageCodec *MessageCodec, tm transactions.RequestTransactionManager, _options ...options.WithOption) *Reader {
customLogger := options.ExtractCustomLoggerOrDefaultToGlobal(_options...)
return &Reader{
alphaGenerator: tpduGenerator,
messageCodec: messageCodec,
tm: tm,
log: customLogger,
}
}
func (m *Reader) Read(ctx context.Context, readRequest apiModel.PlcReadRequest) <-chan apiModel.PlcReadRequestResult {
m.log.Trace().Msg("Reading")
result := make(chan apiModel.PlcReadRequestResult, 1)
go m.readSync(ctx, readRequest, result)
return result
}
func (m *Reader) readSync(ctx context.Context, readRequest apiModel.PlcReadRequest, result chan apiModel.PlcReadRequestResult) {
defer func() {
if err := recover(); err != nil {
result <- spiModel.NewDefaultPlcReadRequestResult(readRequest, nil, errors.Errorf("panic-ed %v. Stack: %s", err, debug.Stack()))
}
}()
numTags := len(readRequest.GetTagNames())
if numTags > 20 { // letters g-z
result <- spiModel.NewDefaultPlcReadRequestResult(readRequest, nil, errors.New("Only 20 tags can be handled at once"))
return
}
messages := make(map[string]readWriteModel.CBusMessage)
for _, tagName := range readRequest.GetTagNames() {
tag := readRequest.GetTag(tagName)
message, supportsRead, _, _, err := TagToCBusMessage(tag, nil, m.alphaGenerator, m.messageCodec)
switch {
case err != nil:
result <- spiModel.NewDefaultPlcReadRequestResult(
readRequest,
nil,
errors.Wrapf(err, "Error encoding cbus message for tag %s", tagName),
)
return
case !supportsRead: // Note this should not be reachable
panic("this should not be possible as we always should then get the error above")
}
messages[tagName] = message
}
responseMu := sync.Mutex{}
responseCodes := map[string]apiModel.PlcResponseCode{}
addResponseCode := func(name string, responseCode apiModel.PlcResponseCode) {
responseMu.Lock()
defer responseMu.Unlock()
responseCodes[name] = responseCode
}
valueMu := sync.Mutex{}
plcValues := map[string]apiValues.PlcValue{}
addPlcValue := func(name string, plcValue apiValues.PlcValue) {
valueMu.Lock()
defer valueMu.Unlock()
plcValues[name] = plcValue
}
for tagName, messageToSend := range messages {
if err := ctx.Err(); err != nil {
result <- spiModel.NewDefaultPlcReadRequestResult(
readRequest,
nil,
err,
)
return
}
m.createMessageTransactionAndWait(ctx, messageToSend, addResponseCode, tagName, addPlcValue)
}
readResponse := spiModel.NewDefaultPlcReadResponse(readRequest, responseCodes, plcValues)
result <- spiModel.NewDefaultPlcReadRequestResult(
readRequest,
readResponse,
nil,
)
}
func (m *Reader) createMessageTransactionAndWait(ctx context.Context, messageToSend readWriteModel.CBusMessage, addResponseCode func(name string, responseCode apiModel.PlcResponseCode), tagName string, addPlcValue func(name string, plcValue apiValues.PlcValue)) {
// Start a new request-transaction (Is ended in the response-handler)
transaction := m.tm.StartTransaction()
transaction.Submit(func(transaction transactions.RequestTransaction) {
m.log.Trace().Stringer("transaction", transaction).Msg("Transaction getting handled")
m.sendMessageOverTheWire(ctx, transaction, messageToSend, addResponseCode, tagName, addPlcValue)
})
if err := transaction.AwaitCompletion(ctx); err != nil {
m.log.Warn().Err(err).Msg("Error while awaiting completion")
}
m.log.Trace().Msg("Finished waiting for transaction to end")
}
func (m *Reader) sendMessageOverTheWire(ctx context.Context, transaction transactions.RequestTransaction, messageToSend readWriteModel.CBusMessage, addResponseCode func(name string, responseCode apiModel.PlcResponseCode), tagName string, addPlcValue func(name string, plcValue apiValues.PlcValue)) {
// Send the over the wire
m.log.Trace().Msg("send over the wire")
ttl := 5 * time.Second
if deadline, ok := ctx.Deadline(); ok {
ttl = -time.Since(deadline)
m.log.Debug().Dur("ttl", ttl).Msg("setting ttl")
}
m.log.Trace().Interface("ctx", ctx).Msg("sending with ctx")
if err := m.messageCodec.SendRequest(
ctx,
messageToSend,
func(cbusMessage spi.Message) bool {
m.log.Trace().Type("cbusMessageType", cbusMessage).Msg("Checking")
messageToClient, ok := cbusMessage.(readWriteModel.CBusMessageToClient)
if !ok {
m.log.Trace().Msg("Not a message to client")
return false
}
// Check if this errored
if _, ok = messageToClient.GetReply().(readWriteModel.ServerErrorReply); ok {
// This means we must handle this below
m.log.Trace().Msg("It is a error, we will handle it")
return true
}
confirmation, ok := messageToClient.GetReply().(readWriteModel.ReplyOrConfirmationConfirmation)
if !ok {
m.log.Trace().Msg("it is not a confirmation")
return false
}
receivedAlpha := confirmation.GetConfirmation().GetAlpha()
// TODO: assert that this is a CBusMessageToServer indeed (by changing param for example)
alphaRetriever, ok := messageToSend.(readWriteModel.CBusMessageToServer).GetRequest().(interface{ GetAlpha() readWriteModel.Alpha })
if !ok {
m.log.Trace().Msg("no alpha there")
return false
}
expectedAlpha := alphaRetriever.GetAlpha()
m.log.Trace().
Stringer("expectedAlpha", expectedAlpha).
Stringer("receivedAlpha", receivedAlpha).
Msgf("Comparing expected alpha to received alpha")
return receivedAlpha.GetCharacter() == expectedAlpha.GetCharacter()
},
func(receivedMessage spi.Message) error {
// Convert the response into an
m.log.Trace().Type("receivedMessage", receivedMessage).Msg("convert message")
messageToClient := receivedMessage.(readWriteModel.CBusMessageToClient)
if _, ok := messageToClient.GetReply().(readWriteModel.ServerErrorReply); ok {
m.log.Trace().Msg("We got a server failure")
addResponseCode(tagName, apiModel.PlcResponseCode_INVALID_DATA)
return transaction.EndRequest()
}
replyOrConfirmationConfirmation := messageToClient.GetReply().(readWriteModel.ReplyOrConfirmationConfirmation)
if !replyOrConfirmationConfirmation.GetConfirmation().GetIsSuccess() {
var responseCode apiModel.PlcResponseCode
switch replyOrConfirmationConfirmation.GetConfirmation().GetConfirmationType() {
case readWriteModel.ConfirmationType_NOT_TRANSMITTED_TO_MANY_RE_TRANSMISSIONS:
responseCode = apiModel.PlcResponseCode_REMOTE_ERROR
case readWriteModel.ConfirmationType_NOT_TRANSMITTED_CORRUPTION:
responseCode = apiModel.PlcResponseCode_INVALID_DATA
case readWriteModel.ConfirmationType_NOT_TRANSMITTED_SYNC_LOSS:
responseCode = apiModel.PlcResponseCode_REMOTE_BUSY
case readWriteModel.ConfirmationType_NOT_TRANSMITTED_TOO_LONG:
responseCode = apiModel.PlcResponseCode_INVALID_DATA
default:
return transaction.FailRequest(errors.Errorf("Every code should be mapped here: %v", replyOrConfirmationConfirmation.GetConfirmation().GetConfirmationType()))
}
m.log.Trace().
Str("tagName", tagName).
Stringer("responseCode", responseCode).
Msg("Was no success")
addResponseCode(tagName, responseCode)
return transaction.EndRequest()
}
alpha := replyOrConfirmationConfirmation.GetConfirmation().GetAlpha()
// TODO: it could be double confirmed but this is not implemented yet
embeddedReply, ok := replyOrConfirmationConfirmation.GetEmbeddedReply().(readWriteModel.ReplyOrConfirmationReply)
if !ok {
m.log.Trace().
Stringer("alpha", alpha).
Msg("Is a confirm only, no data")
addResponseCode(tagName, apiModel.PlcResponseCode_NOT_FOUND)
return transaction.EndRequest()
}
m.log.Trace().Msg("Handling confirmed data")
// TODO: check if we can use a plcValueSerializer
encodedReply := embeddedReply.GetReply().(readWriteModel.ReplyEncodedReply).GetEncodedReply()
if err := MapEncodedReply(m.log, transaction, encodedReply, tagName, addResponseCode, addPlcValue); err != nil {
log.Error().Err(err).Msg("error encoding reply")
addResponseCode(tagName, apiModel.PlcResponseCode_INTERNAL_ERROR)
return transaction.EndRequest()
}
return transaction.EndRequest()
},
func(err error) error {
m.log.Trace().Err(err).Msg("got and error")
addResponseCode(tagName, apiModel.PlcResponseCode_INTERNAL_ERROR)
return transaction.FailRequest(err)
},
ttl); err != nil {
m.log.Debug().Err(err).
Str("tagName", tagName).
Msg("Error sending message for tag %s")
addResponseCode(tagName, apiModel.PlcResponseCode_INTERNAL_ERROR)
if err := transaction.FailRequest(errors.Errorf("timeout after %s", 1*time.Second)); err != nil {
m.log.Debug().Err(err).Msg("Error failing request")
}
}
}