int16_t sendPacket()

in nanofi/src/sitetosite/CRawSocketProtocol.c [691:794]


  int16_t sendPacket(struct CRawSiteToSiteClient * client, const char * transactionID, CDataPacket *packet, flow_file_record * ff) {
    if (client->_peer_state != READY) {
      bootstrap(client);
    }

    if (client->_peer_state != READY) {
      return -1;
    }
    CTransaction* transaction = findTransaction(client, transactionID);

    if (!transaction) {
      return -1;
    }

    if (getState(transaction) != TRANSACTION_STARTED && getState(transaction) != DATA_EXCHANGED) {
      logc(warn, "Site2Site transaction %s is not at started or exchanged state", transactionID);
      return -1;
    }

    if (getDirection(transaction) != SEND) {
      logc(warn, "Site2Site transaction %s direction is wrong", transactionID);
      return -1;
    }

    int ret;

    if (transaction->current_transfers_ > 0) {
      ret = writeResponse(client, CONTINUE_TRANSACTION, "CONTINUE_TRANSACTION");
      if (ret <= 0) {
        return -1;
      }
    }
    // start to read the packet
    uint32_t numAttributes = packet->_attributes->size;
    ret = write_uint32t(transaction, numAttributes);
    if (ret != 4) {
      return -1;
    }

    size_t i;
    for (i = 0; i < packet->_attributes->size; ++i) {
      const char *key = packet->_attributes->attributes[i].key;

      ret = write_UTF(transaction, key, True);

      if (ret <= 0) {
        return -1;
      }

      const char *value = (const char *) packet->_attributes->attributes[i].value;

      ret = write_UTF_len(transaction, value, packet->_attributes->attributes[i].value_size, True);
      if (ret <= 0) {
        return -1;
      }
    }

    uint64_t len = 0;

    if (ff != NULL) {
      int content_size = (int) ff->size;

      uint8_t * content_buf = NULL;

      if (content_size > 0 && ff->crp != NULL) {
        content_buf = (uint8_t*)malloc(content_size*sizeof(uint8_t));
        int len_as_int = get_content(ff, content_buf, content_size);
        if (len_as_int <= 0) {
          return -2;
        }
        len = len_as_int;
        ret = write_uint64t(transaction, len);
        if (ret != 8) {
          logc(debug, "ret != 8");
          return -1;
        }
        writeData(transaction, content_buf, len_as_int);
      }

    } else if (packet->payload_ != NULL && strlen(packet->payload_) > 0) {
      len = strlen(packet->payload_);

      ret = write_uint64t(transaction, len);
      if (ret != 8) {
        return -1;
      }

      ret = writeData(transaction, (uint8_t *)(packet->payload_), (int) len);
      if (ret != (int64_t)len) {
        logc(debug, "ret != len");
        return -1;
      }
    }

    transaction->current_transfers_++;
    transaction->total_transfers_++;
    transaction->_state = DATA_EXCHANGED;
    transaction->_bytes += len;

    logc(info, "Site to Site transaction %s sent flow %d flow records, with total size %"PRIu64, transactionID,
        transaction->total_transfers_, transaction->_bytes);

    return 0;
  }