IndustrialDeviceController/Software/HighLevelApp/drivers/modbus/modbus_transport_tcp.c (264 lines of code) (raw):
/* Copyright (c) Microsoft Corporation. All rights reserved.
Licensed under the MIT License. */
#include <arpa/inet.h>
#include <netinet/tcp.h>
#include <poll.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/time.h>
#include <unistd.h>
#include <init/applibs_versions.h>
#include <applibs/networking.h>
#include <init/device_hal.h>
#include <init/globals.h>
#include <utils/llog.h>
#include <utils/timer.h>
#include <utils/utils.h>
#include <driver/modbus.h>
#include <safeclib/safe_lib.h>
#include "modbus_transport.h"
#include "modbus_transport_tcp.h"
// NOTE: Modbus tcp support simulataneous transcations and transcation id
// must been used.
// Current implementation assume only one outgoing transcation. Possible improvement
// is to send multiple requests out and then wait them all to be completed.
#define MB_TCP_MAX_ADU_SIZE 260
// header not include unit_id
#define MBAP_HEADER_SIZE 7
typedef struct modbus_transport_tcp_t modbus_transport_tcp_t;
struct modbus_transport_tcp_t {
modbus_transport_t base;
int sock_fd;
int port;
char ip[16]; // only support ipv4
uint16_t transcation_id;
};
/// <summary>
/// try to send count bytes over tcp connection, timeout in MB_TCP_IO_TIMEOUT_MS
/// </summary>
/// <param name="sockfd">socket fd</param>
/// <param name="buf">buffer to send</param>
/// <param name="count">count of bytes to send</param>
/// <param name="timeout">the value of timer in ms for this operation</param>
/// <returns>error code</returns>
static err_code tcp_send_bytes(int sockfd, uint8_t *buf, int count, int timeout)
{
int32_t total = 0;
struct timespec poll_sw;
timer_stopwatch_start(&poll_sw);
struct pollfd fds[1];
fds[0].fd = sockfd;
fds[0].events = POLLOUT;
while (total < count) {
int32_t elapse_ms = timer_stopwatch_stop(&poll_sw);
if (elapse_ms >= timeout) {
return DEVICE_E_TIMEOUT;
}
int nevents = poll(fds, 1, timeout - elapse_ms);
if (nevents < 0) {
LOGE("socket I/O error");
return DEVICE_E_IO;
} else if (nevents == 0) {
LOGE("socket sending timeout");
return DEVICE_E_TIMEOUT;
} else {
if (fds[0].revents & POLLHUP) {
LOGE("socket connection broken");
return DEVICE_E_BROKEN;
} else if (fds[0].revents & POLLERR) {
LOGE("socket poll error");
return DEVICE_E_IO;
} else if (fds[0].revents & POLLOUT) {
int nsend = send(sockfd, buf + total, count - total, MSG_NOSIGNAL);
if (nsend <= 0) {
LOGE("socket I/O error");
return DEVICE_E_BROKEN;
}
total += nsend;
}
}
}
return DEVICE_OK;
}
/// <summary>
/// try to receied count bytes over tcp connection, timeout in MB_TCP_IO_TIMEOUT_MS
/// </summary>
/// <param name="sockfd">socket fd</param>
/// <param name="buf">buffer to receive data</param>
/// <param name="count">number of bytes to receive</param>
/// <param name="timeout">the value of timer in ms for this operation</param>
/// <returns>error code</returns>
static err_code tcp_recv_bytes(int sockfd, uint8_t *buf, int count, int timeout)
{
int total = 0;
struct pollfd fds[1];
fds[0].fd = sockfd;
fds[0].events = POLLIN;
struct timespec poll_sw;
timer_stopwatch_start(&poll_sw);
while (total < count) {
int32_t elapse_ms = timer_stopwatch_stop(&poll_sw);
if (elapse_ms >= timeout) {
return DEVICE_E_TIMEOUT;
}
int nevents = poll(fds, 1, timeout - elapse_ms);
if (nevents < 0) {
LOGE("socket I/O error");
return DEVICE_E_IO;
} else if (nevents == 0) {
LOGE("socket receiving timeout");
return DEVICE_E_TIMEOUT;
} else {
if (fds[0].revents & POLLHUP) {
LOGE("socket connection broken");
return DEVICE_E_BROKEN;
} else if (fds[0].revents & POLLERR) {
LOGE("socket poll error");
return DEVICE_E_IO;
} else if (fds[0].revents & (POLLIN | POLLPRI)) {
int nrecv = recv(sockfd, buf + total, count - total, MSG_DONTWAIT);
if (nrecv <= 0) {
LOGE("socket I/O error");
return DEVICE_E_BROKEN;
}
total += nrecv;
}
}
}
return DEVICE_OK;
}
/// <summary>
/// open tcp connection
/// </summary>
err_code tcp_open(modbus_transport_t *instance, int32_t timeout_ms)
{
modbus_transport_tcp_t *ctx = (modbus_transport_tcp_t *)instance;
LOGD("tcp_open %s:%d", ctx->ip, ctx->port);
// assume ipv4, create a tcp socket
int sock = socket(AF_INET, SOCK_STREAM, 0);
if (sock < 0) {
LOGE("Failed to open socket");
return DEVICE_E_IO;
}
struct sockaddr_in server;
memset(&server, 0, sizeof(server));
server.sin_family = AF_INET;
server.sin_addr.s_addr = inet_addr(ctx->ip);
server.sin_port = htons(ctx->port);
struct timeval timeout;
timeout.tv_sec = timeout_ms / 1000;
timeout.tv_usec = timeout_ms % 1000 * 1000;
setsockopt(sock, SOL_SOCKET, SO_SNDTIMEO, &timeout, sizeof(timeout));
if (connect(sock, (struct sockaddr *)&server, sizeof(server)) != 0) {
LOGE("Failed to connect to %s:%d", ctx->ip, ctx->port);
close(sock);
return DEVICE_E_IO;
}
LOGD("Connected to modbus server %s:%d", ctx->ip, ctx->port);
ctx->sock_fd = sock;
return DEVICE_OK;
}
/// <summary>
/// close tcp connection
/// </summary>
/// <returns>0 on success, or -1 on failure</returns>
err_code tcp_close(modbus_transport_t *instance)
{
modbus_transport_tcp_t *ctx = (modbus_transport_tcp_t *)instance;
if (ctx->sock_fd >= 0) {
close(ctx->sock_fd);
ctx->sock_fd = -1;
}
return DEVICE_OK;
}
/// <summary>
/// prepare and send modbus tcp adu
/// </summary>
/// <param name="pdu">pdu to send</param>
/// <param name="pdu_len">pdu length to send</param>
/// <param name="timeout">the value of timer in ms for this operation</param>
/// <returns>error code</returns>
err_code tcp_send_request(modbus_transport_t *instance, uint8_t unit_id, const uint8_t *pdu, int32_t pdu_len,
int32_t timeout)
{
modbus_transport_tcp_t *ctx = (modbus_transport_tcp_t *)instance;
uint8_t adu[MB_TCP_MAX_ADU_SIZE];
if (ctx->sock_fd < 0) {
LOGE("Socket not open yet");
return DEVICE_E_INTERNAL;
}
ctx->transcation_id++;
adu[0] = (ctx->transcation_id >> 8) & 0xFF;
adu[1] = ctx->transcation_id & 0xFF;
adu[2] = 0; // protocol id hi
adu[3] = 0; // protocol id lo
adu[4] = 0; // length hi - always 0 as max is 254
adu[5] = 1 + pdu_len; // one byte unit_id + pdu length
adu[6] = unit_id;
memcpy_s(adu + MBAP_HEADER_SIZE, MB_TCP_MAX_ADU_SIZE - MBAP_HEADER_SIZE, pdu, pdu_len);
int32_t adu_len = MBAP_HEADER_SIZE + pdu_len;
int err = tcp_send_bytes(ctx->sock_fd, adu, adu_len, timeout);
if (err) {
LOGE("Failed to send request:%s", err_str(err));
return err;
}
#ifdef DEBUG_TRAFFIC
LOGD("ADU --> %s", hex(adu, adu_len));
#endif
return DEVICE_OK;
}
/// <summary>
/// recv response for previously sent request
/// </summary>
/// <param name="pdu">pdu buffer</param>
/// <param name="ppdu_len">pointer to variable that hold pdu len</param>
/// <param name="timeout">the value of timer in ms for this operation</param>
/// <returns>error code</returns>
err_code tcp_recv_response(modbus_transport_t *instance, uint8_t unit_id, uint8_t *pdu, int32_t *ppdu_len,
int32_t timeout)
{
modbus_transport_tcp_t *ctx = (modbus_transport_tcp_t *)instance;
uint8_t adu[MB_TCP_MAX_ADU_SIZE];
uint16_t transcation_id = 0;
int32_t pdu_len = 0;
if (ctx->sock_fd < 0) {
LOGE("Socket not open yet");
return DEVICE_E_INTERNAL;
}
struct timespec poll_sw;
timer_stopwatch_start(&poll_sw);
do {
// receive MBAP header first
int32_t elapse_ms = timer_stopwatch_stop(&poll_sw);
if (elapse_ms >= timeout) {
return DEVICE_E_TIMEOUT;
}
err_code err = tcp_recv_bytes(ctx->sock_fd, adu, MBAP_HEADER_SIZE, timeout - elapse_ms);
if (err) {
LOGE("Failed to receive MBAP header:%s", err_str(err));
return err;
}
transcation_id = (adu[0] << 8) + adu[1];
pdu_len = (adu[4] << 8) + adu[5] - 1; // exclude one byte unit id
if ((pdu_len < 0) || (pdu_len > MB_TCP_MAX_ADU_SIZE - MBAP_HEADER_SIZE)) {
LOGE("Invalid pdu len %d", pdu_len);
// clean any garbage data then bail out
uint8_t garbage;
do {
elapse_ms = timer_stopwatch_stop(&poll_sw);
if (elapse_ms >= timeout) {
return DEVICE_E_TIMEOUT;
}
} while (tcp_recv_bytes(ctx->sock_fd, &garbage, 1, timeout - elapse_ms) == 0);
return DEVICE_E_PROTOCOL;
}
// receive the pdu
elapse_ms = timer_stopwatch_stop(&poll_sw);
if (elapse_ms >= timeout) {
return DEVICE_E_TIMEOUT;
}
err = tcp_recv_bytes(ctx->sock_fd, adu + MBAP_HEADER_SIZE, pdu_len, timeout - elapse_ms);
if (err) {
LOGE("Failed to receive pdu:%s", err_str(err));
return err;
}
#ifdef DEBUG_TRAFFIC
LOGD("ADU<-- %s", hex(adu, MBAP_HEADER_SIZE + pdu_len));
#endif
// the response could be delayed, so wait until we received response
// for current transcation
} while (transcation_id < ctx->transcation_id);
if (transcation_id != ctx->transcation_id) {
LOGE("Expect response for transcation %d, got %d", ctx->transcation_id, transcation_id);
return DEVICE_E_PROTOCOL;
}
if (unit_id != adu[6]) {
LOGE("Expect unit_id %d, got %d", unit_id, adu[6]);
return DEVICE_E_PROTOCOL;
}
// Assume that the caller passes in the buffer with size of MODBUS_MAX_PDU_SIZE
memcpy_s(pdu, MODBUS_MAX_PDU_SIZE, adu + MBAP_HEADER_SIZE, pdu_len);
*ppdu_len = pdu_len;
return DEVICE_OK;
}
/// <summary>
/// destroy tcp transport instance, free resource
/// </summary>
void modbus_transport_tcp_destroy(modbus_transport_t *instance)
{
modbus_transport_tcp_t *ctx = (modbus_transport_tcp_t *)instance;
if (ctx->sock_fd >= 0) {
tcp_close(instance);
}
FREE(ctx);
}
/// <summary>
/// create modbus tcp transportation instance.
/// connection string format is "ip:port", null terminated
/// </summary>
modbus_transport_t *modbus_transport_tcp_create(const char *conn_str)
{
ASSERT(conn_str);
modbus_transport_tcp_t *tcp = (modbus_transport_tcp_t *)CALLOC(1, sizeof(modbus_transport_tcp_t));
tcp->base.transport_open = tcp_open;
tcp->base.transport_close = tcp_close;
tcp->base.send_request = tcp_send_request;
tcp->base.recv_response = tcp_recv_response;
char *colon = strchr(conn_str, ':');
if (colon) {
strncpy_s(tcp->ip, sizeof(tcp->ip), conn_str, colon - conn_str);
tcp->port = strtol(colon + 1, NULL, 10);
}
if ((strlen(tcp->ip) == 0) || (!tcp->port)) {
LOGE("Invalid connection string");
modbus_transport_tcp_destroy((modbus_transport_t *)tcp);
return NULL;
}
tcp->transcation_id = 0;
tcp->sock_fd = -1;
return (modbus_transport_t *)tcp;
}