lib/c/c_Producer.cc (45 lines of code) (raw):

/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ #include <pulsar/c/producer.h> #include "c_structs.h" const char *pulsar_producer_get_topic(pulsar_producer_t *producer) { return producer->producer.getTopic().c_str(); } const char *pulsar_producer_get_producer_name(pulsar_producer_t *producer) { return producer->producer.getProducerName().c_str(); } void pulsar_producer_free(pulsar_producer_t *producer) { delete producer; } pulsar_result pulsar_producer_send(pulsar_producer_t *producer, pulsar_message_t *msg) { msg->message = msg->builder.build(); return (pulsar_result)producer->producer.send(msg->message); } static void handle_producer_send(pulsar::Result result, pulsar::MessageId messageId, pulsar_send_callback callback, void *ctx) { if (result == pulsar::ResultOk) { pulsar_message_id_t *c_message_id = new pulsar_message_id_t; c_message_id->messageId = messageId; callback(pulsar_result_Ok, c_message_id, ctx); } else { callback((pulsar_result)result, NULL, ctx); } } void pulsar_producer_send_async(pulsar_producer_t *producer, pulsar_message_t *msg, pulsar_send_callback callback, void *ctx) { msg->message = msg->builder.build(); producer->producer.sendAsync(msg->message, std::bind(&handle_producer_send, std::placeholders::_1, std::placeholders::_2, callback, ctx)); } int64_t pulsar_producer_get_last_sequence_id(pulsar_producer_t *producer) { return producer->producer.getLastSequenceId(); } pulsar_result pulsar_producer_close(pulsar_producer_t *producer) { return (pulsar_result)producer->producer.close(); } void pulsar_producer_close_async(pulsar_producer_t *producer, pulsar_close_callback callback, void *ctx) { producer->producer.closeAsync(std::bind(handle_result_callback, std::placeholders::_1, callback, ctx)); } pulsar_result pulsar_producer_flush(pulsar_producer_t *producer) { return (pulsar_result)producer->producer.flush(); } void pulsar_producer_flush_async(pulsar_producer_t *producer, pulsar_close_callback callback, void *ctx) { producer->producer.flushAsync(std::bind(handle_result_callback, std::placeholders::_1, callback, ctx)); } int pulsar_producer_is_connected(pulsar_producer_t *producer) { return producer->producer.isConnected(); }