std::shared_ptr Message::BuildMessage()

in src/Message.cc [150:226]


std::shared_ptr<pulsar_message_t> Message::BuildMessage(Napi::Object conf) {
  std::shared_ptr<pulsar_message_t> cMessage(pulsar_message_create(), pulsar_message_free);

  if (conf.Has(CFG_DATA) && conf.Get(CFG_DATA).IsBuffer()) {
    Napi::Buffer<char> buf = conf.Get(CFG_DATA).As<Napi::Buffer<char>>();
    char *data = buf.Data();
    pulsar_message_set_content(cMessage.get(), data, buf.Length());
  }

  if (conf.Has(CFG_PROPS) && conf.Get(CFG_PROPS).IsObject()) {
    Napi::Object propObj = conf.Get(CFG_PROPS).ToObject();
    Napi::Array arr = propObj.GetPropertyNames();
    int size = arr.Length();
    for (int i = 0; i < size; i++) {
      Napi::String key = arr.Get(i).ToString();
      Napi::String value = propObj.Get(key).ToString();
      pulsar_message_set_property(cMessage.get(), key.Utf8Value().c_str(), value.Utf8Value().c_str());
    }
  }

  if (conf.Has(CFG_EVENT_TIME) && conf.Get(CFG_EVENT_TIME).IsNumber()) {
    int64_t eventTimestamp = conf.Get(CFG_EVENT_TIME).ToNumber().Int64Value();
    if (eventTimestamp >= 0) {
      pulsar_message_set_event_timestamp(cMessage.get(), eventTimestamp);
    }
  }

  if (conf.Has(CFG_SEQUENCE_ID) && conf.Get(CFG_SEQUENCE_ID).IsNumber()) {
    Napi::Number sequenceId = conf.Get(CFG_SEQUENCE_ID).ToNumber();
    pulsar_message_set_sequence_id(cMessage.get(), sequenceId.Int64Value());
  }

  if (conf.Has(CFG_PARTITION_KEY) && conf.Get(CFG_PARTITION_KEY).IsString()) {
    Napi::String partitionKey = conf.Get(CFG_PARTITION_KEY).ToString();
    pulsar_message_set_partition_key(cMessage.get(), partitionKey.Utf8Value().c_str());
  }

  if (conf.Has(CFG_REPL_CLUSTERS) && conf.Get(CFG_REPL_CLUSTERS).IsArray()) {
    Napi::Array clusters = conf.Get(CFG_REPL_CLUSTERS).As<Napi::Array>();
    // Empty list means to disable replication
    int length = clusters.Length();
    if (length == 0) {
      pulsar_message_disable_replication(cMessage.get(), 1);
    } else {
      char **arr = NewStringArray(length);
      for (int i = 0; i < length; i++) {
        SetString(arr, clusters.Get(i).ToString().Utf8Value().c_str(), i);
      }
      pulsar_message_set_replication_clusters(cMessage.get(), (const char **)arr, length);
      FreeStringArray(arr, length);
    }
  }

  if (conf.Has(CFG_DELIVER_AFTER) && conf.Get(CFG_DELIVER_AFTER).IsNumber()) {
    Napi::Number deliverAfter = conf.Get(CFG_DELIVER_AFTER).ToNumber();
    pulsar_message_set_deliver_after(cMessage.get(), deliverAfter.Int64Value());
  }

  if (conf.Has(CFG_DELIVER_AT) && conf.Get(CFG_DELIVER_AT).IsNumber()) {
    Napi::Number deliverAt = conf.Get(CFG_DELIVER_AT).ToNumber();
    pulsar_message_set_deliver_at(cMessage.get(), deliverAt.Int64Value());
  }

  if (conf.Has(CFG_DISABLE_REPLICATION) && conf.Get(CFG_DISABLE_REPLICATION).IsBoolean()) {
    Napi::Boolean disableReplication = conf.Get(CFG_DISABLE_REPLICATION).ToBoolean();
    if (disableReplication.Value()) {
      pulsar_message_disable_replication(cMessage.get(), 1);
    }
  }

  if (conf.Has(CFG_ORDERING_KEY) && conf.Get(CFG_ORDERING_KEY).IsString()) {
    Napi::String orderingKey = conf.Get(CFG_ORDERING_KEY).ToString();
    pulsar_message_set_ordering_key(cMessage.get(), orderingKey.Utf8Value().c_str());
  }

  return cMessage;
}