fabtests/common/shared.c (2,735 lines of code) (raw):
/*
* Copyright (c) 2013-2018 Intel Corporation. All rights reserved.
* Copyright (c) 2016 Cray Inc. All rights reserved.
* Copyright (c) 2014-2017, Cisco Systems, Inc. All rights reserved.
*
* This software is available to you under the BSD license below:
*
* Redistribution and use in source and binary forms, with or
* without modification, are permitted provided that the following
* conditions are met:
*
* - Redistributions of source code must retain the above
* copyright notice, this list of conditions and the following
* disclaimer.
*
* - Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following
* disclaimer in the documentation and/or other materials
* provided with the distribution.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
#include <assert.h>
#include <netdb.h>
#include <poll.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <sys/wait.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <rdma/fi_cm.h>
#include <rdma/fi_domain.h>
#include <rdma/fi_errno.h>
#include <rdma/fi_endpoint.h>
#include <rdma/fi_rma.h>
#include <rdma/fi_tagged.h>
#include <rdma/fi_atomic.h>
#include <rdma/fi_collective.h>
#include <shared.h>
struct fi_info *fi_pep, *fi, *hints;
struct fid_fabric *fabric;
struct fid_wait *waitset;
struct fid_domain *domain;
struct fid_poll *pollset;
struct fid_pep *pep;
struct fid_ep *ep, *alias_ep;
struct fid_cq *txcq, *rxcq;
struct fid_cntr *txcntr, *rxcntr;
struct fid_mr *mr;
void *mr_desc = NULL;
struct fid_av *av;
struct fid_eq *eq;
struct fid_mc *mc;
struct fid_mr no_mr;
struct fi_context tx_ctx, rx_ctx;
struct ft_context *tx_ctx_arr = NULL, *rx_ctx_arr = NULL;
uint64_t remote_cq_data = 0;
uint64_t tx_seq, rx_seq, tx_cq_cntr, rx_cq_cntr;
int (*ft_mr_alloc_func)(void);
uint64_t ft_tag = 0;
int ft_parent_proc = 0;
pid_t ft_child_pid = 0;
int ft_socket_pair[2];
fi_addr_t remote_fi_addr = FI_ADDR_UNSPEC;
char *buf, *tx_buf, *rx_buf;
char **tx_mr_bufs = NULL, **rx_mr_bufs = NULL;
size_t buf_size, tx_size, rx_size, tx_mr_size, rx_mr_size;
int rx_fd = -1, tx_fd = -1;
char default_port[8] = "9228";
static char default_oob_port[8] = "3000";
const char *greeting = "Hello from Client!";
char test_name[50] = "custom";
int timeout = -1;
struct timespec start, end;
int listen_sock = -1;
int sock = -1;
int oob_sock = -1;
struct fi_av_attr av_attr = {
.type = FI_AV_MAP,
.count = 1
};
struct fi_eq_attr eq_attr = {
.wait_obj = FI_WAIT_UNSPEC
};
struct fi_cq_attr cq_attr = {
.wait_obj = FI_WAIT_NONE
};
struct fi_cntr_attr cntr_attr = {
.events = FI_CNTR_EVENTS_COMP,
.wait_obj = FI_WAIT_NONE
};
struct fi_rma_iov remote;
struct ft_opts opts;
struct test_size_param test_size[] = {
{ 1 << 0, 0 },
{ 1 << 1, 0 }, { (1 << 1) + (1 << 0), 0 },
{ 1 << 2, 0 }, { (1 << 2) + (1 << 1), 0 },
{ 1 << 3, 0 }, { (1 << 3) + (1 << 2), 0 },
{ 1 << 4, 0 }, { (1 << 4) + (1 << 3), 0 },
{ 1 << 5, 0 }, { (1 << 5) + (1 << 4), 0 },
{ 1 << 6, FT_DEFAULT_SIZE }, { (1 << 6) + (1 << 5), 0 },
{ 1 << 7, 0 }, { (1 << 7) + (1 << 6), 0 },
{ 1 << 8, FT_DEFAULT_SIZE }, { (1 << 8) + (1 << 7), 0 },
{ 1 << 9, 0 }, { (1 << 9) + (1 << 8), 0 },
{ 1 << 10, FT_DEFAULT_SIZE }, { (1 << 10) + (1 << 9), 0 },
{ 1 << 11, 0 }, { (1 << 11) + (1 << 10), 0 },
{ 1 << 12, FT_DEFAULT_SIZE }, { (1 << 12) + (1 << 11), 0 },
{ 1 << 13, 0 }, { (1 << 13) + (1 << 12), 0 },
{ 1 << 14, 0 }, { (1 << 14) + (1 << 13), 0 },
{ 1 << 15, 0 }, { (1 << 15) + (1 << 14), 0 },
{ 1 << 16, FT_DEFAULT_SIZE }, { (1 << 16) + (1 << 15), 0 },
{ 1 << 17, 0 }, { (1 << 17) + (1 << 16), 0 },
{ 1 << 18, 0 }, { (1 << 18) + (1 << 17), 0 },
{ 1 << 19, 0 }, { (1 << 19) + (1 << 18), 0 },
{ 1 << 20, FT_DEFAULT_SIZE }, { (1 << 20) + (1 << 19), 0 },
{ 1 << 21, 0 }, { (1 << 21) + (1 << 20), 0 },
{ 1 << 22, 0 }, { (1 << 22) + (1 << 21), 0 },
{ 1 << 23, 0 },
};
const unsigned int test_cnt = (sizeof test_size / sizeof test_size[0]);
#define INTEG_SEED 7
static const char integ_alphabet[] = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ";
static const int integ_alphabet_length = (sizeof(integ_alphabet)/sizeof(*integ_alphabet)) - 1;
static int ft_poll_fd(int fd, int timeout)
{
struct pollfd fds;
int ret;
fds.fd = fd;
fds.events = POLLIN;
ret = poll(&fds, 1, timeout);
if (ret == -1) {
FT_PRINTERR("poll", -errno);
ret = -errno;
} else if (!ret) {
ret = -FI_EAGAIN;
} else {
ret = 0;
}
return ret;
}
size_t ft_tx_prefix_size(void)
{
return (fi->tx_attr->mode & FI_MSG_PREFIX) ?
fi->ep_attr->msg_prefix_size : 0;
}
size_t ft_rx_prefix_size(void)
{
return (fi->rx_attr->mode & FI_MSG_PREFIX) ?
fi->ep_attr->msg_prefix_size : 0;
}
int ft_check_opts(uint64_t flags)
{
return (opts.options & flags) == flags;
}
static void ft_cq_set_wait_attr(void)
{
switch (opts.comp_method) {
case FT_COMP_SREAD:
cq_attr.wait_obj = FI_WAIT_UNSPEC;
cq_attr.wait_cond = FI_CQ_COND_NONE;
break;
case FT_COMP_WAITSET:
assert(waitset);
cq_attr.wait_obj = FI_WAIT_SET;
cq_attr.wait_cond = FI_CQ_COND_NONE;
cq_attr.wait_set = waitset;
break;
case FT_COMP_WAIT_FD:
cq_attr.wait_obj = FI_WAIT_FD;
cq_attr.wait_cond = FI_CQ_COND_NONE;
break;
case FT_COMP_YIELD:
cq_attr.wait_obj = FI_WAIT_YIELD;
cq_attr.wait_cond = FI_CQ_COND_NONE;
break;
default:
cq_attr.wait_obj = FI_WAIT_NONE;
break;
}
}
static void ft_cntr_set_wait_attr(void)
{
switch (opts.comp_method) {
case FT_COMP_SREAD:
cntr_attr.wait_obj = FI_WAIT_UNSPEC;
break;
case FT_COMP_WAITSET:
assert(waitset);
cntr_attr.wait_obj = FI_WAIT_SET;
break;
case FT_COMP_WAIT_FD:
cntr_attr.wait_obj = FI_WAIT_FD;
break;
case FT_COMP_YIELD:
cntr_attr.wait_obj = FI_WAIT_YIELD;
break;
default:
cntr_attr.wait_obj = FI_WAIT_NONE;
break;
}
}
int ft_cntr_open(struct fid_cntr **cntr)
{
ft_cntr_set_wait_attr();
return fi_cntr_open(domain, &cntr_attr, cntr, cntr);
}
static inline int ft_rma_read_target_allowed(uint64_t caps)
{
if (caps & (FI_RMA | FI_ATOMIC)) {
if (caps & FI_REMOTE_READ)
return 1;
return !(caps & (FI_READ | FI_WRITE | FI_REMOTE_WRITE));
}
return 0;
}
static inline int ft_rma_write_target_allowed(uint64_t caps)
{
if (caps & (FI_RMA | FI_ATOMIC)) {
if (caps & FI_REMOTE_WRITE)
return 1;
return !(caps & (FI_READ | FI_WRITE | FI_REMOTE_WRITE));
}
return 0;
}
static inline int ft_check_mr_local_flag(struct fi_info *info)
{
return ((info->mode & FI_LOCAL_MR) ||
(info->domain_attr->mr_mode & FI_MR_LOCAL));
}
uint64_t ft_info_to_mr_access(struct fi_info *info)
{
uint64_t mr_access = 0;
if (ft_check_mr_local_flag(info)) {
if (info->caps & (FI_MSG | FI_TAGGED)) {
if (info->caps & FT_MSG_MR_ACCESS) {
mr_access |= info->caps & FT_MSG_MR_ACCESS;
} else {
mr_access |= FT_MSG_MR_ACCESS;
}
}
if (info->caps & (FI_RMA | FI_ATOMIC)) {
if (info->caps & FT_RMA_MR_ACCESS) {
mr_access |= info->caps & FT_RMA_MR_ACCESS;
} else {
mr_access |= FT_RMA_MR_ACCESS;
}
}
} else {
if (info->caps & (FI_RMA | FI_ATOMIC)) {
if (ft_rma_read_target_allowed(info->caps)) {
mr_access |= FI_REMOTE_READ;
}
if (ft_rma_write_target_allowed(info->caps)) {
mr_access |= FI_REMOTE_WRITE;
}
}
}
return mr_access;
}
#define bit_isset(x, i) (x >> i & 1)
#define for_each_bit(x, i) for (i = 0; i < (8 * sizeof(x)); i++)
static inline int bit_set_count(uint64_t val)
{
int cnt = 0;
while (val) {
cnt++;
val &= val - 1;
}
return cnt;
}
int ft_alloc_bit_combo(uint64_t fixed, uint64_t opt,
uint64_t **combos, int *len)
{
uint64_t *flags;
int i, num_flags;
uint64_t index;
int ret;
num_flags = bit_set_count(opt) + 1;
flags = calloc(num_flags, sizeof(fixed));
if (!flags) {
perror("calloc");
return -FI_ENOMEM;
}
*len = 1 << (num_flags - 1);
*combos = calloc(*len, sizeof(fixed));
if (!(*combos)) {
perror("calloc");
ret = -FI_ENOMEM;
goto clean;
}
num_flags = 0;
for_each_bit(opt, i) {
if (bit_isset(opt, i))
flags[num_flags++] = 1ULL << i;
}
for (index = 0; index < (*len); index++) {
(*combos)[index] = fixed;
for_each_bit(index, i) {
if (bit_isset(index, i))
(*combos)[index] |= flags[i];
}
}
ret = FI_SUCCESS;
clean:
free(flags);
return ret;
}
void ft_free_bit_combo(uint64_t *combo)
{
free(combo);
}
static int ft_alloc_ctx_array(struct ft_context **mr_array, char ***mr_bufs,
char *default_buf, size_t mr_size,
uint64_t start_key)
{
int i, ret;
uint64_t access = ft_info_to_mr_access(fi);
struct ft_context *context;
*mr_array = calloc(opts.window_size, sizeof(**mr_array));
if (!*mr_array)
return -FI_ENOMEM;
if (opts.options & FT_OPT_ALLOC_MULT_MR) {
*mr_bufs = calloc(opts.window_size, sizeof(**mr_bufs));
if (!mr_bufs)
return -FI_ENOMEM;
}
for (i = 0; i < opts.window_size; i++) {
context = &(*mr_array)[i];
if (!(opts.options & FT_OPT_ALLOC_MULT_MR)) {
context->buf = default_buf + mr_size * i;
continue;
}
(*mr_bufs)[i] = calloc(1, mr_size);
context->buf = (*mr_bufs)[i];
if (((fi->domain_attr->mr_mode & FI_MR_LOCAL) ||
(fi->caps & (FI_RMA | FI_ATOMIC)))) {
ret = fi_mr_reg(domain, context->buf,
mr_size, access, 0,
start_key + i, 0,
&context->mr, NULL);
if (ret)
return ret;
context->desc = fi_mr_desc(context->mr);
} else {
context->mr = NULL;
context->desc = NULL;
}
}
return 0;
}
static void ft_set_tx_rx_sizes(size_t *set_tx, size_t *set_rx)
{
*set_tx = opts.options & FT_OPT_SIZE ?
opts.transfer_size : test_size[TEST_CNT - 1].size;
if (*set_tx > fi->ep_attr->max_msg_size)
*set_tx = fi->ep_attr->max_msg_size;
*set_rx = *set_tx + ft_rx_prefix_size();
*set_tx += ft_tx_prefix_size();
}
/*
* Include FI_MSG_PREFIX space in the allocated buffer, and ensure that the
* buffer is large enough for a control message used to exchange addressing
* data.
*/
static int ft_alloc_msgs(void)
{
int ret;
long alignment = 1;
if (ft_check_opts(FT_OPT_SKIP_MSG_ALLOC))
return 0;
if (opts.options & FT_OPT_ALLOC_MULT_MR) {
ft_set_tx_rx_sizes(&tx_mr_size, &rx_mr_size);
rx_size = FT_MAX_CTRL_MSG + ft_rx_prefix_size();
tx_size = FT_MAX_CTRL_MSG + ft_tx_prefix_size();
buf_size = rx_size + tx_size;
} else {
ft_set_tx_rx_sizes(&tx_size, &rx_size);
tx_mr_size = 0;
rx_mr_size = 0;
buf_size = MAX(tx_size, FT_MAX_CTRL_MSG) * opts.window_size +
MAX(rx_size, FT_MAX_CTRL_MSG) * opts.window_size;
}
if (opts.options & FT_OPT_ALIGN) {
alignment = sysconf(_SC_PAGESIZE);
if (alignment < 0)
return -errno;
buf_size += alignment;
ret = posix_memalign((void **) &buf, (size_t) alignment,
buf_size);
if (ret) {
FT_PRINTERR("posix_memalign", ret);
return ret;
}
} else {
buf = malloc(buf_size);
if (!buf) {
perror("malloc");
return -FI_ENOMEM;
}
}
memset(buf, 0, buf_size);
rx_buf = buf;
if (opts.options & FT_OPT_ALLOC_MULT_MR)
tx_buf = (char *) buf + MAX(rx_size, FT_MAX_CTRL_MSG);
else
tx_buf = (char *) buf + MAX(rx_size, FT_MAX_CTRL_MSG) * opts.window_size;
remote_cq_data = ft_init_cq_data(fi);
if (!ft_mr_alloc_func && !ft_check_opts(FT_OPT_SKIP_REG_MR) &&
((fi->domain_attr->mr_mode & FI_MR_LOCAL) ||
(fi->caps & (FI_RMA | FI_ATOMIC)))) {
ret = fi_mr_reg(domain, buf, buf_size, ft_info_to_mr_access(fi),
0, FT_MR_KEY, 0, &mr, NULL);
if (ret) {
FT_PRINTERR("fi_mr_reg", ret);
return ret;
}
mr_desc = ft_check_mr_local_flag(fi) ? fi_mr_desc(mr) : NULL;
} else {
if (ft_mr_alloc_func) {
assert(!ft_check_opts(FT_OPT_SKIP_REG_MR));
ret = ft_mr_alloc_func();
if (ret)
return ret;
}
mr = &no_mr;
}
ret = ft_alloc_ctx_array(&tx_ctx_arr, &tx_mr_bufs, tx_buf,
tx_mr_size, FT_TX_MR_KEY);
if (ret)
return -FI_ENOMEM;
ret = ft_alloc_ctx_array(&rx_ctx_arr, &rx_mr_bufs, rx_buf,
rx_mr_size, FT_RX_MR_KEY);
if (ret)
return -FI_ENOMEM;
return 0;
}
int ft_open_fabric_res(void)
{
int ret;
ret = fi_fabric(fi->fabric_attr, &fabric, NULL);
if (ret) {
FT_PRINTERR("fi_fabric", ret);
return ret;
}
ret = fi_eq_open(fabric, &eq_attr, &eq, NULL);
if (ret) {
FT_PRINTERR("fi_eq_open", ret);
return ret;
}
ret = fi_domain(fabric, fi, &domain, NULL);
if (ret) {
FT_PRINTERR("fi_domain", ret);
return ret;
}
return 0;
}
int ft_alloc_ep_res(struct fi_info *fi)
{
int ret;
ret = ft_alloc_msgs();
if (ret)
return ret;
if (cq_attr.format == FI_CQ_FORMAT_UNSPEC) {
if (fi->caps & FI_TAGGED)
cq_attr.format = FI_CQ_FORMAT_TAGGED;
else
cq_attr.format = FI_CQ_FORMAT_CONTEXT;
}
if (opts.options & FT_OPT_CQ_SHARED) {
ft_cq_set_wait_attr();
cq_attr.size = 0;
if (opts.tx_cq_size)
cq_attr.size += opts.tx_cq_size;
else
cq_attr.size += fi->tx_attr->size;
if (opts.rx_cq_size)
cq_attr.size += opts.rx_cq_size;
else
cq_attr.size += fi->rx_attr->size;
ret = fi_cq_open(domain, &cq_attr, &txcq, &txcq);
if (ret) {
FT_PRINTERR("fi_cq_open", ret);
return ret;
}
rxcq = txcq;
}
if (!(opts.options & FT_OPT_CQ_SHARED)) {
ft_cq_set_wait_attr();
if (opts.tx_cq_size)
cq_attr.size = opts.tx_cq_size;
else
cq_attr.size = fi->tx_attr->size;
ret = fi_cq_open(domain, &cq_attr, &txcq, &txcq);
if (ret) {
FT_PRINTERR("fi_cq_open", ret);
return ret;
}
}
if (opts.options & FT_OPT_TX_CNTR) {
ret = ft_cntr_open(&txcntr);
if (ret) {
FT_PRINTERR("fi_cntr_open", ret);
return ret;
}
}
if (!(opts.options & FT_OPT_CQ_SHARED)) {
ft_cq_set_wait_attr();
if (opts.rx_cq_size)
cq_attr.size = opts.rx_cq_size;
else
cq_attr.size = fi->rx_attr->size;
ret = fi_cq_open(domain, &cq_attr, &rxcq, &rxcq);
if (ret) {
FT_PRINTERR("fi_cq_open", ret);
return ret;
}
}
if (opts.options & FT_OPT_RX_CNTR) {
ret = ft_cntr_open(&rxcntr);
if (ret) {
FT_PRINTERR("fi_cntr_open", ret);
return ret;
}
}
if (fi->ep_attr->type == FI_EP_RDM || fi->ep_attr->type == FI_EP_DGRAM) {
if (fi->domain_attr->av_type != FI_AV_UNSPEC)
av_attr.type = fi->domain_attr->av_type;
if (opts.av_name) {
av_attr.name = opts.av_name;
}
av_attr.count = opts.av_size;
ret = fi_av_open(domain, &av_attr, &av, NULL);
if (ret) {
FT_PRINTERR("fi_av_open", ret);
return ret;
}
}
return 0;
}
int ft_alloc_active_res(struct fi_info *fi)
{
int ret;
ret = ft_alloc_ep_res(fi);
if (ret)
return ret;
ret = fi_endpoint(domain, fi, &ep, NULL);
if (ret) {
FT_PRINTERR("fi_endpoint", ret);
return ret;
}
return 0;
}
static void ft_init(void)
{
tx_seq = 0;
rx_seq = 0;
tx_cq_cntr = 0;
rx_cq_cntr = 0;
}
int ft_init_oob(void)
{
int ret, op, err;
struct addrinfo *ai = NULL;
if (!(opts.options & FT_OPT_OOB_CTRL) || oob_sock != -1)
return 0;
if (!opts.oob_port)
opts.oob_port = default_oob_port;
if (!opts.dst_addr) {
ret = ft_sock_listen(opts.src_addr, opts.oob_port);
if (ret)
return ret;
oob_sock = accept(listen_sock, NULL, 0);
if (oob_sock < 0) {
perror("accept");
ret = oob_sock;
return ret;
}
close(listen_sock);
} else {
ret = getaddrinfo(opts.dst_addr, opts.oob_port, NULL, &ai);
if (ret) {
perror("getaddrinfo");
return ret;
}
oob_sock = socket(ai->ai_family, SOCK_STREAM, 0);
if (oob_sock < 0) {
perror("socket");
ret = oob_sock;
goto free;
}
ret = connect(oob_sock, ai->ai_addr, ai->ai_addrlen);
if (ret) {
perror("connect");
close(oob_sock);
goto free;
}
sleep(1);
}
op = 1;
err = setsockopt(oob_sock, IPPROTO_TCP, TCP_NODELAY,
(void *) &op, sizeof(op));
if (err)
perror("setsockopt"); /* non-fatal error */
free:
if (ai)
freeaddrinfo(ai);
return ret;
}
int ft_accept_next_client() {
int ret;
if (!ft_check_opts(FT_OPT_SKIP_MSG_ALLOC) && (fi->caps & (FI_MSG | FI_TAGGED))) {
/* Initial receive will get remote address for unconnected EPs */
ret = ft_post_rx(ep, MAX(rx_size, FT_MAX_CTRL_MSG), &rx_ctx);
if (ret)
return ret;
}
return ft_init_av();
}
int ft_getinfo(struct fi_info *hints, struct fi_info **info)
{
char *node, *service;
uint64_t flags = 0;
int ret;
ret = ft_read_addr_opts(&node, &service, hints, &flags, &opts);
if (ret)
return ret;
if (!hints->ep_attr->type)
hints->ep_attr->type = FI_EP_RDM;
ret = fi_getinfo(FT_FIVERSION, node, service, flags, hints, info);
if (ret) {
FT_PRINTERR("fi_getinfo", ret);
return ret;
}
if (!ft_check_prefix_forced(*info, &opts)) {
FT_ERR("Provider disabled requested prefix mode.");
return -FI_ENODATA;
}
return 0;
}
int ft_init_fabric_cm(void)
{
int ret;
if (!opts.dst_addr) {
ret = ft_start_server();
if (ret)
return ret;
}
ret = opts.dst_addr ? ft_client_connect() : ft_server_connect();
return ret;
}
int ft_start_server(void)
{
int ret;
ft_init();
ret = ft_init_oob();
if (ret)
return ret;
ret = ft_getinfo(hints, &fi_pep);
if (ret)
return ret;
ret = fi_fabric(fi_pep->fabric_attr, &fabric, NULL);
if (ret) {
FT_PRINTERR("fi_fabric", ret);
return ret;
}
ret = fi_eq_open(fabric, &eq_attr, &eq, NULL);
if (ret) {
FT_PRINTERR("fi_eq_open", ret);
return ret;
}
ret = fi_passive_ep(fabric, fi_pep, &pep, NULL);
if (ret) {
FT_PRINTERR("fi_passive_ep", ret);
return ret;
}
ret = fi_pep_bind(pep, &eq->fid, 0);
if (ret) {
FT_PRINTERR("fi_pep_bind", ret);
return ret;
}
ret = fi_listen(pep);
if (ret) {
FT_PRINTERR("fi_listen", ret);
return ret;
}
return 0;
}
int ft_complete_connect(struct fid_ep *ep, struct fid_eq *eq)
{
struct fi_eq_cm_entry entry;
uint32_t event;
ssize_t rd;
int ret;
rd = fi_eq_sread(eq, &event, &entry, sizeof(entry), -1, 0);
if (rd != sizeof(entry)) {
FT_PROCESS_EQ_ERR(rd, eq, "fi_eq_sread", "accept");
ret = (int) rd;
return ret;
}
if (event != FI_CONNECTED || entry.fid != &ep->fid) {
fprintf(stderr, "Unexpected CM event %d fid %p (ep %p)\n",
event, entry.fid, ep);
ret = -FI_EOTHER;
return ret;
}
return 0;
}
int ft_retrieve_conn_req(struct fid_eq *eq, struct fi_info **fi)
{
struct fi_eq_cm_entry entry;
uint32_t event;
ssize_t rd;
int ret;
rd = fi_eq_sread(eq, &event, &entry, sizeof(entry), -1, 0);
if (rd != sizeof entry) {
FT_PROCESS_EQ_ERR(rd, eq, "fi_eq_sread", "listen");
return (int) rd;
}
*fi = entry.info;
if (event != FI_CONNREQ) {
fprintf(stderr, "Unexpected CM event %d\n", event);
ret = -FI_EOTHER;
return ret;
}
return 0;
}
int ft_accept_connection(struct fid_ep *ep, struct fid_eq *eq)
{
int ret;
ret = fi_accept(ep, NULL, 0);
if (ret) {
FT_PRINTERR("fi_accept", ret);
return ret;
}
ret = ft_complete_connect(ep, eq);
if (ret)
return ret;
return 0;
}
int ft_server_connect(void)
{
int ret;
ret = ft_retrieve_conn_req(eq, &fi);
if (ret)
goto err;
ret = fi_domain(fabric, fi, &domain, NULL);
if (ret) {
FT_PRINTERR("fi_domain", ret);
goto err;
}
ret = ft_alloc_active_res(fi);
if (ret)
goto err;
ret = ft_enable_ep_recv();
if (ret)
goto err;
ret = ft_accept_connection(ep, eq);
if (ret)
goto err;
return 0;
err:
if (fi)
fi_reject(pep, fi->handle, NULL, 0);
return ret;
}
int ft_connect_ep(struct fid_ep *ep,
struct fid_eq *eq, fi_addr_t *remote_addr)
{
int ret;
ret = fi_connect(ep, remote_addr, NULL, 0);
if (ret) {
FT_PRINTERR("fi_connect", ret);
return ret;
}
ret = ft_complete_connect(ep, eq);
if (ret)
return ret;
return 0;
}
int ft_client_connect(void)
{
int ret;
ft_init();
ret = ft_init_oob();
if (ret)
return ret;
ret = ft_getinfo(hints, &fi);
if (ret)
return ret;
ret = ft_open_fabric_res();
if (ret)
return ret;
ret = ft_alloc_active_res(fi);
if (ret)
return ret;
ret = ft_enable_ep_recv();
if (ret)
return ret;
ret = ft_connect_ep(ep, eq, fi->dest_addr);
if (ret)
return ret;
return 0;
}
int ft_init_fabric(void)
{
int ret;
ft_init();
ret = ft_init_oob();
if (ret)
return ret;
ret = ft_getinfo(hints, &fi);
if (ret)
return ret;
ret = ft_open_fabric_res();
if (ret)
return ret;
ret = ft_alloc_active_res(fi);
if (ret)
return ret;
ret = ft_enable_ep_recv();
if (ret)
return ret;
ret = ft_init_av();
if (ret)
return ret;
return 0;
}
int ft_get_cq_fd(struct fid_cq *cq, int *fd)
{
int ret = FI_SUCCESS;
if (cq && opts.comp_method == FT_COMP_WAIT_FD) {
ret = fi_control(&cq->fid, FI_GETWAIT, fd);
if (ret)
FT_PRINTERR("fi_control(FI_GETWAIT)", ret);
}
return ret;
}
int ft_init_alias_ep(uint64_t flags)
{
int ret;
ret = fi_ep_alias(ep, &alias_ep, flags);
if (ret) {
FT_PRINTERR("fi_ep_alias", ret);
return ret;
}
return 0;
}
int ft_enable_ep(struct fid_ep *ep, struct fid_eq *eq, struct fid_av *av,
struct fid_cq *txcq, struct fid_cq *rxcq,
struct fid_cntr *txcntr, struct fid_cntr *rxcntr)
{
uint64_t flags;
int ret;
if (fi->ep_attr->type == FI_EP_MSG || fi->caps & FI_MULTICAST ||
fi->caps & FI_COLLECTIVE)
FT_EP_BIND(ep, eq, 0);
FT_EP_BIND(ep, av, 0);
flags = FI_TRANSMIT;
if (!(opts.options & FT_OPT_TX_CQ))
flags |= FI_SELECTIVE_COMPLETION;
FT_EP_BIND(ep, txcq, flags);
flags = FI_RECV;
if (!(opts.options & FT_OPT_RX_CQ))
flags |= FI_SELECTIVE_COMPLETION;
FT_EP_BIND(ep, rxcq, flags);
ret = ft_get_cq_fd(txcq, &tx_fd);
if (ret)
return ret;
ret = ft_get_cq_fd(rxcq, &rx_fd);
if (ret)
return ret;
/* TODO: use control structure to select counter bindings explicitly */
if (opts.options & FT_OPT_TX_CQ)
flags = 0;
else
flags = FI_SEND;
if (hints->caps & (FI_WRITE | FI_READ))
flags |= hints->caps & (FI_WRITE | FI_READ);
else if (hints->caps & FI_RMA)
flags |= FI_WRITE | FI_READ;
FT_EP_BIND(ep, txcntr, flags);
if (opts.options & FT_OPT_RX_CQ)
flags = 0;
else
flags = FI_RECV;
if (hints->caps & (FI_REMOTE_WRITE | FI_REMOTE_READ))
flags |= hints->caps & (FI_REMOTE_WRITE | FI_REMOTE_READ);
else if (hints->caps & FI_RMA)
flags |= FI_REMOTE_WRITE | FI_REMOTE_READ;
FT_EP_BIND(ep, rxcntr, flags);
ret = fi_enable(ep);
if (ret) {
FT_PRINTERR("fi_enable", ret);
return ret;
}
return 0;
}
int ft_enable_ep_recv(void)
{
int ret;
ret = ft_enable_ep(ep, eq, av, txcq, rxcq, txcntr, rxcntr);
if (ret)
return ret;
if (!ft_check_opts(FT_OPT_SKIP_MSG_ALLOC) &&
(fi->caps & (FI_MSG | FI_TAGGED))) {
/* Initial receive will get remote address for unconnected EPs */
ret = ft_post_rx(ep, MAX(rx_size, FT_MAX_CTRL_MSG), &rx_ctx);
if (ret)
return ret;
}
return 0;
}
int ft_join_mc(void)
{
struct fi_eq_entry entry;
uint32_t event;
ssize_t rd;
int ret;
ret = fi_join(ep, fi->dest_addr, 0, &mc, ep->fid.context);
if (ret) {
FT_PRINTERR("fi_join", ret);
return ret;
}
rd = fi_eq_sread(eq, &event, &entry, sizeof entry, -1, 0);
if (rd != sizeof entry) {
FT_PROCESS_EQ_ERR(rd, eq, "fi_eq_sread", "join");
ret = (int) rd;
return ret;
}
if (event != FI_JOIN_COMPLETE || entry.fid != &mc->fid) {
fprintf(stderr, "Unexpected join event %d fid %p (mc %p)\n",
event, entry.fid, ep);
ret = -FI_EOTHER;
return ret;
}
return 0;
}
int ft_av_insert(struct fid_av *av, void *addr, size_t count, fi_addr_t *fi_addr,
uint64_t flags, void *context)
{
int ret;
ret = fi_av_insert(av, addr, count, fi_addr, flags, context);
if (ret < 0) {
FT_PRINTERR("fi_av_insert", ret);
return ret;
} else if (ret != count) {
FT_ERR("fi_av_insert: number of addresses inserted = %d;"
" number of addresses given = %zd\n", ret, count);
return -EXIT_FAILURE;
}
return 0;
}
int ft_init_av(void)
{
return ft_init_av_dst_addr(av, ep, &remote_fi_addr);
}
int ft_exchange_addresses_oob(struct fid_av *av_ptr, struct fid_ep *ep_ptr,
fi_addr_t *remote_addr)
{
char buf[FT_MAX_CTRL_MSG];
int ret;
size_t addrlen = FT_MAX_CTRL_MSG;
ret = fi_getname(&ep_ptr->fid, buf, &addrlen);
if (ret) {
FT_PRINTERR("fi_getname", ret);
return ret;
}
ret = ft_sock_send(oob_sock, buf, FT_MAX_CTRL_MSG);
if (ret)
return ret;
ret = ft_sock_recv(oob_sock, buf, FT_MAX_CTRL_MSG);
if (ret)
return ret;
ret = ft_av_insert(av_ptr, buf, 1, remote_addr, 0, NULL);
if (ret)
return ret;
return 0;
}
/* TODO: retry send for unreliable endpoints */
int ft_init_av_dst_addr(struct fid_av *av_ptr, struct fid_ep *ep_ptr,
fi_addr_t *remote_addr)
{
size_t addrlen;
int ret;
if (opts.options & FT_OPT_OOB_ADDR_EXCH) {
ret = ft_exchange_addresses_oob(av_ptr, ep_ptr, remote_addr);
if (ret)
return ret;
else
goto set_rx_seq_close;
}
if (opts.dst_addr) {
ret = ft_av_insert(av_ptr, fi->dest_addr, 1, remote_addr, 0, NULL);
if (ret)
return ret;
addrlen = FT_MAX_CTRL_MSG;
ret = fi_getname(&ep_ptr->fid, (char *) tx_buf + ft_tx_prefix_size(),
&addrlen);
if (ret) {
FT_PRINTERR("fi_getname", ret);
return ret;
}
ret = (int) ft_tx(ep, *remote_addr, addrlen, &tx_ctx);
if (ret)
return ret;
ret = ft_rx(ep, 1);
if (ret)
return ret;
} else {
ret = (int) ft_rx(ep, FT_MAX_CTRL_MSG);
if (ret)
return ret;
/* Test passing NULL fi_addr on one of the sides (server) if
* AV type is FI_AV_TABLE */
ret = ft_av_insert(av_ptr, (char *) rx_buf + ft_rx_prefix_size(),
1, ((fi->domain_attr->av_type == FI_AV_TABLE) ?
NULL : remote_addr), 0, NULL);
if (ret)
return ret;
if (fi->domain_attr->av_type == FI_AV_TABLE)
*remote_addr = 0;
ret = (int) ft_tx(ep, *remote_addr, 1, &tx_ctx);
if (ret)
return ret;
}
set_rx_seq_close:
/*
* For a test which does not have MSG or TAGGED
* capabilities, but has RMA/Atomics and uses the OOB sync.
* If no recv is going to be posted,
* then the rx_seq needs to be incremented to wait on the first RMA/Atomic
* completion.
*/
if (!(fi->caps & FI_MSG) && !(fi->caps & FI_TAGGED) && opts.oob_port)
rx_seq++;
return 0;
}
/* TODO: retry send for unreliable endpoints */
int ft_init_av_addr(struct fid_av *av_ptr, struct fid_ep *ep_ptr,
fi_addr_t *remote_addr)
{
size_t addrlen;
int ret;
if (opts.options & FT_OPT_OOB_ADDR_EXCH)
return ft_exchange_addresses_oob(av_ptr, ep_ptr, remote_addr);
if (opts.dst_addr) {
addrlen = FT_MAX_CTRL_MSG;
ret = fi_getname(&ep_ptr->fid, (char *) tx_buf + ft_tx_prefix_size(),
&addrlen);
if (ret) {
FT_PRINTERR("fi_getname", ret);
return ret;
}
ret = (int) ft_tx(ep, remote_fi_addr, addrlen, &tx_ctx);
if (ret)
return ret;
ret = (int) ft_rx(ep, FT_MAX_CTRL_MSG);
if (ret)
return ret;
ret = ft_av_insert(av_ptr, (char *) rx_buf + ft_rx_prefix_size(),
1, remote_addr, 0, NULL);
if (ret)
return ret;
} else {
ret = (int) ft_rx(ep, FT_MAX_CTRL_MSG);
if (ret)
return ret;
ret = ft_av_insert(av_ptr, (char *) rx_buf + ft_rx_prefix_size(),
1, remote_addr, 0, NULL);
if (ret)
return ret;
addrlen = FT_MAX_CTRL_MSG;
ret = fi_getname(&ep_ptr->fid,
(char *) tx_buf + ft_tx_prefix_size(),
&addrlen);
if (ret) {
FT_PRINTERR("fi_getname", ret);
return ret;
}
ret = (int) ft_tx(ep, remote_fi_addr, addrlen, &tx_ctx);
if (ret)
return ret;
}
return 0;
}
int ft_exchange_raw_keys(struct fi_rma_iov *peer_iov)
{
struct fi_rma_iov *rma_iov;
size_t key_size;
size_t len;
uint64_t addr;
int ret;
/* Get key size */
key_size = 0;
ret = fi_mr_raw_attr(mr, &addr, NULL, &key_size, 0);
if (ret != -FI_ETOOSMALL) {
return ret;
}
len = sizeof(*rma_iov) + key_size - sizeof(rma_iov->key);
/* TODO: make sure this fits in tx_buf and rx_buf */
if (opts.dst_addr) {
rma_iov = (struct fi_rma_iov *) (tx_buf + ft_tx_prefix_size());
if ((fi->domain_attr->mr_mode == FI_MR_BASIC) ||
(fi->domain_attr->mr_mode & FI_MR_VIRT_ADDR)) {
rma_iov->addr = (uintptr_t) rx_buf + ft_rx_prefix_size();
} else {
rma_iov->addr = 0;
}
/* Get raw attributes */
ret = fi_mr_raw_attr(mr, &addr, (uint8_t *) &rma_iov->key,
&key_size, 0);
if (ret)
return ret;
ret = ft_tx(ep, remote_fi_addr, len, &tx_ctx);
if (ret)
return ret;
ret = ft_get_rx_comp(rx_seq);
if (ret)
return ret;
rma_iov = (struct fi_rma_iov *) (rx_buf + ft_rx_prefix_size());
peer_iov->addr = rma_iov->addr;
peer_iov->len = rma_iov->len;
/* Map remote mr raw locally */
ret = fi_mr_map_raw(domain, rma_iov->addr,
(uint8_t *) &rma_iov->key, key_size,
&peer_iov->key, 0);
if (ret)
return ret;
ret = ft_post_rx(ep, rx_size, &rx_ctx);
} else {
ret = ft_get_rx_comp(rx_seq);
if (ret)
return ret;
rma_iov = (struct fi_rma_iov *) (rx_buf + ft_rx_prefix_size());
peer_iov->addr = rma_iov->addr;
peer_iov->len = rma_iov->len;
/* Map remote mr raw locally */
ret = fi_mr_map_raw(domain, rma_iov->addr,
(uint8_t *) &rma_iov->key, key_size,
&peer_iov->key, 0);
if (ret)
return ret;
ret = ft_post_rx(ep, rx_size, &rx_ctx);
if (ret)
return ret;
rma_iov = (struct fi_rma_iov *) (tx_buf + ft_tx_prefix_size());
if ((fi->domain_attr->mr_mode == FI_MR_BASIC) ||
(fi->domain_attr->mr_mode & FI_MR_VIRT_ADDR)) {
rma_iov->addr = (uintptr_t) rx_buf + ft_rx_prefix_size();
} else {
rma_iov->addr = 0;
}
/* Get raw attributes */
ret = fi_mr_raw_attr(mr, &addr, (uint8_t *) &rma_iov->key,
&key_size, 0);
if (ret)
return ret;
ret = ft_tx(ep, remote_fi_addr, len, &tx_ctx);
}
return ret;
}
int ft_exchange_keys(struct fi_rma_iov *peer_iov)
{
struct fi_rma_iov *rma_iov;
int ret;
if (fi->domain_attr->mr_mode & FI_MR_RAW)
return ft_exchange_raw_keys(peer_iov);
if (opts.dst_addr) {
rma_iov = (struct fi_rma_iov *) (tx_buf + ft_tx_prefix_size());
if ((fi->domain_attr->mr_mode == FI_MR_BASIC) ||
(fi->domain_attr->mr_mode & FI_MR_VIRT_ADDR)) {
rma_iov->addr = (uintptr_t) rx_buf + ft_rx_prefix_size();
} else {
rma_iov->addr = 0;
}
rma_iov->key = fi_mr_key(mr);
ret = ft_tx(ep, remote_fi_addr, sizeof *rma_iov, &tx_ctx);
if (ret)
return ret;
ret = ft_get_rx_comp(rx_seq);
if (ret)
return ret;
rma_iov = (struct fi_rma_iov *) (rx_buf + ft_rx_prefix_size());
*peer_iov = *rma_iov;
ret = ft_post_rx(ep, rx_size, &rx_ctx);
} else {
ret = ft_get_rx_comp(rx_seq);
if (ret)
return ret;
rma_iov = (struct fi_rma_iov *) (rx_buf + ft_rx_prefix_size());
*peer_iov = *rma_iov;
ret = ft_post_rx(ep, rx_size, &rx_ctx);
if (ret)
return ret;
rma_iov = (struct fi_rma_iov *) (tx_buf + ft_tx_prefix_size());
if ((fi->domain_attr->mr_mode == FI_MR_BASIC) ||
(fi->domain_attr->mr_mode & FI_MR_VIRT_ADDR)) {
rma_iov->addr = (uintptr_t) rx_buf + ft_rx_prefix_size();
} else {
rma_iov->addr = 0;
}
rma_iov->key = fi_mr_key(mr);
ret = ft_tx(ep, remote_fi_addr, sizeof *rma_iov, &tx_ctx);
}
return ret;
}
static void ft_cleanup_mr_array(struct ft_context *ctx_arr, char **mr_bufs)
{
int i;
if (!mr_bufs)
return;
for (i = 0; i < opts.window_size; i++) {
FT_CLOSE_FID(ctx_arr[i].mr);
free(mr_bufs[i]);
}
}
static void ft_close_fids(void)
{
if (mr != &no_mr)
FT_CLOSE_FID(mr);
FT_CLOSE_FID(mc);
FT_CLOSE_FID(alias_ep);
FT_CLOSE_FID(ep);
FT_CLOSE_FID(pep);
if (opts.options & FT_OPT_CQ_SHARED) {
FT_CLOSE_FID(txcq);
} else {
FT_CLOSE_FID(rxcq);
FT_CLOSE_FID(txcq);
}
FT_CLOSE_FID(rxcntr);
FT_CLOSE_FID(txcntr);
FT_CLOSE_FID(pollset);
FT_CLOSE_FID(av);
FT_CLOSE_FID(eq);
FT_CLOSE_FID(domain);
FT_CLOSE_FID(waitset);
FT_CLOSE_FID(fabric);
}
void ft_free_res(void)
{
ft_cleanup_mr_array(tx_ctx_arr, tx_mr_bufs);
ft_cleanup_mr_array(rx_ctx_arr, rx_mr_bufs);
free(tx_ctx_arr);
free(rx_ctx_arr);
tx_ctx_arr = NULL;
rx_ctx_arr = NULL;
ft_close_fids();
if (buf) {
free(buf);
buf = rx_buf = tx_buf = NULL;
buf_size = rx_size = tx_size = tx_mr_size = rx_mr_size = 0;
}
if (fi_pep) {
fi_freeinfo(fi_pep);
fi_pep = NULL;
}
if (fi) {
fi_freeinfo(fi);
fi = NULL;
}
if (hints) {
fi_freeinfo(hints);
hints = NULL;
}
}
static int dupaddr(void **dst_addr, size_t *dst_addrlen,
void *src_addr, size_t src_addrlen)
{
*dst_addr = malloc(src_addrlen);
if (!*dst_addr) {
FT_ERR("address allocation failed");
return EAI_MEMORY;
}
*dst_addrlen = src_addrlen;
memcpy(*dst_addr, src_addr, src_addrlen);
return 0;
}
static int getaddr(char *node, char *service,
struct fi_info *hints, uint64_t flags)
{
int ret;
struct fi_info *fi;
if (!node && !service) {
if (flags & FI_SOURCE) {
hints->src_addr = NULL;
hints->src_addrlen = 0;
} else {
hints->dest_addr = NULL;
hints->dest_addrlen = 0;
}
return 0;
}
ret = fi_getinfo(FT_FIVERSION, node, service, flags, hints, &fi);
if (ret) {
FT_PRINTERR("fi_getinfo", ret);
return ret;
}
hints->addr_format = fi->addr_format;
if (flags & FI_SOURCE) {
ret = dupaddr(&hints->src_addr, &hints->src_addrlen,
fi->src_addr, fi->src_addrlen);
} else {
ret = dupaddr(&hints->dest_addr, &hints->dest_addrlen,
fi->dest_addr, fi->dest_addrlen);
}
fi_freeinfo(fi);
return ret;
}
int ft_getsrcaddr(char *node, char *service, struct fi_info *hints)
{
return getaddr(node, service, hints, FI_SOURCE);
}
int ft_read_addr_opts(char **node, char **service, struct fi_info *hints,
uint64_t *flags, struct ft_opts *opts)
{
int ret;
if (opts->dst_addr && (opts->src_addr || !opts->oob_port)){
if (!opts->dst_port)
opts->dst_port = default_port;
ret = ft_getsrcaddr(opts->src_addr, opts->src_port, hints);
if (ret)
return ret;
*node = opts->dst_addr;
*service = opts->dst_port;
} else {
if (!opts->src_port)
opts->src_port = default_port;
*node = opts->src_addr;
*service = opts->src_port;
*flags = FI_SOURCE;
}
return 0;
}
char *size_str(char str[FT_STR_LEN], long long size)
{
long long base, fraction = 0;
char mag;
memset(str, '\0', FT_STR_LEN);
if (size >= (1 << 30)) {
base = 1 << 30;
mag = 'g';
} else if (size >= (1 << 20)) {
base = 1 << 20;
mag = 'm';
} else if (size >= (1 << 10)) {
base = 1 << 10;
mag = 'k';
} else {
base = 1;
mag = '\0';
}
if (size / base < 10)
fraction = (size % base) * 10 / base;
if (fraction)
snprintf(str, FT_STR_LEN, "%lld.%lld%c", size / base, fraction, mag);
else
snprintf(str, FT_STR_LEN, "%lld%c", size / base, mag);
return str;
}
char *cnt_str(char str[FT_STR_LEN], long long cnt)
{
if (cnt >= 1000000000)
snprintf(str, FT_STR_LEN, "%lldb", cnt / 1000000000);
else if (cnt >= 1000000)
snprintf(str, FT_STR_LEN, "%lldm", cnt / 1000000);
else if (cnt >= 1000)
snprintf(str, FT_STR_LEN, "%lldk", cnt / 1000);
else
snprintf(str, FT_STR_LEN, "%lld", cnt);
return str;
}
int size_to_count(int size)
{
if (size >= (1 << 20))
return (opts.options & FT_OPT_BW) ? 200 : 100;
else if (size >= (1 << 16))
return (opts.options & FT_OPT_BW) ? 2000 : 1000;
else
return (opts.options & FT_OPT_BW) ? 20000: 10000;
}
static const size_t datatype_size_table[] = {
[FI_INT8] = sizeof(int8_t),
[FI_UINT8] = sizeof(uint8_t),
[FI_INT16] = sizeof(int16_t),
[FI_UINT16] = sizeof(uint16_t),
[FI_INT32] = sizeof(int32_t),
[FI_UINT32] = sizeof(uint32_t),
[FI_INT64] = sizeof(int64_t),
[FI_UINT64] = sizeof(uint64_t),
[FI_FLOAT] = sizeof(float),
[FI_DOUBLE] = sizeof(double),
[FI_FLOAT_COMPLEX] = sizeof(OFI_COMPLEX(float)),
[FI_DOUBLE_COMPLEX] = sizeof(OFI_COMPLEX(double)),
[FI_LONG_DOUBLE] = sizeof(long double),
[FI_LONG_DOUBLE_COMPLEX] = sizeof(OFI_COMPLEX(long_double)),
};
size_t datatype_to_size(enum fi_datatype datatype)
{
if (datatype >= FI_DATATYPE_LAST)
return 0;
return datatype_size_table[datatype];
}
void init_test(struct ft_opts *opts, char *test_name, size_t test_name_len)
{
char sstr[FT_STR_LEN];
size_str(sstr, opts->transfer_size);
if (!strcmp(test_name, "custom"))
snprintf(test_name, test_name_len, "%s_lat", sstr);
if (!(opts->options & FT_OPT_ITER))
opts->iterations = size_to_count(opts->transfer_size);
}
static int ft_progress(struct fid_cq *cq, uint64_t total, uint64_t *cq_cntr)
{
struct fi_cq_err_entry comp;
int ret;
ret = fi_cq_read(cq, &comp, 1);
if (ret > 0)
(*cq_cntr)++;
if (ret >= 0 || ret == -FI_EAGAIN)
return 0;
if (ret == -FI_EAVAIL) {
ret = ft_cq_readerr(cq);
(*cq_cntr)++;
} else {
FT_PRINTERR("fi_cq_read/sread", ret);
}
return ret;
}
#define FT_POST(post_fn, progress_fn, cq, seq, cq_cntr, op_str, ...) \
do { \
int timeout_save; \
int ret, rc; \
\
while (1) { \
ret = post_fn(__VA_ARGS__); \
if (!ret) \
break; \
\
if (ret != -FI_EAGAIN) { \
FT_PRINTERR(op_str, ret); \
return ret; \
} \
\
timeout_save = timeout; \
timeout = 0; \
rc = progress_fn(cq, seq, cq_cntr); \
if (rc && rc != -FI_EAGAIN) { \
FT_ERR("Failed to get " op_str " completion"); \
return rc; \
} \
timeout = timeout_save; \
} \
seq++; \
} while (0)
ssize_t ft_post_tx_buf(struct fid_ep *ep, fi_addr_t fi_addr, size_t size,
uint64_t data, void *ctx,
void *op_buf, void *op_mr_desc, uint64_t op_tag)
{
size += ft_tx_prefix_size();
if (hints->caps & FI_TAGGED) {
op_tag = op_tag ? op_tag : tx_seq;
if (data != NO_CQ_DATA) {
FT_POST(fi_tsenddata, ft_progress, txcq, tx_seq,
&tx_cq_cntr, "transmit", ep, op_buf, size,
op_mr_desc, data, fi_addr, op_tag, ctx);
} else {
FT_POST(fi_tsend, ft_progress, txcq, tx_seq,
&tx_cq_cntr, "transmit", ep, op_buf, size,
op_mr_desc, fi_addr, op_tag, ctx);
}
} else {
if (data != NO_CQ_DATA) {
FT_POST(fi_senddata, ft_progress, txcq, tx_seq,
&tx_cq_cntr, "transmit", ep, op_buf, size,
op_mr_desc, data, fi_addr, ctx);
} else {
FT_POST(fi_send, ft_progress, txcq, tx_seq,
&tx_cq_cntr, "transmit", ep, op_buf, size,
op_mr_desc, fi_addr, ctx);
}
}
return 0;
}
ssize_t ft_post_tx(struct fid_ep *ep, fi_addr_t fi_addr, size_t size,
uint64_t data, void *ctx)
{
return ft_post_tx_buf(ep, fi_addr, size, data,
ctx, tx_buf, mr_desc, ft_tag);
}
ssize_t ft_tx(struct fid_ep *ep, fi_addr_t fi_addr, size_t size, void *ctx)
{
ssize_t ret;
if (ft_check_opts(FT_OPT_VERIFY_DATA | FT_OPT_ACTIVE))
ft_fill_buf((char *) tx_buf + ft_tx_prefix_size(), size);
ret = ft_post_tx(ep, fi_addr, size, NO_CQ_DATA, ctx);
if (ret)
return ret;
ret = ft_get_tx_comp(tx_seq);
return ret;
}
ssize_t ft_post_inject(struct fid_ep *ep, fi_addr_t fi_addr, size_t size)
{
if (hints->caps & FI_TAGGED) {
FT_POST(fi_tinject, ft_progress, txcq, tx_seq, &tx_cq_cntr,
"inject", ep, tx_buf, size + ft_tx_prefix_size(),
fi_addr, tx_seq);
} else {
FT_POST(fi_inject, ft_progress, txcq, tx_seq, &tx_cq_cntr,
"inject", ep, tx_buf, size + ft_tx_prefix_size(),
fi_addr);
}
tx_cq_cntr++;
return 0;
}
ssize_t ft_inject(struct fid_ep *ep, fi_addr_t fi_addr, size_t size)
{
ssize_t ret;
if (ft_check_opts(FT_OPT_VERIFY_DATA | FT_OPT_ACTIVE))
ft_fill_buf((char *) tx_buf + ft_tx_prefix_size(), size);
ret = ft_post_inject(ep, fi_addr, size);
if (ret)
return ret;
return ret;
}
ssize_t ft_post_rma(enum ft_rma_opcodes op, struct fid_ep *ep, size_t size,
struct fi_rma_iov *remote, void *context)
{
switch (op) {
case FT_RMA_WRITE:
FT_POST(fi_write, ft_progress, txcq, tx_seq, &tx_cq_cntr,
"fi_write", ep, tx_buf, opts.transfer_size, mr_desc,
remote_fi_addr, remote->addr, remote->key, context);
break;
case FT_RMA_WRITEDATA:
FT_POST(fi_writedata, ft_progress, txcq, tx_seq, &tx_cq_cntr,
"fi_writedata", ep, tx_buf, opts.transfer_size, mr_desc,
remote_cq_data, remote_fi_addr, remote->addr,
remote->key, context);
break;
case FT_RMA_READ:
FT_POST(fi_read, ft_progress, txcq, tx_seq, &tx_cq_cntr,
"fi_read", ep, rx_buf, opts.transfer_size, mr_desc,
remote_fi_addr, remote->addr,remote->key, context);
break;
default:
FT_ERR("Unknown RMA op type\n");
return EXIT_FAILURE;
}
return 0;
}
ssize_t ft_rma(enum ft_rma_opcodes op, struct fid_ep *ep, size_t size,
struct fi_rma_iov *remote, void *context)
{
int ret;
ret = ft_post_rma(op, ep, size, remote, context);
if (ret)
return ret;
if (op == FT_RMA_WRITEDATA) {
if (fi->rx_attr->mode & FI_RX_CQ_DATA) {
ret = ft_rx(ep, 0);
} else {
ret = ft_get_rx_comp(rx_seq);
/* Just increment the seq # instead of posting recv so
* that we wait for remote write completion on the next
* iteration. */
rx_seq++;
}
if (ret)
return ret;
}
ret = ft_get_tx_comp(tx_seq);
if (ret)
return ret;
return 0;
}
ssize_t ft_post_rma_inject(enum ft_rma_opcodes op, struct fid_ep *ep, size_t size,
struct fi_rma_iov *remote)
{
switch (op) {
case FT_RMA_WRITE:
FT_POST(fi_inject_write, ft_progress, txcq, tx_seq, &tx_cq_cntr,
"fi_inject_write", ep, tx_buf, opts.transfer_size,
remote_fi_addr, remote->addr, remote->key);
break;
case FT_RMA_WRITEDATA:
FT_POST(fi_inject_writedata, ft_progress, txcq, tx_seq,
&tx_cq_cntr, "fi_inject_writedata", ep, tx_buf,
opts.transfer_size, remote_cq_data, remote_fi_addr,
remote->addr, remote->key);
break;
default:
FT_ERR("Unknown RMA inject op type\n");
return EXIT_FAILURE;
}
tx_cq_cntr++;
return 0;
}
ssize_t ft_post_atomic(enum ft_atomic_opcodes opcode, struct fid_ep *ep,
void *compare, void *compare_desc, void *result,
void *result_desc, struct fi_rma_iov *remote,
enum fi_datatype datatype, enum fi_op atomic_op,
void *context)
{
size_t size, count;
size = datatype_to_size(datatype);
if (!size) {
FT_ERR("Unknown datatype\n");
return EXIT_FAILURE;
}
count = opts.transfer_size / size;
switch (opcode) {
case FT_ATOMIC_BASE:
FT_POST(fi_atomic, ft_progress, txcq, tx_seq, &tx_cq_cntr,
"fi_atomic", ep, buf, count, mr_desc, remote_fi_addr,
remote->addr, remote->key, datatype, atomic_op, context);
break;
case FT_ATOMIC_FETCH:
FT_POST(fi_fetch_atomic, ft_progress, txcq, tx_seq, &tx_cq_cntr,
"fi_fetch_atomic", ep, buf, count, mr_desc, result,
result_desc, remote_fi_addr, remote->addr, remote->key,
datatype, atomic_op, context);
break;
case FT_ATOMIC_COMPARE:
FT_POST(fi_compare_atomic, ft_progress, txcq, tx_seq,
&tx_cq_cntr, "fi_compare_atomic", ep, buf, count,
mr_desc, compare, compare_desc, result, result_desc,
remote_fi_addr, remote->addr, remote->key, datatype,
atomic_op, context);
break;
default:
FT_ERR("Unknown atomic opcode\n");
return EXIT_FAILURE;
}
return 0;
}
static int check_atomic_attr(enum fi_op op, enum fi_datatype datatype,
uint64_t flags)
{
struct fi_atomic_attr attr;
int ret;
ret = fi_query_atomic(domain, datatype, op, &attr, flags);
if (ret) {
FT_PRINTERR("fi_query_atomic", ret);
return ret;
}
if (attr.size != datatype_to_size(datatype)) {
fprintf(stderr, "Provider atomic size mismatch\n");
return -FI_ENOSYS;
}
return 0;
}
int check_base_atomic_op(struct fid_ep *endpoint, enum fi_op op,
enum fi_datatype datatype, size_t *count)
{
int ret;
ret = fi_atomicvalid(endpoint, datatype, op, count);
if (ret)
return ret;
return check_atomic_attr(op, datatype, 0);
}
int check_fetch_atomic_op(struct fid_ep *endpoint, enum fi_op op,
enum fi_datatype datatype, size_t *count)
{
int ret;
ret = fi_fetch_atomicvalid(endpoint, datatype, op, count);
if (ret)
return ret;
return check_atomic_attr(op, datatype, FI_FETCH_ATOMIC);
}
int check_compare_atomic_op(struct fid_ep *endpoint, enum fi_op op,
enum fi_datatype datatype, size_t *count)
{
int ret;
ret = fi_compare_atomicvalid(endpoint, datatype, op, count);
if (ret)
return ret;
return check_atomic_attr(op, datatype, FI_COMPARE_ATOMIC);
}
ssize_t ft_post_rx_buf(struct fid_ep *ep, size_t size, void *ctx,
void *op_buf, void *op_mr_desc, uint64_t op_tag)
{
size = MAX(size, FT_MAX_CTRL_MSG) + ft_rx_prefix_size();
if (hints->caps & FI_TAGGED) {
op_tag = op_tag ? op_tag : rx_seq;
FT_POST(fi_trecv, ft_progress, rxcq, rx_seq, &rx_cq_cntr,
"receive", ep, op_buf, size, op_mr_desc, 0, op_tag,
0, ctx);
} else {
FT_POST(fi_recv, ft_progress, rxcq, rx_seq, &rx_cq_cntr,
"receive", ep, op_buf, size, op_mr_desc, 0, ctx);
}
return 0;
}
ssize_t ft_post_rx(struct fid_ep *ep, size_t size, void *ctx)
{
return ft_post_rx_buf(ep, size, ctx, rx_buf, mr_desc, ft_tag);
}
ssize_t ft_rx(struct fid_ep *ep, size_t size)
{
ssize_t ret;
ret = ft_get_rx_comp(rx_seq);
if (ret)
return ret;
if (ft_check_opts(FT_OPT_VERIFY_DATA | FT_OPT_ACTIVE)) {
ret = ft_check_buf((char *) rx_buf + ft_rx_prefix_size(), size);
if (ret)
return ret;
}
/* TODO: verify CQ data, if available */
/* Ignore the size arg. Post a buffer large enough to handle all message
* sizes. ft_sync() makes use of ft_rx() and gets called in tests just before
* message size is updated. The recvs posted are always for the next incoming
* message */
ret = ft_post_rx(ep, rx_size, &rx_ctx);
return ret;
}
/*
* Received messages match tagged buffers in order, but the completions can be
* reported out of order. A tag is valid if it's within the current window.
*/
static inline int
ft_tag_is_valid(struct fid_cq * cq, struct fi_cq_err_entry *comp, uint64_t tag)
{
int valid = 1;
if ((hints->caps & FI_TAGGED) && (cq == rxcq)) {
if (opts.options & FT_OPT_BW) {
/* valid: (tag - window) < comp->tag < (tag + window) */
valid = (tag < comp->tag + opts.window_size) &&
(comp->tag < tag + opts.window_size);
} else {
valid = (comp->tag == tag);
}
if (!valid) {
FT_ERR("Tag mismatch!. Expected: %"PRIu64", actual: %"
PRIu64, tag, comp->tag);
}
}
return valid;
}
/*
* fi_cq_err_entry can be cast to any CQ entry format.
*/
static int ft_spin_for_comp(struct fid_cq *cq, uint64_t *cur,
uint64_t total, int timeout)
{
struct fi_cq_err_entry comp;
struct timespec a, b;
int ret;
if (timeout >= 0)
clock_gettime(CLOCK_MONOTONIC, &a);
do {
ret = fi_cq_read(cq, &comp, 1);
if (ret > 0) {
if (timeout >= 0)
clock_gettime(CLOCK_MONOTONIC, &a);
if (!ft_tag_is_valid(cq, &comp, ft_tag ? ft_tag : rx_cq_cntr))
return -FI_EOTHER;
(*cur)++;
} else if (ret < 0 && ret != -FI_EAGAIN) {
return ret;
} else if (timeout >= 0) {
clock_gettime(CLOCK_MONOTONIC, &b);
if ((b.tv_sec - a.tv_sec) > timeout) {
fprintf(stderr, "%ds timeout expired\n", timeout);
return -FI_ENODATA;
}
}
} while (total - *cur > 0);
return 0;
}
/*
* fi_cq_err_entry can be cast to any CQ entry format.
*/
static int ft_wait_for_comp(struct fid_cq *cq, uint64_t *cur,
uint64_t total, int timeout)
{
struct fi_cq_err_entry comp;
int ret;
while (total - *cur > 0) {
ret = fi_cq_sread(cq, &comp, 1, NULL, timeout);
if (ret > 0) {
if (!ft_tag_is_valid(cq, &comp, ft_tag ? ft_tag : rx_cq_cntr))
return -FI_EOTHER;
(*cur)++;
} else if (ret < 0 && ret != -FI_EAGAIN) {
return ret;
}
}
return 0;
}
/*
* fi_cq_err_entry can be cast to any CQ entry format.
*/
static int ft_fdwait_for_comp(struct fid_cq *cq, uint64_t *cur,
uint64_t total, int timeout)
{
struct fi_cq_err_entry comp;
struct fid *fids[1];
int fd, ret;
fd = cq == txcq ? tx_fd : rx_fd;
fids[0] = &cq->fid;
while (total - *cur > 0) {
ret = fi_trywait(fabric, fids, 1);
if (ret == FI_SUCCESS) {
ret = ft_poll_fd(fd, timeout);
if (ret && ret != -FI_EAGAIN)
return ret;
}
ret = fi_cq_read(cq, &comp, 1);
if (ret > 0) {
if (!ft_tag_is_valid(cq, &comp, ft_tag ? ft_tag : rx_cq_cntr))
return -FI_EOTHER;
(*cur)++;
} else if (ret < 0 && ret != -FI_EAGAIN) {
return ret;
}
}
return 0;
}
static int ft_get_cq_comp(struct fid_cq *cq, uint64_t *cur,
uint64_t total, int timeout)
{
int ret;
switch (opts.comp_method) {
case FT_COMP_SREAD:
case FT_COMP_YIELD:
ret = ft_wait_for_comp(cq, cur, total, timeout);
break;
case FT_COMP_WAIT_FD:
ret = ft_fdwait_for_comp(cq, cur, total, timeout);
break;
default:
ret = ft_spin_for_comp(cq, cur, total, timeout);
break;
}
if (ret) {
if (ret == -FI_EAVAIL) {
ret = ft_cq_readerr(cq);
(*cur)++;
} else {
FT_PRINTERR("ft_get_cq_comp", ret);
}
}
return ret;
}
static int ft_spin_for_cntr(struct fid_cntr *cntr, uint64_t total, int timeout)
{
struct timespec a, b;
uint64_t cur;
if (timeout >= 0)
clock_gettime(CLOCK_MONOTONIC, &a);
for (;;) {
cur = fi_cntr_read(cntr);
if (cur >= total)
return 0;
if (timeout >= 0) {
clock_gettime(CLOCK_MONOTONIC, &b);
if ((b.tv_sec - a.tv_sec) > timeout)
break;
}
}
fprintf(stderr, "%ds timeout expired\n", timeout);
return -FI_ENODATA;
}
static int ft_wait_for_cntr(struct fid_cntr *cntr, uint64_t total, int timeout)
{
int ret;
while (fi_cntr_read(cntr) < total) {
ret = fi_cntr_wait(cntr, total, timeout);
if (ret)
FT_PRINTERR("fi_cntr_wait", ret);
else
break;
}
return 0;
}
static int ft_get_cntr_comp(struct fid_cntr *cntr, uint64_t total, int timeout)
{
int ret = 0;
switch (opts.comp_method) {
case FT_COMP_SREAD:
case FT_COMP_WAITSET:
case FT_COMP_WAIT_FD:
case FT_COMP_YIELD:
ret = ft_wait_for_cntr(cntr, total, timeout);
break;
default:
ret = ft_spin_for_cntr(cntr, total, timeout);
break;
}
if (ret)
FT_PRINTERR("fs_get_cntr_comp", ret);
return ret;
}
int ft_get_rx_comp(uint64_t total)
{
int ret = FI_SUCCESS;
if (opts.options & FT_OPT_RX_CQ) {
ret = ft_get_cq_comp(rxcq, &rx_cq_cntr, total, timeout);
} else if (rxcntr) {
ret = ft_get_cntr_comp(rxcntr, total, timeout);
} else {
FT_ERR("Trying to get a RX completion when no RX CQ or counter were opened");
ret = -FI_EOTHER;
}
return ret;
}
int ft_get_tx_comp(uint64_t total)
{
int ret;
if (opts.options & FT_OPT_TX_CQ) {
ret = ft_get_cq_comp(txcq, &tx_cq_cntr, total, -1);
} else if (txcntr) {
ret = ft_get_cntr_comp(txcntr, total, -1);
} else {
FT_ERR("Trying to get a TX completion when no TX CQ or counter were opened");
ret = -FI_EOTHER;
}
return ret;
}
int ft_sendmsg(struct fid_ep *ep, fi_addr_t fi_addr,
size_t size, void *ctx, int flags)
{
int ret;
struct fi_msg msg;
struct fi_msg_tagged tagged_msg;
struct iovec msg_iov;
msg_iov.iov_base = tx_buf;
msg_iov.iov_len = size;
if (hints->caps & FI_TAGGED) {
tagged_msg.msg_iov = &msg_iov;
tagged_msg.desc = &mr_desc;
tagged_msg.iov_count = 1;
tagged_msg.addr = fi_addr;
tagged_msg.data = NO_CQ_DATA;
tagged_msg.context = ctx;
tagged_msg.tag = ft_tag ? ft_tag : tx_seq;
tagged_msg.ignore = 0;
ret = fi_tsendmsg(ep, &tagged_msg, flags);
if (ret) {
FT_PRINTERR("fi_tsendmsg", ret);
return ret;
}
} else {
msg.msg_iov = &msg_iov;
msg.desc = &mr_desc;
msg.iov_count = 1;
msg.addr = fi_addr;
msg.data = NO_CQ_DATA;
msg.context = ctx;
ret = fi_sendmsg(ep, &msg, flags);
if (ret) {
FT_PRINTERR("fi_sendmsg", ret);
return ret;
}
}
return 0;
}
int ft_recvmsg(struct fid_ep *ep, fi_addr_t fi_addr,
size_t size, void *ctx, int flags)
{
int ret;
struct fi_msg msg;
struct fi_msg_tagged tagged_msg;
struct iovec msg_iov;
msg_iov.iov_base = rx_buf;
msg_iov.iov_len = size;
if (hints->caps & FI_TAGGED) {
tagged_msg.msg_iov = &msg_iov;
tagged_msg.desc = &mr_desc;
tagged_msg.iov_count = 1;
tagged_msg.addr = fi_addr;
tagged_msg.data = NO_CQ_DATA;
tagged_msg.context = ctx;
tagged_msg.tag = ft_tag ? ft_tag : tx_seq;
tagged_msg.ignore = 0;
ret = fi_trecvmsg(ep, &tagged_msg, flags);
if (ret) {
FT_PRINTERR("fi_trecvmsg", ret);
return ret;
}
} else {
msg.msg_iov = &msg_iov;
msg.desc = &mr_desc;
msg.iov_count = 1;
msg.addr = fi_addr;
msg.data = NO_CQ_DATA;
msg.context = ctx;
ret = fi_recvmsg(ep, &msg, flags);
if (ret) {
FT_PRINTERR("fi_recvmsg", ret);
return ret;
}
}
return 0;
}
int ft_cq_read_verify(struct fid_cq *cq, void *op_context)
{
int ret;
struct fi_cq_err_entry completion;
do {
/* read events from the completion queue */
ret = fi_cq_read(cq, (void *)&completion, 1);
if (ret > 0) {
if (op_context != completion.op_context) {
fprintf(stderr, "ERROR: op ctx=%p cq_ctx=%p\n",
op_context, completion.op_context);
return -FI_EOTHER;
}
if (!ft_tag_is_valid(cq, &completion,
ft_tag ? ft_tag : rx_cq_cntr))
return -FI_EOTHER;
} else if ((ret <= 0) && (ret != -FI_EAGAIN)) {
FT_PRINTERR("POLL: Error\n", ret);
if (ret == -FI_EAVAIL)
FT_PRINTERR("POLL: error available\n", ret);
return -FI_EOTHER;
}
} while (ret == -FI_EAGAIN);
return 0;
}
int ft_cq_readerr(struct fid_cq *cq)
{
struct fi_cq_err_entry cq_err;
int ret;
memset(&cq_err, 0, sizeof(cq_err));
ret = fi_cq_readerr(cq, &cq_err, 0);
if (ret < 0) {
FT_PRINTERR("fi_cq_readerr", ret);
} else {
FT_CQ_ERR(cq, cq_err, NULL, 0);
ret = -cq_err.err;
}
return ret;
}
void eq_readerr(struct fid_eq *eq, const char *eq_str)
{
struct fi_eq_err_entry eq_err;
int rd;
memset(&eq_err, 0, sizeof(eq_err));
rd = fi_eq_readerr(eq, &eq_err, 0);
if (rd != sizeof(eq_err)) {
FT_PRINTERR("fi_eq_readerr", rd);
} else {
FT_EQ_ERR(eq, eq_err, NULL, 0);
}
}
int ft_sync()
{
char buf;
int ret;
if (opts.dst_addr) {
if (!(opts.options & FT_OPT_OOB_SYNC)) {
ret = ft_tx(ep, remote_fi_addr, 1, &tx_ctx);
if (ret)
return ret;
ret = ft_rx(ep, 1);
} else {
ret = ft_sock_send(oob_sock, &buf, 1);
if (ret)
return ret;
ret = ft_sock_recv(oob_sock, &buf, 1);
if (ret)
return ret;
}
} else {
if (!(opts.options & FT_OPT_OOB_SYNC)) {
ret = ft_rx(ep, 1);
if (ret)
return ret;
ret = ft_tx(ep, remote_fi_addr, 1, &tx_ctx);
} else {
ret = ft_sock_recv(oob_sock, &buf, 1);
if (ret)
return ret;
ret = ft_sock_send(oob_sock, &buf, 1);
if (ret)
return ret;
}
}
return ret;
}
int ft_sync_pair(int status)
{
int ret;
int pair_status;
if (ft_parent_proc) {
ret = write(ft_socket_pair[1], &status, sizeof(int));
if (ret < 0) {
FT_PRINTERR("write", errno);
return ret;
}
ret = read(ft_socket_pair[1], &pair_status, sizeof(int));
if (ret < 0) {
FT_PRINTERR("read", errno);
return ret;
}
} else {
ret = read(ft_socket_pair[0], &pair_status, sizeof(int));
if (ret < 0) {
FT_PRINTERR("read", errno);
return ret;
}
ret = write(ft_socket_pair[0], &status, sizeof(int));
if (ret < 0) {
FT_PRINTERR("write", errno);
return ret;
}
}
/* check status reported the other guy */
if (pair_status != FI_SUCCESS)
return pair_status;
return 0;
}
int ft_fork_and_pair(void)
{
int ret;
ret = socketpair(AF_LOCAL, SOCK_STREAM, 0, ft_socket_pair);
if (ret) {
FT_PRINTERR("socketpair", errno);
return -errno;
}
ft_child_pid = fork();
if (ft_child_pid < 0) {
FT_PRINTERR("fork", ft_child_pid);
return -errno;
}
if (ft_child_pid)
ft_parent_proc = 1;
return 0;
}
int ft_wait_child(void)
{
int ret;
ret = close(ft_socket_pair[0]);
if (ret) {
FT_PRINTERR("close", errno);
return ret;
}
ret = close(ft_socket_pair[1]);
if (ret) {
FT_PRINTERR("close", errno);
return ret;
}
if (ft_parent_proc) {
ret = waitpid(ft_child_pid, NULL, WCONTINUED);
if (ret < 0) {
FT_PRINTERR("waitpid", errno);
return ret;
}
}
return 0;
}
int ft_finalize_ep(struct fid_ep *ep)
{
struct iovec iov;
int ret;
struct fi_context ctx;
strcpy(tx_buf + ft_tx_prefix_size(), "fin");
iov.iov_base = tx_buf;
iov.iov_len = 4 + ft_tx_prefix_size();
if (hints->caps & FI_TAGGED) {
struct fi_msg_tagged tmsg;
memset(&tmsg, 0, sizeof tmsg);
tmsg.msg_iov = &iov;
tmsg.desc = &mr_desc;
tmsg.iov_count = 1;
tmsg.addr = remote_fi_addr;
tmsg.tag = tx_seq;
tmsg.ignore = 0;
tmsg.context = &ctx;
FT_POST(fi_tsendmsg, ft_progress, txcq, tx_seq,
&tx_cq_cntr, "tsendmsg", ep, &tmsg,
FI_INJECT | FI_TRANSMIT_COMPLETE);
} else {
struct fi_msg msg;
memset(&msg, 0, sizeof msg);
msg.msg_iov = &iov;
msg.desc = &mr_desc;
msg.iov_count = 1;
msg.addr = remote_fi_addr;
msg.context = &ctx;
FT_POST(fi_sendmsg, ft_progress, txcq, tx_seq,
&tx_cq_cntr, "sendmsg", ep, &msg,
FI_INJECT | FI_TRANSMIT_COMPLETE);
}
ret = ft_get_tx_comp(tx_seq);
if (ret)
return ret;
ret = ft_get_rx_comp(rx_seq);
if (ret)
return ret;
return 0;
}
int ft_finalize(void)
{
int ret;
if (fi->domain_attr->mr_mode & FI_MR_RAW) {
ret = fi_mr_unmap_key(domain, remote.key);
if (ret)
return ret;
}
return ft_finalize_ep(ep);
}
int64_t get_elapsed(const struct timespec *b, const struct timespec *a,
enum precision p)
{
int64_t elapsed;
elapsed = difftime(a->tv_sec, b->tv_sec) * 1000 * 1000 * 1000;
elapsed += a->tv_nsec - b->tv_nsec;
return elapsed / p;
}
void show_perf(char *name, size_t tsize, int iters, struct timespec *start,
struct timespec *end, int xfers_per_iter)
{
static int header = 1;
char str[FT_STR_LEN];
int64_t elapsed = get_elapsed(start, end, MICRO);
long long bytes = (long long) iters * tsize * xfers_per_iter;
float usec_per_xfer;
if (name) {
if (header) {
printf("%-50s%-8s%-8s%-8s%8s %10s%13s%13s\n",
"name", "bytes", "iters",
"total", "time", "MB/sec",
"usec/xfer", "Mxfers/sec");
header = 0;
}
printf("%-50s", name);
} else {
if (header) {
printf("%-8s%-8s%-8s%8s %10s%13s%13s\n",
"bytes", "iters", "total",
"time", "MB/sec", "usec/xfer",
"Mxfers/sec");
header = 0;
}
}
printf("%-8s", size_str(str, tsize));
printf("%-8s", cnt_str(str, iters));
printf("%-8s", size_str(str, bytes));
usec_per_xfer = ((float)elapsed / iters / xfers_per_iter);
printf("%8.2fs%10.2f%11.2f%11.2f\n",
elapsed / 1000000.0, bytes / (1.0 * elapsed),
usec_per_xfer, 1.0/usec_per_xfer);
}
void show_perf_mr(size_t tsize, int iters, struct timespec *start,
struct timespec *end, int xfers_per_iter, int argc, char *argv[])
{
static int header = 1;
int64_t elapsed = get_elapsed(start, end, MICRO);
long long total = (long long) iters * tsize * xfers_per_iter;
int i;
float usec_per_xfer;
if (header) {
printf("---\n");
for (i = 0; i < argc; ++i)
printf("%s ", argv[i]);
printf(":\n");
header = 0;
}
usec_per_xfer = ((float)elapsed / iters / xfers_per_iter);
printf("- { ");
printf("xfer_size: %zu, ", tsize);
printf("iterations: %d, ", iters);
printf("total: %lld, ", total);
printf("time: %f, ", elapsed / 1000000.0);
printf("MB/sec: %f, ", (total) / (1.0 * elapsed));
printf("usec/xfer: %f, ", usec_per_xfer);
printf("Mxfers/sec: %f", 1.0/usec_per_xfer);
printf(" }\n");
}
void ft_addr_usage()
{
FT_PRINT_OPTS_USAGE("-B <src_port>", "non default source port number");
FT_PRINT_OPTS_USAGE("-P <dst_port>", "non default destination port number");
FT_PRINT_OPTS_USAGE("-s <address>", "source address");
FT_PRINT_OPTS_USAGE("-b[=<oob_port>]", "enable out-of-band address exchange and "
"synchronization over the, optional, port");
FT_PRINT_OPTS_USAGE("-E[=<oob_port>]", "enable out-of-band address exchange only "
"over the, optional, port");
FT_PRINT_OPTS_USAGE("-C <number>", "number of connections to accept before "
"cleaning up a server");
}
void ft_usage(char *name, char *desc)
{
fprintf(stderr, "Usage:\n");
fprintf(stderr, " %s [OPTIONS]\t\tstart server\n", name);
fprintf(stderr, " %s [OPTIONS] <host>\tconnect to server\n", name);
if (desc)
fprintf(stderr, "\n%s\n", desc);
fprintf(stderr, "\nOptions:\n");
ft_addr_usage();
FT_PRINT_OPTS_USAGE("-f <fabric>", "fabric name");
FT_PRINT_OPTS_USAGE("-d <domain>", "domain name");
FT_PRINT_OPTS_USAGE("-p <provider>", "specific provider name eg sockets, verbs");
FT_PRINT_OPTS_USAGE("-e <ep_type>", "Endpoint type: msg|rdm|dgram (default:rdm)");
FT_PRINT_OPTS_USAGE("", "Only the following tests support this option for now:");
FT_PRINT_OPTS_USAGE("", "fi_rma_bw");
FT_PRINT_OPTS_USAGE("", "fi_shared_ctx");
FT_PRINT_OPTS_USAGE("", "fi_multi_mr");
FT_PRINT_OPTS_USAGE("", "fi_multi_ep");
FT_PRINT_OPTS_USAGE("", "fi_recv_cancel");
FT_PRINT_OPTS_USAGE("", "fi_unexpected_msg");
FT_PRINT_OPTS_USAGE("", "fi_resmgmt_test");
FT_PRINT_OPTS_USAGE("", "fi_inj_complete");
FT_PRINT_OPTS_USAGE("", "fi_bw");
FT_PRINT_OPTS_USAGE("-M <mode>", "Disable mode bit from test");
FT_PRINT_OPTS_USAGE("", "mr_local");
FT_PRINT_OPTS_USAGE("-a <address vector name>", "name of address vector");
FT_PRINT_OPTS_USAGE("-h", "display this help output");
return;
}
void ft_mcusage(char *name, char *desc)
{
fprintf(stderr, "Usage:\n");
fprintf(stderr, " %s [OPTIONS] -M <mcast_addr>\tstart listener\n", name);
fprintf(stderr, " %s [OPTIONS] <mcast_addr>\tsend to group\n", name);
if (desc)
fprintf(stderr, "\n%s\n", desc);
fprintf(stderr, "\nOptions:\n");
ft_addr_usage();
FT_PRINT_OPTS_USAGE("-f <fabric>", "fabric name");
FT_PRINT_OPTS_USAGE("-d <domain>", "domain name");
FT_PRINT_OPTS_USAGE("-p <provider>", "specific provider name eg sockets, verbs");
FT_PRINT_OPTS_USAGE("-d <domain>", "domain name");
FT_PRINT_OPTS_USAGE("-p <provider>", "specific provider name eg sockets, verbs");
FT_PRINT_OPTS_USAGE("-h", "display this help output");
return;
}
void ft_csusage(char *name, char *desc)
{
ft_usage(name, desc);
FT_PRINT_OPTS_USAGE("-I <number>", "number of iterations");
FT_PRINT_OPTS_USAGE("-w <number>", "number of warmup iterations");
FT_PRINT_OPTS_USAGE("-S <size>", "specific transfer size or 'all'");
FT_PRINT_OPTS_USAGE("-l", "align transmit and receive buffers to page size");
FT_PRINT_OPTS_USAGE("-m", "machine readable output");
FT_PRINT_OPTS_USAGE("-t <type>", "completion type [queue, counter]");
FT_PRINT_OPTS_USAGE("-c <method>", "completion method [spin, sread, fd, yield]");
FT_PRINT_OPTS_USAGE("-h", "display this help output");
return;
}
void ft_parseinfo(int op, char *optarg, struct fi_info *hints,
struct ft_opts *opts)
{
switch (op) {
case 'f':
if (!hints->fabric_attr) {
hints->fabric_attr = malloc(sizeof *(hints->fabric_attr));
if (!hints->fabric_attr) {
perror("malloc");
exit(EXIT_FAILURE);
}
}
hints->fabric_attr->name = strdup(optarg);
break;
case 'd':
if (!hints->domain_attr) {
hints->domain_attr = malloc(sizeof *(hints->domain_attr));
if (!hints->domain_attr) {
perror("malloc");
exit(EXIT_FAILURE);
}
}
hints->domain_attr->name = strdup(optarg);
break;
case 'p':
if (!hints->fabric_attr) {
hints->fabric_attr = malloc(sizeof *(hints->fabric_attr));
if (!hints->fabric_attr) {
perror("malloc");
exit(EXIT_FAILURE);
}
}
hints->fabric_attr->prov_name = strdup(optarg);
break;
case 'e':
if (!strncasecmp("msg", optarg, 3))
hints->ep_attr->type = FI_EP_MSG;
if (!strncasecmp("rdm", optarg, 3))
hints->ep_attr->type = FI_EP_RDM;
if (!strncasecmp("dgram", optarg, 5))
hints->ep_attr->type = FI_EP_DGRAM;
break;
case 'M':
if (!strncasecmp("mr_local", optarg, 8))
opts->mr_mode &= ~FI_MR_LOCAL;
break;
default:
/* let getopt handle unknown opts*/
break;
}
}
void ft_parse_addr_opts(int op, char *optarg, struct ft_opts *opts)
{
switch (op) {
case 's':
opts->src_addr = optarg;
break;
case 'B':
opts->src_port = optarg;
break;
case 'P':
opts->dst_port = optarg;
break;
case 'b':
opts->options |= FT_OPT_OOB_SYNC;
/* fall through */
case 'E':
opts->options |= FT_OPT_OOB_ADDR_EXCH;
if (optarg && strlen(optarg) > 1)
opts->oob_port = optarg + 1;
else
opts->oob_port = default_oob_port;
break;
case 'C':
opts->options |= FT_OPT_SERVER_PERSIST;
opts->num_connections = atoi(optarg);
default:
/* let getopt handle unknown opts*/
break;
}
}
void ft_parsecsopts(int op, char *optarg, struct ft_opts *opts)
{
ft_parse_addr_opts(op, optarg, opts);
switch (op) {
case 'I':
opts->options |= FT_OPT_ITER;
opts->iterations = atoi(optarg);
break;
case 'S':
if (!strncasecmp("all", optarg, 3)) {
opts->sizes_enabled = FT_ENABLE_ALL;
} else {
opts->options |= FT_OPT_SIZE;
opts->transfer_size = atoi(optarg);
}
break;
case 'm':
opts->machr = 1;
break;
case 'c':
if (!strncasecmp("sread", optarg, 5))
opts->comp_method = FT_COMP_SREAD;
else if (!strncasecmp("fd", optarg, 2))
opts->comp_method = FT_COMP_WAIT_FD;
else if (!strncasecmp("yield", optarg, 5))
opts->comp_method = FT_COMP_YIELD;
break;
case 't':
if (!strncasecmp("counter", optarg, 7)) {
opts->options |= FT_OPT_RX_CNTR | FT_OPT_TX_CNTR;
opts->options &= ~(FT_OPT_RX_CQ | FT_OPT_TX_CQ);
}
break;
case 'a':
opts->av_name = optarg;
break;
case 'w':
opts->warmup_iterations = atoi(optarg);
break;
case 'l':
opts->options |= FT_OPT_ALIGN;
break;
default:
/* let getopt handle unknown opts*/
break;
}
}
int ft_parse_rma_opts(int op, char *optarg, struct fi_info *hints,
struct ft_opts *opts)
{
switch (op) {
case 'o':
if (!strcmp(optarg, "read")) {
hints->caps |= FI_READ | FI_REMOTE_READ;
opts->rma_op = FT_RMA_READ;
} else if (!strcmp(optarg, "writedata")) {
hints->caps |= FI_WRITE | FI_REMOTE_WRITE;
hints->mode |= FI_RX_CQ_DATA;
hints->domain_attr->cq_data_size = 4;
opts->rma_op = FT_RMA_WRITEDATA;
cq_attr.format = FI_CQ_FORMAT_DATA;
} else if (!strcmp(optarg, "write")) {
hints->caps |= FI_WRITE | FI_REMOTE_WRITE;
opts->rma_op = FT_RMA_WRITE;
} else {
fprintf(stderr, "Invalid operation type: \"%s\". Usage:\n"
"-o <op>\trma op type: read|write|writedata "
"(default:write)\n", optarg);
return EXIT_FAILURE;
}
break;
default:
/* let getopt handle unknown opts*/
break;
}
return 0;
}
void ft_fill_buf(void *buf, int size)
{
char *msg_buf;
int msg_index;
static unsigned int iter = 0;
int i;
msg_index = ((iter++)*INTEG_SEED) % integ_alphabet_length;
msg_buf = (char *)buf;
for (i = 0; i < size; i++) {
msg_buf[i] = integ_alphabet[msg_index++];
if (msg_index >= integ_alphabet_length)
msg_index = 0;
}
}
int ft_check_buf(void *buf, int size)
{
char *recv_data;
char c;
static unsigned int iter = 0;
int msg_index;
int i;
msg_index = ((iter++)*INTEG_SEED) % integ_alphabet_length;
recv_data = (char *)buf;
for (i = 0; i < size; i++) {
c = integ_alphabet[msg_index++];
if (msg_index >= integ_alphabet_length)
msg_index = 0;
if (c != recv_data[i])
break;
}
if (i != size) {
printf("Error at iteration=%d size=%d byte=%d\n",
iter, size, i);
return 1;
}
return 0;
}
uint64_t ft_init_cq_data(struct fi_info *info)
{
if (info->domain_attr->cq_data_size >= sizeof(uint64_t)) {
return 0x0123456789abcdefULL;
} else {
return 0x0123456789abcdef &
((0x1ULL << (info->domain_attr->cq_data_size * 8)) - 1);
}
}
int check_recv_msg(const char *message)
{
size_t recv_len;
size_t message_len = strlen(message) + 1;
/* Account for null terminated byte. */
recv_len = strlen(rx_buf) + 1;
if (recv_len != message_len) {
fprintf(stderr, "Received length does not match expected length.\n");
return -1;
}
if (strncmp(rx_buf, message, message_len)) {
fprintf(stderr, "Received message does not match expected message.\n");
return -1;
}
fprintf(stdout, "Data check OK\n");
return 0;
}
int ft_send_greeting(struct fid_ep *ep)
{
size_t message_len = strlen(greeting) + 1;
int ret;
fprintf(stdout, "Sending message...\n");
if (snprintf(tx_buf, tx_size, "%s", greeting) >= tx_size) {
fprintf(stderr, "Transmit buffer too small.\n");
return -FI_ETOOSMALL;
}
ret = ft_tx(ep, remote_fi_addr, message_len, &tx_ctx);
if (ret)
return ret;
fprintf(stdout, "Send completion received\n");
return 0;
}
int ft_recv_greeting(struct fid_ep *ep)
{
int ret;
fprintf(stdout, "Waiting for message from client...\n");
ret = ft_get_rx_comp(rx_seq);
if (ret)
return ret;
ret = check_recv_msg(greeting);
if (ret)
return ret;
fprintf(stdout, "Received data from client: %s\n", (char *) rx_buf);
return 0;
}
int ft_send_recv_greeting(struct fid_ep *ep)
{
return opts.dst_addr ? ft_send_greeting(ep) : ft_recv_greeting(ep);
}
int ft_sock_listen(char *node, char *service)
{
struct addrinfo *ai, hints;
int val, ret;
memset(&hints, 0, sizeof hints);
hints.ai_flags = AI_PASSIVE;
ret = getaddrinfo(node, service, &hints, &ai);
if (ret) {
fprintf(stderr, "getaddrinfo() %s\n", gai_strerror(ret));
return ret;
}
listen_sock = socket(ai->ai_family, SOCK_STREAM, 0);
if (listen_sock < 0) {
perror("socket");
ret = listen_sock;
goto out;
}
val = 1;
ret = setsockopt(listen_sock, SOL_SOCKET, SO_REUSEADDR,
(void *) &val, sizeof val);
if (ret) {
perror("setsockopt SO_REUSEADDR");
goto out;
}
ret = bind(listen_sock, ai->ai_addr, ai->ai_addrlen);
if (ret) {
perror("bind");
goto out;
}
ret = listen(listen_sock, 0);
if (ret)
perror("listen");
out:
if (ret && listen_sock >= 0)
close(listen_sock);
freeaddrinfo(ai);
return ret;
}
int ft_sock_connect(char *node, char *service)
{
struct addrinfo *ai;
int ret;
ret = getaddrinfo(node, service, NULL, &ai);
if (ret) {
perror("getaddrinfo");
return ret;
}
sock = socket(ai->ai_family, SOCK_STREAM, 0);
if (sock < 0) {
perror("socket");
ret = sock;
goto free;
}
ret = 1;
ret = setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (void *) &ret, sizeof(ret));
if (ret)
perror("setsockopt");
ret = connect(sock, ai->ai_addr, ai->ai_addrlen);
if (ret) {
perror("connect");
close(sock);
}
free:
freeaddrinfo(ai);
return ret;
}
int ft_sock_accept()
{
int ret, op;
sock = accept(listen_sock, NULL, 0);
if (sock < 0) {
ret = sock;
perror("accept");
return ret;
}
op = 1;
ret = setsockopt(sock, IPPROTO_TCP, TCP_NODELAY,
(void *) &op, sizeof(op));
if (ret)
perror("setsockopt");
return 0;
}
int ft_sock_send(int fd, void *msg, size_t len)
{
int ret;
ret = send(fd, msg, len, 0);
if (ret == len) {
return 0;
} else if (ret < 0) {
perror("send");
return -errno;
} else {
perror("send aborted");
return -FI_ECONNABORTED;
}
}
int ft_sock_recv(int fd, void *msg, size_t len)
{
int ret;
ret = recv(fd, msg, len, MSG_WAITALL);
if (ret == len) {
return 0;
} else if (ret == 0) {
return -FI_ENOTCONN;
} else if (ret < 0) {
FT_PRINTERR("ft_sock_recv", -errno);
perror("recv");
return -errno;
} else {
perror("recv aborted");
return -FI_ECONNABORTED;
}
}
int ft_sock_sync(int value)
{
int result = -FI_EOTHER;
if (listen_sock < 0) {
ft_sock_send(sock, &value, sizeof value);
ft_sock_recv(sock, &result, sizeof result);
} else {
ft_sock_recv(sock, &result, sizeof result);
ft_sock_send(sock, &value, sizeof value);
}
return result;
}
void ft_sock_shutdown(int fd)
{
shutdown(fd, SHUT_RDWR);
close(fd);
}
static int ft_has_util_prefix(const char *str)
{
return !strncasecmp(str, OFI_UTIL_PREFIX, strlen(OFI_UTIL_PREFIX));
}
const char *ft_util_name(const char *str, size_t *len)
{
char *delim;
delim = strchr(str, OFI_NAME_DELIM);
if (delim) {
if (ft_has_util_prefix(delim + 1)) {
*len = strlen(delim + 1);
return delim + 1;
} else if (ft_has_util_prefix(str)) {
*len = delim - str;
return str;
}
} else if (ft_has_util_prefix(str)) {
*len = strlen(str);
return str;
}
*len = 0;
return NULL;
}
const char *ft_core_name(const char *str, size_t *len)
{
char *delim;
delim = strchr(str, OFI_NAME_DELIM);
if (delim) {
if (!ft_has_util_prefix(delim + 1)) {
*len = strlen(delim + 1);
return delim + 1;
} else if (!ft_has_util_prefix(str)) {
*len = delim - str;
return str;
}
} else if (!ft_has_util_prefix(str)) {
*len = strlen(str);
return str;
}
*len = 0;
return NULL;
}
/* Split the given string "s" using the specified delimiter(s) in the string
* "delim" and return an array of strings. The array is terminated with a NULL
* pointer. Returned array should be freed with ft_free_string_array().
*
* Returns NULL on failure.
*/
char **ft_split_and_alloc(const char *s, const char *delim, size_t *count)
{
int i, n;
char *tmp;
char *dup = NULL;
char **arr = NULL;
if (!s || !delim)
return NULL;
dup = strdup(s);
if (!dup)
return NULL;
/* compute the array size */
n = 1;
for (tmp = dup; *tmp != '\0'; ++tmp) {
for (i = 0; delim[i] != '\0'; ++i) {
if (*tmp == delim[i]) {
++n;
break;
}
}
}
/* +1 to leave space for NULL terminating pointer */
arr = calloc(n + 1, sizeof(*arr));
if (!arr)
goto cleanup;
/* set array elts to point inside the dup'ed string */
for (tmp = dup, i = 0; tmp != NULL; ++i) {
arr[i] = strsep(&tmp, delim);
}
assert(i == n);
if (count)
*count = n;
return arr;
cleanup:
free(dup);
free(arr);
return NULL;
}
/* see ft_split_and_alloc() */
void ft_free_string_array(char **s)
{
/* all strings are allocated from the same strdup'ed slab, so just free
* the first element */
if (s != NULL)
free(s[0]);
/* and then the actual array of pointers */
free(s);
}