RaspberrPi project source code
Guo Wenxue
2024-12-29 e30a4c8103e221201e5bfc1e3f9b19e7a86f68d4
commit | author | age
e30a4c 1 /********************************************************************************
GW 2  *      Copyright:  (C) 2021 LingYun IoT System Studio
3  *                  All rights reserved.
4  *
5  *       Filename:  mqtt.h
6  *    Description:  This head file is MQTT subscriber and publisher thread code
7  *
8  *        Version:  1.0.0(20/04/21)
9  *         Author:  Guo Wenxue <guowenxue@gmail.com>
10  *      ChangeLog:  1, Release initial version on "20/04/21 15:46:42"
11  *
12  ********************************************************************************/
13
14 #include <string.h>
15 #include <math.h>
16 #include <cjson/cJSON.h>
17 #include <mosquitto.h>
18 #include <errno.h>
19
20 #include "util_proc.h"
21 #include "logger.h"
22 #include "hal.h"
23 #include "conf.h"
24
25
26 /*+-------------------------------------------+
27  *|                                           |
28  *|    MQTT publisher thread worker code      |
29  *|                                           |
30  *+-------------------------------------------+*/
31
32
33 void pub_connect_callback(struct mosquitto *mosq, void *userdata, int result)
34 {
35     iotd_ctx_t             *ctx = (iotd_ctx_t *)userdata;
36     mqtt_ctx_t             *mqtt_ctx;
37     hal_ctx_t              *hal_ctx;
38     int                     rv = 0;
39     char                    msg[128];
40     float                   temp = 0.0; /* temperature */
41     float                   rh = 0.0;   /* relative humidity */
42     int                     retain = 0;
43
44     mqtt_ctx = &ctx->mqtt_ctx;
45     hal_ctx = &ctx->hal_ctx;
46
47
48     if( result )
49     {
50         log_error("Publisher connect to broker server[%s:%d] failed, rv=%d\n", mqtt_ctx->host, mqtt_ctx->port, result);
51         return ;
52     }
53     log_info("Publisher connect to broker server[%s:%d] successfully\n", mqtt_ctx->host, mqtt_ctx->port);
54
55
56     if( hal_ctx->ds18b20_enable )
57     {
58         memset(msg, 0, sizeof(msg));
59
60         if( ds18b20_get_temperature(&temp) < 0 )
61             snprintf(msg, sizeof(msg), "{ \"id\":%s, \"temp\":\"error\" }", mqtt_ctx->id);
62         else
63             snprintf(msg, sizeof(msg), "{ \"id\":%s, \"temp\":\"%.2f\" }", mqtt_ctx->id, temp);
64
65         rv = mosquitto_publish(mosq, NULL, mqtt_ctx->pubTopic, strlen(msg), msg, mqtt_ctx->pubQos, retain);
66         if( rv )
67         {
68             log_error("Publisher broadcast message '%s' failure: %d\n", msg, rv);
69         }
70         else
71         {
72             log_info("Publisher broadcast message '%s' ok\n", msg);
73         }
74     }
75
76     if( hal_ctx->sht2x_enable )
77     {
78         memset(msg, 0, sizeof(msg));
79
80         if( sht2x_get_temp_humidity(&temp, &rh) < 0 )
81             snprintf(msg, sizeof(msg), "{ \"id\":%s, \"temp\":\"error\", \"RH\":\"error\" }", mqtt_ctx->id);
82         else
83             snprintf(msg, sizeof(msg), "{ \"id\":%s, \"temp\":\"%.2f\", \"RH\":\"%.2f\" }", mqtt_ctx->id, temp, rh);
84
85         rv = mosquitto_publish(mosq, NULL, mqtt_ctx->pubTopic, strlen(msg), msg, mqtt_ctx->pubQos, retain);
86         if( rv )
87         {
88             log_error("Publisher broadcast message '%s' failure: %d\n", msg, rv);
89         }
90         else
91         {
92             log_info("Publisher broadcast message '%s' ok\n", msg);
93         }
94     }
95
96
97     log_info("Publisher broadcast over and disconnect broker now\n", mqtt_ctx->host, mqtt_ctx->port);
98     mosquitto_disconnect(mosq);
99
100     return ;
101 }
102
103
104 void *mqtt_pub_worker(void *args)
105 {
106     struct mosquitto       *mosq;
107     bool                    session = true;
108
109     iotd_ctx_t             *ctx = (iotd_ctx_t *)args;
110     mqtt_ctx_t             *mqtt_ctx;
111
112     if( !ctx )
113     {
114         log_error("Invalid input arguments\n");
115         return NULL;
116     }
117
118     mqtt_ctx = &ctx->mqtt_ctx;
119
120
121     mosq = mosquitto_new(NULL, session, ctx);
122     if( !mosq )
123     {
124         log_error("mosquitto_new failure\n");
125         return NULL;
126     }
127
128     /* set connnect to broker username and password  */
129     if( strlen(mqtt_ctx->uid)> 0  && strlen(mqtt_ctx->pwd)> 0 )
130         mosquitto_username_pw_set(mosq, mqtt_ctx->uid, mqtt_ctx->pwd);
131
132     /* set callback functions */
133     mosquitto_connect_callback_set(mosq, pub_connect_callback);
134
135     while( !g_signal.stop )
136     {
137         /* connect to MQTT broker  */
138         if( mosquitto_connect(mosq, mqtt_ctx->host, mqtt_ctx->port, mqtt_ctx->keepalive) )
139         {
140             log_error("Publisher connect to broker[%s:%d] failure: %s\n", mqtt_ctx->host, mqtt_ctx->port, strerror(errno));
141             msleep(1000);
142             continue;
143         }
144
145         /* -1: use default timeout 1000ms  1: unused */
146         mosquitto_loop_forever(mosq, -1, 1);
147
148         /* Publisher broadcast sensors data message interval time, unit seconds */
149         sleep( mqtt_ctx->interval );
150     }
151
152     mosquitto_destroy(mosq);
153     return NULL;
154 }
155
156 /*+-------------------------------------------+
157  *|                                           |
158  *|    MQTT Subscriber thread worker code     |
159  *|                                           |
160  *+-------------------------------------------+*/
161
162 void sub_connect_callback(struct mosquitto *mosq, void *userdata, int result)
163 {
164     iotd_ctx_t             *ctx = (iotd_ctx_t *)userdata;
165
166     if( result )
167     {
168         log_error("Subscriber connect to broker server failed, rv=%d\n", result);
169         return ;
170     }
171
172     log_info("Subscriber connect to broker server[%s:%d] successfully\n", ctx->mqtt_ctx.host, ctx->mqtt_ctx.port);
173
174     log_info("Subscriber subTopic '%s' with Qos[%d]\n", ctx->mqtt_ctx.subTopic, ctx->mqtt_ctx.subQos);
175     mosquitto_subscribe(mosq, NULL, ctx->mqtt_ctx.subTopic, ctx->mqtt_ctx.subQos);
176 }
177
178 void sub_disconnect_callback(struct mosquitto *mosq, void *userdata, int result)
179 {
180     iotd_ctx_t             *ctx = (iotd_ctx_t *)userdata;
181
182     log_warn("Subscriber disconnect to broker server[%s:%d], reason=%d\n",
183             ctx->mqtt_ctx.host, ctx->mqtt_ctx.port, result);
184 }
185
186 void proc_json_items(cJSON *root)
187 {
188     int                    i;
189     char                  *value;
190     cJSON                 *item;
191
192     if( !root )
193     {
194         log_error("Invalid input arguments $root\n");
195         return ;
196     }
197
198     for( i=0; i<cJSON_GetArraySize(root); i++ )
199     {
200         item = cJSON_GetArrayItem(root, i);
201         if( !item )
202             break;
203
204         /* if item is cJSON_Object, then recursive call proc_json */
205         if( cJSON_Object == item->type )
206         {
207             proc_json_items(item);
208         }
209         else if( cJSON_Array != item->type )
210         {
211             value = cJSON_Print(item);
212
213             log_debug("JSON Parser key: %s value: %s\n", item->string, value);
214             if( strstr(item->string, "light") || strstr(item->string, "led") || strstr(item->string, "relay"))
215             {
216                 if( strstr(value, "on") || strstr(value, "off") )
217                 {
218                     log_info("parser get turn '%s' %s from JSON string\n", item->string, value);
219                     gpio_out(item->string, value);
220                 }
221             }
222
223             free(value); /* must free it, or it will result memory leak */
224         }
225     }
226 }
227
228 void sub_message_callback(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *message)
229 {
230     iotd_ctx_t             *ctx = (iotd_ctx_t *)userdata;
231
232     cJSON                  *root = NULL;
233     cJSON                  *item;
234     char                   *value;
235
236
237     if ( !message->payloadlen )
238     {
239         log_error("%s (null)\n", message->topic);
240         return ;
241     }
242
243     log_debug("Subscriber receive message: '%s'\n", message->payload);
244
245     root = cJSON_Parse(message->payload);
246     if( !root )
247     {
248         log_error("cJSON_Parse parser failure: %s\n", cJSON_GetErrorPtr());
249         return ;
250     }
251
252     /* check ID matched or not */
253     item = cJSON_GetObjectItem(root, "id");
254     if( !item )
255     {
256         log_error("cJSON_Parse get ID failure: %s\n", cJSON_GetErrorPtr());
257         goto OUT;
258     }
259
260     value = cJSON_PrintUnformatted(item);
261     if( strcasecmp(value, ctx->mqtt_ctx.id) )
262     {
263         log_warn("cJSON_Parse get ID not matchs [%s<->%s], drop this message!\n", value, ctx->mqtt_ctx.id);
264         free(value);
265         goto OUT;
266     }
267
268     free(value);
269
270     /* proc JSON mesage */
271     proc_json_items(root);
272
273 OUT:
274     cJSON_Delete(root); /* must delete it, or it will result memory leak */
275     return ;
276 }
277
278
279 void *mqtt_sub_worker(void *args)
280 {
281     struct mosquitto       *mosq;
282     bool                    session = true;
283
284     iotd_ctx_t             *ctx = (iotd_ctx_t *)args;
285     mqtt_ctx_t             *mqtt_ctx;
286
287     if( !ctx )
288     {
289         log_error("Invalid input arguments\n");
290         return NULL;
291     }
292
293     mqtt_ctx = &ctx->mqtt_ctx;
294
295
296     mosq = mosquitto_new(NULL, session, ctx);
297     if( !mosq )
298     {
299         log_error("mosquitto_new failure\n");
300         return NULL;
301     }
302
303     /* set connnect to broker username and password  */
304     if( strlen(mqtt_ctx->uid)> 0  && strlen(mqtt_ctx->pwd)> 0 )
305         mosquitto_username_pw_set(mosq, mqtt_ctx->uid, mqtt_ctx->pwd);
306
307     /* set callback functions */
308     mosquitto_connect_callback_set(mosq, sub_connect_callback);
309     mosquitto_disconnect_callback_set(mosq, sub_disconnect_callback);
310     mosquitto_message_callback_set(mosq, sub_message_callback);
311
312     while( !g_signal.stop )
313     {
314         /* connect to MQTT broker  */
315         if( mosquitto_connect(mosq, mqtt_ctx->host, mqtt_ctx->port, mqtt_ctx->keepalive) )
316         {
317             log_error("Subscriber connect to broker[%s:%d] failure: %s\n", mqtt_ctx->host, mqtt_ctx->port, strerror(errno));
318             msleep(1000);
319             continue;
320         }
321
322         /* -1: use default timeout 1000ms  1: unused */
323         mosquitto_loop_forever(mosq, -1, 1);
324     }
325
326     mosquitto_destroy(mosq);
327     return NULL;
328 }
329