in nanofi/src/sitetosite/CRawSocketProtocol.c [691:794]
int16_t sendPacket(struct CRawSiteToSiteClient * client, const char * transactionID, CDataPacket *packet, flow_file_record * ff) {
if (client->_peer_state != READY) {
bootstrap(client);
}
if (client->_peer_state != READY) {
return -1;
}
CTransaction* transaction = findTransaction(client, transactionID);
if (!transaction) {
return -1;
}
if (getState(transaction) != TRANSACTION_STARTED && getState(transaction) != DATA_EXCHANGED) {
logc(warn, "Site2Site transaction %s is not at started or exchanged state", transactionID);
return -1;
}
if (getDirection(transaction) != SEND) {
logc(warn, "Site2Site transaction %s direction is wrong", transactionID);
return -1;
}
int ret;
if (transaction->current_transfers_ > 0) {
ret = writeResponse(client, CONTINUE_TRANSACTION, "CONTINUE_TRANSACTION");
if (ret <= 0) {
return -1;
}
}
// start to read the packet
uint32_t numAttributes = packet->_attributes->size;
ret = write_uint32t(transaction, numAttributes);
if (ret != 4) {
return -1;
}
size_t i;
for (i = 0; i < packet->_attributes->size; ++i) {
const char *key = packet->_attributes->attributes[i].key;
ret = write_UTF(transaction, key, True);
if (ret <= 0) {
return -1;
}
const char *value = (const char *) packet->_attributes->attributes[i].value;
ret = write_UTF_len(transaction, value, packet->_attributes->attributes[i].value_size, True);
if (ret <= 0) {
return -1;
}
}
uint64_t len = 0;
if (ff != NULL) {
int content_size = (int) ff->size;
uint8_t * content_buf = NULL;
if (content_size > 0 && ff->crp != NULL) {
content_buf = (uint8_t*)malloc(content_size*sizeof(uint8_t));
int len_as_int = get_content(ff, content_buf, content_size);
if (len_as_int <= 0) {
return -2;
}
len = len_as_int;
ret = write_uint64t(transaction, len);
if (ret != 8) {
logc(debug, "ret != 8");
return -1;
}
writeData(transaction, content_buf, len_as_int);
}
} else if (packet->payload_ != NULL && strlen(packet->payload_) > 0) {
len = strlen(packet->payload_);
ret = write_uint64t(transaction, len);
if (ret != 8) {
return -1;
}
ret = writeData(transaction, (uint8_t *)(packet->payload_), (int) len);
if (ret != (int64_t)len) {
logc(debug, "ret != len");
return -1;
}
}
transaction->current_transfers_++;
transaction->total_transfers_++;
transaction->_state = DATA_EXCHANGED;
transaction->_bytes += len;
logc(info, "Site to Site transaction %s sent flow %d flow records, with total size %"PRIu64, transactionID,
transaction->total_transfers_, transaction->_bytes);
return 0;
}