/********************************************************************************* * Copyright: (C) 2019 LingYun IoT System Studio * All rights reserved. * * Filename: main.c * Description: This file * * Version: 1.0.0(29/01/19) * Author: Guo Wenxue * ChangeLog: 1, Release initial version on "29/01/19 15:34:41" * ********************************************************************************/ #include #include #include #include #include #include #include #include #include #include #include "logger.h" #include "util_proc.h" #include "modules.h" #include "conf.h" #define PROG_VERSION "v1.0.0" #define DAEMON_PIDFILE "/tmp/.mqtt.pid" void *mqtt_sub_worker(void *args); void *mqtt_pub_worker(void *args); static void program_usage(char *progname) { printf("Usage: %s [OPTION]...\n", progname); printf(" %s is LingYun studio MQTT daemon program running on RaspberryPi\n", progname); printf("\nMandatory arguments to long options are mandatory for short options too:\n"); printf(" -d[debug ] Running in debug mode\n"); printf(" -c[conf ] Specify configure file\n"); printf(" -h[help ] Display this help information\n"); printf(" -v[version ] Display the program version\n"); printf("\n%s version %s\n", progname, PROG_VERSION); return; } int main (int argc, char **argv) { int daemon = 1; pthread_t tid; mqtt_ctx_t ctx; char *conf_file=NULL; int debug = 0; int opt; char *progname=NULL; struct option long_options[] = { {"conf", required_argument, NULL, 'c'}, {"debug", no_argument, NULL, 'd'}, {"version", no_argument, NULL, 'v'}, {"help", no_argument, NULL, 'h'}, {NULL, 0, NULL, 0} }; progname = (char *)basename(argv[0]); /* Parser the command line parameters */ while ((opt = getopt_long(argc, argv, "c:dvh", long_options, NULL)) != -1) { switch (opt) { case 'c': /* Set configure file */ conf_file = optarg; break; case 'd': /* Set debug running */ daemon = 0; debug = 1; break; case 'v': /* Get software version */ printf("%s version %s\n", progname, PROG_VERSION); return 0; case 'h': /* Get help information */ program_usage(progname); return 0; default: break; } } if( !conf_file ) debug = 1; if( mqttd_parser_conf(conf_file, &ctx, debug)<0 ) { fprintf(stderr, "Parser mqtted configure file failure\n"); return -2; } install_default_signal(); if( check_set_program_running(daemon, DAEMON_PIDFILE) < 0 ) goto cleanup; mosquitto_lib_init(); if( thread_start(&tid, mqtt_sub_worker, &ctx ) < 0 ) { log_error("Start MQTT subsciber worker thread failure\n"); goto cleanup; } log_info("Start MQTT subsciber worker thread ok\n"); if( thread_start(&tid, mqtt_pub_worker, &ctx) < 0 ) { log_error("Start MQTT publisher worker thread failure\n"); goto cleanup; } log_info("Start MQTT publisher worker thread ok\n"); while( ! g_signal.stop ) { msleep(1000); } cleanup: mosquitto_lib_cleanup(); log_close(); return 0; } /* ----- End of main() ----- */ void pub_connect_callback(struct mosquitto *mosq, void *userdata, int result) { mqtt_ctx_t *ctx = (mqtt_ctx_t *)userdata; int rv = 0; char msg[128]; float temp = 0.0; /* temperature */ float rh = 0.0; /* relative humidity */ int retain = 0; if( result ) { log_error("Publisher connect to broker server[%s:%d] failed, rv=%d\n", ctx->host, ctx->port, result); return ; } log_info("Publisher connect to broker server[%s:%d] successfully\n", ctx->host, ctx->port); log_debug("Publish topic '%s'\n", ctx->pubTopic); if( ctx->hwconf.ds18b20 ) { memset(msg, 0, sizeof(msg)); log_debug("DS18B20 temperature sensor enabled, start broadcast it\n"); if( ds18b20_get_temperature(&temp) < 0 ) snprintf(msg, sizeof(msg), "{ \"id\":%s, \"temp\":\"error\" }", ctx->devid); else snprintf(msg, sizeof(msg), "{ \"id\":%s, \"temp\":\"%.2f\" }", ctx->devid, temp); rv = mosquitto_publish(mosq, NULL, ctx->pubTopic, strlen(msg), msg, ctx->pubQos, retain); if( rv ) { log_error("Publisher broadcast message '%s' failure: %d\n", msg, rv); } else { log_info("Publisher broadcast message '%s' ok\n", msg); } } if( ctx->hwconf.sht2x ) { memset(msg, 0, sizeof(msg)); log_debug("SHT2X temperature and humidity sensor enabled, start broadcast it\n"); if( sht2x_get_temp_humidity(&temp, &rh) < 0 ) snprintf(msg, sizeof(msg), "{ \"id\":%s, \"temp\":\"error\", \"RH\":\"error\" }", ctx->devid); else snprintf(msg, sizeof(msg), "{ \"id\":%s, \"temp\":\"%.2f\", \"RH\":\"%.2f\" }", ctx->devid, temp, rh); rv = mosquitto_publish(mosq, NULL, ctx->pubTopic, strlen(msg), msg, ctx->pubQos, retain); if( rv ) { log_error("Publisher broadcast message '%s' failure: %d\n", msg, rv); } else { log_info("Publisher broadcast message '%s' ok\n", msg); } } log_info("Publisher broadcast over and disconnect broker now\n", ctx->host, ctx->port); mosquitto_disconnect(mosq); return ; } void *mqtt_pub_worker(void *args) { mqtt_ctx_t *ctx = (mqtt_ctx_t *)args; struct mosquitto *mosq; bool session = true; mosq = mosquitto_new(NULL, session, ctx); if( !mosq ) { log_error("mosquitto_new failure\n"); return NULL; } /* set connnect to broker username and password */ if( strlen(ctx->uid)> 0 && strlen(ctx->pwd)> 0 ) mosquitto_username_pw_set(mosq, ctx->uid, ctx->pwd); /* set callback functions */ mosquitto_connect_callback_set(mosq, pub_connect_callback); while( !g_signal.stop ) { /* connect to MQTT broker */ if( mosquitto_connect(mosq, ctx->host, ctx->port, ctx->keepalive) ) { log_error("Publisher connect to broker[%s:%d] failure: %s\n", ctx->host, ctx->port, strerror(errno)); msleep(1000); continue; } /* -1: use default timeout 1000ms 1: unused */ mosquitto_loop_forever(mosq, -1, 1); /* Publisher broadcast sensors data message interval time, unit seconds */ sleep( ctx->interval ); } mosquitto_destroy(mosq); return NULL; } void sub_connect_callback(struct mosquitto *mosq, void *userdata, int result) { mqtt_ctx_t *ctx = (mqtt_ctx_t *)userdata; if( result ) { log_error("Subscriber connect to broker server failed, rv=%d\n", result); return ; } log_info("Subscriber connect to broker server[%s:%d] successfully\n", ctx->host, ctx->port); mosquitto_subscribe(mosq, NULL, ctx->subTopic, ctx->subQos); } void sub_disconnect_callback(struct mosquitto *mosq, void *userdata, int result) { mqtt_ctx_t *ctx = (mqtt_ctx_t *)userdata; log_warn("Subscriber disconnect to broker server[%s:%d], reason=%d\n", ctx->host, ctx->port, result); } static inline void mqtt_turn_led(int which, char *cmd) { if( strcasestr(cmd, "on") ) turn_led(which, ON); else if( strcasestr(cmd, "off") ) turn_led(which, OFF); } void proc_json_items(cJSON *root, mqtt_ctx_t *ctx) { int i; char *value; cJSON *item; cJSON *array; hwconf_t *hwconf = &ctx->hwconf; if( !root ) { log_error("Invalid input arguments $root\n"); return ; } for( i=0; itype ) { proc_json_items(item, ctx); } else if( cJSON_Array == item->type ) { /* RGB colors led control: {"id":"RPi3B#01", "leds":[{"red":"on","green":"off","blue":"on"}]} */ if( hwconf->led && !strcasecmp(item->string, "leds") ) { array = cJSON_GetArrayItem(item, 0); if( NULL != array ) { cJSON *led_item; if( NULL != (led_item=cJSON_GetObjectItem(array , "red")) ) { log_info("turn red led '%s'\n", led_item->valuestring); mqtt_turn_led(LED_R, led_item->valuestring); } if( NULL != (led_item=cJSON_GetObjectItem(array , "green")) ) { log_info("turn green led '%s'\n", led_item->valuestring); mqtt_turn_led(LED_G, led_item->valuestring); } if( NULL != (led_item=cJSON_GetObjectItem(array , "blue")) ) { log_info("turn blue led '%s'\n", led_item->valuestring); mqtt_turn_led(LED_B, led_item->valuestring); } } } } else { value = cJSON_Print(item); /* light controled by relay: {"id":"RPi3B#01", "light":"on"} */ if( hwconf->relay && !strcasecmp(item->string, "light") ) { if( strcasestr(value, "on") ) { log_info("Turn light on\n"); turn_relay(RELAY1, ON); } else if( strcasestr(value, "off") ) { log_info("Turn light off\n"); turn_relay(RELAY1, OFF); } } /* buzzer controled : {"id":"RPi3B#01", "buzzer":"on"} */ if( hwconf->beeper && !strcasecmp(item->string, "buzzer") ) { if( strcasestr(value, "on") ) { turn_beep(3); } } free(value); /* must free it, or it will result memory leak */ } } } void sub_message_callback(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *message) { mqtt_ctx_t *ctx = (mqtt_ctx_t *)userdata; cJSON *root = NULL; cJSON *item; char *value; if ( !message->payloadlen ) { log_error("%s (null)\n", message->topic); return ; } log_debug("Subscriber receive message: '%s'\n", message->payload); root = cJSON_Parse(message->payload); if( !root ) { log_error("cJSON_Parse parser failure: %s\n", cJSON_GetErrorPtr()); return ; } item = cJSON_GetObjectItem(root, "id"); if( !item ) { log_error("cJSON_Parse get ID failure: %s\n", cJSON_GetErrorPtr()); goto cleanup; } value = cJSON_PrintUnformatted(item); if( strcasecmp(value, ctx->devid) ) { free(value); goto cleanup; } free(value); log_info("Subscriber receive message: '%s'\n", message->payload); proc_json_items(root, ctx); cleanup: cJSON_Delete(root); /* must delete it, or it will result memory leak */ return ; } void *mqtt_sub_worker(void *args) { mqtt_ctx_t *ctx = (mqtt_ctx_t *)args; struct mosquitto *mosq; bool session = true; mosq = mosquitto_new(NULL, session, ctx); if( !mosq ) { log_error("mosquitto_new failure\n"); return NULL; } /* set connnect to broker username and password */ if( strlen(ctx->uid)> 0 && strlen(ctx->pwd)> 0 ) mosquitto_username_pw_set(mosq, ctx->uid, ctx->pwd); /* set callback functions */ mosquitto_connect_callback_set(mosq, sub_connect_callback); mosquitto_disconnect_callback_set(mosq, sub_disconnect_callback); mosquitto_message_callback_set(mosq, sub_message_callback); while( !g_signal.stop ) { /* connect to MQTT broker */ if( mosquitto_connect(mosq, ctx->host, ctx->port, ctx->keepalive) ) { log_error("Subscriber connect to broker[%s:%d] failure: %s\n", ctx->host, ctx->port, strerror(errno)); msleep(1000); continue; } /* -1: use default timeout 1000ms 1: unused */ mosquitto_loop_forever(mosq, -1, 1); } mosquitto_destroy(mosq); return NULL; }