plc4go/internal/opcua/SecureChannel.go (1,435 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package opcua
import (
"bytes"
"context"
"crypto/sha1"
"encoding/binary"
"math"
"math/rand"
"net"
"net/url"
"regexp"
"slices"
"sync"
"sync/atomic"
"time"
"github.com/pkg/errors"
"github.com/rs/zerolog"
"github.com/apache/plc4x/plc4go/pkg/api"
readWriteModel "github.com/apache/plc4x/plc4go/protocols/opcua/readwrite/model"
"github.com/apache/plc4x/plc4go/spi"
"github.com/apache/plc4x/plc4go/spi/utils"
)
const (
VERSION = uint32(0)
DEFAULT_MAX_CHUNK_COUNT = 64
DEFAULT_MAX_MESSAGE_SIZE = uint32(2097152)
DEFAULT_RECEIVE_BUFFER_SIZE = uint32(65535)
DEFAULT_SEND_BUFFER_SIZE = uint32(65535)
REQUEST_TIMEOUT = 10 * time.Second
REQUEST_TIMEOUT_LONG = 10000
PASSWORD_ENCRYPTION_ALGORITHM = "http://www.w3.org/2001/04/xmlenc#rsa-oaep"
EPOCH_OFFSET = 116444736000000000 //Offset between OPC UA epoch time and linux epoch time.
)
var (
SECURITY_POLICY_NONE = readWriteModel.NewPascalString(utils.ToPtr("http://opcfoundation.org/UA/SecurityPolicy#None"))
NULL_STRING = readWriteModel.NewPascalString(nil)
NULL_BYTE_STRING = readWriteModel.NewPascalByteString(-1, nil)
NULL_EXPANDED_NODEID = readWriteModel.NewExpandedNodeId(false,
false,
readWriteModel.NewNodeIdTwoByte(0),
nil,
nil,
)
BINARY_ENCODING_MASK = readWriteModel.NewExtensionObjectEncodingMask(false, false, true)
NULL_EXTENSION_OBJECT = readWriteModel.NewNullExtensionObjectWithMask(NULL_EXPANDED_NODEID,
readWriteModel.NewExtensionObjectEncodingMask(false, false, false),
0,
false) // Body
INET_ADDRESS_PATTERN = regexp.MustCompile(`(.(?P<transportCode>tcp))?://(?P<transportHost>[\w.-]+)(:(?P<transportPort>\d*))?`)
URI_PATTERN = regexp.MustCompile(`^(?P<protocolCode>opc)` + INET_ADDRESS_PATTERN.String() + `(?P<transportEndpoint>[\w/=]*)[?]?`)
APPLICATION_URI = readWriteModel.NewPascalString(utils.ToPtr("urn:apache:plc4x:client"))
PRODUCT_URI = readWriteModel.NewPascalString(utils.ToPtr("urn:apache:plc4x:client"))
APPLICATION_TEXT = readWriteModel.NewPascalString(utils.ToPtr("OPCUA client for the Apache PLC4X:PLC4J project"))
DEFAULT_CONNECTION_LIFETIME = uint32(36000000)
)
//go:generate go tool plc4xGenerator -type=SecureChannel
type SecureChannel struct {
sessionName string
clientNonce []byte
requestHandleGenerator atomic.Uint32
policyId readWriteModel.PascalString
tokenType readWriteModel.UserTokenType `stringer:"true"`
discovery bool
certFile string
keyStoreFile string
ckp CertificateKeyPair
endpoint readWriteModel.PascalString
username string
password string
securityPolicy string
publicCertificate readWriteModel.PascalByteString
thumbprint readWriteModel.PascalByteString
isEncrypted bool
senderCertificate []byte
senderNonce []byte
certificateThumbprint readWriteModel.PascalByteString
checkedEndpoints bool
encryptionHandler *EncryptionHandler
configuration Configuration `stringer:"true"`
channelId atomic.Uint32
tokenId atomic.Uint32
authenticationToken readWriteModel.NodeIdTypeDefinition
codec *MessageCodec
channelTransactionManager *SecureChannelTransactionManager
lifetime uint32
keepAliveStateChange sync.Mutex
keepAliveIndicator atomic.Bool
keepAliveWg sync.WaitGroup
sendBufferSize int
maxMessageSize int
endpoints []string
senderSequenceNumber atomic.Int32
wg sync.WaitGroup // use to track spawned go routines
log zerolog.Logger
}
func NewSecureChannel(log zerolog.Logger, ctx DriverContext, configuration Configuration) *SecureChannel {
s := &SecureChannel{
configuration: configuration,
endpoint: readWriteModel.NewPascalString(&configuration.Endpoint),
username: configuration.Username,
password: configuration.Password,
securityPolicy: "http://opcfoundation.org/UA/SecurityPolicy#" + configuration.SecurityPolicy,
sessionName: "UaSession:" + *APPLICATION_TEXT.GetStringValue() + ":" + utils.RandomString(20),
authenticationToken: readWriteModel.NewNodeIdTwoByte(0),
clientNonce: []byte(utils.RandomString(40)),
keyStoreFile: configuration.KeyStoreFile,
channelTransactionManager: NewSecureChannelTransactionManager(log),
lifetime: DEFAULT_CONNECTION_LIFETIME,
log: log,
}
s.requestHandleGenerator.Store(1)
s.channelId.Store(1)
s.tokenId.Store(1)
ckp := configuration.Ckp
if configuration.SecurityPolicy == "Basic256Sha256" {
//Sender Certificate gets populated during the 'discover' phase when encryption is enabled.
s.senderCertificate = configuration.SenderCertificate
s.encryptionHandler = NewEncryptionHandler(s.log, ckp, s.senderCertificate, configuration.SecurityPolicy)
certificate := ckp.getCertificate()
s.publicCertificate = readWriteModel.NewPascalByteString(int32(len(certificate.Raw)), certificate.Raw)
s.isEncrypted = true
s.thumbprint = configuration.Thumbprint
} else {
s.encryptionHandler = NewEncryptionHandler(s.log, ckp, s.senderCertificate, configuration.SecurityPolicy)
s.publicCertificate = NULL_BYTE_STRING
s.thumbprint = NULL_BYTE_STRING
s.isEncrypted = false
}
// Generate a list of endpoints we can use.
{
var err error
address, err := url.Parse("none://" + configuration.Host)
if err == nil {
if names, lookupErr := net.LookupHost(address.Host); lookupErr == nil {
s.endpoints = append(s.endpoints, names[rand.Intn(len(names))])
s.endpoints = append(s.endpoints, address.Host)
//s.endpoints = append(s.endpoints, address.Host)//TODO: not sure if golang can do
} else {
err = lookupErr
}
}
if err != nil {
s.log.Warn().Err(err).Msg("Unable to resolve host name. Using original host from connection string which may cause issues connecting to server")
s.endpoints = append(s.endpoints, address.Host)
}
}
s.channelId.Store(1)
return s
}
func (s *SecureChannel) submit(ctx context.Context, codec *MessageCodec, errorDispatcher func(err error), consumer func(opcuaResponse []byte), buffer utils.WriteBufferByteBased) {
transactionId := s.channelTransactionManager.getTransactionIdentifier()
//TODO: We need to split large messages up into chunks if it is larger than the sendBufferSize
// This value is negotiated when opening a channel
messageRequest := readWriteModel.NewOpcuaMessageRequest(
readWriteModel.ChunkType_FINAL,
readWriteModel.NewSecurityHeader(
s.channelId.Load(),
s.tokenId.Load(),
),
readWriteModel.NewBinaryPayload(
readWriteModel.NewSequenceHeader(transactionId, transactionId),
buffer.GetBytes(),
uint32(len(buffer.GetBytes())),
),
uint32(len(buffer.GetBytes())),
true,
)
var apu readWriteModel.OpcuaAPU
if s.isEncrypted {
message, err := s.encryptionHandler.encodeMessage(ctx, messageRequest, buffer.GetBytes())
if err != nil {
errorDispatcher(err)
return
}
apu, err = readWriteModel.OpcuaAPUParse(ctx, message, false, true)
if err != nil {
errorDispatcher(err)
return
}
} else {
apu = readWriteModel.NewOpcuaAPU(messageRequest, false, true)
}
requestConsumer := func(transactionId int32) {
var messageBuffer []byte
if err := codec.SendRequest(ctx, apu,
func(message spi.Message) bool {
opcuaAPU, ok := message.(readWriteModel.OpcuaAPU)
if !ok {
s.log.Debug().Type("type", message).Msg("Not relevant")
return false
}
if decodedOpcuaAPU, err := s.encryptionHandler.decodeMessage(ctx, opcuaAPU); err != nil {
s.log.Debug().Err(err).Msg("error decoding")
return false
} else {
opcuaAPU = decodedOpcuaAPU.(readWriteModel.OpcuaAPU)
}
messagePDU := opcuaAPU.GetMessage()
s.log.Trace().Stringer("messagePDU", messagePDU).Msg("looking at messagePDU")
opcuaResponse, ok := messagePDU.(readWriteModel.OpcuaMessageResponse)
if !ok {
s.log.Debug().Type("type", message).Msg("Not relevant")
return false
}
if requestId := opcuaResponse.GetMessage().GetSequenceHeader().GetRequestId(); requestId != transactionId {
s.log.Debug().Int32("requestId", requestId).Int32("transactionId", transactionId).Msg("Not relevant")
return false
} else {
messageBuffer = opcuaResponse.(readWriteModel.BinaryPayload).GetPayload()
if !(s.senderSequenceNumber.Add(1) == (opcuaResponse.GetMessage().GetSequenceHeader().GetSequenceNumber())) {
s.log.Error().
Int32("senderSequenceNumber", s.senderSequenceNumber.Load()).
Int32("responseSequenceNumber", opcuaResponse.GetMessage().GetSequenceHeader().GetSequenceNumber()).
Msg("Sequence number isn't as expected, we might have missed a packet. - senderSequenceNumber != responseSequenceNumber")
errorDispatcher(errors.New("unexpected sequence number"))
}
}
return true
},
func(message spi.Message) error {
opcuaAPU := message.(readWriteModel.OpcuaAPU)
opcuaAPU, _ = s.encryptionHandler.decodeMessage(ctx, opcuaAPU)
messagePDU := opcuaAPU.GetMessage()
s.log.Trace().Stringer("messagePDU", messagePDU).Msg("looking at messagePDU")
opcuaResponse := messagePDU.(readWriteModel.OpcuaMessageResponse)
if opcuaResponse.GetChunk() == (readWriteModel.ChunkType_FINAL) {
s.tokenId.Store(opcuaResponse.GetSecurityHeader().GetSecureTokenId())
s.channelId.Store(opcuaResponse.GetSecurityHeader().GetSecureChannelId())
consumer(messageBuffer)
} else {
s.log.Warn().Stringer("chunk", opcuaResponse.GetChunk()).Msg("Message discarded")
}
return nil
},
func(err error) error {
errorDispatcher(err)
return nil
},
REQUEST_TIMEOUT); err != nil {
errorDispatcher(err)
}
}
s.log.Debug().Int32("transactionId", transactionId).Msg("Submitting Transaction to TransactionManager")
if err := s.channelTransactionManager.submit(requestConsumer, transactionId); err != nil {
s.log.Debug().Err(err).Msg("error submitting")
}
}
func (s *SecureChannel) onConnect(ctx context.Context, connection *Connection, ch chan plc4go.PlcConnectionConnectResult) {
s.log.Trace().Msg("on connect")
// Only the TCP transport supports login.
s.log.Debug().Msg("Opcua Driver running in ACTIVE mode.")
s.codec = connection.messageCodec // TODO: why would we need to set that?
hello := readWriteModel.NewOpcuaHelloRequest(
readWriteModel.ChunkType_FINAL,
VERSION,
readWriteModel.NewOpcuaProtocolLimits(
DEFAULT_RECEIVE_BUFFER_SIZE,
DEFAULT_SEND_BUFFER_SIZE,
DEFAULT_MAX_MESSAGE_SIZE,
DEFAULT_MAX_CHUNK_COUNT,
),
s.endpoint,
true,
)
requestConsumer := func(transactionId int32) {
s.log.Trace().Int32("transactionId", transactionId).Msg("request consumer called")
if err := s.codec.SendRequest(
ctx,
hello,
func(message spi.Message) bool {
opcuaAPU, ok := message.(readWriteModel.OpcuaAPU)
if !ok {
s.log.Debug().Type("type", message).Msg("Not relevant")
return false
}
messagePDU := opcuaAPU.GetMessage()
_, ok = messagePDU.(readWriteModel.OpcuaAcknowledgeResponse)
if !ok {
s.log.Debug().Type("type", messagePDU).Msg("Not relevant")
return false
}
return true
},
func(message spi.Message) error {
opcuaAPU := message.(readWriteModel.OpcuaAPU)
messagePDU := opcuaAPU.GetMessage()
opcuaAcknowledgeResponse := messagePDU.(readWriteModel.OpcuaAcknowledgeResponse)
go s.onConnectOpenSecureChannel(ctx, connection, ch, opcuaAcknowledgeResponse)
return nil
},
func(err error) error {
s.log.Debug().Err(err).Msg("error submitting")
connection.fireConnectionError(err, ch)
return nil
},
REQUEST_TIMEOUT); err != nil {
s.log.Debug().Err(err).Msg("error sending")
}
}
if err := s.channelTransactionManager.submit(requestConsumer, s.channelTransactionManager.getTransactionIdentifier()); err != nil {
s.log.Debug().Err(err).Msg("error submitting")
}
}
func (s *SecureChannel) onConnectOpenSecureChannel(ctx context.Context, connection *Connection, ch chan plc4go.PlcConnectionConnectResult, response readWriteModel.OpcuaAcknowledgeResponse) {
transactionId := s.channelTransactionManager.getTransactionIdentifier()
requestHeader := readWriteModel.NewRequestHeader(
s.getAuthenticationToken(),
s.getCurrentDateTime(),
0, //RequestHandle
0,
NULL_STRING,
REQUEST_TIMEOUT_LONG,
NULL_EXTENSION_OBJECT)
var openSecureChannelRequest readWriteModel.OpenSecureChannelRequest
if s.isEncrypted {
openSecureChannelRequest = readWriteModel.NewOpenSecureChannelRequest(
requestHeader,
VERSION,
readWriteModel.SecurityTokenRequestType_securityTokenRequestTypeIssue,
readWriteModel.MessageSecurityMode_messageSecurityModeSignAndEncrypt,
readWriteModel.NewPascalByteString(int32(len(s.clientNonce)), s.clientNonce),
s.lifetime)
} else {
openSecureChannelRequest = readWriteModel.NewOpenSecureChannelRequest(
requestHeader,
VERSION,
readWriteModel.SecurityTokenRequestType_securityTokenRequestTypeIssue,
readWriteModel.MessageSecurityMode_messageSecurityModeNone,
NULL_BYTE_STRING,
s.lifetime)
}
identifier := openSecureChannelRequest.GetExtensionId()
expandedNodeId := readWriteModel.NewExpandedNodeId(
false, //Namespace Uri Specified
false, //Server Index Specified
readWriteModel.NewNodeIdFourByte(0, uint16(identifier)),
nil,
nil,
)
extObject := readWriteModel.NewRootExtensionObject(
expandedNodeId,
openSecureChannelRequest,
identifier,
)
buffer := utils.NewWriteBufferByteBased(utils.WithByteOrderForByteBasedBuffer(binary.LittleEndian))
if err := extObject.SerializeWithWriteBuffer(ctx, buffer); err != nil {
s.log.Debug().Err(err).Msg("error serializing")
connection.fireConnectionError(err, ch)
return
}
openRequest := readWriteModel.NewOpcuaOpenRequest(
readWriteModel.ChunkType_FINAL,
readWriteModel.NewOpenChannelMessageRequest(
0,
readWriteModel.NewPascalString(&s.securityPolicy),
s.publicCertificate,
s.thumbprint),
readWriteModel.NewBinaryPayload(
readWriteModel.NewSequenceHeader(transactionId, transactionId),
buffer.GetBytes(),
uint32(len(buffer.GetBytes())),
),
uint32(len(buffer.GetBytes())),
true,
)
var apu readWriteModel.OpcuaAPU
if s.isEncrypted {
message, err := s.encryptionHandler.encodeMessage(ctx, openRequest, buffer.GetBytes())
if err != nil {
s.log.Debug().Err(err).Msg("error encoding")
connection.fireConnectionError(err, ch)
return
}
apu, err = readWriteModel.OpcuaAPUParse(ctx, message, false, true)
if err != nil {
s.log.Debug().Err(err).Msg("error parsing")
connection.fireConnectionError(err, ch)
return
}
} else {
apu = readWriteModel.NewOpcuaAPU(openRequest, false, true)
}
requestConsumer := func(transactionId int32) {
if err := s.codec.SendRequest(
ctx,
apu,
func(message spi.Message) bool {
opcuaAPU, ok := message.(readWriteModel.OpcuaAPU)
if !ok {
s.log.Debug().Type("type", message).Msg("Not relevant")
return false
}
messagePDU := opcuaAPU.GetMessage()
openResponse, ok := messagePDU.(readWriteModel.OpcuaOpenResponse)
if !ok {
s.log.Debug().Type("type", messagePDU).Msg("Not relevant")
return false
}
return openResponse.GetMessage().GetSequenceHeader().GetRequestId() == transactionId
},
func(message spi.Message) error {
opcuaAPU := message.(readWriteModel.OpcuaAPU)
messagePDU := opcuaAPU.GetMessage()
opcuaOpenResponse := messagePDU.(readWriteModel.OpcuaOpenResponse)
readBuffer := utils.NewReadBufferByteBased(opcuaOpenResponse.(readWriteModel.BinaryPayload).GetPayload(), utils.WithByteOrderForReadBufferByteBased(binary.LittleEndian))
extensionObject, err := readWriteModel.ExtensionObjectParseWithBuffer[readWriteModel.ExtensionObject](ctx, readBuffer, false)
if err != nil {
return errors.Wrap(err, "error parsing")
}
//Store the initial sequence number from the server. there's no requirement for the server and client to use the same starting number.
s.senderSequenceNumber.Store(opcuaOpenResponse.GetMessage().GetSequenceHeader().GetSequenceNumber())
if fault, ok := extensionObject.GetBody().(readWriteModel.ServiceFault); ok {
statusCode := fault.GetResponseHeader().(readWriteModel.ResponseHeader).GetServiceResult().GetStatusCode()
statusCodeByValue, _ := readWriteModel.OpcuaStatusCodeByValue(statusCode)
s.log.Error().
Uint32("statusCode", statusCode).
Stringer("statusCodeByValue", statusCodeByValue).
Msg("Failed to connect to opc ua server for the following reason")
connection.fireConnectionError(errors.New("service fault received"), ch)
return nil
}
s.log.Debug().Msg("Got Secure Response Connection Response")
openSecureChannelResponse := extensionObject.GetBody().(readWriteModel.OpenSecureChannelResponse)
s.tokenId.Store(openSecureChannelResponse.GetSecurityToken().(readWriteModel.ChannelSecurityToken).GetTokenId())
s.channelId.Store(openSecureChannelResponse.GetSecurityToken().(readWriteModel.ChannelSecurityToken).GetChannelId())
go s.onConnectCreateSessionRequest(ctx, connection, ch)
return nil
},
func(err error) error {
s.log.Debug().Err(err).Msg("error submitting")
connection.fireConnectionError(err, ch)
return nil
},
REQUEST_TIMEOUT,
); err != nil {
s.log.Debug().Err(err).Msg("a error")
connection.fireConnectionError(err, ch)
}
}
s.log.Debug().Int32("transactionId", transactionId).Msg("Submitting OpenSecureChannel with id")
if err := s.channelTransactionManager.submit(requestConsumer, transactionId); err != nil {
s.log.Debug().Err(err).Msg("error submitting")
connection.fireConnectionError(err, ch)
}
}
func (s *SecureChannel) onConnectCreateSessionRequest(ctx context.Context, connection *Connection, ch chan plc4go.PlcConnectionConnectResult) {
requestHeader := readWriteModel.NewRequestHeader(
s.getAuthenticationToken(),
s.getCurrentDateTime(),
0,
0,
NULL_STRING,
REQUEST_TIMEOUT_LONG,
NULL_EXTENSION_OBJECT)
applicationName := readWriteModel.NewLocalizedText(
true,
true,
readWriteModel.NewPascalString(utils.ToPtr("en")),
APPLICATION_TEXT)
var discoveryUrls []readWriteModel.PascalString
clientDescription := readWriteModel.NewApplicationDescription(APPLICATION_URI,
PRODUCT_URI,
applicationName,
readWriteModel.ApplicationType_applicationTypeClient,
NULL_STRING,
NULL_STRING,
discoveryUrls)
createSessionRequest := readWriteModel.NewCreateSessionRequest(
requestHeader,
clientDescription,
NULL_STRING,
s.endpoint,
readWriteModel.NewPascalString(&s.sessionName),
readWriteModel.NewPascalByteString(int32(len(s.clientNonce)), s.clientNonce),
NULL_BYTE_STRING,
120000,
0,
)
identifier := createSessionRequest.GetExtensionId()
expandedNodeId := readWriteModel.NewExpandedNodeId(
false, //Namespace Uri Specified
false, //Server Index Specified
readWriteModel.NewNodeIdFourByte(0, uint16(identifier)),
nil,
nil)
extObject := readWriteModel.NewRootExtensionObject(
expandedNodeId,
createSessionRequest,
identifier,
)
buffer := utils.NewWriteBufferByteBased(utils.WithByteOrderForByteBasedBuffer(binary.LittleEndian))
if err := extObject.SerializeWithWriteBuffer(ctx, buffer); err != nil {
s.log.Debug().Err(err).Msg("error serializing")
connection.fireConnectionError(err, ch)
return
}
consumer := func(opcuaResponse []byte) {
extensionObject, err := readWriteModel.ExtensionObjectParseWithBuffer[readWriteModel.ExtensionObject](ctx, utils.NewReadBufferByteBased(opcuaResponse, utils.WithByteOrderForReadBufferByteBased(binary.LittleEndian)), false)
if err != nil {
s.log.Error().Err(err).Msg("error parsing")
connection.fireConnectionError(err, ch)
return
}
s.log.Trace().Stringer("extensionObject", extensionObject).Msg("looking at message")
if fault, ok := extensionObject.GetBody().(readWriteModel.ServiceFault); ok {
statusCode := fault.GetResponseHeader().(readWriteModel.ResponseHeader).GetServiceResult().GetStatusCode()
statusCodeByValue, _ := readWriteModel.OpcuaStatusCodeByValue(statusCode)
s.log.Error().
Uint32("statusCode", statusCode).
Stringer("statusCodeByValue", statusCodeByValue).
Msg("Failed to connect to opc ua server for the following reason")
connection.fireConnectionError(errors.New("service fault received"), ch)
return
}
s.log.Debug().Msg("Got Create Session Response Connection Response")
unknownExtensionObject := extensionObject.GetBody()
if responseMessage, ok := unknownExtensionObject.(readWriteModel.CreateSessionResponse); ok {
s.authenticationToken = responseMessage.GetAuthenticationToken().GetNodeId()
go s.onConnectActivateSessionRequest(ctx, connection, ch, responseMessage, responseMessage)
} else {
serviceFault := unknownExtensionObject.(readWriteModel.ServiceFault)
header := serviceFault.GetResponseHeader().(readWriteModel.ResponseHeader)
s.log.Error().
Stringer("serviceResult", header.GetServiceResult()).
Msg("Subscription ServiceFault returned from server with error code, '%s'")
}
}
errorDispatcher := func(err error) {
s.log.Error().Err(err).Msg("Error while waiting for subscription response")
connection.fireConnectionError(err, ch)
}
s.submit(ctx, connection.messageCodec, errorDispatcher, consumer, buffer)
}
func (s *SecureChannel) onConnectActivateSessionRequest(ctx context.Context, connection *Connection, ch chan plc4go.PlcConnectionConnectResult, opcuaMessageResponse readWriteModel.CreateSessionResponse, sessionResponse readWriteModel.CreateSessionResponse) {
s.senderCertificate = sessionResponse.GetServerCertificate().GetStringValue()
certificate, err := s.encryptionHandler.getCertificateX509(s.senderCertificate)
if err != nil {
s.log.Error().Err(err).Msg("error getting certificate")
connection.fireConnectionError(err, ch)
return
}
s.log.Debug().Interface("senderCertificate", certificate).Msg("working with senderCertificate")
s.encryptionHandler.setServerCertificate(certificate)
s.senderNonce = sessionResponse.GetServerNonce().GetStringValue()
endpoints := make([]string, 3)
if address, err := url.Parse(s.configuration.Host); err == nil {
if names, err := net.LookupAddr(address.Host); err != nil {
endpoints[0] = "opc.tcp://" + names[rand.Intn(len(names))] + ":" + s.configuration.Port + s.configuration.TransportEndpoint
}
endpoints[1] = "opc.tcp://" + address.Hostname() + ":" + s.configuration.Port + s.configuration.TransportEndpoint
//endpoints[2] = "opc.tcp://" + address.getCanonicalHostName() + ":" + s.configuration.getPort() + s.configuration.transportEndpoint// TODO: not sure how to get that in golang
} else {
s.log.Debug().Err(err).Msg("error parsing host")
}
s.selectEndpoint(sessionResponse)
if s.policyId == nil {
s.log.Error().Msg("Unable to find endpoint - " + endpoints[1])
connection.fireConnectionError(err, ch)
return
}
userIdentityToken := s.getIdentityToken(s.tokenType, s.policyId.GetStringValue())
requestHandle := s.getRequestHandle()
requestHeader := readWriteModel.NewRequestHeader(
s.getAuthenticationToken(),
s.getCurrentDateTime(),
requestHandle,
0,
NULL_STRING,
REQUEST_TIMEOUT_LONG,
NULL_EXTENSION_OBJECT)
clientSignature := readWriteModel.NewSignatureData(NULL_STRING, NULL_BYTE_STRING)
activateSessionRequest := readWriteModel.NewActivateSessionRequest(
requestHeader,
clientSignature,
nil,
nil,
userIdentityToken,
clientSignature,
)
identifier := activateSessionRequest.GetExtensionId()
expandedNodeId := readWriteModel.NewExpandedNodeId(false, //Namespace Uri Specified
false, //Server Index Specified
readWriteModel.NewNodeIdFourByte(0, uint16(identifier)),
nil,
nil)
extObject := readWriteModel.NewRootExtensionObject(
expandedNodeId,
activateSessionRequest,
identifier,
)
buffer := utils.NewWriteBufferByteBased(utils.WithByteOrderForByteBasedBuffer(binary.LittleEndian))
if err := extObject.SerializeWithWriteBuffer(ctx, buffer); err != nil {
s.log.Debug().Err(err).Msg("error serializing")
connection.fireConnectionError(err, ch)
return
}
consumer := func(opcuaResponse []byte) {
message, err := readWriteModel.ExtensionObjectParseWithBuffer[readWriteModel.ExtensionObject](ctx, utils.NewReadBufferByteBased(opcuaResponse, utils.WithByteOrderForReadBufferByteBased(binary.LittleEndian)), false)
if err != nil {
s.log.Error().Err(err).Msg("error parsing")
return
}
s.log.Trace().Stringer("message", message).Msg("looking at message")
if fault, ok := message.GetBody().(readWriteModel.ServiceFault); ok {
statusCode := fault.GetResponseHeader().(readWriteModel.ResponseHeader).GetServiceResult().GetStatusCode()
statusCodeByValue, _ := readWriteModel.OpcuaStatusCodeByValue(statusCode)
s.log.Error().
Uint32("statusCode", statusCode).
Stringer("statusCodeByValue", statusCodeByValue).
Msg("Failed to connect to opc ua server for the following reason")
connection.fireConnectionError(errors.New("service fault received"), ch)
return
}
s.log.Debug().Msg("Got Activate Session Response Connection Response")
extensionObject, err := readWriteModel.ExtensionObjectParseWithBuffer[readWriteModel.ExtensionObject](ctx, utils.NewReadBufferByteBased(opcuaResponse, utils.WithByteOrderForReadBufferByteBased(binary.LittleEndian)), false)
if err != nil {
s.log.Error().Err(err).Msg("error parsing")
return
}
unknownExtensionObject := extensionObject.GetBody()
if responseMessage, ok := unknownExtensionObject.(readWriteModel.ActivateSessionResponse); ok {
returnedRequestHandle := responseMessage.GetResponseHeader().(readWriteModel.ResponseHeader).GetRequestHandle()
if !(requestHandle == returnedRequestHandle) {
s.log.Error().
Uint32("requestHandle", requestHandle).
Uint32("returnedRequestHandle", returnedRequestHandle).
Msg("Request handle isn't as expected, we might have missed a packet. requestHandle != returnedRequestHandle")
}
// Send an event that connection setup is complete.
s.keepAlive()
connection.fireConnected(ch)
} else {
serviceFault := unknownExtensionObject.(readWriteModel.ServiceFault)
header := serviceFault.GetResponseHeader().(readWriteModel.ResponseHeader)
s.log.Error().
Stringer("serviceResult", header.GetServiceResult()).
Msg("Subscription ServiceFault returned from server with error code")
}
}
errorDispatcher := func(err error) {
s.log.Error().Err(err).Msg("Error while waiting for subscription response")
connection.fireConnectionError(err, ch)
}
s.submit(ctx, connection.messageCodec, errorDispatcher, consumer, buffer)
}
func (s *SecureChannel) onDisconnect(ctx context.Context, connection *Connection) {
s.log.Info().Msg("disconnecting")
requestHandle := s.getRequestHandle()
s.keepAliveIndicator.Store(false)
expandedNodeId := readWriteModel.NewExpandedNodeId(false, //Namespace Uri Specified
false, //Server Index Specified
readWriteModel.NewNodeIdFourByte(0, 473),
nil,
nil) //Identifier for OpenSecureChannel
if s.authenticationToken == nil {
// TODO: this or nil?? What do we do when we don't have one?
s.log.Error().Msg("no authentication token, so we can't disconnect")
return
}
requestHeader := readWriteModel.NewRequestHeader(
s.getAuthenticationToken(),
s.getCurrentDateTime(),
requestHandle, //RequestHandle
0,
NULL_STRING,
5000,
NULL_EXTENSION_OBJECT)
closeSessionRequest := readWriteModel.NewCloseSessionRequest(
requestHeader,
true)
extObject := readWriteModel.NewRootExtensionObject(
expandedNodeId,
closeSessionRequest,
closeSessionRequest.GetExtensionId(),
)
buffer := utils.NewWriteBufferByteBased(utils.WithByteOrderForByteBasedBuffer(binary.LittleEndian))
if err := extObject.SerializeWithWriteBuffer(ctx, buffer); err != nil {
s.log.Debug().Err(err).Msg("error serializing")
return
}
consumer := func(opcuaResponse []byte) {
message, err := readWriteModel.ExtensionObjectParseWithBuffer[readWriteModel.ExtensionObject](ctx, utils.NewReadBufferByteBased(opcuaResponse, utils.WithByteOrderForReadBufferByteBased(binary.LittleEndian)), false)
if err != nil {
s.log.Error().Err(err).Msg("error parsing")
return
}
s.log.Trace().Stringer("message", message).Msg("looking at message")
if fault, ok := message.GetBody().(readWriteModel.ServiceFault); ok {
statusCode := fault.GetResponseHeader().(readWriteModel.ResponseHeader).GetServiceResult().GetStatusCode()
statusCodeByValue, _ := readWriteModel.OpcuaStatusCodeByValue(statusCode)
s.log.Error().
Uint32("statusCode", statusCode).
Stringer("statusCodeByValue", statusCodeByValue).
Msg("Failed to connect to opc ua server for the following reason")
return
}
s.log.Debug().Msg("Got Close Session Response Connection Response")
extensionObject, err := readWriteModel.ExtensionObjectParseWithBuffer[readWriteModel.ExtensionObject](ctx, utils.NewReadBufferByteBased(opcuaResponse, utils.WithByteOrderForReadBufferByteBased(binary.LittleEndian)), false)
if err != nil {
s.log.Error().Err(err).Msg("error parsing")
return
}
unknownExtensionObject := extensionObject.GetBody()
if responseMessage, ok := unknownExtensionObject.(readWriteModel.CloseSessionResponse); ok {
go s.onDisconnectCloseSecureChannel(ctx, connection, responseMessage, message.GetBody().(readWriteModel.CloseSessionResponse))
} else {
serviceFault := unknownExtensionObject.(readWriteModel.ServiceFault)
header := serviceFault.GetResponseHeader().(readWriteModel.ResponseHeader)
s.log.Error().
Stringer("serviceResult", header.GetServiceResult()).
Msg("Subscription ServiceFault returned from server with error code")
}
}
errorDispatcher := func(err error) {
s.log.Error().Err(err).Msg("Error while waiting for close session response")
}
s.submit(ctx, connection.messageCodec, errorDispatcher, consumer, buffer)
}
func (s *SecureChannel) onDisconnectCloseSecureChannel(ctx context.Context, connection *Connection, message readWriteModel.CloseSessionResponse, response readWriteModel.CloseSessionResponse) {
transactionId := s.channelTransactionManager.getTransactionIdentifier()
requestHeader := readWriteModel.NewRequestHeader(
s.getAuthenticationToken(),
s.getCurrentDateTime(),
0, //RequestHandle
0,
NULL_STRING,
REQUEST_TIMEOUT_LONG,
NULL_EXTENSION_OBJECT)
closeSecureChannelRequest := readWriteModel.NewCloseSecureChannelRequest(requestHeader)
identifier := closeSecureChannelRequest.GetExtensionId()
expandedNodeId := readWriteModel.NewExpandedNodeId(
false, //Namespace Uri Specified
false, //Server Index Specified
readWriteModel.NewNodeIdFourByte(0, uint16(identifier)),
nil,
nil,
)
closeRequest := readWriteModel.NewOpcuaCloseRequest(
readWriteModel.ChunkType_FINAL,
readWriteModel.NewSecurityHeader(s.channelId.Load(), s.tokenId.Load()),
readWriteModel.NewExtensiblePayload(
readWriteModel.NewSequenceHeader(transactionId, transactionId),
readWriteModel.NewRootExtensionObject(
expandedNodeId,
closeSecureChannelRequest,
identifier,
),
0,
),
true,
)
apu := readWriteModel.NewOpcuaAPU(closeRequest, false, true)
requestConsumer := func(transactionId int32) {
if err := connection.messageCodec.SendRequest(
ctx,
apu,
func(message spi.Message) bool {
opcuaAPU, ok := message.(readWriteModel.OpcuaAPU)
if !ok {
s.log.Debug().Type("type", message).Msg("Not relevant")
return false
}
messagePDU := opcuaAPU.GetMessage()
openResponse, ok := messagePDU.(readWriteModel.OpcuaMessageResponse)
if !ok {
s.log.Debug().Type("type", messagePDU).Msg("Not relevant")
return false
}
return openResponse.GetMessage().GetSequenceHeader().GetRequestId() == transactionId
},
func(message spi.Message) error {
opcuaAPU := message.(readWriteModel.OpcuaAPU)
messagePDU := opcuaAPU.GetMessage()
opcuaMessageResponse := messagePDU.(readWriteModel.OpcuaMessageResponse)
s.log.Trace().Stringer("opcuaMessageResponse", opcuaMessageResponse).Msg("Got close secure channel response")
return nil
},
func(err error) error {
s.log.Debug().Err(err).Msg("error submitting")
return nil
},
REQUEST_TIMEOUT,
); err != nil {
s.log.Debug().Err(err).Msg("a error")
}
}
s.log.Debug().Int32("transactionId", transactionId).Msg("Submitting CloseSecureChannel with id")
if err := s.channelTransactionManager.submit(requestConsumer, transactionId); err != nil {
s.log.Debug().Err(err).Msg("error submitting")
}
}
func (s *SecureChannel) onDiscover(ctx context.Context, codec *MessageCodec) {
s.log.Trace().Msg("onDiscover")
// Only the TCP transport supports login.
s.log.Debug().Msg("Opcua Driver running in ACTIVE mode, discovering endpoints")
hello := readWriteModel.NewOpcuaHelloRequest(
readWriteModel.ChunkType_FINAL,
VERSION,
readWriteModel.NewOpcuaProtocolLimits(
DEFAULT_RECEIVE_BUFFER_SIZE,
DEFAULT_SEND_BUFFER_SIZE,
DEFAULT_MAX_MESSAGE_SIZE,
DEFAULT_MAX_CHUNK_COUNT,
),
s.endpoint,
true,
)
apu := readWriteModel.NewOpcuaAPU(hello, false, true)
requestConsumer := func(transactionId int32) {
if err := codec.SendRequest(
ctx,
apu,
func(message spi.Message) bool {
opcuaAPU, ok := message.(readWriteModel.OpcuaAPU)
if !ok {
s.log.Debug().Type("type", message).Msg("Not relevant")
return false
}
messagePDU := opcuaAPU.GetMessage()
_, ok = messagePDU.(readWriteModel.OpcuaAcknowledgeResponse)
if !ok {
s.log.Debug().Type("type", messagePDU).Msg("Not relevant")
return false
}
return true
},
func(message spi.Message) error {
opcuaAPU := message.(readWriteModel.OpcuaAPU)
messagePDU := opcuaAPU.GetMessage()
opcuaAcknowledgeResponse := messagePDU.(readWriteModel.OpcuaAcknowledgeResponse)
s.log.Trace().Stringer("opcuaAcknowledgeResponse", opcuaAcknowledgeResponse).Msg("Got Hello Response Connection Response")
go s.onDiscoverOpenSecureChannel(ctx, codec, opcuaAcknowledgeResponse)
return nil
},
func(err error) error {
s.log.Debug().Err(err).Msg("error submitting")
return nil
},
REQUEST_TIMEOUT,
); err != nil {
s.log.Debug().Err(err).Msg("a error")
}
}
if err := s.channelTransactionManager.submit(requestConsumer, s.channelTransactionManager.getTransactionIdentifier()); err != nil {
s.log.Debug().Err(err).Msg("error submitting")
}
}
func (s *SecureChannel) onDiscoverOpenSecureChannel(ctx context.Context, codec *MessageCodec, opcuaAcknowledgeResponse readWriteModel.OpcuaAcknowledgeResponse) {
s.log.Trace().Msg("onDiscoverOpenSecureChannel")
transactionId := s.channelTransactionManager.getTransactionIdentifier()
requestHeader := readWriteModel.NewRequestHeader(
s.getAuthenticationToken(),
s.getCurrentDateTime(),
0, //RequestHandle
0,
NULL_STRING,
REQUEST_TIMEOUT_LONG,
NULL_EXTENSION_OBJECT)
openSecureChannelRequest := readWriteModel.NewOpenSecureChannelRequest(
requestHeader,
VERSION,
readWriteModel.SecurityTokenRequestType_securityTokenRequestTypeIssue,
readWriteModel.MessageSecurityMode_messageSecurityModeNone,
NULL_BYTE_STRING,
s.lifetime,
)
identifier := openSecureChannelRequest.GetExtensionId()
expandedNodeId := readWriteModel.NewExpandedNodeId(
false, //Namespace Uri Specified
false, //Server Index Specified
readWriteModel.NewNodeIdFourByte(0, uint16(identifier)),
nil,
nil,
)
extObject := readWriteModel.NewRootExtensionObject(
expandedNodeId,
openSecureChannelRequest,
identifier,
)
buffer := utils.NewWriteBufferByteBased(utils.WithByteOrderForByteBasedBuffer(binary.LittleEndian))
if err := extObject.SerializeWithWriteBuffer(ctx, buffer); err != nil {
s.log.Debug().Err(err).Msg("error serializing")
return
}
openRequest := readWriteModel.NewOpcuaOpenRequest(
readWriteModel.ChunkType_FINAL,
readWriteModel.NewOpenChannelMessageRequest(
0,
SECURITY_POLICY_NONE,
NULL_BYTE_STRING,
NULL_BYTE_STRING,
),
readWriteModel.NewBinaryPayload(
readWriteModel.NewSequenceHeader(transactionId, transactionId),
buffer.GetBytes(),
uint32(len(buffer.GetBytes())),
),
uint32(len(buffer.GetBytes())),
true,
)
apu := readWriteModel.NewOpcuaAPU(openRequest, false, true)
requestConsumer := func(transactionId int32) {
if err := codec.SendRequest(
ctx,
apu,
func(message spi.Message) bool {
opcuaAPU, ok := message.(readWriteModel.OpcuaAPU)
if !ok {
s.log.Debug().Type("type", message).Msg("Not relevant")
return false
}
messagePDU := opcuaAPU.GetMessage()
openResponse, ok := messagePDU.(readWriteModel.OpcuaOpenResponse)
if !ok {
s.log.Debug().Type("type", messagePDU).Msg("Not relevant")
return false
}
return openResponse.GetMessage().GetSequenceHeader().GetRequestId() == transactionId
},
func(message spi.Message) error {
opcuaAPU := message.(readWriteModel.OpcuaAPU)
messagePDU := opcuaAPU.GetMessage()
opcuaOpenResponse := messagePDU.(readWriteModel.OpcuaOpenResponse)
readBuffer := utils.NewReadBufferByteBased(opcuaOpenResponse.(readWriteModel.BinaryPayload).GetPayload(), utils.WithByteOrderForReadBufferByteBased(binary.LittleEndian))
extensionObject, err := readWriteModel.ExtensionObjectParseWithBuffer[readWriteModel.ExtensionObject](ctx, readBuffer, false)
if err != nil {
return errors.Wrap(err, "error parsing")
}
if fault, ok := extensionObject.GetBody().(readWriteModel.ServiceFault); ok {
statusCode := fault.GetResponseHeader().(readWriteModel.ResponseHeader).GetServiceResult().GetStatusCode()
statusCodeByValue, _ := readWriteModel.OpcuaStatusCodeByValue(statusCode)
s.log.Error().
Uint32("statusCode", statusCode).
Stringer("statusCodeByValue", statusCodeByValue).
Msg("Failed to connect to opc ua server for the following reason")
return nil
}
s.log.Debug().Msg("Got Secure Response Connection Response")
openSecureChannelResponse := extensionObject.GetBody().(readWriteModel.OpenSecureChannelResponse)
go s.onDiscoverGetEndpointsRequest(ctx, codec, opcuaOpenResponse, openSecureChannelResponse)
return nil
},
func(err error) error {
s.log.Debug().Err(err).Msg("error submitting")
return nil
},
REQUEST_TIMEOUT,
); err != nil {
s.log.Debug().Err(err).Msg("a error")
}
}
if err := s.channelTransactionManager.submit(requestConsumer, transactionId); err != nil {
s.log.Debug().Err(err).Msg("error submitting")
}
}
func (s *SecureChannel) onDiscoverGetEndpointsRequest(ctx context.Context, codec *MessageCodec, opcuaOpenResponse readWriteModel.OpcuaOpenResponse, openSecureChannelResponse readWriteModel.OpenSecureChannelResponse) {
s.log.Trace().Msg("onDiscoverGetEndpointsRequest")
s.tokenId.Store(openSecureChannelResponse.GetSecurityToken().(readWriteModel.ChannelSecurityToken).GetTokenId())
s.channelId.Store(openSecureChannelResponse.GetSecurityToken().(readWriteModel.ChannelSecurityToken).GetChannelId())
transactionId := s.channelTransactionManager.getTransactionIdentifier()
nextSequenceNumber := opcuaOpenResponse.GetMessage().GetSequenceHeader().GetSequenceNumber() + 1
nextRequestId := opcuaOpenResponse.GetMessage().GetSequenceHeader().GetRequestId() + 1
if !(transactionId == nextSequenceNumber) {
s.log.Error().
Int32("transactionId", transactionId).
Int32("nextSequenceNumber", nextSequenceNumber).
Msg("Sequence number isn't as expected, we might have missed a packet. - transactionId != nextSequenceNumber")
return
}
requestHeader := readWriteModel.NewRequestHeader(
s.getAuthenticationToken(),
s.getCurrentDateTime(),
0, //RequestHandle
0,
NULL_STRING,
REQUEST_TIMEOUT_LONG,
NULL_EXTENSION_OBJECT)
endpointsRequest := readWriteModel.NewGetEndpointsRequest(
requestHeader,
s.endpoint,
nil,
nil)
identifier := endpointsRequest.GetExtensionId()
expandedNodeId := readWriteModel.NewExpandedNodeId(
false, //Namespace Uri Specified
false, //Server Index Specified
readWriteModel.NewNodeIdFourByte(0, uint16(identifier)),
nil,
nil,
)
extObject := readWriteModel.NewRootExtensionObject(
expandedNodeId,
endpointsRequest,
identifier,
)
buffer := utils.NewWriteBufferByteBased(utils.WithByteOrderForByteBasedBuffer(binary.LittleEndian))
if err := extObject.SerializeWithWriteBuffer(ctx, buffer); err != nil {
s.log.Debug().Err(err).Msg("error serializing")
return
}
messageRequest := readWriteModel.NewOpcuaMessageRequest(
readWriteModel.ChunkType_FINAL,
readWriteModel.NewSecurityHeader(
s.channelId.Load(),
s.tokenId.Load(),
),
readWriteModel.NewBinaryPayload(
readWriteModel.NewSequenceHeader(nextSequenceNumber, nextRequestId),
buffer.GetBytes(),
uint32(len(buffer.GetBytes())),
),
uint32(len(buffer.GetBytes())),
true,
)
apu := readWriteModel.NewOpcuaAPU(messageRequest, false, true)
requestConsumer := func(transactionId int32) {
if err := codec.SendRequest(
ctx,
apu,
func(message spi.Message) bool {
opcuaAPU, ok := message.(readWriteModel.OpcuaAPU)
if !ok {
s.log.Debug().Type("type", message).Msg("Not relevant")
return false
}
messagePDU := opcuaAPU.GetMessage()
messageResponse, ok := messagePDU.(readWriteModel.OpcuaMessageResponse)
if !ok {
s.log.Debug().Type("type", messagePDU).Msg("Not relevant")
return false
}
return messageResponse.GetMessage().GetSequenceHeader().GetRequestId() == transactionId
},
func(message spi.Message) error {
opcuaAPU := message.(readWriteModel.OpcuaAPU)
messagePDU := opcuaAPU.GetMessage()
messageResponse := messagePDU.(readWriteModel.OpcuaMessageResponse)
readBuffer := utils.NewReadBufferByteBased(messageResponse.(readWriteModel.BinaryPayload).GetPayload(), utils.WithByteOrderForReadBufferByteBased(binary.LittleEndian))
extensionObject, err := readWriteModel.ExtensionObjectParseWithBuffer[readWriteModel.ExtensionObject](ctx, readBuffer, false)
if err != nil {
return errors.Wrap(err, "error parsing")
}
if fault, ok := extensionObject.GetBody().(readWriteModel.ServiceFault); ok {
statusCode := fault.GetResponseHeader().(readWriteModel.ResponseHeader).GetServiceResult().GetStatusCode()
statusCodeByValue, _ := readWriteModel.OpcuaStatusCodeByValue(statusCode)
s.log.Error().
Uint32("statusCode", statusCode).
Stringer("statusCodeByValue", statusCodeByValue).
Msg("Failed to connect to opc ua server for the following reason")
} else {
s.log.Debug().Msg("Got Secure Response Connection Response")
response := extensionObject.GetBody().(readWriteModel.GetEndpointsResponse)
endpoints := response.GetEndpoints()
for _, endpoint := range endpoints {
endpointDescription := endpoint.(readWriteModel.EndpointDescription)
if endpointDescription.GetEndpointUrl().GetStringValue() == (s.endpoint.GetStringValue()) && *endpointDescription.GetSecurityPolicyUri().GetStringValue() == (s.securityPolicy) {
s.log.Info().Str("stringValue", *s.endpoint.GetStringValue()).Msg("Found OPC UA endpoint")
s.configuration.SenderCertificate = endpointDescription.GetServerCertificate().GetStringValue()
}
}
digest := sha1.Sum(s.configuration.SenderCertificate)
s.thumbprint = readWriteModel.NewPascalByteString(int32(len(digest)), digest[:])
go s.onDiscoverCloseSecureChannel(ctx, codec, response)
}
return nil
},
func(err error) error {
s.log.Debug().Err(err).Msg("error submitting")
return nil
},
REQUEST_TIMEOUT,
); err != nil {
s.log.Debug().Err(err).Msg("a error")
}
}
if err := s.channelTransactionManager.submit(requestConsumer, transactionId); err != nil {
s.log.Debug().Err(err).Msg("error submitting")
}
}
func (s *SecureChannel) onDiscoverCloseSecureChannel(ctx context.Context, codec *MessageCodec, response readWriteModel.GetEndpointsResponse) {
s.log.Trace().Msg("onDiscoverCloseSecureChannel")
transactionId := s.channelTransactionManager.getTransactionIdentifier()
requestHeader := readWriteModel.NewRequestHeader(
s.getAuthenticationToken(),
s.getCurrentDateTime(),
0, //RequestHandle
0,
NULL_STRING,
REQUEST_TIMEOUT_LONG,
NULL_EXTENSION_OBJECT)
closeSecureChannelRequest := readWriteModel.NewCloseSecureChannelRequest(requestHeader)
identifier := closeSecureChannelRequest.GetExtensionId()
expandedNodeId := readWriteModel.NewExpandedNodeId(
false, //Namespace Uri Specified
false, //Server Index Specified
readWriteModel.NewNodeIdFourByte(0, uint16(identifier)),
nil,
nil,
)
closeRequest := readWriteModel.NewOpcuaCloseRequest(
readWriteModel.ChunkType_FINAL,
readWriteModel.NewSecurityHeader(
s.channelId.Load(),
s.tokenId.Load(),
),
readWriteModel.NewExtensiblePayload(
readWriteModel.NewSequenceHeader(transactionId, transactionId),
readWriteModel.NewRootExtensionObject(
expandedNodeId,
closeSecureChannelRequest,
identifier,
),
uint32(0),
),
true,
)
apu := readWriteModel.NewOpcuaAPU(closeRequest, false, true)
requestConsumer := func(transactionId int32) {
if err := codec.SendRequest(
ctx,
apu,
func(message spi.Message) bool {
opcuaAPU, ok := message.(readWriteModel.OpcuaAPU)
if !ok {
s.log.Debug().Type("type", message).Msg("Not relevant")
return false
}
messagePDU := opcuaAPU.GetMessage()
openResponse, ok := messagePDU.(readWriteModel.OpcuaMessageResponse)
if !ok {
s.log.Debug().Type("type", messagePDU).Msg("Not relevant")
return false
}
return openResponse.GetMessage().GetSequenceHeader().GetRequestId() == transactionId
},
func(message spi.Message) error {
opcuaAPU := message.(readWriteModel.OpcuaAPU)
messagePDU := opcuaAPU.GetMessage()
opcuaMessageResponse := messagePDU.(readWriteModel.OpcuaMessageResponse)
s.log.Trace().Stringer("opcuaMessageResponse", opcuaMessageResponse).Msg("Got close secure channel response")
return nil
},
func(err error) error {
s.log.Debug().Err(err).Msg("error submitting")
return nil
},
REQUEST_TIMEOUT,
); err != nil {
s.log.Debug().Err(err).Msg("a error")
}
}
s.log.Debug().Int32("transactionId", transactionId).Msg("Submitting CloseSecureChannel with id")
if err := s.channelTransactionManager.submit(requestConsumer, transactionId); err != nil {
s.log.Debug().Err(err).Msg("error submitting")
}
}
func (s *SecureChannel) keepAlive() {
s.keepAliveStateChange.Lock()
defer s.keepAliveStateChange.Unlock()
if s.keepAliveIndicator.Load() {
s.log.Warn().Msg("keepalive already running")
return
}
s.keepAliveWg.Add(1)
s.wg.Add(1)
go func() {
defer s.wg.Done()
defer s.keepAliveWg.Done()
s.keepAliveIndicator.Store(true)
defer s.keepAliveIndicator.Store(false)
defer s.log.Info().Msg("ending keepalive")
ctx := context.Background()
for (s.codec == nil || s.codec.IsRunning()) && s.keepAliveIndicator.Load() {
sleepTime := time.Duration(math.Ceil(float64(s.lifetime)*0.75)) * time.Millisecond
s.log.Trace().Dur("sleepTime", sleepTime).Msg("Sleeping")
time.Sleep(sleepTime)
transactionId := s.channelTransactionManager.getTransactionIdentifier()
requestHeader := readWriteModel.NewRequestHeader(
s.getAuthenticationToken(),
s.getCurrentDateTime(),
0, //RequestHandle
0,
NULL_STRING,
REQUEST_TIMEOUT_LONG,
NULL_EXTENSION_OBJECT,
)
var openSecureChannelRequest readWriteModel.OpenSecureChannelRequest
if s.isEncrypted {
openSecureChannelRequest = readWriteModel.NewOpenSecureChannelRequest(
requestHeader,
VERSION,
readWriteModel.SecurityTokenRequestType_securityTokenRequestTypeIssue,
readWriteModel.MessageSecurityMode_messageSecurityModeSignAndEncrypt,
readWriteModel.NewPascalByteString(int32(len(s.clientNonce)), s.clientNonce),
uint32(s.lifetime))
} else {
openSecureChannelRequest = readWriteModel.NewOpenSecureChannelRequest(
requestHeader,
VERSION,
readWriteModel.SecurityTokenRequestType_securityTokenRequestTypeIssue,
readWriteModel.MessageSecurityMode_messageSecurityModeNone,
NULL_BYTE_STRING,
uint32(s.lifetime))
}
identifier := openSecureChannelRequest.GetExtensionId()
expandedNodeId := readWriteModel.NewExpandedNodeId(false, //Namespace Uri Specified
false, //Server Index Specified
readWriteModel.NewNodeIdFourByte(0, uint16(identifier)),
nil,
nil)
extObject := readWriteModel.NewRootExtensionObject(
expandedNodeId,
openSecureChannelRequest,
identifier,
)
buffer := utils.NewWriteBufferByteBased(utils.WithByteOrderForByteBasedBuffer(binary.LittleEndian))
if err := extObject.SerializeWithWriteBuffer(ctx, buffer); err != nil {
s.log.Error().Err(err).Msg("error serializing")
return
}
openRequest := readWriteModel.NewOpcuaOpenRequest(
readWriteModel.ChunkType_FINAL,
readWriteModel.NewOpenChannelMessageRequest(0,
readWriteModel.NewPascalString(&s.securityPolicy),
s.publicCertificate,
s.thumbprint,
),
readWriteModel.NewBinaryPayload(
readWriteModel.NewSequenceHeader(transactionId, transactionId),
buffer.GetBytes(),
uint32(len(buffer.GetBytes())),
),
uint32(len(buffer.GetBytes())),
true,
)
var apu readWriteModel.OpcuaAPU
if s.isEncrypted {
message, err := s.encryptionHandler.encodeMessage(ctx, openRequest, buffer.GetBytes())
if err != nil {
s.log.Error().Err(err).Msg("error encoding")
return
}
apu, err = readWriteModel.OpcuaAPUParse(ctx, message, false, true)
if err != nil {
s.log.Error().Err(err).Msg("error parsing")
return
}
} else {
apu = readWriteModel.NewOpcuaAPU(openRequest, false, true)
}
requestConsumer := func(transactionId int32) {
if err := s.codec.SendRequest(
ctx,
apu,
func(message spi.Message) bool {
opcuaAPU, ok := message.(readWriteModel.OpcuaAPU)
if !ok {
s.log.Debug().Type("type", message).Msg("Not relevant")
return false
}
messagePDU := opcuaAPU.GetMessage()
openResponse, ok := messagePDU.(readWriteModel.OpcuaOpenResponse)
if !ok {
s.log.Debug().Type("type", messagePDU).Msg("Not relevant")
return false
}
return openResponse.GetMessage().GetSequenceHeader().GetRequestId() == transactionId
},
func(message spi.Message) error {
opcuaAPU := message.(readWriteModel.OpcuaAPU)
messagePDU := opcuaAPU.GetMessage()
opcuaOpenResponse := messagePDU.(readWriteModel.OpcuaOpenResponse)
readBuffer := utils.NewReadBufferByteBased(opcuaOpenResponse.(readWriteModel.BinaryPayload).GetPayload(), utils.WithByteOrderForReadBufferByteBased(binary.LittleEndian))
extensionObject, err := readWriteModel.ExtensionObjectParseWithBuffer[readWriteModel.ExtensionObject](ctx, readBuffer, false)
if err != nil {
return errors.Wrap(err, "error parsing")
}
if fault, ok := extensionObject.GetBody().(readWriteModel.ServiceFault); ok {
statusCode := fault.GetResponseHeader().(readWriteModel.ResponseHeader).GetServiceResult().GetStatusCode()
statusCodeByValue, _ := readWriteModel.OpcuaStatusCodeByValue(statusCode)
s.log.Error().
Uint32("statusCode", statusCode).
Stringer("statusCodeByValue", statusCodeByValue).
Msg("Failed to connect to opc ua server for the following reason")
} else {
s.log.Debug().Msg("Got Secure Response Connection Response")
openSecureChannelResponse := extensionObject.GetBody().(readWriteModel.OpenSecureChannelResponse)
token := openSecureChannelResponse.GetSecurityToken().(readWriteModel.ChannelSecurityToken)
s.tokenId.Store(token.GetTokenId())
s.channelId.Store(token.GetChannelId())
s.lifetime = token.GetRevisedLifetime()
}
return nil
},
func(err error) error {
s.log.Debug().Err(err).Msg("error submitting")
return nil
},
REQUEST_TIMEOUT,
); err != nil {
s.log.Debug().Err(err).Msg("a error")
}
}
s.log.Debug().Int32("transactionId", transactionId).Msg("Submitting OpenSecureChannel with id")
if err := s.channelTransactionManager.submit(requestConsumer, transactionId); err != nil {
s.log.Debug().Err(err).Msg("error submitting")
}
}
}()
return
}
// getRequestHandle returns the next request handle
func (s *SecureChannel) getRequestHandle() uint32 {
return s.requestHandleGenerator.Add(1) - 1
}
// getAuthenticationToken returns the authentication token for the current connection
func (s *SecureChannel) getAuthenticationToken() readWriteModel.NodeId {
if s.authenticationToken == nil {
panic("authenticationToken should be set at this point")
}
return readWriteModel.NewNodeId(s.authenticationToken)
}
// getChannelId gets the Channel identifier for the current channel
func (s *SecureChannel) getChannelId() uint32 {
return s.channelId.Load()
}
// getTokenId gets the Token Identifier
func (s *SecureChannel) getTokenId() uint32 {
return s.tokenId.Load()
}
// selectEndpoint Selects the endpoint to use based on the connection string provided.
// - If Discovery is disabled it will use the host address return from the server
// - @param sessionResponse - The CreateSessionResponse message returned by the server
// - @throws PlcRuntimeException - If no endpoint with a compatible policy is found raise and error.
func (s *SecureChannel) selectEndpoint(sessionResponse readWriteModel.CreateSessionResponse) {
// Get a list of the endpoints which match ours.
var filteredEndpoints []readWriteModel.EndpointDescription
for _, endpoint := range sessionResponse.GetServerEndpoints() {
endpointDescription := endpoint.(readWriteModel.EndpointDescription)
if s.isEndpoint(endpointDescription) {
filteredEndpoints = append(filteredEndpoints, endpointDescription)
}
}
//Determine if the requested security policy is included in the endpoint
for _, endpoint := range filteredEndpoints {
userIdentityTokens := make([]readWriteModel.UserTokenPolicy, len(endpoint.GetUserIdentityTokens()))
for i, definition := range endpoint.GetUserIdentityTokens() {
userIdentityTokens[i] = definition.(readWriteModel.UserTokenPolicy)
}
s.hasIdentity(userIdentityTokens)
}
if s.policyId == nil {
s.log.Error().Str("endpoint", s.endpoints[0]).Msg("Unable to find endpoint")
return
}
if s.tokenType == 0xffffffff { // TODO: what did we use as undefined
s.log.Error().Str("endpoint", s.endpoints[0]).Msg("Unable to find Security Policy for endpoint")
return
}
}
// isEndpoint checks each component of the return endpoint description against the connection string.
// - If all are correct then return true.
// - @param endpoint - EndpointDescription returned from server
// - @return true if this endpoint matches our configuration
// - @return error - If the returned endpoint string doesn't match the format expected
func (s *SecureChannel) isEndpoint(endpoint readWriteModel.EndpointDescription) bool {
// Split up the connection string into its individual segments.
matches := utils.GetSubgroupMatches(URI_PATTERN, *endpoint.GetEndpointUrl().GetStringValue())
if len(matches) == 0 {
s.log.Error().Stringer("endpoint", endpoint).Msg("Endpoint returned from the server doesn't match the format '{protocol-code}:({transport-code})?//{transport-host}(:{transport-port})(/{transport-endpoint})'")
return false
}
s.log.Trace().
Str("transportHost", matches["transportHost"]).
Str("transportPort", matches["transportPort"]).
Str("transportEndpoint", matches["transportEndpoint"]).
Msg("Using Endpoint")
if s.configuration.Discovery && !slices.Contains(s.endpoints, matches["transportHost"]) {
return false
}
if s.configuration.Port != matches["transportPort"] {
return false
}
if s.configuration.TransportEndpoint != matches["transportEndpoint"] {
return false
}
if !s.configuration.Discovery {
s.configuration.Host = matches["transportHost"]
}
return true
}
// hasIdentity confirms that a policy that matches the connection string is available from
// - the returned endpoints. It sets the policyId and tokenType for the policy to use.
// - @param policies - A list of policies returned with the endpoint description.
func (s *SecureChannel) hasIdentity(policies []readWriteModel.UserTokenPolicy) {
for _, identityToken := range policies {
if (identityToken.GetTokenType() == readWriteModel.UserTokenType_userTokenTypeAnonymous) && (s.username == "") {
s.policyId = identityToken.GetPolicyId()
s.tokenType = identityToken.GetTokenType()
} else if (identityToken.GetTokenType() == readWriteModel.UserTokenType_userTokenTypeUserName) && (s.username != "") {
s.policyId = identityToken.GetPolicyId()
s.tokenType = identityToken.GetTokenType()
}
}
}
// getIdentityToken creates an IdentityToken to authenticate with a server.
// - @param tokenType the token type
// - @param policyId the policy id
// - @return returns an ExtensionObject with an IdentityToken.
func (s *SecureChannel) getIdentityToken(tokenType readWriteModel.UserTokenType, policyId *string) readWriteModel.ExtensionObject {
switch tokenType {
case readWriteModel.UserTokenType_userTokenTypeAnonymous:
//If we aren't using authentication tell the server we would like to log in anonymously
anonymousIdentityToken := readWriteModel.NewAnonymousIdentityToken(readWriteModel.NewPascalString(policyId))
extExpandedNodeId := readWriteModel.NewExpandedNodeId(
false, //Namespace Uri Specified
false, //Server Index Specified
readWriteModel.NewNodeIdFourByte(0, uint16(anonymousIdentityToken.GetExtensionId())),
nil,
nil,
)
return readWriteModel.NewBinaryExtensionObjectWithMask(
extExpandedNodeId,
BINARY_ENCODING_MASK,
anonymousIdentityToken,
anonymousIdentityToken.GetExtensionId(),
false,
)
case readWriteModel.UserTokenType_userTokenTypeUserName:
//Encrypt the password using the server nonce and server public key
passwordBytes := []byte(s.password)
encodeableBuffer := new(bytes.Buffer)
var err error
err = binary.Write(encodeableBuffer, binary.LittleEndian, len(passwordBytes)+len(s.senderNonce))
s.log.Debug().Err(err).Msg("write")
err = binary.Write(encodeableBuffer, binary.LittleEndian, passwordBytes)
s.log.Debug().Err(err).Msg("write")
err = binary.Write(encodeableBuffer, binary.LittleEndian, s.senderNonce)
s.log.Debug().Err(err).Msg("write")
encodeablePassword := make([]byte, 4+len(passwordBytes)+len(s.senderNonce))
n, err := encodeableBuffer.Read(encodeablePassword)
s.log.Debug().Err(err).Int("n", n).Msg("read")
encryptedPassword, err := s.encryptionHandler.encryptPassword(encodeablePassword)
if err != nil {
s.log.Error().Err(err).Msg("error encrypting password")
return nil
}
userNameIdentityToken := readWriteModel.NewUserNameIdentityToken(
readWriteModel.NewPascalString(policyId),
readWriteModel.NewPascalString(&s.username),
readWriteModel.NewPascalByteString(int32(len(encryptedPassword)), encryptedPassword),
readWriteModel.NewPascalString(utils.ToPtr(PASSWORD_ENCRYPTION_ALGORITHM)),
)
extExpandedNodeId := readWriteModel.NewExpandedNodeId(
false, //Namespace Uri Specified
false, //Server Index Specified
readWriteModel.NewNodeIdFourByte(0, uint16(userNameIdentityToken.GetExtensionId())),
nil,
nil)
return readWriteModel.NewBinaryExtensionObjectWithMask(
extExpandedNodeId,
BINARY_ENCODING_MASK,
userNameIdentityToken,
userNameIdentityToken.GetExtensionId(),
false,
)
}
return nil
}
func (s *SecureChannel) getCurrentDateTime() int64 {
return (time.Now().UnixMilli() * 10000) + EPOCH_OFFSET
}