IndustrialDeviceController/Software/HighLevelApp/init/adapter.c (858 lines of code) (raw):
/* Copyright (c) Microsoft Corporation. All rights reserved.
Licensed under the MIT License. */
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <strings.h>
#include <time.h>
#include <math.h>
#include <stdbool.h>
#include <sys/types.h>
#include <unistd.h>
#include <applibs/log.h>
#include <applibs/storage.h>
#include <applibs/eventloop.h>
#include <init/adapter.h>
#include <init/device_hal.h>
#include <init/globals.h>
#include <iot/diag.h>
#include <iot/iot.h>
#include <utils/event_loop_timer.h>
#include <utils/llog.h>
#include <utils/memory.h>
#include <utils/network.h>
#include <utils/timer.h>
#include <utils/utils.h>
#include <utils/led.h>
#include <frozen/frozen.h>
#include <safeclib/safe_lib.h>
extern volatile bool g_app_running;
#define PIPE_READ_END 0
#define PIPE_WRITE_END 1
#define MAX_CONNECTION_STRING_SIZE 100
const char PROV_FILE_MAGIC[8] = {'P', 'R', 'O', 'V', ' ', 'V', '0', '1'};
struct prov_file_hdr_t {
char magic[8]; // "PROV V01"
uint32_t hashcode;
int32_t size;
};
typedef struct adapter_t adapter_t;
struct adapter_t {
int64_t provision_epoch;
char* name;
char* location;
char* source_id;
int32_t num_device;
int32_t num_schema;
link_t uplink;
link_t downlink;
ce_device_t* devices;
data_schema_t* schemas;
device_driver_t *driver;
uint32_t driver_state;
struct timespec last_provisioned;
pthread_t worker_tid;
// worker thread -> main thread RESULT queue
int result_pipe[2];
EventRegistration *result_io;
event_loop_timer_t *notify_timer;
EventLoop *eloop;
char *pending_provision;
pthread_mutex_t mutex;
pthread_cond_t cond;
ce_device_t *pending_device;
ce_device_t *ready_device;
};
static adapter_t s_adapter;
// load local provision, return hashcode and null terminated provision
// caller response to free provision
static int load_local_provision(char **p_provision)
{
ASSERT(p_provision);
struct prov_file_hdr_t hdr;
int fd = Storage_OpenMutableFile();
if (fd < 0) {
LOGE("Failed to open mutable storage");
return -1;
}
if (lseek(fd, PROVISION_FILE_OFFSET, SEEK_SET) != PROVISION_FILE_OFFSET) {
LOGW("Prvoision file not exit");
close(fd);
return -1;
}
if (read(fd, &hdr, sizeof(hdr)) != sizeof(hdr)) {
LOGE("Failed to read prov file header");
close(fd);
return -1;
}
if (memcmp(hdr.magic, PROV_FILE_MAGIC, sizeof(PROV_FILE_MAGIC)) != 0) {
LOGW("provision file magic mismatch");
close(fd);
return -1;
}
if (hdr.size < 0 || hdr.size > PROVISION_FILE_MAX_SIZE) {
LOGW("provision file size invalid");
close(fd);
return -1;
}
char *provision = (char *)MALLOC(hdr.size + 1);
if (read(fd, provision, hdr.size) != hdr.size) {
close(fd);
LOGW("Failed to read provision content");
FREE(provision);
return -1;
}
close(fd);
provision[hdr.size] = 0;
if (hash(provision, hdr.size) != hdr.hashcode) {
LOGW("provision hashcode not match");
FREE(provision);
return -1;
}
*p_provision = provision;
LOGI("load_local_provision: hash=%x", hdr.hashcode);
return 0;
}
static int save_local_provision(const char *provision, size_t provision_size)
{
ASSERT(provision);
int fd = Storage_OpenMutableFile();
if (fd < 0) {
LOGE("Failed to open mutable storage");
return -1;
}
struct prov_file_hdr_t hdr;
memcpy_s(hdr.magic, sizeof(hdr.magic), PROV_FILE_MAGIC, sizeof(PROV_FILE_MAGIC));
hdr.size = provision_size;
hdr.hashcode = hash(provision, provision_size);
lseek(fd, PROVISION_FILE_OFFSET, SEEK_SET);
if (write(fd, &hdr, sizeof(hdr)) != sizeof(hdr)) {
close(fd);
LOGE("Failed to write provision header");
return -1;
}
if (write(fd, provision, hdr.size) != hdr.size) {
close(fd);
LOGE("Failed to write provision content");
return -1;
}
fsync(fd);
close(fd);
LOGI("save_local_provision: hash=%x", hdr.hashcode);
return 0;
}
// double to str and trim tailing '0's
static char *double_to_str(double d)
{
static char buf[TELEMETRY_MAX_VALUE_SIZE];
int nchar = snprintf(buf, sizeof(buf), "%.3f", d);
for (int i = nchar - 1; i; i--) {
if (buf[i] == '0') {
buf[i] = 0;
} else if (buf[i] == '.') {
buf[i] = 0;
break;
} else {
break;
}
}
return buf;
}
static int printf_points(struct json_out *out, va_list *ap)
{
ce_device_t *device = va_arg(*ap, struct ce_device_t *);
int force = va_arg(*ap, int);
int len = json_printf(out, "[[%Q,\"%d\"]", "ERROR_CODE", device->err);
for (int i = 0; i < device->schema->num_point; i++) {
// skip unchanged value if COV flag set
if (!force && !IS_COV(device->telemetry, i)) {
continue;
}
if (IS_STR_VALUE(device->telemetry, i)) {
len += json_printf(out, ",[%Q,%Q]", device->schema->points[i].key, device->telemetry->values[i].str);
} else {
if (isnan(device->telemetry->values[i].num)) {
len += json_printf(out, ",[%Q,%s]", device->schema->points[i].key, "null");
} else {
len += json_printf(out, ",[%Q,%Q]", device->schema->points[i].key, double_to_str(device->telemetry->values[i].num));
}
}
}
len += json_printf(out, "]");
return len;
}
static struct timespec calc_telemetry_timestamp(const ce_device_t *device)
{
struct timespec ts = now();
if (device->schema->flags & FLAG_CE_TIMESTAMP) {
for (int i = 0; i < device->schema->num_point; i++) {
if (IS_NUM_VALUE(device->telemetry, i) &&
(strcasecmp(device->schema->points[i].key, "timestamp") == 0) &&
(!isnan(device->telemetry->values[i].num))) {
ts.tv_sec = device->telemetry->values[i].num;
ts.tv_nsec = 0;
}
}
}
return ts;
}
static char *build_telemetry_message(const ce_device_t *device, bool force)
{
ASSERT(device);
return json_asprintf("{timestamp:%Q,name:%Q,location:%Q,point:%M}",
timespec2str(calc_telemetry_timestamp(device)),
device->name,
device->location,
printf_points, device, force);
}
static void telemetry_message_delivered(bool delivered, void *context)
{
if (delivered) {
LOGI("Telemetry delivered");
} else {
LOGW("Telemetry delivery failed");
diag_log_event(EVENT_TELEMETRY_FAILED);
}
}
static void send_telemetry_message(const ce_device_t *device, bool force)
{
ASSERT(device);
LOGI("[%s] Send telemetry to iothub, status=%s", err_str(device->err));
char *message = build_telemetry_message(device, force);
const char *message_type = IOT_MESSAGE_TYPE_TELEMETRY;
int err = iot_send_message_async(message, message_type, telemetry_message_delivered, NULL);
if (err != 0) {
LOGW("Failed to send telemetry message");
diag_log_event(EVENT_TELEMETRY_FAILED);
}
FREE(message);
}
// schema name format: <name>[:<offset>][:<channel>]
static data_schema_t *parse_schema(data_schema_t *schemas, const char *schema_name)
{
char *colon = strchr(schema_name, ':');
size_t len = 0;
if (colon) {
len = colon - schema_name;
} else {
len = strlen(schema_name);
}
for (data_schema_t *schema = schemas; schema; schema = schema->next) {
if ((len == strlen(schema->name)) && strncmp(schema->name, schema_name, len) == 0) {
return schema;
}
}
return NULL;
}
// schema name format: <name>[:<offset>][:<channel>]
static int32_t parse_schema_offset(const char *schema_name)
{
if (!schema_name) {
return 0;
}
char *offset_str = strchr(schema_name, ':');
if (offset_str && offset_str[1]) {
return strtol(offset_str + 1, NULL, 10);
}
return 0;
}
// schema name format: <name>[:<offset>][:<channel>]
// this will be deprecated as we now get channel from device->id section
static uint32_t parse_schema_channel(const char *schema_name)
{
if (!schema_name) {
return 0;
}
char *offset_str = strchr(schema_name, ':');
if (offset_str && offset_str[1]) {
char *channel_str = strchr(offset_str + 1, ':');
if (channel_str && channel_str[1]) {
return strtol(channel_str + 1, NULL, 10);
}
}
return 0u;
}
static uint32_t parse_flag(const char *flag_str, int len)
{
if (!flag_str || len <= 0) {
return FLAG_NONE;
}
if ((strlen(FLAG_NO_BATCH_STR) == len) && (strncasecmp(flag_str, FLAG_NO_BATCH_STR, len) == 0)) {
return FLAG_NO_BATCH;
}
if ((strlen(FLAG_CE_TIMESTAMP_STR) == len) && (strncasecmp(flag_str, FLAG_CE_TIMESTAMP_STR, len) == 0)) {
return FLAG_CE_TIMESTAMP;
}
if ((strlen(FLAG_COV_STR) == len) && (strncasecmp(flag_str, FLAG_COV_STR, len) == 0)) {
return FLAG_COV;
}
return 0u;
}
static void destroy_device_telemetry(telemetry_t *telemetry)
{
if (!telemetry) {
return;
}
// free string value
for (int i = 0; i < telemetry->num_values; i++) {
if (IS_STR_VALUE(telemetry, i)) {
FREE(telemetry->values[i].str);
}
}
FREE(telemetry->values);
FREE(telemetry->cov_mask);
FREE(telemetry->str_mask);
FREE(telemetry);
}
static telemetry_t* create_empty_device_telemetry(int num_values)
{
telemetry_t *telemetry = CALLOC(1, sizeof(telemetry_t));
telemetry->cov_mask = CALLOC(1, (num_values + 7) / 8);
telemetry->str_mask = CALLOC(1, (num_values + 7) / 8);
telemetry->values = CALLOC(num_values, sizeof(telemetry_value_t));
telemetry->num_values = num_values;
for (int i=0; i<num_values; i++) {
telemetry->values[i].num = NAN;
}
return telemetry;
}
static void destroy_schema(data_schema_t *schema)
{
ASSERT(schema);
FREE(schema->name);
if (schema->points) {
destroy_point_table(schema->protocol, schema->points, schema->num_point);
}
FREE(schema);
}
static void destroy_device(ce_device_t *device)
{
ASSERT(device);
FREE(device->name);
FREE(device->connection);
FREE(device->location);
destroy_device_telemetry(device->telemetry);
device->telemetry = NULL;
FREE(device);
}
static ce_device_t *get_next_device_to_run(ce_device_t *current)
{
// nothing to schedule
if (!s_adapter.devices) {
return NULL;
}
// schedule first device
if (!current) {
return s_adapter.devices;
}
// if only one device, schedule that one again
if (s_adapter.devices->next == NULL) {
return s_adapter.devices;
}
// schedule next device with nearest schedule time, start from next device in line
// so when some device won't starving when not able to catch up
ce_device_t *start = current->next ? current->next : s_adapter.devices;
ce_device_t *device = start;
ce_device_t *next = start;
do {
if (timespec_compare(&device->ts_schedule, &next->ts_schedule) < 0) {
next = device;
}
device = device->next ? device->next : s_adapter.devices;
} while (device != start);
return next;
}
static void query_device(ce_device_t *device)
{
ASSERT(device);
ASSERT(device->schema);
struct timespec poll_sw;
timer_stopwatch_start(&poll_sw);
// since we reuse schema for multiple devices, set schema offest from current device
// before we query device with that schema
device->schema->offset = device->schema_offset;
if (!device->telemetry) {
device->telemetry = create_empty_device_telemetry(device->schema->num_point);
}
if (s_adapter.driver_state == DRIVER_STATE_INIT) {
LOGI("Open device driver");
if (s_adapter.driver->driver_open(s_adapter.driver, device->id, device->timeout) != DEVICE_OK) {
LOGE("Failed to open driver");
return;
}
}
device->err = s_adapter.driver->get_point_list(s_adapter.driver, device->id, device->schema, device->telemetry, device->timeout);
if (device->err) {
LOGE("[%s] Read points failed: %s", device->name, err_str(device->err));
return;
}
device->poll_duration = timer_stopwatch_stop(&poll_sw);
LOGI("[%s] Read points in %d ms", device->name, device->poll_duration);
}
static void reset_adapter(adapter_t *adapter)
{
ASSERT(adapter);
FREE(adapter->name);
FREE(adapter->location);
FREE(adapter->source_id);
FREE(adapter->uplink.if_name);
FREE(adapter->uplink.if_data);
FREE(adapter->downlink.if_name);
FREE(adapter->downlink.if_data);
while (adapter->devices) {
ce_device_t *device = adapter->devices;
adapter->devices = adapter->devices->next;
destroy_device(device);
}
while (adapter->schemas) {
data_schema_t *schema = adapter->schemas;
adapter->schemas = adapter->schemas->next;
destroy_schema(schema);
}
if (adapter->driver) {
if (adapter->driver_state & DRIVER_STATE_OPENED) {
adapter->driver->driver_close(adapter->driver);
}
destroy_driver(adapter->driver);
adapter->driver = NULL;
}
adapter->provision_epoch = 0;
adapter->num_device = 0;
adapter->num_schema = 0;
adapter->pending_device = NULL;
adapter->ready_device = NULL;
adapter->driver_state = DRIVER_STATE_INIT;
}
static void scan_link(const char *str, int len, void *user_data)
{
link_t *link = (link_t *)user_data;
json_scanf(str, len, "{interface:%Q, data:%Q}", &link->if_name, &link->if_data);
}
static void scan_protocol(const char *str, int len, void *user_data)
{
data_schema_t *schema = (data_schema_t*)user_data;
char *str_protocol = STRNDUP(str, len);
schema->protocol = str2protocol(str_protocol);
FREE(str_protocol);
}
static void scan_flags(const char *str, int len, void *user_data)
{
data_schema_t *schema = (data_schema_t *)user_data;
struct json_token t_flag;
for (int i = 0; json_scanf_array_elem(str, len, "", i, &t_flag) > 0; i++) {
schema->flags |= parse_flag(t_flag.ptr, t_flag.len);
}
}
static void scan_schema_array(const char *str, int len, void *user_data)
{
if (!str || len <= 0 || !user_data) {
return;
}
struct json_token t = {.ptr=NULL, .len=0, .type=JSON_TYPE_INVALID};
adapter_t *adapter = (adapter_t *)user_data;
// parse schemas array
for (int i = 0; json_scanf_array_elem(str, len, "", i, &t) > 0; i++) {
struct json_token t_points_def = {.ptr=NULL, .len=0, .type=JSON_TYPE_INVALID};
data_schema_t *schema = (data_schema_t *)CALLOC(1, sizeof(data_schema_t));
json_scanf(t.ptr, t.len, "{name:%Q, protocol:%M, interval:%d, timeout:%d, flags:%M, points:%T}",
&schema->name,
scan_protocol, schema,
&schema->interval,
&schema->timeout,
scan_flags, schema,
&t_points_def);
if (! schema->name) {
LOGE("missing schema name");
destroy_schema(schema);
return;
}
if (schema->protocol == DEVICE_PROTOCOL_INVALID) {
LOGE("invalid schema protocol");
destroy_schema(schema);
return;
}
if (schema->interval <= 0) {
LOGE("invalid schema interval");
destroy_schema(schema);
return;
}
if (schema->timeout <= 0) {
LOGE("invalid schema timeout");
destroy_schema(schema);
return;
}
if (create_point_table(schema->protocol, &t_points_def, &schema->num_point, &schema->points) != DEVICE_OK) {
LOGE("invalid points defintions");
destroy_schema(schema);
return;
}
schema->integrity_period_ms = DEFAULT_INTEGRITY_PERIOD_MS;
schema->next = adapter->schemas;
adapter->schemas = schema;
adapter->num_schema++;
LOGI("Add schema [name=%s, protocol=%s, interval=%ld, timeout=%ld]", schema->name,
protocol2str(schema->protocol), schema->interval, schema->timeout);
}
}
static const char *get_connection_string(ce_device_t *device)
{
static char conn_str[MAX_CONNECTION_STRING_SIZE];
if (device->connection && s_adapter.downlink.if_data) {
snprintf(conn_str, sizeof(conn_str), "%s,%s", device->connection, s_adapter.downlink.if_data);
return conn_str;
} else if (s_adapter.downlink.if_data) {
return s_adapter.downlink.if_data;
} else if (device->connection) {
return device->connection;
} else {
return NULL;
}
}
static bool is_compatible_driver(device_protocol_t protocol, device_driver_t *driver)
{
device_protocol_t driver_protocol = driver->get_protocol(driver);
return (protocol == driver_protocol);
}
static int find_or_create_driver(adapter_t *adapter, ce_device_t *device)
{
if (!adapter->driver) {
const char* conn_str = get_connection_string(device);
if ((conn_str == NULL) || (adapter->driver = create_driver(device->protocol, conn_str)) == NULL) {
LOGE("failed to create driver");
return -1;
}
} else {
if (!is_compatible_driver(device->protocol, adapter->driver)) {
return -1;
}
}
return 0;
}
static void scan_device_array(const char *str, int len, void *user_data)
{
if (!str || len <= 0 || !user_data) {
return;
}
adapter_t *adapter = (adapter_t *)user_data;
struct json_token t;
for (int i = 0; json_scanf_array_elem(str, len, "", i, &t) > 0; i++) {
char *schema_name = NULL;
char *device_id = NULL;
ce_device_t *device = (ce_device_t *)CALLOC(1, sizeof(ce_device_t));
json_scanf(t.ptr, t.len, "{name:%Q, schema:%Q, id:%Q, connection:%Q, location:%Q, interval:%d, timeout:%d}",
&device->name,
&schema_name,
&device_id,
&device->connection,
&device->location,
&device->interval,
&device->timeout);
// dealling with schema_name and device id first, so we don't bother to
// worry about free them in error case
if (schema_name) {
device->schema = parse_schema(adapter->schemas, schema_name);
device->schema_offset = parse_schema_offset(schema_name);
device->id = parse_schema_channel(schema_name);
FREE(schema_name);
}
if (device_id) {
device->id = strtol(device_id, NULL, 10);
FREE(device_id);
}
if (!device->name) {
LOGE("missing device name");
destroy_device(device);
return;
}
if (!device->schema) {
LOGE("invalid device schema");
destroy_device(device);
return;
}
if (!device->location && adapter->location) {
device->location = STRDUP(adapter->location);
}
if (device->interval <= 0) {
device->interval = device->schema->interval;
}
if (device->timeout <= 0) {
device->timeout = device->schema->timeout;
}
device->protocol = device->schema->protocol;
device->telemetry = NULL;
if (find_or_create_driver(adapter, device) != 0) {
LOGE("failed to find or create device driver");
destroy_device(device);
return;
}
device->err = DEVICE_E_INVALID;
device->next = adapter->devices;
adapter->devices = device;
adapter->num_device++;
LOGI("Add device [name=%s, schema=%s, interval=%ld, timeout=%ld]", device->name, device->schema->name,
device->interval, device->timeout);
}
}
static void scan_provision(const char *str, int len, void *user_data)
{
if (!str || !len || !user_data) {
LOGE("invalid input");
return;
}
adapter_t *adapter = (adapter_t *)user_data;
json_scanf(str, len, "{name:%Q,location:%Q,sourceId:%Q,uplink:%M,downlink:%M}",
&adapter->name,
&adapter->location,
&adapter->source_id,
scan_link, &adapter->uplink,
scan_link, &adapter->downlink);
// some of the schema, device field depend on adapter properties, so make
// sure they been scan first
json_scanf(str, len, "{schemas:%M,devices:%M}",
scan_schema_array, adapter,
scan_device_array, adapter);
}
static bool is_adapter_valid(adapter_t *adapter)
{
if (!adapter) {
return false;
}
if (!adapter->name) {
LOGE("missing adapter name");
return false;
}
if (!adapter->location) {
LOGE("missing adapter location");
return false;
}
if (!adapter->source_id) {
LOGE("missing adapter source id");
return false;
}
return true;
}
static void notify_worker_locked(void *device)
{
s_adapter.pending_device = device;
pthread_cond_signal(&s_adapter.cond);
}
static void notify_worker_callback(void *device)
{
pthread_mutex_lock(&s_adapter.mutex);
notify_worker_locked(device);
pthread_mutex_unlock(&s_adapter.mutex);
}
static void distribute_device_query_time(void)
{
struct timespec ts_start;
clock_gettime(CLOCK_MONOTONIC, &ts_start);
for (ce_device_t *device = s_adapter.devices; device; device = device->next) {
device->ts_schedule = ts_start;
device->last_flush_ts.tv_nsec = device->last_flush_ts.tv_sec = 0;
struct timespec ts_timeout = MS2SPEC(200);
timespec_add(&ts_start, &ts_timeout);
}
}
static int64_t consume_epoch_from_result_pipe(int fd)
{
int64_t epoch = 0;
if (read(fd, &epoch, sizeof(epoch)) < 0) {
LOGE("Failed to retrive device result");
return -1;
}
return epoch;
}
static void report_device_telemetry(ce_device_t *device)
{
bool force = false;
// force flush all data points if reach integrity period or don't support COV
if (device->schema->flags & FLAG_COV) {
struct timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts);
int64_t ms_now = SPEC2MS(ts);
int64_t ms_last = SPEC2MS(device->last_flush_ts);
if (!ms_last || (ms_now - ms_last > device->schema->integrity_period_ms)) {
force = true;
device->last_flush_ts = ts;
}
} else {
force = true;
}
send_telemetry_message(device, force);
// if we don't need to support COV for this device, free up all telemetry
if (!(device->schema->flags & FLAG_COV)) {
destroy_device_telemetry(device->telemetry);
device->telemetry = NULL;
}
}
static void infer_driver_state(void)
{
int ndevice_in_state[DEVICE_E_LAST];
memset(&ndevice_in_state, 0, sizeof(ndevice_in_state));
for (ce_device_t *dev = s_adapter.devices; dev; dev = dev->next) {
ndevice_in_state[dev->err]++;
}
if (ndevice_in_state[DEVICE_E_INVALID] == s_adapter.num_device) {
s_adapter.driver_state = DRIVER_STATE_INIT;
} else if (ndevice_in_state[DEVICE_OK] == s_adapter.num_device) {
s_adapter.driver_state = DRIVER_STATE_NORMAL;
} else if (ndevice_in_state[DEVICE_E_TIMEOUT] == s_adapter.num_device || ndevice_in_state[DEVICE_E_BROKEN] > 0) {
s_adapter.driver_state = DRIVER_STATE_OPENED;
} else {
s_adapter.driver_state = DRIVER_STATE_PARTIAL;
}
}
static void schedule_next_device(ce_device_t *current)
{
ce_device_t *next = get_next_device_to_run(current);
if (next) {
struct timespec ts_next_task, ts_now;
clock_gettime(CLOCK_MONOTONIC, &ts_now);
if (timespec_compare(&next->ts_schedule, &ts_now) > 0) {
// when schedule task, data pointed by param will not been copy over, maintain a static copy
// so it's still avaiable when actual posting
ts_next_task = next->ts_schedule;
timespec_subtract(&ts_next_task, &ts_now);
event_loop_set_timer_and_context(s_adapter.notify_timer, &ts_next_task, NULL, next);
} else {
// when timer running late, reset next run to current time and schedule immediately
// no need to catch up, this to ensure next poll happens with specified interval
LOGW("Timer for %s running late", next->name);
next->ts_schedule = ts_now;
notify_worker_locked(next);
}
}
}
static void handle_device_result(EventLoop *eloop, int fd, EventLoop_IoEvents events, void *context)
{
if (consume_epoch_from_result_pipe(fd) != s_adapter.provision_epoch) {
LOGW("Stale result, ignore");
return;
}
pthread_mutex_lock(&s_adapter.mutex);
if (s_adapter.ready_device) {
report_device_telemetry(s_adapter.ready_device);
infer_driver_state();
if (s_adapter.ready_device->err == DEVICE_OK) {
diag_log_value(s_adapter.ready_device->name, s_adapter.ready_device->poll_duration);
}
}
schedule_next_device(s_adapter.ready_device);
s_adapter.ready_device = NULL;
pthread_mutex_unlock(&s_adapter.mutex);
MEMORY_REPORT(0);
}
static void post_epoch_to_result_pipe(void)
{
if (write(s_adapter.result_pipe[PIPE_WRITE_END],
&s_adapter.provision_epoch, sizeof(s_adapter.provision_epoch)) <= 0) {
LOGE("Failed to post result");
}
}
static void *device_worker_thread(void *vargp)
{
while (g_app_running) {
pthread_mutex_lock(&s_adapter.mutex);
if (!s_adapter.pending_device) {
pthread_cond_wait(&s_adapter.cond, &s_adapter.mutex);
}
if (s_adapter.pending_device) {
// use device->schedule instead of now() to avoid drifting
struct timespec ts_interval = MS2SPEC(s_adapter.pending_device->interval);
timespec_add(&s_adapter.pending_device->ts_schedule, &ts_interval);
query_device(s_adapter.pending_device);
s_adapter.ready_device = s_adapter.pending_device;
s_adapter.pending_device = NULL;
post_epoch_to_result_pipe();
}
pthread_mutex_unlock(&s_adapter.mutex);
}
return NULL;
}
static void apply_local_provision(void)
{
s_adapter.provision_epoch = 0;
char *local_provision = NULL;
if (load_local_provision(&local_provision) == 0) {
adapter_provision(local_provision, strlen(local_provision), false);
FREE(local_provision);
}
}
// ---------------------------- public interface ------------------------------
int adapter_init(EventLoop *eloop)
{
LOGI("adapter init");
s_adapter.eloop = eloop;
if (pipe(s_adapter.result_pipe) == -1) {
LOGE("Failed to create device result queue");
return -1;
}
s_adapter.result_io = EventLoop_RegisterIo(eloop, s_adapter.result_pipe[PIPE_READ_END], EventLoop_Input, handle_device_result, NULL);
if (s_adapter.result_io < 0) {
LOGE("Failed to register event loop for device result");
return -1;
}
s_adapter.notify_timer = event_loop_register_timer(eloop, NULL, NULL, notify_worker_callback, NULL);
if (!s_adapter.notify_timer) {
LOGE("Failed to register notify timer for device");
return -1;
}
if (pthread_mutex_init(&s_adapter.mutex, NULL) != 0) {
LOGE("Failed to create mutex");
return -1;
}
if (pthread_cond_init(&s_adapter.cond, NULL) != 0) {
perror("pthread_cond_init() error");
return -1;
}
// thread need to be created after pipe open
if (pthread_create(&s_adapter.worker_tid, NULL, device_worker_thread, &s_adapter) != 0) {
LOGE("Failed to create worker thread");
return -1;
}
apply_local_provision();
return 0;
}
void adapter_deinit()
{
pthread_cond_signal(&s_adapter.cond);
pthread_join(s_adapter.worker_tid, NULL);
pthread_mutex_destroy(&s_adapter.mutex);
pthread_cond_destroy(&s_adapter.cond);
EventLoop_UnregisterIo(s_adapter.eloop, s_adapter.result_io);
close(s_adapter.result_pipe[PIPE_READ_END]);
close(s_adapter.result_pipe[PIPE_WRITE_END]);
event_loop_unregister_timer(s_adapter.eloop, s_adapter.notify_timer);
reset_adapter(&s_adapter);
}
void adapter_provision(const char *provision, size_t provision_size, bool flush)
{
ASSERT(provision);
int64_t epoch;
json_scanf(provision, provision_size, "{epoch:%lld}", &epoch);
LOGD("adapter_provision: epoch=%lld", epoch);
iot_report_device_twin_async("{\"provision\":null}", NULL, NULL);
pthread_mutex_lock(&s_adapter.mutex);
if (epoch == s_adapter.provision_epoch) {
diag_log_event(EVENT_PROVISION);
LOGI("provision is not changed");
}
else {
reset_adapter(&s_adapter);
json_scanf(provision, provision_size, "{data:%M}", scan_provision, &s_adapter);
event_loop_cancel_timer(s_adapter.notify_timer);
if (is_adapter_valid(&s_adapter)) {
network_config(&s_adapter.uplink, &s_adapter.downlink);
if (flush) {
save_local_provision(provision, provision_size);
}
s_adapter.provision_epoch = epoch;
if (s_adapter.num_device > 0) {
distribute_device_query_time();
notify_worker_locked(s_adapter.devices);
}
diag_log_event(EVENT_PROVISION);
LOGI("provision succeed");
}
else {
reset_adapter(&s_adapter);
diag_log_event(EVENT_PROVISION_FAILED);
LOGE("provision failed");
}
}
clock_gettime(CLOCK_BOOTTIME, &s_adapter.last_provisioned);
pthread_mutex_unlock(&s_adapter.mutex);
MEMORY_REPORT(0);
}
const char *adapter_get_name(void)
{
return s_adapter.name;
}
const char* adapter_get_location(void)
{
return s_adapter.location;
}
const char* adapter_get_source_id(void)
{
return s_adapter.source_id;
}
ce_device_t *adapter_get_devices(void)
{
return s_adapter.devices;
}
struct timespec adapter_last_provisioned(void)
{
return s_adapter.last_provisioned;
}
uint32_t adapter_get_driver_state(void)
{
return s_adapter.driver_state;
}