int wemq_proxy_load_servers()

in eventmesh-sdks/eventmesh-sdk-c/src/rmb_access_config.c [416:528]


int wemq_proxy_load_servers (const char *url, long timeout)
{
  int ret = 0;
  int tol = 0;
  int i = 0;
  int j = 0;
  if (pRmbStConfig->iWemqUseHttpCfg != 1)
  {
    return 0;
  }

  if (_wemq_proxy_inited == 0)
  {
    _wemq_proxy_inited = 1;
    pthread_mutex_init (&__wemq_proxy_rmutex, NULL);
  }

  int tmp_wemq_proxy_list_num = 0;
  struct StWemqProxy tmp_wemq_proxy_list[WEMQ_PROXY_MAX_SIZE];

  memset (tmp_wemq_proxy_list, 0, sizeof (StWemqProxy) * WEMQ_PROXY_MAX_SIZE);

  //1. 通过HTTP请求配置中心
  ret =
    _wemq_proxy_load_by_http (tmp_wemq_proxy_list, WEMQ_PROXY_MAX_SIZE, url,
                              timeout);
  if (ret <= 0 && (strstr (url, "wemqAccessServer") == NULL))
  {
    LOGRMB (RMB_LOG_WARN, "get wemq proxy departMent ip list failed,url=%s",
            url);
    return -1;
  }
  if (ret <= 0)
  {
    LOGRMB (RMB_LOG_ERROR, "get wemq proxy ip list failed,url=%s", url);
    return -1;
  }
  tmp_wemq_proxy_list_num = ret;

  //2. 排序
  qsort (tmp_wemq_proxy_list, tmp_wemq_proxy_list_num, sizeof (StWemqProxy),
         _wemq_proxy_cmp);

  //3. 权重相加
  for (i = 0; i < tmp_wemq_proxy_list_num; i++)
  {
    tol += tmp_wemq_proxy_list[i].weight;
  }
  _wemq_proxy_weight_tol = tol;
  int equal_flag = 0;

  if (tmp_wemq_proxy_list_num == _wemq_proxy_list_num)
  {
    for (i = 0; i < _wemq_proxy_list_num; i++)
    {
      if (strcmp (_wemq_proxy_list[i].host, tmp_wemq_proxy_list[i].host) != 0
          || _wemq_proxy_list[i].port != tmp_wemq_proxy_list[i].port
          || _wemq_proxy_list[i].weight != tmp_wemq_proxy_list[i].weight)
      {
        equal_flag = 1;
        break;
      }
    }
    //配置中心和本地的配置相同
    if (equal_flag == 0)
      return 0;
  }
  //打印配置中心和本地的配置
  for (i = 0; i < _wemq_proxy_list_num; i++)
  {
    LOGRMB (RMB_LOG_DEBUG,
            "[local address %d: host,%s|port,%u|idc:%s|weight,%u|index,%u|failed_time,%ld|flag,%d",
            i, _wemq_proxy_list[i].host, _wemq_proxy_list[i].port,
            _wemq_proxy_list[i].idc, _wemq_proxy_list[i].weight,
            _wemq_proxy_list[i].index, _wemq_proxy_list[i].failed_time,
            _wemq_proxy_list[i].flag);
  }
  for (j = 0; j < tmp_wemq_proxy_list_num; j++)
  {
    LOGRMB (RMB_LOG_DEBUG,
            "[configcenter address %d: host,%s|port,%u|idc:%s|weight,%u|index,%u|failed_time,%ld|flag,%d",
            j, tmp_wemq_proxy_list[j].host, tmp_wemq_proxy_list[j].port,
            tmp_wemq_proxy_list[j].idc, tmp_wemq_proxy_list[j].weight,
            tmp_wemq_proxy_list[j].index, tmp_wemq_proxy_list[j].failed_time,
            tmp_wemq_proxy_list[j].flag);
  }

  pthread_mutex_lock (&__wemq_proxy_rmutex);
  //比较缓存数据和配置中心获取的数据,保留缓存数据中的flag和failedtime
  for (i = 0; i < _wemq_proxy_list_num; i++)
  {
    if (_wemq_proxy_list[i].failed_time == 0 && _wemq_proxy_list[i].flag == 0)
      continue;
    for (j = 0; j < tmp_wemq_proxy_list_num; j++)
    {
      if (strcmp (_wemq_proxy_list[i].host, tmp_wemq_proxy_list[j].host) == 0
          && _wemq_proxy_list[i].port == tmp_wemq_proxy_list[j].port)
      {
        tmp_wemq_proxy_list[j].failed_time = _wemq_proxy_list[i].failed_time;
        tmp_wemq_proxy_list[j].flag = _wemq_proxy_list[i].flag;
      }
    }
  }
  //更新缓存数据
  memset (_wemq_proxy_list, 0, sizeof (StWemqProxy) * WEMQ_PROXY_MAX_SIZE);
  memcpy (_wemq_proxy_list, tmp_wemq_proxy_list,
          sizeof (StWemqProxy) * tmp_wemq_proxy_list_num);
  _wemq_proxy_list_num = tmp_wemq_proxy_list_num;
  _wemq_proxy_list_used = -1;
  pthread_mutex_unlock (&__wemq_proxy_rmutex);

  return 0;
}