* Copyright: (C) 2021 LingYun IoT System Studio
* All rights reserved.
* Filename: mqtt.h
* Description: This head file is MQTT subscriber and publisher thread code
* Version: 1.0.0(20/04/21)
* Author: Guo Wenxue <guowenxue@gmail.com>
* ChangeLog: 1, Release initial version on "20/04/21 15:46:42"
#include <string.h>
#include <math.h>
#include <cjson/cJSON.h>
#include <mosquitto.h>
#include <errno.h>
#include "util_proc.h"
#include "logger.h"
#include "hal.h"
#include "conf.h"
*| |
*| MQTT publisher thread worker code |
*| |
void pub_connect_callback(struct mosquitto *mosq, void *userdata, int result)
iotd_ctx_t *ctx = (iotd_ctx_t *)userdata;
mqtt_ctx_t *mqtt_ctx;
hal_ctx_t *hal_ctx;
int rv = 0;
char msg[128];
float temp = 0.0; /* temperature */
float rh = 0.0; /* relative humidity */
int retain = 0;
mqtt_ctx = &ctx->mqtt_ctx;
hal_ctx = &ctx->hal_ctx;
if( result )
log_error("Publisher connect to broker server[%s:%d] failed, rv=%d\n", mqtt_ctx->host, mqtt_ctx->port, result);
return ;
log_info("Publisher connect to broker server[%s:%d] successfully\n", mqtt_ctx->host, mqtt_ctx->port);
if( hal_ctx->ds18b20_enable )
memset(msg, 0, sizeof(msg));
if( ds18b20_get_temperature(&temp) < 0 )
snprintf(msg, sizeof(msg), "{ \"id\":%s, \"temp\":\"error\" }", mqtt_ctx->id);
snprintf(msg, sizeof(msg), "{ \"id\":%s, \"temp\":\"%.2f\" }", mqtt_ctx->id, temp);
rv = mosquitto_publish(mosq, NULL, mqtt_ctx->pubTopic, strlen(msg), msg, mqtt_ctx->pubQos, retain);
if( rv )
log_error("Publisher broadcast message '%s' failure: %d\n", msg, rv);
log_info("Publisher broadcast message '%s' ok\n", msg);
if( hal_ctx->sht2x_enable )
memset(msg, 0, sizeof(msg));
if( sht2x_get_temp_humidity(&temp, &rh) < 0 )
snprintf(msg, sizeof(msg), "{ \"id\":%s, \"temp\":\"error\", \"RH\":\"error\" }", mqtt_ctx->id);
snprintf(msg, sizeof(msg), "{ \"id\":%s, \"temp\":\"%.2f\", \"RH\":\"%.2f\" }", mqtt_ctx->id, temp, rh);
rv = mosquitto_publish(mosq, NULL, mqtt_ctx->pubTopic, strlen(msg), msg, mqtt_ctx->pubQos, retain);
if( rv )
log_error("Publisher broadcast message '%s' failure: %d\n", msg, rv);
log_info("Publisher broadcast message '%s' ok\n", msg);
log_info("Publisher broadcast over and disconnect broker now\n", mqtt_ctx->host, mqtt_ctx->port);
return ;
void *mqtt_pub_worker(void *args)
struct mosquitto *mosq;
bool session = true;
iotd_ctx_t *ctx = (iotd_ctx_t *)args;
mqtt_ctx_t *mqtt_ctx;
if( !ctx )
log_error("Invalid input arguments\n");
return NULL;
mqtt_ctx = &ctx->mqtt_ctx;
mosq = mosquitto_new(NULL, session, ctx);
if( !mosq )
log_error("mosquitto_new failure\n");
return NULL;
/* set connnect to broker username and password */
if( strlen(mqtt_ctx->uid)> 0 && strlen(mqtt_ctx->pwd)> 0 )
mosquitto_username_pw_set(mosq, mqtt_ctx->uid, mqtt_ctx->pwd);
/* set callback functions */
mosquitto_connect_callback_set(mosq, pub_connect_callback);
while( !g_signal.stop )
/* connect to MQTT broker */
if( mosquitto_connect(mosq, mqtt_ctx->host, mqtt_ctx->port, mqtt_ctx->keepalive) )
log_error("Publisher connect to broker[%s:%d] failure: %s\n", mqtt_ctx->host, mqtt_ctx->port, strerror(errno));
/* -1: use default timeout 1000ms 1: unused */
mosquitto_loop_forever(mosq, -1, 1);
/* Publisher broadcast sensors data message interval time, unit seconds */
sleep( mqtt_ctx->interval );
return NULL;
*| |
*| MQTT Subscriber thread worker code |
*| |
void sub_connect_callback(struct mosquitto *mosq, void *userdata, int result)
iotd_ctx_t *ctx = (iotd_ctx_t *)userdata;
if( result )
log_error("Subscriber connect to broker server failed, rv=%d\n", result);
return ;
log_info("Subscriber connect to broker server[%s:%d] successfully\n", ctx->mqtt_ctx.host, ctx->mqtt_ctx.port);
log_info("Subscriber subTopic '%s' with Qos[%d]\n", ctx->mqtt_ctx.subTopic, ctx->mqtt_ctx.subQos);
mosquitto_subscribe(mosq, NULL, ctx->mqtt_ctx.subTopic, ctx->mqtt_ctx.subQos);
void sub_disconnect_callback(struct mosquitto *mosq, void *userdata, int result)
iotd_ctx_t *ctx = (iotd_ctx_t *)userdata;
log_warn("Subscriber disconnect to broker server[%s:%d], reason=%d\n",
ctx->mqtt_ctx.host, ctx->mqtt_ctx.port, result);
void proc_json_items(cJSON *root)
int i;
char *value;
cJSON *item;
if( !root )
log_error("Invalid input arguments $root\n");
return ;
for( i=0; i<cJSON_GetArraySize(root); i++ )
item = cJSON_GetArrayItem(root, i);
if( !item )
/* if item is cJSON_Object, then recursive call proc_json */
if( cJSON_Object == item->type )
else if( cJSON_Array != item->type )
value = cJSON_Print(item);
log_debug("JSON Parser key: %s value: %s\n", item->string, value);
if( strstr(item->string, "light") || strstr(item->string, "led") || strstr(item->string, "relay"))
if( strstr(value, "on") || strstr(value, "off") )
log_info("parser get turn '%s' %s from JSON string\n", item->string, value);
gpio_out(item->string, value);
free(value); /* must free it, or it will result memory leak */
void sub_message_callback(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *message)
iotd_ctx_t *ctx = (iotd_ctx_t *)userdata;
cJSON *root = NULL;
cJSON *item;
char *value;
if ( !message->payloadlen )
log_error("%s (null)\n", message->topic);
return ;
log_debug("Subscriber receive message: '%s'\n", message->payload);
root = cJSON_Parse(message->payload);
if( !root )
log_error("cJSON_Parse parser failure: %s\n", cJSON_GetErrorPtr());
return ;
/* check ID matched or not */
item = cJSON_GetObjectItem(root, "id");
if( !item )
log_error("cJSON_Parse get ID failure: %s\n", cJSON_GetErrorPtr());
goto OUT;
value = cJSON_PrintUnformatted(item);
if( strcasecmp(value, ctx->mqtt_ctx.id) )
log_warn("cJSON_Parse get ID not matchs [%s<->%s], drop this message!\n", value, ctx->mqtt_ctx.id);
goto OUT;
/* proc JSON mesage */
cJSON_Delete(root); /* must delete it, or it will result memory leak */
return ;
void *mqtt_sub_worker(void *args)
struct mosquitto *mosq;
bool session = true;
iotd_ctx_t *ctx = (iotd_ctx_t *)args;
mqtt_ctx_t *mqtt_ctx;
if( !ctx )
log_error("Invalid input arguments\n");
return NULL;
mqtt_ctx = &ctx->mqtt_ctx;
mosq = mosquitto_new(NULL, session, ctx);
if( !mosq )
log_error("mosquitto_new failure\n");
return NULL;
/* set connnect to broker username and password */
if( strlen(mqtt_ctx->uid)> 0 && strlen(mqtt_ctx->pwd)> 0 )
mosquitto_username_pw_set(mosq, mqtt_ctx->uid, mqtt_ctx->pwd);
/* set callback functions */
mosquitto_connect_callback_set(mosq, sub_connect_callback);
mosquitto_disconnect_callback_set(mosq, sub_disconnect_callback);
mosquitto_message_callback_set(mosq, sub_message_callback);
while( !g_signal.stop )
/* connect to MQTT broker */
if( mosquitto_connect(mosq, mqtt_ctx->host, mqtt_ctx->port, mqtt_ctx->keepalive) )
log_error("Subscriber connect to broker[%s:%d] failure: %s\n", mqtt_ctx->host, mqtt_ctx->port, strerror(errno));
/* -1: use default timeout 1000ms 1: unused */
mosquitto_loop_forever(mosq, -1, 1);
return NULL;