in eventmesh-sdks/eventmesh-sdk-c/src/rmb_pub.c [1088:1324]
int rmb_pub_send_gsl_and_insert_cache (StRmbPub * pStPub, StRmbMsg * pStMsg,
StServiceStatus * pTmpService,
StServiceStatus * hasCachedService)
{
//pthread_mutex_lock(&pStPub->pubMutex);
if (hasCachedService != NULL)
{
//if (hasCachedService->ulGetTimes + pRmbStConfig->iCacheTimeoutTime * 1000 >= pRmbStConfig->ulNowTtime)
if (hasCachedService->ulInvalidTime >= pRmbStConfig->ulNowTtime)
{
//if (hasCachedService->cResult == 1)
if ((hasCachedService->cResult & 0x0F) == 0x01)
{
LOGRMB (RMB_LOG_ERROR, "Cache:[%s-%s-%s-%d] service=NULL",
pTmpService->strServiceId,
pTmpService->strScenarioId,
pTmpService->strTargetOrgId,
(int) pTmpService->cFlagForOrgId);
rmb_errno = RMB_ERROR_GSL_SERVICE_ID_NULL;
return 1;
}
// else if (hasCachedService->cResult == 2)
// {
// LOGRMB(RMB_LOG_ERROR, "Cache:[%s-%s-%s-%d] service error",
// pTmpService->strServiceId,
// pTmpService->strScenarioId,
// pTmpService->strTargetOrgId,
// (int)pTmpService->cFlagForOrgId
// );
// rmb_errno = RMB_ERROR_GSL_SERVICE_ID_ERROR;
// return 2;
// }
else if (hasCachedService->cResult == 0)
{
if (hasCachedService->cRouteFlag == 0
|| hasCachedService->cRouteFlag == 1)
{
return 0;
}
else
{
//LOGRMB(RMB_LOG_DEBUG, "nowTime=%lu,Cache:[%s]", pRmbStConfig->ulNowTtime, rmb_printf_service_status(pStPub, tmpService));
if (strcmp (pStMsg->strTargetDcn, hasCachedService->strTargetDcn) !=
0)
{
LOGRMB (RMB_LOG_INFO, "GSL DCN=%s is diffrent with input DCN=%s",
hasCachedService->strTargetDcn, pStMsg->strTargetDcn);
strncpy (pStMsg->strTargetDcn, hasCachedService->strTargetDcn,
sizeof (pStMsg->strTargetDcn) - 1);
//LOGRMB(RMB_LOG_INFO, "GSL DCN=%s is diffrent with input DCN=%s", hasCachedService->strTargetDcn, pStMsg->strTargetDcn);
}
return 0;
}
}
}
}
else
{
hasCachedService =
bsearch (pTmpService, pRmbStConfig->serviceStatusList,
pRmbStConfig->iCacheServiceNums, sizeof (StServiceStatus),
cmpServiceStatusStr);
if (hasCachedService != NULL)
{
//if (hasCachedService->ulGetTimes + pRmbStConfig->iCacheTimeoutTime * 1000 >= pRmbStConfig->ulNowTtime)
if (hasCachedService->ulInvalidTime >= pRmbStConfig->ulNowTtime)
{
//if (hasCachedService->cResult == 1)
if ((hasCachedService->cResult & 0x0F) == 0x01)
{
LOGRMB (RMB_LOG_ERROR, "Cache:[%s-%s-%s-%d] service=NULL",
pTmpService->strServiceId,
pTmpService->strScenarioId,
pTmpService->strTargetOrgId,
(int) pTmpService->cFlagForOrgId);
rmb_errno = RMB_ERROR_GSL_SERVICE_ID_NULL;
return 1;
}
// else if (hasCachedService->cResult == 2)
// {
// LOGRMB(RMB_LOG_ERROR, "Cache:[%s-%s-%s-%d] service error",
// pTmpService->strServiceId,
// pTmpService->strScenarioId,
// pTmpService->strTargetOrgId,
// (int)pTmpService->cFlagForOrgId
// );
// rmb_errno = RMB_ERROR_GSL_SERVICE_ID_ERROR;
// return 2;
// }
else if (hasCachedService->cResult == 0)
{
if (hasCachedService->cRouteFlag == 0
|| hasCachedService->cRouteFlag == 1)
{
return 0;
}
else
{
//LOGRMB(RMB_LOG_DEBUG, "nowTime=%lu,Cache:[%s]", pRmbStConfig->ulNowTtime, rmb_printf_service_status(pStPub, tmpService));
if (strcmp (pStMsg->strTargetDcn, hasCachedService->strTargetDcn)
!= 0)
{
LOGRMB (RMB_LOG_INFO,
"GSL DCN=%s is diffrent with input DCN=%s",
hasCachedService->strTargetDcn, pStMsg->strTargetDcn);
strncpy (pStMsg->strTargetDcn, hasCachedService->strTargetDcn,
sizeof (pStMsg->strTargetDcn) - 1);
}
return 0;
}
}
}
}
}
int iRet =
rmb_pub_send_gsl (pStPub, pTmpService, pStMsg->sysHeader.cBizSeqNo,
pStMsg->sysHeader.cConsumerSeqNo);
if (iRet == 2 || iRet == 3)
{
pTmpService->ulGetTimes = pRmbStConfig->ulNowTtime;
if (hasCachedService != NULL)
{
hasCachedService->cResult = pTmpService->cResult;
hasCachedService->cRouteFlag = pTmpService->cRouteFlag;
}
return iRet;
}
// if (iRet == 3)
// {
// pTmpService->ulGetTimes = pRmbStConfig->ulNowTtime;
// pTmpService->ulInvalidTime = pRmbStConfig->ulNowTtime + pRmbStConfig->iCacheFailedTimeoutTime * 1000;
// //pthread_mutex_unlock(&pStPub->pubMutex);
// return 3;
// }
if (iRet == 4)
{
pTmpService->ulGetTimes = pRmbStConfig->ulNowTtime;
pTmpService->ulInvalidTime =
pRmbStConfig->ulNowTtime + pRmbStConfig->iCacheFailedTimeoutTime * 1000;
if (hasCachedService != NULL)
{
if ((hasCachedService->cResult == 0)
|| ((hasCachedService->cResult & 0x0F) == 0x01))
{
hasCachedService->ulInvalidTime =
pRmbStConfig->ulNowTtime +
pRmbStConfig->iCacheFailedTimeoutTime * 1000;
}
}
return 4;
}
//cache过期,且在本地存在
if (hasCachedService != NULL)
{
pTmpService->ulGetTimes = pRmbStConfig->ulNowTtime;
hasCachedService->cResult = pTmpService->cResult;
hasCachedService->cRouteFlag = pTmpService->cRouteFlag;
hasCachedService->ulGetTimes = pRmbStConfig->ulNowTtime;
if (hasCachedService->cResult == 0)
{
memcpy (hasCachedService->strTargetDcn, pTmpService->strTargetDcn,
sizeof (hasCachedService->strTargetDcn));
}
if (iRet == 0 || iRet == 1)
{
hasCachedService->ulInvalidTime =
pRmbStConfig->ulNowTtime + pRmbStConfig->iCacheSuccTimeoutTime * 1000;
}
else
{
hasCachedService->ulInvalidTime =
pRmbStConfig->ulNowTtime +
pRmbStConfig->iCacheFailedTimeoutTime * 1000;
}
}
else
{ //本地无cach
//本地无cache,且返回结果为2或3,则不缓存
if (iRet == 2 || iRet == 3 || iRet == 4)
return iRet;
pTmpService->ulGetTimes = pRmbStConfig->ulNowTtime;
if (iRet == 0 || iRet == 1)
{
pTmpService->ulInvalidTime =
pRmbStConfig->ulNowTtime + pRmbStConfig->iCacheSuccTimeoutTime * 1000;
}
else
{
pTmpService->ulInvalidTime =
pRmbStConfig->ulNowTtime +
pRmbStConfig->iCacheFailedTimeoutTime * 1000;
}
//select
StServiceStatus *pTmp = NULL;
if (pRmbStConfig->iCacheServiceNums >= MAX_SERVICE_STATUS_CACHE_NUMS)
{
//find the oldest cache
int i = 0;
//unsigned long ulMinTimestamp = 1 << (sizeof(unsigned long)-1);
unsigned long ulMinTimestamp =
pRmbStConfig->serviceStatusList[0].ulGetTimes;
for (; i < MAX_SERVICE_STATUS_CACHE_NUMS; i++)
{
if (ulMinTimestamp > pRmbStConfig->serviceStatusList[i].ulGetTimes)
{
ulMinTimestamp = pRmbStConfig->serviceStatusList[i].ulGetTimes;
pTmp = &pRmbStConfig->serviceStatusList[i];
}
}
pRmbStConfig->iCacheServiceNums -= 1;
}
else
{
pTmp =
&pRmbStConfig->serviceStatusList[pRmbStConfig->iCacheServiceNums];
}
memcpy (pTmp, pTmpService, sizeof (StServiceStatus));
pRmbStConfig->iCacheServiceNums += 1;
//sort
qsort (pRmbStConfig->serviceStatusList, pRmbStConfig->iCacheServiceNums,
sizeof (StServiceStatus), cmpServiceStatusStr);
}
int i = 0;
for (; i < pRmbStConfig->iCacheServiceNums; i++)
{
LOGRMB (RMB_LOG_INFO, "cache%dth:[%s]", i,
rmb_printf_service_status (pStPub,
&pRmbStConfig->serviceStatusList[i]));
}
//pthread_mutex_unlock(&pStPub->pubMutex);
//return pTmpService->cResult;
return iRet;
}