c/experimental/raw_plus_tls2.c (511 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "thread.h"
#include <proton/annotations.h>
#include <proton/raw_connection.h>
#include <proton/tls.h>
#include <proton/listener.h>
#include <proton/netaddr.h>
#include <proton/proactor.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <unistd.h>
/*
* Jabberwock raw connection example with and without TLS.
*
* One client and one server take turns sending a line of the poem.
* The simple "application" logic resides in line_of_jabber() and gobble_jabber().
* handle_outgoing() and handle_incoming() handle the application IO
* plumbing to use Proton raw connections, with or without TLS.
*
* See the "no_tls" option to contrast the approach compared to TLS usage.
*
* This example is frugal in the number of buffers it uses, giving at most one
* pn_raw_buffer_t at any time to the read or write sides of raw connections and
* the TLS engine.
*
* In this example orderly termination of the connection is initiated by one side.
* The initiator does not wait for any close handshake from the peer.
* The peer looks for confirmation of orderly closure from the initiator.
*
* Based on the broker and raw connection examples. Must be able to find
* the TLS certificates in the same location as the broker example.
*
* The astute reader will realize that the lack of any locking mechanisms
* works only by luck of the simple nature of this "taking turns" example
* and may not survive TSAN scrutiny.
*/
/* The ssl-certs subdir must be in the current directory for TLS configuration */
#define SSL_FILE(NAME) "ssl-certs/" NAME
#define SSL_PW "tserverpw"
/* Windows vs. OpenSSL certificates */
#if defined(_WIN32)
# define CERTIFICATE(NAME) SSL_FILE(NAME "-certificate.p12")
# define SET_CREDENTIALS(DOMAIN, NAME) \
pn_tls_config_set_credentials(DOMAIN, SSL_FILE(NAME "-full.p12"), "", SSL_PW)
#else
# define CERTIFICATE(NAME) SSL_FILE(NAME "-certificate.pem")
# define SET_CREDENTIALS(DOMAIN, NAME) \
pn_tls_config_set_credentials(DOMAIN, CERTIFICATE(NAME), SSL_FILE(NAME "-private-key.pem"), SSL_PW)
#endif
static void jfatal(const char *file, int line) {
fprintf(stderr, "epoll proactor failure in %s:%d\n", __FILE__, __LINE__);
abort(); \
}
#define jcheck( EXPR ) \
{ if (!(EXPR)) jfatal(__FILE__,__LINE__); }
static uintptr_t JBR_UNUSED = 0;
static uintptr_t JBR_INUSE = 1;
// A raw buffer "pool", in name only
static void rbuf_pool_get(pn_raw_buffer_t *buf) {
memset(buf, 0, sizeof(pn_raw_buffer_t));
buf->bytes = calloc(1, 4096);
buf->capacity = 4096;
buf->context = JBR_UNUSED;
}
static void rbuf_pool_return(pn_raw_buffer_t *rbuf) {
free(rbuf->bytes);
}
// Buffer is "in use" if ownership transferred to raw connection or TLS until a future event.
static void buf_set_in_use(pn_raw_buffer_t *rbuf, bool in_use) {
rbuf->context = in_use ? JBR_INUSE : JBR_UNUSED;
}
static bool buf_in_use(pn_raw_buffer_t *rbuf) {
return rbuf->context == JBR_INUSE;
}
static bool buf_unused(pn_raw_buffer_t *rbuf) {
return !buf_in_use(rbuf);
}
static void buf_reset(pn_raw_buffer_t *rbuf) {
rbuf->size = 0;
rbuf->offset = 0;
memset(rbuf->bytes, 0, rbuf->capacity);
buf_set_in_use(rbuf, false);
}
static size_t size_t_min(size_t a, size_t b) {
return (a < b) ? a : b;
}
typedef struct jabber_t {
pn_proactor_t *proactor;
size_t threads;
pn_tls_config_t *srv_domain;
pn_tls_config_t *cli_domain;
const char *host;
const char *port;
pn_listener_t *listener;
size_t current_jline;
size_t total_bytes_sent;
size_t total_bytes_recv;
bool alpn_enabled;
} jabber_t;
typedef struct jabber_connection_t {
jabber_t *parent;
pn_raw_connection_t *rawc;
pn_tls_t *tls;
// Orderly close:
// Non TLS: shutdown write side, read side detects EOF
// TLS: same shutdown, but also an encrypted TLS EOF using pn_tls_close_output()
// which sends the TLS protocol close_notify alert record.
bool need_rawc_write_close;
bool tls_has_output;
bool connecting;
bool tls_closing;
bool input_done;
bool orderly_close_initiated;
bool orderly_close_detected;
bool tls_error;
pn_raw_buffer_t out_wire_buf;
pn_raw_buffer_t in_wire_buf;
pn_raw_buffer_t out_app_buf;
pn_raw_buffer_t in_app_buf;
bool is_server;
bool jabber_turn;
char *alpn_protocol;
} jabber_connection_t;
static inline uint32_t room(pn_raw_buffer_t const *rb) {
if (rb)
return rb->capacity - (rb->offset + rb->size);
return 0;
}
static const char* jlines[] = {
"Twas brillig, and the slithy toves",
"Did gire and gymble in the wabe.",
"All mimsy were the borogroves,",
"And the mome raths outgrabe."
};
static size_t jlines_count = sizeof(jlines) / sizeof(jlines[0]);
// Provide one line of poem into rbuf.
static void line_of_jabber(jabber_connection_t *jc, pn_raw_buffer_t *rbufp) {
jabber_t *j = jc->parent;
const char *self = jc->is_server ? "server" : "client";
if (j->current_jline < jlines_count) {
size_t len = strlen(jlines[j->current_jline]);
jcheck( len < room(rbufp) );
memcpy(rbufp->bytes + rbufp->offset, jlines[j->current_jline++], len);
rbufp->size = len;
j->total_bytes_sent += len;
}
}
// Consume content of poem sent by peer.
static void gobble_jabber(jabber_connection_t* jc, pn_raw_buffer_t* rbuf) {
jabber_t *j = jc->parent;
const char *self = jc->is_server ? "server" : "client";
jcheck( rbuf->size != 0 );
printf("<-- %s received: %.4096s\n", self, rbuf->bytes + rbuf->offset);
j->total_bytes_recv += rbuf->size;
if (j->total_bytes_recv == j->total_bytes_sent) {
if (j->current_jline != jlines_count) {
// This connection sends the next set of lines.
jc->jabber_turn = true;
pn_raw_connection_wake(jc->rawc);
}
}
}
// Return false if TLS error encountered.
static bool jabber_tls_process(jabber_connection_t* jc) {
int err = pn_tls_process(jc->tls);
if (err && !jc->tls_error) {
jc->tls_error = true;
char buf[256];
pn_tls_get_session_error_string(jc->tls, buf, sizeof(buf));
fprintf(stderr, "TLS processing error: %s\n", buf);
fprintf(stderr, "Jabber %s connection terminated.\n", jc->is_server ? "server" : "client");
// Stop all application data processing.
// Close input. Continue non-application output in case we have a TLS protocol error to send to peer.
pn_raw_connection_read_close(jc->rawc);
jc->input_done = true;
jc->tls_has_output = pn_tls_is_encrypt_output_pending(jc->tls) > 0;
return false;
}
return true;
}
// TLS close requires best efforts to send final closure or error alert record.
// Peer is not obligated to wait for it.
static void jabber_tls_begin_close(jabber_connection_t* jc) {
if (!jc->tls_closing) {
jc->tls_closing = true;
pn_tls_close_output(jc->tls);
jc->need_rawc_write_close = true; // Remember to eventually close the raw connection.
pn_tls_process(jc->tls); // Best efforts. No error check.
pn_raw_connection_wake(jc->rawc); // Ensure handle_outgoing is called.
}
}
static void check_alpn(jabber_connection_t* jc) {
const char *self = jc->is_server ? "server" : "client";
if (!jc->alpn_protocol) {
const char *protocol_name;
size_t len;
if (pn_tls_get_alpn_protocol(jc->tls, &protocol_name, &len)) {
jc->alpn_protocol = (char *) malloc(len+1);
jcheck( jc->alpn_protocol );
memmove(jc->alpn_protocol, protocol_name, len);
jc->alpn_protocol[len] = 0;
printf("**%s: using ALPN protocol %s\n", self, jc->alpn_protocol);
} else {
printf("**%s: no available ALPN protocol\n", self);
}
}
}
static void load_alpn_strings(pn_tls_config_t *cli_domain, pn_tls_config_t *srv_domain) {
const char *protos[4];
// server...
protos[0] = "jibberjabber";
protos[1] = "jabber/v1"; // expected winner
protos[2] = "piglatin";
pn_tls_config_set_alpn_protocols(srv_domain, protos, 3);
//client
protos[0] = "jabber/v2";
protos[1] = "piglatin";
protos[2] = "jabber/v1"; // expected winner
protos[3] = "ghost";
pn_tls_config_set_alpn_protocols(cli_domain, protos, 4);
}
static void handle_outgoing(jabber_connection_t* jc) {
// For this simplified example, 1 buffer of application output is dealt
// with at a time.
jabber_t *j = jc->parent;
if (buf_in_use(&jc->out_wire_buf)) {
// See if raw connection done with the buffer
if (1 == pn_raw_connection_take_written_buffers(jc->rawc, &jc->out_wire_buf, 1)) {
buf_reset(&jc->out_wire_buf);
} else {
return; // Nothing to do until notified of future raw connection write completion
}
}
if (!jc->jabber_turn && !jc->tls_has_output && !jc->need_rawc_write_close)
return; // nothing to send at this time
if (!jc->tls) {
line_of_jabber(jc, &jc->out_wire_buf);
if (jc->out_wire_buf.size > 0) {
jc->jabber_turn = false; // Peer gets to do next jabber.
jcheck( pn_raw_connection_write_buffers(jc->rawc, &jc->out_wire_buf, 1) == 1 );
buf_set_in_use(&jc->out_wire_buf, true);
}
// If no more application data to write, start orderly close.
if (j->current_jline == jlines_count) {
pn_raw_connection_write_close(jc->rawc);
jc->orderly_close_initiated = true;
}
} else {
// TLS
// Reacquire encryption buffer if TLS library is done with it.
if (buf_in_use(&jc->out_app_buf) && pn_tls_take_encrypt_input_buffers(jc->tls, &jc->out_app_buf, 1) == 1)
buf_reset(&jc->out_app_buf);
if (pn_tls_is_secure(jc->tls) && jc->jabber_turn && !jc->tls_closing) {
jcheck( jc->alpn_protocol || !jc->parent->alpn_enabled );
// Add jabber data if there is room.
if (buf_unused(&jc->out_app_buf)) {
line_of_jabber(jc, &jc->out_app_buf);
if (jc->out_app_buf.size > 0) {
jc->jabber_turn = false; // Peer gets to do next jabber.
buf_set_in_use(&jc->out_app_buf, true);
size_t consumed = pn_tls_give_encrypt_input_buffers(jc->tls, &jc->out_app_buf, 1);
if (consumed != 1) abort();
// If no more application data to write, start orderly close.
// For TLS, indicate we are done writing to generate the protocol's EOS (closure alert).
if (j->current_jline == jlines_count) {
jc->orderly_close_initiated = true;
jabber_tls_begin_close(jc);
}
jabber_tls_process(jc);
}
}
}
// Even if no application output just queued for processing (i.e. no call to
// pn_tls_give_encrypt_input_buffers()), there may be straggling encrypted output.
// Also possible we are flushing TLS error information to peer.
if (buf_unused(&jc->out_wire_buf) && pn_tls_need_encrypt_output_buffers(jc->tls)) {
pn_tls_give_encrypt_output_buffers(jc->tls, &jc->out_wire_buf, 1);
jabber_tls_process(jc);
jcheck( pn_tls_take_encrypt_output_buffers(jc->tls, &jc->out_wire_buf, 1) == 1 );
// Write the application data we just encrypted.
jcheck( pn_raw_connection_write_buffers(jc->rawc, &jc->out_wire_buf, 1) == 1);
buf_set_in_use(&jc->out_wire_buf, true);
}
// Remember whether we missed some output buffered in the TLS library and need to come back.
jc->tls_has_output = pn_tls_is_encrypt_output_pending(jc->tls) > 0;
if (jc->need_rawc_write_close && !jc->tls_has_output) {
// We have written to the raw connection all the output the TLS library can ever give us.
pn_raw_connection_write_close(jc->rawc);
jc->need_rawc_write_close = false;
}
}
}
static void handle_incoming(jabber_connection_t* jc, bool rawc_input_closed_event) {
// There may be more received wire buffers than sent.
if (jc->input_done)
return;
bool wire_bytes = false;
const char *self = jc->is_server ? "server" : "client";
if (!rawc_input_closed_event) {
if (!buf_in_use(&jc->in_wire_buf) ||
pn_raw_connection_take_read_buffers(jc->rawc, &jc->in_wire_buf, 1) != 1)
return; // No wire buffer to process - come back later.
if (jc->in_wire_buf.size > 0)
wire_bytes = true;
}
if (!jc->tls) {
if (wire_bytes) {
// Use wire data directly
gobble_jabber(jc, &jc->in_wire_buf);
buf_reset(&jc->in_wire_buf);
} else {
if (!jc->orderly_close_initiated && jc->parent->current_jline == jlines_count)
jc->orderly_close_detected = true;
// Whether unexpected hangup or expected EOS, Jabber has no more data to send.
pn_raw_connection_close(jc->rawc);
jc->input_done = true;
}
} else {
// TLS case
if (wire_bytes) {
jcheck( pn_tls_get_decrypt_input_buffer_capacity(jc->tls) > 0 );
size_t consumed = pn_tls_give_decrypt_input_buffers(jc->tls, &jc->in_wire_buf, 1);
if (consumed == 0) {
if (pn_tls_is_input_closed(jc->tls))
// Library will not process anything after TLS EOS, and we are not expecting any trailing data.
printf("data after TLS EOF\n"); // Error by peer or garbage bytes from attacker.
else
printf("Unexpected TLS decrypt input buffer failure\n");
abort();
}
if (!jabber_tls_process(jc))
return;
if (jc->parent->alpn_enabled && !jc->alpn_protocol)
check_alpn(jc);
while (pn_tls_need_decrypt_output_buffers(jc->tls)) {
jcheck( pn_tls_give_decrypt_output_buffers(jc->tls, &jc->in_app_buf, 1) == 1 );
if (!jabber_tls_process(jc))
return;
jcheck( pn_tls_take_decrypt_output_buffers(jc->tls, &jc->in_app_buf, 1) == 1 );
jcheck( jc->in_app_buf.size > 0 );
gobble_jabber(jc, &jc->in_app_buf); // Application layer consumes all bytes.
buf_reset(&jc->in_app_buf);
}
// Reclaim and recycle the TLS encoded input buffer (from the wire) processed by the TLS library.
// If a partial TLS record remains, it is buffered in the TLS engine.
jcheck( pn_tls_take_decrypt_input_buffers(jc->tls, &jc->in_wire_buf, 1) == 1);
buf_reset(&jc->in_wire_buf);
if (pn_tls_is_input_closed(jc->tls) && !jc->orderly_close_initiated) {
printf("**%s: received TLS end of session notification from peer\n", self);
jc->orderly_close_detected = true;
jabber_tls_begin_close(jc);
}
} else {
// EOS
jc->input_done = true;
if (!pn_tls_is_input_closed(jc->tls) && !jc->orderly_close_initiated)
// Expecting graceful close with peer in this example.
printf("**%s: TLS session ended abruptly\n", self);
jabber_tls_begin_close(jc);
}
// TLS input can generate non-application output. Note that here.
jc->tls_has_output = pn_tls_is_encrypt_output_pending(jc->tls) > 0;
}
}
static void allocate_wire_buffers(jabber_connection_t *jc) {
rbuf_pool_get(&jc->out_wire_buf);
rbuf_pool_get(&jc->in_wire_buf);
}
static void allocate_tls_buffers(jabber_connection_t *jc) {
pn_tls_set_encrypt_input_buffer_max_capacity(jc->tls, 1);
pn_tls_set_decrypt_input_buffer_max_capacity(jc->tls, 1);
pn_tls_set_encrypt_output_buffer_max_capacity(jc->tls, 1);
pn_tls_set_decrypt_output_buffer_max_capacity(jc->tls, 1);
rbuf_pool_get(&jc->in_app_buf);
rbuf_pool_get(&jc->out_app_buf);
}
static void create_client_connection(jabber_t *j) {
char addr[PN_MAX_ADDR];
pn_raw_connection_t *c = pn_raw_connection();
jabber_connection_t *jc = calloc(sizeof(*jc), 1);
jc->rawc = c;
jc->is_server = false;
jc->jabber_turn = true; // Client goes first for purposes of this example. Arbitrary choice.
jc->parent = j;
pn_raw_connection_set_context(c, (void *) jc);
// Client side TLS can be set up anywhere between here and first write (TLS clienthello).
if (j->cli_domain) {
jc->tls = pn_tls(j->cli_domain);
pn_tls_set_peer_hostname(jc->tls, "test_server");
allocate_tls_buffers(jc);
pn_tls_start(jc->tls);
jc->tls_has_output = true; // always true for initial client side TLS.
}
allocate_wire_buffers(jc);
pn_proactor_addr(addr, sizeof(addr), j->host, j->port);
pn_proactor_raw_connect(j->proactor, c, addr);
}
static void create_server_connection(jabber_t *j, pn_listener_t *listener) {
pn_raw_connection_t *c = pn_raw_connection();
jabber_connection_t *jc = calloc(sizeof(*jc), 1);
jc->rawc = c;
jc->is_server = true;
jc->parent = j;
pn_raw_connection_set_context(c, (void *) jc);
// Server side TLS can be set up anywhere between here and first read (TLS clienthello).
if (j->srv_domain) {
jc->tls = pn_tls(j->srv_domain);
allocate_tls_buffers(jc);
pn_tls_start(jc->tls);
}
allocate_wire_buffers(jc);
pn_listener_raw_accept(listener, c);
printf("**listener accepted %p\n", (void *)jc);
pn_listener_close(j->listener); // Single client.
}
// Called on disconnect. No more raw connection IO in either direction.
static void tls_cleanup(jabber_connection_t* jc) {
if (jc->tls) {
pn_tls_stop(jc->tls);
pn_raw_buffer_t rb;
if (buf_in_use(&jc->out_app_buf))
jcheck( pn_tls_take_encrypt_input_buffers(jc->tls, &jc->out_app_buf, 1) == 1 );
jcheck( buf_unused(&jc->in_app_buf) );
rbuf_pool_return(&jc->in_app_buf);
rbuf_pool_return(&jc->out_app_buf);
free(jc->alpn_protocol);
pn_tls_free(jc->tls);
jc->tls = NULL;
}
}
static bool handle_raw_connection(jabber_connection_t* jc, pn_event_t* event) {
switch (pn_event_type(event)) {
case PN_RAW_CONNECTION_CONNECTED: {
printf("**raw connection established %p %s\n", (void *)jc, jc->is_server ? "server" : "client");
jc->connecting = false;
} break;
case PN_RAW_CONNECTION_NEED_READ_BUFFERS: {
jcheck( buf_unused(&jc->in_wire_buf) );
pn_raw_connection_give_read_buffers(jc->rawc, &jc->in_wire_buf, 1);
buf_set_in_use(&jc->in_wire_buf, true);
} break;
case PN_RAW_CONNECTION_CLOSED_WRITE: {
pn_raw_connection_close(jc->rawc); // In case not fully closed
} break;
case PN_RAW_CONNECTION_WRITTEN:
case PN_RAW_CONNECTION_NEED_WRITE_BUFFERS:
case PN_RAW_CONNECTION_WAKE: {
handle_outgoing(jc);
} break;
case PN_RAW_CONNECTION_CLOSED_READ:
case PN_RAW_CONNECTION_READ: {
handle_incoming(jc, pn_event_type(event) == PN_RAW_CONNECTION_CLOSED_READ);
if (jc->tls_has_output)
handle_outgoing(jc);
} break;
case PN_RAW_CONNECTION_DRAIN_BUFFERS: {
pn_raw_buffer_t rbuf;
while (1 == pn_raw_connection_take_read_buffers(jc->rawc, &rbuf, 1)) {
}
while (1 == pn_raw_connection_take_written_buffers(jc->rawc, &rbuf, 1)) {
}
} break;
case PN_RAW_CONNECTION_DISCONNECTED: {
const char *self = jc->is_server ? "Server" : "Client";
pn_condition_t *cond = pn_raw_connection_condition(jc->rawc);
if (jc->connecting)
printf("**Connection failed\n");
else {
if (!(jc->orderly_close_initiated || jc->orderly_close_detected) || pn_condition_is_set(cond))
printf("**%s connection terminated abnormally\n", self);
else if (jc->orderly_close_detected && jc->parent->current_jline != jlines_count)
printf("**Jabber completed successfully\n", self);
}
if (pn_condition_is_set(cond)) {
fprintf(stderr, "raw connection failure: %s\n", pn_condition_get_name(cond), pn_condition_get_description(cond));
}
if (jc->tls) {
if (jc->need_rawc_write_close)
printf("**%s connection terminated with unsent TLS data\n", self);
tls_cleanup(jc);
}
rbuf_pool_return(&jc->in_wire_buf);
rbuf_pool_return(&jc->out_wire_buf);
free(jc);
} break;
}
return true;
}
static bool handle(jabber_t* j, pn_event_t* event) {
switch (pn_event_type(event)) {
case PN_LISTENER_OPEN: {
create_client_connection(j);
break;
}
case PN_LISTENER_ACCEPT: {
pn_listener_t *listener = pn_event_listener(event);
create_server_connection(j, listener);
} break;
case PN_PROACTOR_INACTIVE:
printf("**proactor inactive: connections and listeners finalized\n");
PN_FALLTHROUGH;
case PN_PROACTOR_INTERRUPT: {
pn_proactor_t *proactor = pn_event_proactor(event);
pn_proactor_interrupt(proactor);
return false;
} break;
default: {
pn_raw_connection_t *c = pn_event_raw_connection(event);
if (c) {
jabber_connection_t *jc = (jabber_connection_t *) pn_raw_connection_get_context(c);
handle_raw_connection(jc, event);
}
}
}
return true;
}
static void* j_thread(void *void_j) {
jabber_t *j = (jabber_t*)void_j;
bool finished = false;
do {
pn_event_batch_t *events = pn_proactor_wait(j->proactor);
pn_event_t *e;
while ((e = pn_event_batch_next(events))) {
if (!handle(j, e)) finished = true;
}
pn_proactor_done(j->proactor, events);
} while(!finished);
return NULL;
}
// main is from broker.c but using raw connections instead of AMQP connections.
int main(int argc, char **argv) {
int err;
srand(time(NULL));
jabber_t j = {0};
j.host = (argc > 1) ? argv[1] : "";
j.port = (argc > 2) ? argv[2] : "15243"; // our default Jabberwock address
j.listener = pn_listener();
bool use_tls = true;
if (argc > 3 && strncmp(argv[3], "no_tls", 6) == 0)
use_tls = false;
j.alpn_enabled = use_tls; // make configurable
j.proactor = pn_proactor();
j.threads = 4;
if (use_tls) {
j.srv_domain = pn_tls_config(PN_TLS_MODE_SERVER);
if (SET_CREDENTIALS(j.srv_domain, "tserver") != 0) {
printf("Failed to set up server certificate: %s, private key: %s\n", CERTIFICATE("tserver"), SSL_FILE("tserver-private-key.pem"));
exit(1);
}
j.cli_domain = pn_tls_config(PN_TLS_MODE_CLIENT); // PN_TLS_VERIFY_PEER_NAME by default
if (pn_tls_config_set_trusted_certs(j.cli_domain, CERTIFICATE("tserver")) != 0) {
printf("CA failure\n");
exit(1);
}
if (j.alpn_enabled) {
// Set some ALPN values for each peer.
load_alpn_strings(j.cli_domain, j.srv_domain);
}
}
{
/* Listen on addr */
char addr[PN_MAX_ADDR];
pn_proactor_addr(addr, sizeof(addr), j.host, j.port);
pn_proactor_listen(j.proactor, j.listener, addr, 16);
}
{
/* Start n-1 threads */
pthread_t* threads = (pthread_t*)calloc(sizeof(pthread_t), j.threads);
size_t i;
for (i = 0; i < j.threads-1; ++i) {
pthread_create(&threads[i], NULL, j_thread, &j);
}
j_thread(&j); /* Use the main thread too. */
/* Join the other threads */
for (i = 0; i < j.threads-1; ++i) {
pthread_join(threads[i], NULL);
}
pn_proactor_free(j.proactor);
free(threads);
if (j.srv_domain)
pn_tls_config_free(j.srv_domain);
if (j.cli_domain)
pn_tls_config_free(j.cli_domain);
return 0;
}
}