plc4go/internal/cbus/Reader.go (212 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * https://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ package cbus import ( "context" "runtime/debug" "sync" "time" "github.com/pkg/errors" "github.com/rs/zerolog" "github.com/rs/zerolog/log" apiModel "github.com/apache/plc4x/plc4go/pkg/api/model" apiValues "github.com/apache/plc4x/plc4go/pkg/api/values" readWriteModel "github.com/apache/plc4x/plc4go/protocols/cbus/readwrite/model" "github.com/apache/plc4x/plc4go/spi" spiModel "github.com/apache/plc4x/plc4go/spi/model" "github.com/apache/plc4x/plc4go/spi/options" "github.com/apache/plc4x/plc4go/spi/transactions" ) type Reader struct { alphaGenerator *AlphaGenerator messageCodec *MessageCodec tm transactions.RequestTransactionManager log zerolog.Logger } func NewReader(tpduGenerator *AlphaGenerator, messageCodec *MessageCodec, tm transactions.RequestTransactionManager, _options ...options.WithOption) *Reader { customLogger := options.ExtractCustomLoggerOrDefaultToGlobal(_options...) return &Reader{ alphaGenerator: tpduGenerator, messageCodec: messageCodec, tm: tm, log: customLogger, } } func (m *Reader) Read(ctx context.Context, readRequest apiModel.PlcReadRequest) <-chan apiModel.PlcReadRequestResult { m.log.Trace().Msg("Reading") result := make(chan apiModel.PlcReadRequestResult, 1) go m.readSync(ctx, readRequest, result) return result } func (m *Reader) readSync(ctx context.Context, readRequest apiModel.PlcReadRequest, result chan apiModel.PlcReadRequestResult) { defer func() { if err := recover(); err != nil { result <- spiModel.NewDefaultPlcReadRequestResult(readRequest, nil, errors.Errorf("panic-ed %v. Stack: %s", err, debug.Stack())) } }() numTags := len(readRequest.GetTagNames()) if numTags > 20 { // letters g-z result <- spiModel.NewDefaultPlcReadRequestResult(readRequest, nil, errors.New("Only 20 tags can be handled at once")) return } messages := make(map[string]readWriteModel.CBusMessage) for _, tagName := range readRequest.GetTagNames() { tag := readRequest.GetTag(tagName) message, supportsRead, _, _, err := TagToCBusMessage(tag, nil, m.alphaGenerator, m.messageCodec) switch { case err != nil: result <- spiModel.NewDefaultPlcReadRequestResult( readRequest, nil, errors.Wrapf(err, "Error encoding cbus message for tag %s", tagName), ) return case !supportsRead: // Note this should not be reachable panic("this should not be possible as we always should then get the error above") } messages[tagName] = message } responseMu := sync.Mutex{} responseCodes := map[string]apiModel.PlcResponseCode{} addResponseCode := func(name string, responseCode apiModel.PlcResponseCode) { responseMu.Lock() defer responseMu.Unlock() responseCodes[name] = responseCode } valueMu := sync.Mutex{} plcValues := map[string]apiValues.PlcValue{} addPlcValue := func(name string, plcValue apiValues.PlcValue) { valueMu.Lock() defer valueMu.Unlock() plcValues[name] = plcValue } for tagName, messageToSend := range messages { if err := ctx.Err(); err != nil { result <- spiModel.NewDefaultPlcReadRequestResult( readRequest, nil, err, ) return } m.createMessageTransactionAndWait(ctx, messageToSend, addResponseCode, tagName, addPlcValue) } readResponse := spiModel.NewDefaultPlcReadResponse(readRequest, responseCodes, plcValues) result <- spiModel.NewDefaultPlcReadRequestResult( readRequest, readResponse, nil, ) } func (m *Reader) createMessageTransactionAndWait(ctx context.Context, messageToSend readWriteModel.CBusMessage, addResponseCode func(name string, responseCode apiModel.PlcResponseCode), tagName string, addPlcValue func(name string, plcValue apiValues.PlcValue)) { // Start a new request-transaction (Is ended in the response-handler) transaction := m.tm.StartTransaction() transaction.Submit(func(transaction transactions.RequestTransaction) { m.log.Trace().Stringer("transaction", transaction).Msg("Transaction getting handled") m.sendMessageOverTheWire(ctx, transaction, messageToSend, addResponseCode, tagName, addPlcValue) }) if err := transaction.AwaitCompletion(ctx); err != nil { m.log.Warn().Err(err).Msg("Error while awaiting completion") } m.log.Trace().Msg("Finished waiting for transaction to end") } func (m *Reader) sendMessageOverTheWire(ctx context.Context, transaction transactions.RequestTransaction, messageToSend readWriteModel.CBusMessage, addResponseCode func(name string, responseCode apiModel.PlcResponseCode), tagName string, addPlcValue func(name string, plcValue apiValues.PlcValue)) { // Send the over the wire m.log.Trace().Msg("send over the wire") ttl := 5 * time.Second if deadline, ok := ctx.Deadline(); ok { ttl = -time.Since(deadline) m.log.Debug().Dur("ttl", ttl).Msg("setting ttl") } m.log.Trace().Interface("ctx", ctx).Msg("sending with ctx") if err := m.messageCodec.SendRequest( ctx, messageToSend, func(cbusMessage spi.Message) bool { m.log.Trace().Type("cbusMessageType", cbusMessage).Msg("Checking") messageToClient, ok := cbusMessage.(readWriteModel.CBusMessageToClient) if !ok { m.log.Trace().Msg("Not a message to client") return false } // Check if this errored if _, ok = messageToClient.GetReply().(readWriteModel.ServerErrorReply); ok { // This means we must handle this below m.log.Trace().Msg("It is a error, we will handle it") return true } confirmation, ok := messageToClient.GetReply().(readWriteModel.ReplyOrConfirmationConfirmation) if !ok { m.log.Trace().Msg("it is not a confirmation") return false } receivedAlpha := confirmation.GetConfirmation().GetAlpha() // TODO: assert that this is a CBusMessageToServer indeed (by changing param for example) alphaRetriever, ok := messageToSend.(readWriteModel.CBusMessageToServer).GetRequest().(interface{ GetAlpha() readWriteModel.Alpha }) if !ok { m.log.Trace().Msg("no alpha there") return false } expectedAlpha := alphaRetriever.GetAlpha() m.log.Trace(). Stringer("expectedAlpha", expectedAlpha). Stringer("receivedAlpha", receivedAlpha). Msgf("Comparing expected alpha to received alpha") return receivedAlpha.GetCharacter() == expectedAlpha.GetCharacter() }, func(receivedMessage spi.Message) error { // Convert the response into an m.log.Trace().Type("receivedMessage", receivedMessage).Msg("convert message") messageToClient := receivedMessage.(readWriteModel.CBusMessageToClient) if _, ok := messageToClient.GetReply().(readWriteModel.ServerErrorReply); ok { m.log.Trace().Msg("We got a server failure") addResponseCode(tagName, apiModel.PlcResponseCode_INVALID_DATA) return transaction.EndRequest() } replyOrConfirmationConfirmation := messageToClient.GetReply().(readWriteModel.ReplyOrConfirmationConfirmation) if !replyOrConfirmationConfirmation.GetConfirmation().GetIsSuccess() { var responseCode apiModel.PlcResponseCode switch replyOrConfirmationConfirmation.GetConfirmation().GetConfirmationType() { case readWriteModel.ConfirmationType_NOT_TRANSMITTED_TO_MANY_RE_TRANSMISSIONS: responseCode = apiModel.PlcResponseCode_REMOTE_ERROR case readWriteModel.ConfirmationType_NOT_TRANSMITTED_CORRUPTION: responseCode = apiModel.PlcResponseCode_INVALID_DATA case readWriteModel.ConfirmationType_NOT_TRANSMITTED_SYNC_LOSS: responseCode = apiModel.PlcResponseCode_REMOTE_BUSY case readWriteModel.ConfirmationType_NOT_TRANSMITTED_TOO_LONG: responseCode = apiModel.PlcResponseCode_INVALID_DATA default: return transaction.FailRequest(errors.Errorf("Every code should be mapped here: %v", replyOrConfirmationConfirmation.GetConfirmation().GetConfirmationType())) } m.log.Trace(). Str("tagName", tagName). Stringer("responseCode", responseCode). Msg("Was no success") addResponseCode(tagName, responseCode) return transaction.EndRequest() } alpha := replyOrConfirmationConfirmation.GetConfirmation().GetAlpha() // TODO: it could be double confirmed but this is not implemented yet embeddedReply, ok := replyOrConfirmationConfirmation.GetEmbeddedReply().(readWriteModel.ReplyOrConfirmationReply) if !ok { m.log.Trace(). Stringer("alpha", alpha). Msg("Is a confirm only, no data") addResponseCode(tagName, apiModel.PlcResponseCode_NOT_FOUND) return transaction.EndRequest() } m.log.Trace().Msg("Handling confirmed data") // TODO: check if we can use a plcValueSerializer encodedReply := embeddedReply.GetReply().(readWriteModel.ReplyEncodedReply).GetEncodedReply() if err := MapEncodedReply(m.log, transaction, encodedReply, tagName, addResponseCode, addPlcValue); err != nil { log.Error().Err(err).Msg("error encoding reply") addResponseCode(tagName, apiModel.PlcResponseCode_INTERNAL_ERROR) return transaction.EndRequest() } return transaction.EndRequest() }, func(err error) error { m.log.Trace().Err(err).Msg("got and error") addResponseCode(tagName, apiModel.PlcResponseCode_INTERNAL_ERROR) return transaction.FailRequest(err) }, ttl); err != nil { m.log.Debug().Err(err). Str("tagName", tagName). Msg("Error sending message for tag %s") addResponseCode(tagName, apiModel.PlcResponseCode_INTERNAL_ERROR) if err := transaction.FailRequest(errors.Errorf("timeout after %s", 1*time.Second)); err != nil { m.log.Debug().Err(err).Msg("Error failing request") } } }