static ncclResult_t ofi_listen()

in src/nccl_ofi_net.c [1225:1312]


static ncclResult_t ofi_listen(int dev, void *handle, void **listenComm)
{
	ncclResult_t ret = ncclSuccess;
	char ep_name[MAX_EP_ADDR] = {0};
	size_t namelen = sizeof(ep_name);
	fi_addr_t local_ep_addr;
	listenComm_t *lComm = NULL;
	uint64_t tag;
	int num_addrs;

	if (OFI_UNLIKELY(dev < 0 || dev >= ofi_ndevices)) {
		NCCL_OFI_WARN("Incorrect device ID %d provided. Correct values are from 0 to %d",
			      dev, ofi_ndevices - 1);
		ret = ncclSystemError;
		goto exit;
	}

	if (OFI_UNLIKELY(nccl_ofi_component == NULL)) {
		NCCL_OFI_WARN("NCCL OFI component is not initialised.");
		ret = ncclSystemError;
		goto error;
	}

	/*
	 * Create libfabric components for the given NIC, if not
	 * already created, else increase tag ID.
	 */
	pthread_mutex_lock(&nccl_ofi_lock);
	ret = get_nccl_ofi_comp(dev);
	if (ret)
		goto unlock;

	if (nccl_ofi_component[dev]->tag + 1 >=
	    nccl_ofi_component[dev]->max_tag) {
		NCCL_OFI_WARN("Cannot open more connection for device ID %d."
			      " Maximum is %ld",
			      dev, nccl_ofi_component[dev]->max_tag);
		ret = ncclSystemError;
		goto unlock;
	}
	tag = ++nccl_ofi_component[dev]->tag;
	pthread_mutex_unlock(&nccl_ofi_lock);

	/* Build handle */
	ret = fi_getname(&(nccl_ofi_component[dev]->ep->fid), (void *)&ep_name,
			 &namelen);
	if (ret != 0) {
		NCCL_OFI_WARN("Call to fi_getname() failed with RC: %d, ERROR: %s",
			      ret, fi_strerror(-ret));
		ret = ncclSystemError;
		goto error;
	}

	memcpy(handle, ep_name, MAX_EP_ADDR);
	memcpy(handle + MAX_EP_ADDR, &tag, sizeof(tag));

	/* Insert local EP address to AV. This will be used to issue local read operations */
	num_addrs = fi_av_insert(nccl_ofi_component[dev]->av, (void *)ep_name, 1,
				 &local_ep_addr, 0, NULL);
	if (OFI_UNLIKELY(num_addrs != 1)) {	/* Only 1 address should be inserted into the AV */
		NCCL_OFI_WARN("Unable to insert remote address into address vector for device %d. RC: %d",
			      dev, fi_strerror(-ret));
		ret = ncclSystemError;
		goto exit;
	} else {
		ret = ncclSuccess;
	}

	/* Build listenComm */
	lComm = (listenComm_t *)calloc(1, sizeof(listenComm_t));
	lComm->tag = tag;
	lComm->local_ep = nccl_ofi_component[dev]->ep;
	lComm->accepted = false;
	lComm->dev = dev;
	lComm->local_ep_addr = local_ep_addr;

	*listenComm = lComm;

	goto exit;

unlock:
	pthread_mutex_unlock(&nccl_ofi_lock);
error:
	if (lComm)
		free(lComm);
exit:
	return ret;
}