in eventmesh-sdks/eventmesh-sdk-c/src/wemq_tcp.c [516:714]
int wemq_tcp_recv (int fd, void *msg, uint32_t * len, int iTimeOut, SSL * ssl)
{
int iRet;
uint32_t uiRemain, uiRecved;
if (NULL != ssl)
{
return wemq_ssl_recv (fd, msg, len, iTimeOut, ssl);
}
uiRemain = 4;
uiRecved = 0;
if (fd < 0 || msg == NULL || len == NULL)
{
return -1;
}
if (iTimeOut <= 0)
{
iTimeOut = 1000;
}
while (uiRemain > 0)
{
int iRecv = 0;
iRet = check_recv (fd, iTimeOut / 1000, iTimeOut % 1000);
if (iRet == 0)
{
return 0;
}
else if (iRet < 0)
{
LOGRMB (RMB_LOG_ERROR, "check_recv error, error=%d\n", errno);
return -2;
}
iRecv = recv (fd, msg + uiRecved, uiRemain, 0);
if (iRecv < 0 && (errno != EAGAIN))
{
LOGRMB (RMB_LOG_ERROR, "Recv error, fd close() error=%d, iRecv=%d\n",
errno, iRecv);
return -2;
}
if (iRecv == 0)
{
LOGRMB (RMB_LOG_ERROR, "Peer close connect, fd close() errno=%d\n",
errno);
return -2;
}
if (iRet > 0)
{
uiRecved += iRecv;
uiRemain -= iRecv;
}
}
if (uiRecved != 4 || strcmp (msg, "WEMQ") != 0)
{
LOGRMB (RMB_LOG_ERROR, "recv msg header error,%u:%s", uiRecved,
(char *) msg);
return -3;
}
uiRemain = 4;
uiRecved = 0;
if (fd < 0 || msg == NULL || len == NULL)
{
return -1;
}
if (iTimeOut <= 0)
{
iTimeOut = 1000;
}
while (uiRemain > 0)
{
int iRecv = 0;
iRet = check_recv (fd, iTimeOut / 1000, iTimeOut % 1000);
if (iRet == 0)
{
return 0;
}
else if (iRet < 0)
{
LOGRMB (RMB_LOG_ERROR, "check_recv error, error=%d\n", errno);
return -2;
}
iRecv = recv (fd, msg + uiRecved, uiRemain, 0);
if (iRecv < 0 && (errno != EAGAIN))
{
LOGRMB (RMB_LOG_ERROR, "Recv error, fd close() error=%d, iRecv=%d\n",
errno, iRecv);
return -2;
}
if (iRecv == 0)
{
LOGRMB (RMB_LOG_ERROR, "Peer close connect, fd close() errno=%d\n",
errno);
return -2;
}
if (iRet > 0)
{
uiRecved += iRecv;
uiRemain -= iRecv;
}
}
//LOGRMB(RMB_LOG_DEBUG, "Recv version = %s\n", (char *)msg);
if (uiRecved != 4)
{
LOGRMB (RMB_LOG_ERROR, "recv msg version error,%u:%s", uiRecved,
(char *) msg);
return -3;
}
// read msg length
uiRemain = 4;
uiRecved = 0;
while (uiRemain > 0)
{
int iRecv = 0;
iRet = check_recv (fd, iTimeOut / 1000, iTimeOut % 1000);
if (iRet == 0)
{
return 0;
}
else if (iRet < 0)
{
LOGRMB (RMB_LOG_ERROR, "check_recv error, error=%d\n", errno);
return -2;
}
iRecv = recv (fd, msg + uiRecved, uiRemain, 0);
if (iRecv < 0 && (errno != EAGAIN))
{
LOGRMB (RMB_LOG_ERROR, "Recv error, fd close() error=%d, iRecv=%d\n",
errno, iRecv);
return -2;
}
if (iRecv == 0)
{
LOGRMB (RMB_LOG_ERROR, "Peer close connect, fd close() errno=%d\n",
errno);
return -2;
}
if (iRet > 0)
{
uiRecved += iRecv;
uiRemain -= iRecv;
}
}
if (uiRecved != 4)
{
// can't read 4 bytes
LOGRMB (RMB_LOG_ERROR, "recv less than 4, recv %d bytes\n", uiRecved);
return -3;
}
*len = ntohl (*(uint32_t *) msg);
uiRemain = *len - 4;
while (uiRemain > 0)
{
int iRecv = 0;
iRet = check_recv (fd, iTimeOut / 1000, iTimeOut % 1000);
if (iRet == 0)
{
return 0;
}
else if (iRet < 0)
{
LOGRMB (RMB_LOG_ERROR, "check_recv error, error=%d\n", errno);
return -2;
}
iRecv = recv (fd, msg + uiRecved, uiRemain, 0);
if (iRecv < 0 && (errno != EAGAIN))
{
LOGRMB (RMB_LOG_ERROR, "Recv error, fd close() error=%d, iRecv=%d\n",
errno, iRecv);
return -2;
}
if (iRecv == 0)
{
LOGRMB (RMB_LOG_ERROR, "Peer close connect, fd close() errno=%d\n",
errno);
return -2;
}
if (iRet > 0)
{
uiRecved += iRecv;
uiRemain -= iRecv;
}
}
if (uiRecved != *len)
{
// can't recv whole msg
return -3;
}
return uiRecved;
}