RaspberrPi project source code
Guo Wenxue
6 days ago f7889e2ceddbc3e15ea4b5377d831f4432169f76
commit | author | age
d6b4a7 1 /*********************************************************************************
G 2  *      Copyright:  (C) 2019 LingYun IoT System Studio
3  *                  All rights reserved.
4  *
5  *       Filename:  main.c
6  *    Description:  This file
7  *
8  *        Version:  1.0.0(29/01/19)
9  *         Author:  Guo Wenxue <guowenxue@gmail.com>
10  *      ChangeLog:  1, Release initial version on "29/01/19 15:34:41"
11  *
12  ********************************************************************************/
13 #include <stdio.h>
14 #include <stdlib.h>
15 #include <unistd.h>
16 #include <time.h>
17 #include <getopt.h>
18 #include <libgen.h>
19 #include <string.h>
20 #include <errno.h>
21
22 #include <mosquitto.h>
23 #include <cjson/cJSON.h>
24
25 #include "logger.h"
26 #include "util_proc.h"
27 #include "modules.h"
28 #include "conf.h"
29
30 #define PROG_VERSION               "v1.0.0"
31 #define DAEMON_PIDFILE             "/tmp/.mqtt.pid"
32
33 void *mqtt_sub_worker(void *args);
34 void *mqtt_pub_worker(void *args);
35
36 static void program_usage(char *progname)
37 {
38
39     printf("Usage: %s [OPTION]...\n", progname);
40     printf(" %s is LingYun studio MQTT daemon program running on RaspberryPi\n", progname);
41
42     printf("\nMandatory arguments to long options are mandatory for short options too:\n");
43     printf(" -d[debug   ]  Running in debug mode\n");
44     printf(" -c[conf    ]  Specify configure file\n");
45     printf(" -h[help    ]  Display this help information\n");
46     printf(" -v[version ]  Display the program version\n");
47
48     printf("\n%s version %s\n", progname, PROG_VERSION);
49     return;
50 }
51
52 int main (int argc, char **argv)
53 {
54     int                daemon = 1;
55     pthread_t          tid;
56     mqtt_ctx_t         ctx;
57     char               *conf_file=NULL;
58     int                debug = 0;
59     int                opt;
60     char              *progname=NULL;
61
62     struct option long_options[] = {
63         {"conf", required_argument, NULL, 'c'},
64         {"debug", no_argument, NULL, 'd'},
65         {"version", no_argument, NULL, 'v'},
66         {"help", no_argument, NULL, 'h'},
67         {NULL, 0, NULL, 0}
68     };
69
70     progname = (char *)basename(argv[0]);
71
72     /* Parser the command line parameters */
73     while ((opt = getopt_long(argc, argv, "c:dvh", long_options, NULL)) != -1)
74     {
75         switch (opt)
76         {
77             case 'c': /* Set configure file */
78                 conf_file = optarg;
79                 break;
80
81             case 'd': /* Set debug running */
82                 daemon = 0;
83                 debug = 1;
84                 break;
85
86             case 'v':  /* Get software version */
87                 printf("%s version %s\n", progname, PROG_VERSION);
88                 return 0;
89
90             case 'h':  /* Get help information */
91                 program_usage(progname);
92                 return 0;
93
94             default:
95                 break;
96         }
97
98     }
99
100     if( !conf_file )
101         debug = 1;
102
103     if( mqttd_parser_conf(conf_file, &ctx, debug)<0 )
104     {
105         fprintf(stderr, "Parser mqtted configure file failure\n");
106         return -2;
107     }
108
109     install_default_signal();
110
111     if( check_set_program_running(daemon, DAEMON_PIDFILE) < 0 )
112         goto cleanup;
113
114     mosquitto_lib_init();
115
116     if( thread_start(&tid, mqtt_sub_worker, &ctx ) < 0 )
117     {
118         log_error("Start MQTT subsciber worker thread failure\n");
119         goto cleanup;
120     }
121     log_info("Start MQTT subsciber worker thread ok\n");
122
123     if( thread_start(&tid, mqtt_pub_worker, &ctx) < 0 )
124     {
125         log_error("Start MQTT publisher worker thread failure\n");
126         goto cleanup;
127     }
128     log_info("Start MQTT publisher worker thread ok\n");
129
130     while( ! g_signal.stop )
131     {
132         msleep(1000);
133     }
134
135 cleanup:
136     mosquitto_lib_cleanup();
137     log_close();
138
139     return 0;
140 } /* ----- End of main() ----- */
141
142 void pub_connect_callback(struct mosquitto *mosq, void *userdata, int result)
143 {
144     mqtt_ctx_t             *ctx = (mqtt_ctx_t *)userdata;
145     int                     rv = 0;
146     char                    msg[128];
147     float                   temp = 0.0; /* temperature */
148     float                   rh = 0.0;   /* relative humidity */
149     int                     retain = 0;
150
151     if( result )
152     {
153         log_error("Publisher connect to broker server[%s:%d] failed, rv=%d\n", ctx->host, ctx->port, result);
154         return ;
155     }
156
157     log_info("Publisher connect to broker server[%s:%d] successfully\n", ctx->host, ctx->port);
158     log_debug("Publish topic '%s'\n", ctx->pubTopic);
159
160     if( ctx->hwconf.ds18b20 )
161     {
162         memset(msg, 0, sizeof(msg));
163
164         log_debug("DS18B20 temperature sensor enabled, start broadcast it\n");
165
166         if( ds18b20_get_temperature(&temp) < 0 )
167             snprintf(msg, sizeof(msg), "{ \"id\":%s, \"temp\":\"error\" }", ctx->devid);
168         else
169             snprintf(msg, sizeof(msg), "{ \"id\":%s, \"temp\":\"%.2f\" }", ctx->devid, temp);
170
171         rv = mosquitto_publish(mosq, NULL, ctx->pubTopic, strlen(msg), msg, ctx->pubQos, retain);
172         if( rv )
173         {
174             log_error("Publisher broadcast message '%s' failure: %d\n", msg, rv);
175         }
176         else
177         {
178             log_info("Publisher broadcast message '%s' ok\n", msg);
179         }
180     }
181
182     if( ctx->hwconf.sht2x )
183     {
184         memset(msg, 0, sizeof(msg));
185
186         log_debug("SHT2X temperature and humidity sensor enabled, start broadcast it\n");
187
188         if( sht2x_get_temp_humidity(&temp, &rh) < 0 )
189             snprintf(msg, sizeof(msg), "{ \"id\":%s, \"temp\":\"error\", \"RH\":\"error\" }", ctx->devid);
190         else
191             snprintf(msg, sizeof(msg), "{ \"id\":%s, \"temp\":\"%.2f\", \"RH\":\"%.2f\" }", ctx->devid, temp, rh);
192
193         rv = mosquitto_publish(mosq, NULL, ctx->pubTopic, strlen(msg), msg, ctx->pubQos, retain);
194         if( rv )
195         {
196             log_error("Publisher broadcast message '%s' failure: %d\n", msg, rv);
197         }
198         else
199         {
200             log_info("Publisher broadcast message '%s' ok\n", msg);
201         }
202     }
203
204     log_info("Publisher broadcast over and disconnect broker now\n", ctx->host, ctx->port);
205     mosquitto_disconnect(mosq);
206
207     return ;
208 }
209
210
211 void *mqtt_pub_worker(void *args)
212 {
213     mqtt_ctx_t             *ctx = (mqtt_ctx_t *)args;
214     struct mosquitto       *mosq;
215     bool                    session = true;
216
217
218     mosq = mosquitto_new(NULL, session, ctx);
219     if( !mosq )
220     {
221         log_error("mosquitto_new failure\n");
222         return NULL;
223     }
224
225     /* set connnect to broker username and password  */
226     if( strlen(ctx->uid)> 0  && strlen(ctx->pwd)> 0 )
227         mosquitto_username_pw_set(mosq, ctx->uid, ctx->pwd);
228
229     /* set callback functions */
230     mosquitto_connect_callback_set(mosq, pub_connect_callback);
231
232     while( !g_signal.stop )
233     {
234         /* connect to MQTT broker  */
235         if( mosquitto_connect(mosq, ctx->host, ctx->port, ctx->keepalive) )
236         {
237             log_error("Publisher connect to broker[%s:%d] failure: %s\n", ctx->host, ctx->port, strerror(errno));
238             msleep(1000);
239             continue;
240         }
241
242         /* -1: use default timeout 1000ms  1: unused */
243         mosquitto_loop_forever(mosq, -1, 1);
244
245         /* Publisher broadcast sensors data message interval time, unit seconds */
246         sleep( ctx->interval );
247     }
248
249     mosquitto_destroy(mosq);
250     return NULL;
251 }
252
253 void sub_connect_callback(struct mosquitto *mosq, void *userdata, int result)
254 {
255     mqtt_ctx_t             *ctx = (mqtt_ctx_t *)userdata;
256
257     if( result )
258     {
259         log_error("Subscriber connect to broker server failed, rv=%d\n", result);
260         return ;
261     }
262
263     log_info("Subscriber connect to broker server[%s:%d] successfully\n", ctx->host, ctx->port);
264     mosquitto_subscribe(mosq, NULL, ctx->subTopic, ctx->subQos);
265 }
266
267 void sub_disconnect_callback(struct mosquitto *mosq, void *userdata, int result)
268 {
269     mqtt_ctx_t             *ctx = (mqtt_ctx_t *)userdata;
270
271     log_warn("Subscriber disconnect to broker server[%s:%d], reason=%d\n", ctx->host, ctx->port, result);
272 }
273
274 static inline void mqtt_turn_led(int which, char *cmd)
275 {
276     if( strcasestr(cmd, "on") )
277         turn_led(which, ON);
278     else if( strcasestr(cmd, "off") )
279         turn_led(which, OFF);
280 }
281
282 void proc_json_items(cJSON *root, mqtt_ctx_t *ctx)
283 {
284     int                    i;
285     char                  *value;
286     cJSON                 *item;
287     cJSON                 *array;
288     hwconf_t              *hwconf = &ctx->hwconf;
289
290     if( !root )
291     {
292         log_error("Invalid input arguments $root\n");
293         return ;
294     }
295
296     for( i=0; i<cJSON_GetArraySize(root); i++ )
297     {
298         item = cJSON_GetArrayItem(root, i);
299         if( !item )
300             break;
301
302         /* if item is cJSON_Object, then recursive call proc_json */
303         if( cJSON_Object == item->type )
304         {
305             proc_json_items(item, ctx);
306         }
307         else if( cJSON_Array == item->type )
308         {
309             /* RGB colors led control: {"id":"RPi3B#01", "leds":[{"red":"on","green":"off","blue":"on"}]} */
310             if( hwconf->led && !strcasecmp(item->string, "leds") )
311             {
312                 array = cJSON_GetArrayItem(item, 0);
313                 if( NULL != array )
314                 {
315                     cJSON        *led_item;
316
317                     if( NULL != (led_item=cJSON_GetObjectItem(array , "red")) )
318                     {
319                         log_info("turn red led '%s'\n", led_item->valuestring);
320                         mqtt_turn_led(LED_R, led_item->valuestring);
321                     }
322
323                     if( NULL != (led_item=cJSON_GetObjectItem(array , "green")) )
324                     {
325                         log_info("turn green led '%s'\n", led_item->valuestring);
326                         mqtt_turn_led(LED_G, led_item->valuestring);
327                     }
328
329                     if( NULL != (led_item=cJSON_GetObjectItem(array , "blue")) )
330                     {
331                         log_info("turn blue led '%s'\n", led_item->valuestring);
332                         mqtt_turn_led(LED_B, led_item->valuestring);
333                     }
334                 }
335             }
336         }
337         else
338         {
339             value = cJSON_Print(item);
340
341             /* light controled by relay: {"id":"RPi3B#01", "light":"on"} */
342             if( hwconf->relay && !strcasecmp(item->string, "light") )
343             {
344                 if( strcasestr(value, "on") )
345                 {
346                     log_info("Turn light on\n");
347                     turn_relay(RELAY1, ON);
348                 }
349                 else if( strcasestr(value, "off") )
350                 {
351                     log_info("Turn light off\n");
352                     turn_relay(RELAY1, OFF);
353                 }
354             }
355
356             /* buzzer controled : {"id":"RPi3B#01", "buzzer":"on"} */
357             if( hwconf->beeper && !strcasecmp(item->string, "buzzer") )
358             {
359                 if( strcasestr(value, "on") )
360                 {
361                     turn_beep(3);
362                 }
363             }
364
365             free(value); /* must free it, or it will result memory leak */
366         }
367     }
368
369 }
370
371 void sub_message_callback(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *message)
372 {
373     mqtt_ctx_t             *ctx = (mqtt_ctx_t *)userdata;
374
375     cJSON                  *root = NULL;
376     cJSON                  *item;
377     char                   *value;
378
379     if ( !message->payloadlen )
380     {
381         log_error("%s (null)\n", message->topic);
382         return ;
383     }
384
385     log_debug("Subscriber receive message: '%s'\n", message->payload);
386
387     root = cJSON_Parse(message->payload);
388     if( !root )
389     {
390         log_error("cJSON_Parse parser failure: %s\n", cJSON_GetErrorPtr());
391         return ;
392     }
393
394     item = cJSON_GetObjectItem(root, "id");
395     if( !item )
396     {
397         log_error("cJSON_Parse get ID failure: %s\n", cJSON_GetErrorPtr());
398         goto cleanup;
399     }
400
401     value = cJSON_PrintUnformatted(item);
402     if( strcasecmp(value, ctx->devid) )
403     {
404         free(value);
405         goto cleanup;
406     }
407
408     free(value);
409     log_info("Subscriber receive message: '%s'\n", message->payload);
410
411     proc_json_items(root, ctx);
412
413 cleanup:
414     cJSON_Delete(root); /* must delete it, or it will result memory leak */
415     return ;
416 }
417
418
419 void *mqtt_sub_worker(void *args)
420 {
421     mqtt_ctx_t             *ctx = (mqtt_ctx_t *)args;
422     struct mosquitto       *mosq;
423     bool                    session = true;
424
425     mosq = mosquitto_new(NULL, session, ctx);
426     if( !mosq )
427     {
428         log_error("mosquitto_new failure\n");
429         return NULL;
430     }
431
432     /* set connnect to broker username and password  */
433     if( strlen(ctx->uid)> 0  && strlen(ctx->pwd)> 0 )
434         mosquitto_username_pw_set(mosq, ctx->uid, ctx->pwd);
435
436     /* set callback functions */
437     mosquitto_connect_callback_set(mosq, sub_connect_callback);
438     mosquitto_disconnect_callback_set(mosq, sub_disconnect_callback);
439     mosquitto_message_callback_set(mosq, sub_message_callback);
440
441     while( !g_signal.stop )
442     {
443         /* connect to MQTT broker  */
444         if( mosquitto_connect(mosq, ctx->host, ctx->port, ctx->keepalive) )
445         {
446             log_error("Subscriber connect to broker[%s:%d] failure: %s\n", ctx->host, ctx->port, strerror(errno));
447             msleep(1000);
448             continue;
449         }
450
451         /* -1: use default timeout 1000ms  1: unused */
452         mosquitto_loop_forever(mosq, -1, 1);
453     }
454
455     mosquitto_destroy(mosq);
456     return NULL;
457 }
458