RaspberrPi project source code
Guo Wenxue
6 days ago f7889e2ceddbc3e15ea4b5377d831f4432169f76
commit | author | age
d6b4a7 1 /*
G 2  * coreMQTT v2.1.1
3  * Copyright (C) 2022 Amazon.com, Inc. or its affiliates.  All Rights Reserved.
4  *
5  * SPDX-License-Identifier: MIT
6  *
7  * Permission is hereby granted, free of charge, to any person obtaining a copy of
8  * this software and associated documentation files (the "Software"), to deal in
9  * the Software without restriction, including without limitation the rights to
10  * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
11  * the Software, and to permit persons to whom the Software is furnished to do so,
12  * subject to the following conditions:
13  *
14  * The above copyright notice and this permission notice shall be included in all
15  * copies or substantial portions of the Software.
16  *
17  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
19  * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
20  * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
21  * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
22  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
23  */
24
25 /**
26  * @file core_mqtt.c
27  * @brief Implements the user-facing functions in core_mqtt.h.
28  */
29 #include <string.h>
30 #include <assert.h>
31
32 #include "core_mqtt.h"
33 #include "core_mqtt_state.h"
34
35 /* Include config defaults header to get default values of configs. */
36 #include "core_mqtt_config_defaults.h"
37
38 #include "core_mqtt_default_logging.h"
39
40 #ifndef MQTT_PRE_SEND_HOOK
41
42 /**
43  * @brief Hook called before a 'send' operation is executed.
44  */
45     #define MQTT_PRE_SEND_HOOK( pContext )
46 #endif /* !MQTT_PRE_SEND_HOOK */
47
48 #ifndef MQTT_POST_SEND_HOOK
49
50 /**
51  * @brief Hook called after the 'send' operation is complete.
52  */
53     #define MQTT_POST_SEND_HOOK( pContext )
54 #endif /* !MQTT_POST_SEND_HOOK */
55
56 #ifndef MQTT_PRE_STATE_UPDATE_HOOK
57
58 /**
59  * @brief Hook called just before an update to the MQTT state is made.
60  */
61     #define MQTT_PRE_STATE_UPDATE_HOOK( pContext )
62 #endif /* !MQTT_PRE_STATE_UPDATE_HOOK */
63
64 #ifndef MQTT_POST_STATE_UPDATE_HOOK
65
66 /**
67  * @brief Hook called just after an update to the MQTT state has
68  * been made.
69  */
70     #define MQTT_POST_STATE_UPDATE_HOOK( pContext )
71 #endif /* !MQTT_POST_STATE_UPDATE_HOOK */
72
73 /*-----------------------------------------------------------*/
74
75 /**
76  * @brief Sends provided buffer to network using transport send.
77  *
78  * @brief param[in] pContext Initialized MQTT context.
79  * @brief param[in] pBufferToSend Buffer to be sent to network.
80  * @brief param[in] bytesToSend Number of bytes to be sent.
81  *
82  * @note This operation may call the transport send function
83  * repeatedly to send bytes over the network until either:
84  * 1. The requested number of bytes @a bytesToSend have been sent.
85  *                    OR
86  * 2. MQTT_SEND_TIMEOUT_MS milliseconds have gone by since entering this
87  * function.
88  *                    OR
89  * 3. There is an error in sending data over the network.
90  *
91  * @return Total number of bytes sent, or negative value on network error.
92  */
93 static int32_t sendBuffer( MQTTContext_t * pContext,
94                            const uint8_t * pBufferToSend,
95                            size_t bytesToSend );
96
97 /**
98  * @brief Sends MQTT connect without copying the users data into any buffer.
99  *
100  * @brief param[in] pContext Initialized MQTT context.
101  * @brief param[in] pConnectInfo MQTT CONNECT packet information.
102  * @brief param[in] pWillInfo Last Will and Testament. Pass NULL if Last Will and
103  * Testament is not used.
104  * @brief param[in] remainingLength the length of the connect packet.
105  *
106  * @note This operation may call the transport send function
107  * repeatedly to send bytes over the network until either:
108  * 1. The requested number of bytes @a remainingLength have been sent.
109  *                    OR
110  * 2. MQTT_SEND_TIMEOUT_MS milliseconds have gone by since entering this
111  * function.
112  *                    OR
113  * 3. There is an error in sending data over the network.
114  *
115  * @return #MQTTSendFailed or #MQTTSuccess.
116  */
117 static MQTTStatus_t sendConnectWithoutCopy( MQTTContext_t * pContext,
118                                             const MQTTConnectInfo_t * pConnectInfo,
119                                             const MQTTPublishInfo_t * pWillInfo,
120                                             size_t remainingLength );
121
122 /**
123  * @brief Sends the vector array passed through the parameters over the network.
124  *
125  * @note The preference is given to 'writev' function if it is present in the
126  * transport interface. Otherwise, a send call is made repeatedly to achieve the
127  * result.
128  *
129  * @param[in] pContext Initialized MQTT context.
130  * @param[in] pIoVec The vector array to be sent.
131  * @param[in] ioVecCount The number of elements in the array.
132  *
133  * @note This operation may call the transport send or writev functions
134  * repeatedly to send bytes over the network until either:
135  * 1. The requested number of bytes have been sent.
136  *                    OR
137  * 2. MQTT_SEND_TIMEOUT_MS milliseconds have gone by since entering this
138  * function.
139  *                    OR
140  * 3. There is an error in sending data over the network.
141  *
142  * @return The total number of bytes sent or the error code as received from the
143  * transport interface.
144  */
145 static int32_t sendMessageVector( MQTTContext_t * pContext,
146                                   TransportOutVector_t * pIoVec,
147                                   size_t ioVecCount );
148
149 /**
150  * @brief Add a string and its length after serializing it in a manner outlined by
151  * the MQTT specification.
152  *
153  * @param[in] serailizedLength Array of two bytes to which the vector will point.
154  * The array must remain in scope until the message has been sent.
155  * @param[in] string The string to be serialized.
156  * @param[in] length The length of the string to be serialized.
157  * @param[in] iterator The iterator pointing to the first element in the
158  * transport interface IO array.
159  * @param[out] updatedLength This parameter will be added to with the number of
160  * bytes added to the vector.
161  *
162  * @return The number of vectors added.
163  */
164 static size_t addEncodedStringToVector( uint8_t serailizedLength[ 2 ],
165                                         const char * const string,
166                                         uint16_t length,
167                                         TransportOutVector_t * iterator,
168                                         size_t * updatedLength );
169
170 /**
171  * @brief Send MQTT SUBSCRIBE message without copying the user data into a buffer and
172  * directly sending it.
173  *
174  * @param[in] pContext Initialized MQTT context.
175  * @param[in] pSubscriptionList List of MQTT subscription info.
176  * @param[in] subscriptionCount The count of elements in the list.
177  * @param[in] packetId The packet ID of the subscribe packet
178  * @param[in] remainingLength The remaining length of the subscribe packet.
179  *
180  * @return #MQTTSuccess or #MQTTSendFailed.
181  */
182 static MQTTStatus_t sendSubscribeWithoutCopy( MQTTContext_t * pContext,
183                                               const MQTTSubscribeInfo_t * pSubscriptionList,
184                                               size_t subscriptionCount,
185                                               uint16_t packetId,
186                                               size_t remainingLength );
187
188 /**
189  * @brief Send MQTT UNSUBSCRIBE message without copying the user data into a buffer and
190  * directly sending it.
191  *
192  * @param[in] pContext Initialized MQTT context.
193  * @param[in] pSubscriptionList MQTT subscription info.
194  * @param[in] subscriptionCount The count of elements in the list.
195  * @param[in] packetId The packet ID of the unsubscribe packet.
196  * @param[in] remainingLength The remaining length of the unsubscribe packet.
197  *
198  * @return #MQTTSuccess or #MQTTSendFailed.
199  */
200 static MQTTStatus_t sendUnsubscribeWithoutCopy( MQTTContext_t * pContext,
201                                                 const MQTTSubscribeInfo_t * pSubscriptionList,
202                                                 size_t subscriptionCount,
203                                                 uint16_t packetId,
204                                                 size_t remainingLength );
205
206 /**
207  * @brief Calculate the interval between two millisecond timestamps, including
208  * when the later value has overflowed.
209  *
210  * @note In C, the operands are promoted to signed integers in subtraction.
211  * Using this function avoids the need to cast the result of subtractions back
212  * to uint32_t.
213  *
214  * @param[in] later The later time stamp, in milliseconds.
215  * @param[in] start The earlier time stamp, in milliseconds.
216  *
217  * @return later - start.
218  */
219 static uint32_t calculateElapsedTime( uint32_t later,
220                                       uint32_t start );
221
222 /**
223  * @brief Convert a byte indicating a publish ack type to an #MQTTPubAckType_t.
224  *
225  * @param[in] packetType First byte of fixed header.
226  *
227  * @return Type of ack.
228  */
229 static MQTTPubAckType_t getAckFromPacketType( uint8_t packetType );
230
231 /**
232  * @brief Receive bytes into the network buffer.
233  *
234  * @param[in] pContext Initialized MQTT Context.
235  * @param[in] bytesToRecv Number of bytes to receive.
236  *
237  * @note This operation calls the transport receive function
238  * repeatedly to read bytes from the network until either:
239  * 1. The requested number of bytes @a bytesToRecv are read.
240  *                    OR
241  * 2. No data is received from the network for MQTT_RECV_POLLING_TIMEOUT_MS duration.
242  *
243  *                    OR
244  * 3. There is an error in reading from the network.
245  *
246  *
247  * @return Number of bytes received, or negative number on network error.
248  */
249 static int32_t recvExact( const MQTTContext_t * pContext,
250                           size_t bytesToRecv );
251
252 /**
253  * @brief Discard a packet from the transport interface.
254  *
255  * @param[in] pContext MQTT Connection context.
256  * @param[in] remainingLength Remaining length of the packet to dump.
257  * @param[in] timeoutMs Time remaining to discard the packet.
258  *
259  * @return #MQTTRecvFailed or #MQTTNoDataAvailable.
260  */
261 static MQTTStatus_t discardPacket( const MQTTContext_t * pContext,
262                                    size_t remainingLength,
263                                    uint32_t timeoutMs );
264
265 /**
266  * @brief Discard a packet from the MQTT buffer and the transport interface.
267  *
268  * @param[in] pContext MQTT Connection context.
269  * @param[in] pPacketInfo Information struct of the packet to be discarded.
270  *
271  * @return #MQTTRecvFailed or #MQTTNoDataAvailable.
272  */
273 static MQTTStatus_t discardStoredPacket( MQTTContext_t * pContext,
274                                          const MQTTPacketInfo_t * pPacketInfo );
275
276 /**
277  * @brief Receive a packet from the transport interface.
278  *
279  * @param[in] pContext MQTT Connection context.
280  * @param[in] incomingPacket packet struct with remaining length.
281  * @param[in] remainingTimeMs Time remaining to receive the packet.
282  *
283  * @return #MQTTSuccess or #MQTTRecvFailed.
284  */
285 static MQTTStatus_t receivePacket( const MQTTContext_t * pContext,
286                                    MQTTPacketInfo_t incomingPacket,
287                                    uint32_t remainingTimeMs );
288
289 /**
290  * @brief Get the correct ack type to send.
291  *
292  * @param[in] state Current state of publish.
293  *
294  * @return Packet Type byte of PUBACK, PUBREC, PUBREL, or PUBCOMP if one of
295  * those should be sent, else 0.
296  */
297 static uint8_t getAckTypeToSend( MQTTPublishState_t state );
298
299 /**
300  * @brief Send acks for received QoS 1/2 publishes.
301  *
302  * @param[in] pContext MQTT Connection context.
303  * @param[in] packetId packet ID of original PUBLISH.
304  * @param[in] publishState Current publish state in record.
305  *
306  * @return #MQTTSuccess, #MQTTIllegalState or #MQTTSendFailed.
307  */
308 static MQTTStatus_t sendPublishAcks( MQTTContext_t * pContext,
309                                      uint16_t packetId,
310                                      MQTTPublishState_t publishState );
311
312 /**
313  * @brief Send a keep alive PINGREQ if the keep alive interval has elapsed.
314  *
315  * @param[in] pContext Initialized MQTT Context.
316  *
317  * @return #MQTTKeepAliveTimeout if a PINGRESP is not received in time,
318  * #MQTTSendFailed if the PINGREQ cannot be sent, or #MQTTSuccess.
319  */
320 static MQTTStatus_t handleKeepAlive( MQTTContext_t * pContext );
321
322 /**
323  * @brief Handle received MQTT PUBLISH packet.
324  *
325  * @param[in] pContext MQTT Connection context.
326  * @param[in] pIncomingPacket Incoming packet.
327  *
328  * @return MQTTSuccess, MQTTIllegalState or deserialization error.
329  */
330 static MQTTStatus_t handleIncomingPublish( MQTTContext_t * pContext,
331                                            MQTTPacketInfo_t * pIncomingPacket );
332
333 /**
334  * @brief Handle received MQTT publish acks.
335  *
336  * @param[in] pContext MQTT Connection context.
337  * @param[in] pIncomingPacket Incoming packet.
338  *
339  * @return MQTTSuccess, MQTTIllegalState, or deserialization error.
340  */
341 static MQTTStatus_t handlePublishAcks( MQTTContext_t * pContext,
342                                        MQTTPacketInfo_t * pIncomingPacket );
343
344 /**
345  * @brief Handle received MQTT ack.
346  *
347  * @param[in] pContext MQTT Connection context.
348  * @param[in] pIncomingPacket Incoming packet.
349  * @param[in] manageKeepAlive Flag indicating if PINGRESPs should not be given
350  * to the application
351  *
352  * @return MQTTSuccess, MQTTIllegalState, or deserialization error.
353  */
354 static MQTTStatus_t handleIncomingAck( MQTTContext_t * pContext,
355                                        MQTTPacketInfo_t * pIncomingPacket,
356                                        bool manageKeepAlive );
357
358 /**
359  * @brief Run a single iteration of the receive loop.
360  *
361  * @param[in] pContext MQTT Connection context.
362  * @param[in] manageKeepAlive Flag indicating if keep alive should be handled.
363  *
364  * @return #MQTTRecvFailed if a network error occurs during reception;
365  * #MQTTSendFailed if a network error occurs while sending an ACK or PINGREQ;
366  * #MQTTBadResponse if an invalid packet is received;
367  * #MQTTKeepAliveTimeout if the server has not sent a PINGRESP before
368  * #MQTT_PINGRESP_TIMEOUT_MS milliseconds;
369  * #MQTTIllegalState if an incoming QoS 1/2 publish or ack causes an
370  * invalid transition for the internal state machine;
371  * #MQTTSuccess on success.
372  */
373 static MQTTStatus_t receiveSingleIteration( MQTTContext_t * pContext,
374                                             bool manageKeepAlive );
375
376 /**
377  * @brief Validates parameters of #MQTT_Subscribe or #MQTT_Unsubscribe.
378  *
379  * @param[in] pContext Initialized MQTT context.
380  * @param[in] pSubscriptionList List of MQTT subscription info.
381  * @param[in] subscriptionCount The number of elements in pSubscriptionList.
382  * @param[in] packetId Packet identifier.
383  *
384  * @return #MQTTBadParameter if invalid parameters are passed;
385  * #MQTTSuccess otherwise.
386  */
387 static MQTTStatus_t validateSubscribeUnsubscribeParams( const MQTTContext_t * pContext,
388                                                         const MQTTSubscribeInfo_t * pSubscriptionList,
389                                                         size_t subscriptionCount,
390                                                         uint16_t packetId );
391
392 /**
393  * @brief Receives a CONNACK MQTT packet.
394  *
395  * @param[in] pContext Initialized MQTT context.
396  * @param[in] timeoutMs Timeout for waiting for CONNACK packet.
397  * @param[in] cleanSession Clean session flag set by application.
398  * @param[out] pIncomingPacket List of MQTT subscription info.
399  * @param[out] pSessionPresent Whether a previous session was present.
400  * Only relevant if not establishing a clean session.
401  *
402  * @return #MQTTBadResponse if a bad response is received;
403  * #MQTTNoDataAvailable if no data available for transport recv;
404  * ##MQTTRecvFailed if transport recv failed;
405  * #MQTTSuccess otherwise.
406  */
407 static MQTTStatus_t receiveConnack( const MQTTContext_t * pContext,
408                                     uint32_t timeoutMs,
409                                     bool cleanSession,
410                                     MQTTPacketInfo_t * pIncomingPacket,
411                                     bool * pSessionPresent );
412
413 /**
414  * @brief Resends pending acks for a re-established MQTT session, or
415  * clears existing state records for a clean session.
416  *
417  * @param[in] pContext Initialized MQTT context.
418  * @param[in] sessionPresent Session present flag received from the MQTT broker.
419  *
420  * @return #MQTTSendFailed if transport send during resend failed;
421  * #MQTTSuccess otherwise.
422  */
423 static MQTTStatus_t handleSessionResumption( MQTTContext_t * pContext,
424                                              bool sessionPresent );
425
426
427 /**
428  * @brief Send the publish packet without copying the topic string and payload in
429  * the buffer.
430  *
431  * @brief param[in] pContext Initialized MQTT context.
432  * @brief param[in] pPublishInfo MQTT PUBLISH packet parameters.
433  * @brief param[in] pMqttHeader the serialized MQTT header with the header byte;
434  * the encoded length of the packet; and the encoded length of the topic string.
435  * @brief param[in] headerSize Size of the serialized PUBLISH header.
436  * @brief param[in] packetId Packet Id of the publish packet.
437  *
438  * @return #MQTTSendFailed if transport send during resend failed;
439  * #MQTTSuccess otherwise.
440  */
441 static MQTTStatus_t sendPublishWithoutCopy( MQTTContext_t * pContext,
442                                             const MQTTPublishInfo_t * pPublishInfo,
443                                             const uint8_t * pMqttHeader,
444                                             size_t headerSize,
445                                             uint16_t packetId );
446
447 /**
448  * @brief Function to validate #MQTT_Publish parameters.
449  *
450  * @brief param[in] pContext Initialized MQTT context.
451  * @brief param[in] pPublishInfo MQTT PUBLISH packet parameters.
452  * @brief param[in] packetId Packet Id for the MQTT PUBLISH packet.
453  *
454  * @return #MQTTBadParameter if invalid parameters are passed;
455  * #MQTTSuccess otherwise.
456  */
457 static MQTTStatus_t validatePublishParams( const MQTTContext_t * pContext,
458                                            const MQTTPublishInfo_t * pPublishInfo,
459                                            uint16_t packetId );
460
461 /**
462  * @brief Performs matching for special cases when a topic filter ends
463  * with a wildcard character.
464  *
465  * When the topic name has been consumed but there are remaining characters to
466  * to match in topic filter, this function handles the following 2 cases:
467  * - When the topic filter ends with "/+" or "/#" characters, but the topic
468  * name only ends with '/'.
469  * - When the topic filter ends with "/#" characters, but the topic name
470  * ends at the parent level.
471  *
472  * @note This function ASSUMES that the topic name been consumed in linear
473  * matching with the topic filer, but the topic filter has remaining characters
474  * to be matched.
475  *
476  * @param[in] pTopicFilter The topic filter containing the wildcard.
477  * @param[in] topicFilterLength Length of the topic filter being examined.
478  * @param[in] filterIndex Index of the topic filter being examined.
479  *
480  * @return Returns whether the topic filter and the topic name match.
481  */
482 static bool matchEndWildcardsSpecialCases( const char * pTopicFilter,
483                                            uint16_t topicFilterLength,
484                                            uint16_t filterIndex );
485
486 /**
487  * @brief Attempt to match topic name with a topic filter starting with a wildcard.
488  *
489  * If the topic filter starts with a '+' (single-level) wildcard, the function
490  * advances the @a pNameIndex by a level in the topic name.
491  * If the topic filter starts with a '#' (multi-level) wildcard, the function
492  * concludes that both the topic name and topic filter match.
493  *
494  * @param[in] pTopicName The topic name to match.
495  * @param[in] topicNameLength Length of the topic name.
496  * @param[in] pTopicFilter The topic filter to match.
497  * @param[in] topicFilterLength Length of the topic filter.
498  * @param[in,out] pNameIndex Current index in the topic name being examined. It is
499  * advanced by one level for `+` wildcards.
500  * @param[in, out] pFilterIndex Current index in the topic filter being examined.
501  * It is advanced to position of '/' level separator for '+' wildcard.
502  * @param[out] pMatch Whether the topic filter and topic name match.
503  *
504  * @return `true` if the caller of this function should exit; `false` if the
505  * caller should continue parsing the topics.
506  */
507 static bool matchWildcards( const char * pTopicName,
508                             uint16_t topicNameLength,
509                             const char * pTopicFilter,
510                             uint16_t topicFilterLength,
511                             uint16_t * pNameIndex,
512                             uint16_t * pFilterIndex,
513                             bool * pMatch );
514
515 /**
516  * @brief Match a topic name and topic filter allowing the use of wildcards.
517  *
518  * @param[in] pTopicName The topic name to check.
519  * @param[in] topicNameLength Length of the topic name.
520  * @param[in] pTopicFilter The topic filter to check.
521  * @param[in] topicFilterLength Length of topic filter.
522  *
523  * @return `true` if the topic name and topic filter match; `false` otherwise.
524  */
525 static bool matchTopicFilter( const char * pTopicName,
526                               uint16_t topicNameLength,
527                               const char * pTopicFilter,
528                               uint16_t topicFilterLength );
529
530 /*-----------------------------------------------------------*/
531
532 static bool matchEndWildcardsSpecialCases( const char * pTopicFilter,
533                                            uint16_t topicFilterLength,
534                                            uint16_t filterIndex )
535 {
536     bool matchFound = false;
537
538     assert( pTopicFilter != NULL );
539     assert( topicFilterLength != 0U );
540
541     /* Check if the topic filter has 2 remaining characters and it ends in
542      * "/#". This check handles the case to match filter "sport/#" with topic
543      * "sport". The reason is that the '#' wildcard represents the parent and
544      * any number of child levels in the topic name.*/
545     if( ( topicFilterLength >= 3U ) &&
546         ( filterIndex == ( topicFilterLength - 3U ) ) &&
547         ( pTopicFilter[ filterIndex + 1U ] == '/' ) &&
548         ( pTopicFilter[ filterIndex + 2U ] == '#' ) )
549
550     {
551         matchFound = true;
552     }
553
554     /* Check if the next character is "#" or "+" and the topic filter ends in
555      * "/#" or "/+". This check handles the cases to match:
556      *
557      * - Topic filter "sport/+" with topic "sport/".
558      * - Topic filter "sport/#" with topic "sport/".
559      */
560     if( ( filterIndex == ( topicFilterLength - 2U ) ) &&
561         ( pTopicFilter[ filterIndex ] == '/' ) )
562     {
563         /* Check that the last character is a wildcard. */
564         matchFound = ( pTopicFilter[ filterIndex + 1U ] == '+' ) ||
565                      ( pTopicFilter[ filterIndex + 1U ] == '#' );
566     }
567
568     return matchFound;
569 }
570
571 /*-----------------------------------------------------------*/
572
573 static bool matchWildcards( const char * pTopicName,
574                             uint16_t topicNameLength,
575                             const char * pTopicFilter,
576                             uint16_t topicFilterLength,
577                             uint16_t * pNameIndex,
578                             uint16_t * pFilterIndex,
579                             bool * pMatch )
580 {
581     bool shouldStopMatching = false;
582     bool locationIsValidForWildcard;
583
584     assert( pTopicName != NULL );
585     assert( topicNameLength != 0U );
586     assert( pTopicFilter != NULL );
587     assert( topicFilterLength != 0U );
588     assert( pNameIndex != NULL );
589     assert( pFilterIndex != NULL );
590     assert( pMatch != NULL );
591
592     /* Wild card in a topic filter is only valid either at the starting position
593      * or when it is preceded by a '/'.*/
594     locationIsValidForWildcard = ( *pFilterIndex == 0u ) ||
595                                  ( pTopicFilter[ *pFilterIndex - 1U ] == '/' );
596
597     if( ( pTopicFilter[ *pFilterIndex ] == '+' ) && ( locationIsValidForWildcard == true ) )
598     {
599         bool nextLevelExistsInTopicName = false;
600         bool nextLevelExistsinTopicFilter = false;
601
602         /* Move topic name index to the end of the current level. The end of the
603          * current level is identified by the last character before the next level
604          * separator '/'. */
605         while( *pNameIndex < topicNameLength )
606         {
607             /* Exit the loop if we hit the level separator. */
608             if( pTopicName[ *pNameIndex ] == '/' )
609             {
610                 nextLevelExistsInTopicName = true;
611                 break;
612             }
613
614             ( *pNameIndex )++;
615         }
616
617         /* Determine if the topic filter contains a child level after the current level
618          * represented by the '+' wildcard. */
619         if( ( *pFilterIndex < ( topicFilterLength - 1U ) ) &&
620             ( pTopicFilter[ *pFilterIndex + 1U ] == '/' ) )
621         {
622             nextLevelExistsinTopicFilter = true;
623         }
624
625         /* If the topic name contains a child level but the topic filter ends at
626          * the current level, then there does not exist a match. */
627         if( ( nextLevelExistsInTopicName == true ) &&
628             ( nextLevelExistsinTopicFilter == false ) )
629         {
630             *pMatch = false;
631             shouldStopMatching = true;
632         }
633
634         /* If the topic name and topic filter have child levels, then advance the
635          * filter index to the level separator in the topic filter, so that match
636          * can be performed in the next level.
637          * Note: The name index already points to the level separator in the topic
638          * name. */
639         else if( nextLevelExistsInTopicName == true )
640         {
641             ( *pFilterIndex )++;
642         }
643         else
644         {
645             /* If we have reached here, the the loop terminated on the
646              * ( *pNameIndex < topicNameLength) condition, which means that have
647              * reached past the end of the topic name, and thus, we decrement the
648              * index to the last character in the topic name.*/
649             ( *pNameIndex )--;
650         }
651     }
652
653     /* '#' matches everything remaining in the topic name. It must be the
654      * last character in a topic filter. */
655     else if( ( pTopicFilter[ *pFilterIndex ] == '#' ) &&
656              ( *pFilterIndex == ( topicFilterLength - 1U ) ) &&
657              ( locationIsValidForWildcard == true ) )
658     {
659         /* Subsequent characters don't need to be checked for the
660          * multi-level wildcard. */
661         *pMatch = true;
662         shouldStopMatching = true;
663     }
664     else
665     {
666         /* Any character mismatch other than '+' or '#' means the topic
667          * name does not match the topic filter. */
668         *pMatch = false;
669         shouldStopMatching = true;
670     }
671
672     return shouldStopMatching;
673 }
674
675 /*-----------------------------------------------------------*/
676
677 static bool matchTopicFilter( const char * pTopicName,
678                               uint16_t topicNameLength,
679                               const char * pTopicFilter,
680                               uint16_t topicFilterLength )
681 {
682     bool matchFound = false, shouldStopMatching = false;
683     uint16_t nameIndex = 0, filterIndex = 0;
684
685     assert( pTopicName != NULL );
686     assert( topicNameLength != 0 );
687     assert( pTopicFilter != NULL );
688     assert( topicFilterLength != 0 );
689
690     while( ( nameIndex < topicNameLength ) && ( filterIndex < topicFilterLength ) )
691     {
692         /* Check if the character in the topic name matches the corresponding
693          * character in the topic filter string. */
694         if( pTopicName[ nameIndex ] == pTopicFilter[ filterIndex ] )
695         {
696             /* If the topic name has been consumed but the topic filter has not
697              * been consumed, match for special cases when the topic filter ends
698              * with wildcard character. */
699             if( nameIndex == ( topicNameLength - 1U ) )
700             {
701                 matchFound = matchEndWildcardsSpecialCases( pTopicFilter,
702                                                             topicFilterLength,
703                                                             filterIndex );
704             }
705         }
706         else
707         {
708             /* Check for matching wildcards. */
709             shouldStopMatching = matchWildcards( pTopicName,
710                                                  topicNameLength,
711                                                  pTopicFilter,
712                                                  topicFilterLength,
713                                                  &nameIndex,
714                                                  &filterIndex,
715                                                  &matchFound );
716         }
717
718         if( ( matchFound == true ) || ( shouldStopMatching == true ) )
719         {
720             break;
721         }
722
723         /* Increment indexes. */
724         nameIndex++;
725         filterIndex++;
726     }
727
728     if( matchFound == false )
729     {
730         /* If the end of both strings has been reached, they match. This represents the
731          * case when the topic filter contains the '+' wildcard at a non-starting position.
732          * For example, when matching either of "sport/+/player" OR "sport/hockey/+" topic
733          * filters with "sport/hockey/player" topic name. */
734         matchFound = ( nameIndex == topicNameLength ) &&
735                      ( filterIndex == topicFilterLength );
736     }
737
738     return matchFound;
739 }
740
741 /*-----------------------------------------------------------*/
742
743 static int32_t sendMessageVector( MQTTContext_t * pContext,
744                                   TransportOutVector_t * pIoVec,
745                                   size_t ioVecCount )
746 {
747     int32_t sendResult;
748     uint32_t timeoutMs;
749     TransportOutVector_t * pIoVectIterator;
750     size_t vectorsToBeSent = ioVecCount;
751     size_t bytesToSend = 0U;
752     int32_t bytesSentOrError = 0;
753
754     assert( pContext != NULL );
755     assert( pIoVec != NULL );
756     assert( pContext->getTime != NULL );
757     /* Send must always be defined */
758     assert( pContext->transportInterface.send != NULL );
759
760     /* Count the total number of bytes to be sent as outlined in the vector. */
761     for( pIoVectIterator = pIoVec; pIoVectIterator <= &( pIoVec[ ioVecCount - 1U ] ); pIoVectIterator++ )
762     {
763         bytesToSend += pIoVectIterator->iov_len;
764     }
765
766     /* Reset the iterator to point to the first entry in the array. */
767     pIoVectIterator = pIoVec;
768
769     /* Set the timeout. */
770     timeoutMs = pContext->getTime() + MQTT_SEND_TIMEOUT_MS;
771
772     while( ( bytesSentOrError < ( int32_t ) bytesToSend ) && ( bytesSentOrError >= 0 ) )
773     {
774         if( pContext->transportInterface.writev != NULL )
775         {
776             sendResult = pContext->transportInterface.writev( pContext->transportInterface.pNetworkContext,
777                                                               pIoVectIterator,
778                                                               vectorsToBeSent );
779         }
780         else
781         {
782             sendResult = pContext->transportInterface.send( pContext->transportInterface.pNetworkContext,
783                                                             pIoVectIterator->iov_base,
784                                                             pIoVectIterator->iov_len );
785         }
786
787         if( sendResult > 0 )
788         {
789             /* It is a bug in the application's transport send implementation if
790              * more bytes than expected are sent. */
791             assert( sendResult <= ( ( int32_t ) bytesToSend - bytesSentOrError ) );
792
793             bytesSentOrError += sendResult;
794
795             /* Set last transmission time. */
796             pContext->lastPacketTxTime = pContext->getTime();
797
798             LogDebug( ( "sendMessageVector: Bytes Sent=%ld, Bytes Remaining=%lu",
799                         ( long int ) sendResult,
800                         ( unsigned long ) ( bytesToSend - ( size_t ) bytesSentOrError ) ) );
801         }
802         else if( sendResult < 0 )
803         {
804             bytesSentOrError = sendResult;
805             LogError( ( "sendMessageVector: Unable to send packet: Network Error." ) );
806         }
807         else
808         {
809             /* MISRA Empty body */
810         }
811
812         /* Check for timeout. */
813         if( pContext->getTime() >= timeoutMs )
814         {
815             LogError( ( "sendMessageVector: Unable to send packet: Timed out." ) );
816             break;
817         }
818
819         /* Update the send pointer to the correct vector and offset. */
820         while( ( pIoVectIterator <= &( pIoVec[ ioVecCount - 1U ] ) ) &&
821                ( sendResult >= ( int32_t ) pIoVectIterator->iov_len ) )
822         {
823             sendResult -= ( int32_t ) pIoVectIterator->iov_len;
824             pIoVectIterator++;
825             /* Update the number of vector which are yet to be sent. */
826             vectorsToBeSent--;
827         }
828
829         /* Some of the bytes from this vector were sent as well, update the length
830          * and the pointer to data in this vector. */
831         if( ( sendResult > 0 ) &&
832             ( pIoVectIterator <= &( pIoVec[ ioVecCount - 1U ] ) ) )
833         {
834             pIoVectIterator->iov_base = ( const void * ) &( ( ( const uint8_t * ) pIoVectIterator->iov_base )[ sendResult ] );
835             pIoVectIterator->iov_len -= ( size_t ) sendResult;
836         }
837     }
838
839     return bytesSentOrError;
840 }
841
842 static int32_t sendBuffer( MQTTContext_t * pContext,
843                            const uint8_t * pBufferToSend,
844                            size_t bytesToSend )
845 {
846     int32_t sendResult;
847     uint32_t timeoutMs;
848     int32_t bytesSentOrError = 0;
849     const uint8_t * pIndex = pBufferToSend;
850
851     assert( pContext != NULL );
852     assert( pContext->getTime != NULL );
853     assert( pContext->transportInterface.send != NULL );
854     assert( pIndex != NULL );
855
856     /* Set the timeout. */
857     timeoutMs = pContext->getTime() + MQTT_SEND_TIMEOUT_MS;
858
859     while( ( bytesSentOrError < ( int32_t ) bytesToSend ) && ( bytesSentOrError >= 0 ) )
860     {
861         sendResult = pContext->transportInterface.send( pContext->transportInterface.pNetworkContext,
862                                                         pIndex,
863                                                         bytesToSend - ( size_t ) bytesSentOrError );
864
865         if( sendResult > 0 )
866         {
867             /* It is a bug in the application's transport send implementation if
868              * more bytes than expected are sent. */
869             assert( sendResult <= ( ( int32_t ) bytesToSend - bytesSentOrError ) );
870
871             bytesSentOrError += sendResult;
872             pIndex = &pIndex[ sendResult ];
873
874             /* Set last transmission time. */
875             pContext->lastPacketTxTime = pContext->getTime();
876
877             LogDebug( ( "sendBuffer: Bytes Sent=%ld, Bytes Remaining=%lu",
878                         ( long int ) sendResult,
879                         ( unsigned long ) ( bytesToSend - ( size_t ) bytesSentOrError ) ) );
880         }
881         else if( sendResult < 0 )
882         {
883             bytesSentOrError = sendResult;
884             LogError( ( "sendBuffer: Unable to send packet: Network Error." ) );
885         }
886         else
887         {
888             /* MISRA Empty body */
889         }
890
891         /* Check for timeout. */
892         if( pContext->getTime() >= timeoutMs )
893         {
894             LogError( ( "sendBuffer: Unable to send packet: Timed out." ) );
895             break;
896         }
897     }
898
899     return bytesSentOrError;
900 }
901
902 /*-----------------------------------------------------------*/
903
904 static uint32_t calculateElapsedTime( uint32_t later,
905                                       uint32_t start )
906 {
907     return later - start;
908 }
909
910 /*-----------------------------------------------------------*/
911
912 static MQTTPubAckType_t getAckFromPacketType( uint8_t packetType )
913 {
914     MQTTPubAckType_t ackType = MQTTPuback;
915
916     switch( packetType )
917     {
918         case MQTT_PACKET_TYPE_PUBACK:
919             ackType = MQTTPuback;
920             break;
921
922         case MQTT_PACKET_TYPE_PUBREC:
923             ackType = MQTTPubrec;
924             break;
925
926         case MQTT_PACKET_TYPE_PUBREL:
927             ackType = MQTTPubrel;
928             break;
929
930         case MQTT_PACKET_TYPE_PUBCOMP:
931         default:
932
933             /* This function is only called after checking the type is one of
934              * the above four values, so packet type must be PUBCOMP here. */
935             assert( packetType == MQTT_PACKET_TYPE_PUBCOMP );
936             ackType = MQTTPubcomp;
937             break;
938     }
939
940     return ackType;
941 }
942
943 /*-----------------------------------------------------------*/
944
945 static int32_t recvExact( const MQTTContext_t * pContext,
946                           size_t bytesToRecv )
947 {
948     uint8_t * pIndex = NULL;
949     size_t bytesRemaining = bytesToRecv;
950     int32_t totalBytesRecvd = 0, bytesRecvd;
951     uint32_t lastDataRecvTimeMs = 0U, timeSinceLastRecvMs = 0U;
952     TransportRecv_t recvFunc = NULL;
953     MQTTGetCurrentTimeFunc_t getTimeStampMs = NULL;
954     bool receiveError = false;
955
956     assert( pContext != NULL );
957     assert( bytesToRecv <= pContext->networkBuffer.size );
958     assert( pContext->getTime != NULL );
959     assert( pContext->transportInterface.recv != NULL );
960     assert( pContext->networkBuffer.pBuffer != NULL );
961
962     pIndex = pContext->networkBuffer.pBuffer;
963     recvFunc = pContext->transportInterface.recv;
964     getTimeStampMs = pContext->getTime;
965
966     /* Part of the MQTT packet has been read before calling this function. */
967     lastDataRecvTimeMs = getTimeStampMs();
968
969     while( ( bytesRemaining > 0U ) && ( receiveError == false ) )
970     {
971         bytesRecvd = recvFunc( pContext->transportInterface.pNetworkContext,
972                                pIndex,
973                                bytesRemaining );
974
975         if( bytesRecvd < 0 )
976         {
977             LogError( ( "Network error while receiving packet: ReturnCode=%ld.",
978                         ( long int ) bytesRecvd ) );
979             totalBytesRecvd = bytesRecvd;
980             receiveError = true;
981         }
982         else if( bytesRecvd > 0 )
983         {
984             /* Reset the starting time as we have received some data from the network. */
985             lastDataRecvTimeMs = getTimeStampMs();
986
987             /* It is a bug in the application's transport receive implementation
988              * if more bytes than expected are received. To avoid a possible
989              * overflow in converting bytesRemaining from unsigned to signed,
990              * this assert must exist after the check for bytesRecvd being
991              * negative. */
992             assert( ( size_t ) bytesRecvd <= bytesRemaining );
993
994             bytesRemaining -= ( size_t ) bytesRecvd;
995             totalBytesRecvd += ( int32_t ) bytesRecvd;
996             /* Increment the index. */
997             pIndex = &pIndex[ bytesRecvd ];
998             LogDebug( ( "BytesReceived=%ld, BytesRemaining=%lu, TotalBytesReceived=%ld.",
999                         ( long int ) bytesRecvd,
1000                         ( unsigned long ) bytesRemaining,
1001                         ( long int ) totalBytesRecvd ) );
1002         }
1003         else
1004         {
1005             /* No bytes were read from the network. */
1006             timeSinceLastRecvMs = calculateElapsedTime( getTimeStampMs(), lastDataRecvTimeMs );
1007
1008             /* Check for timeout if we have been waiting to receive any byte on the network. */
1009             if( timeSinceLastRecvMs >= MQTT_RECV_POLLING_TIMEOUT_MS )
1010             {
1011                 LogError( ( "Unable to receive packet: Timed out in transport recv." ) );
1012                 receiveError = true;
1013             }
1014         }
1015     }
1016
1017     return totalBytesRecvd;
1018 }
1019
1020 /*-----------------------------------------------------------*/
1021
1022 static MQTTStatus_t discardPacket( const MQTTContext_t * pContext,
1023                                    size_t remainingLength,
1024                                    uint32_t timeoutMs )
1025 {
1026     MQTTStatus_t status = MQTTRecvFailed;
1027     int32_t bytesReceived = 0;
1028     size_t bytesToReceive = 0U;
1029     uint32_t totalBytesReceived = 0U;
1030     uint32_t entryTimeMs = 0U;
1031     uint32_t elapsedTimeMs = 0U;
1032     MQTTGetCurrentTimeFunc_t getTimeStampMs = NULL;
1033     bool receiveError = false;
1034
1035     assert( pContext != NULL );
1036     assert( pContext->getTime != NULL );
1037
1038     bytesToReceive = pContext->networkBuffer.size;
1039     getTimeStampMs = pContext->getTime;
1040
1041     entryTimeMs = getTimeStampMs();
1042
1043     while( ( totalBytesReceived < remainingLength ) && ( receiveError == false ) )
1044     {
1045         if( ( remainingLength - totalBytesReceived ) < bytesToReceive )
1046         {
1047             bytesToReceive = remainingLength - totalBytesReceived;
1048         }
1049
1050         bytesReceived = recvExact( pContext, bytesToReceive );
1051
1052         if( bytesReceived != ( int32_t ) bytesToReceive )
1053         {
1054             LogError( ( "Receive error while discarding packet."
1055                         "ReceivedBytes=%ld, ExpectedBytes=%lu.",
1056                         ( long int ) bytesReceived,
1057                         ( unsigned long ) bytesToReceive ) );
1058             receiveError = true;
1059         }
1060         else
1061         {
1062             totalBytesReceived += ( uint32_t ) bytesReceived;
1063
1064             elapsedTimeMs = calculateElapsedTime( getTimeStampMs(), entryTimeMs );
1065
1066             /* Check for timeout. */
1067             if( elapsedTimeMs >= timeoutMs )
1068             {
1069                 LogError( ( "Time expired while discarding packet." ) );
1070                 receiveError = true;
1071             }
1072         }
1073     }
1074
1075     if( totalBytesReceived == remainingLength )
1076     {
1077         LogError( ( "Dumped packet. DumpedBytes=%lu.",
1078                     ( unsigned long ) totalBytesReceived ) );
1079         /* Packet dumped, so no data is available. */
1080         status = MQTTNoDataAvailable;
1081     }
1082
1083     return status;
1084 }
1085
1086 /*-----------------------------------------------------------*/
1087
1088 static MQTTStatus_t discardStoredPacket( MQTTContext_t * pContext,
1089                                          const MQTTPacketInfo_t * pPacketInfo )
1090 {
1091     MQTTStatus_t status = MQTTRecvFailed;
1092     int32_t bytesReceived = 0;
1093     size_t bytesToReceive = 0U;
1094     uint32_t totalBytesReceived = 0U;
1095     bool receiveError = false;
1096     size_t mqttPacketSize = 0;
1097     size_t remainingLength;
1098
1099     assert( pContext != NULL );
1100     assert( pPacketInfo != NULL );
1101
1102     mqttPacketSize = pPacketInfo->remainingLength + pPacketInfo->headerLength;
1103
1104     /* Assert that the packet being discarded is bigger than the
1105      * receive buffer. */
1106     assert( mqttPacketSize > pContext->networkBuffer.size );
1107
1108     /* Discard these many bytes at a time. */
1109     bytesToReceive = pContext->networkBuffer.size;
1110
1111     /* Number of bytes depicted by 'index' have already been received. */
1112     remainingLength = mqttPacketSize - pContext->index;
1113
1114     while( ( totalBytesReceived < remainingLength ) && ( receiveError == false ) )
1115     {
1116         if( ( remainingLength - totalBytesReceived ) < bytesToReceive )
1117         {
1118             bytesToReceive = remainingLength - totalBytesReceived;
1119         }
1120
1121         bytesReceived = recvExact( pContext, bytesToReceive );
1122
1123         if( bytesReceived != ( int32_t ) bytesToReceive )
1124         {
1125             LogError( ( "Receive error while discarding packet."
1126                         "ReceivedBytes=%ld, ExpectedBytes=%lu.",
1127                         ( long int ) bytesReceived,
1128                         ( unsigned long ) bytesToReceive ) );
1129             receiveError = true;
1130         }
1131         else
1132         {
1133             totalBytesReceived += ( uint32_t ) bytesReceived;
1134         }
1135     }
1136
1137     if( totalBytesReceived == remainingLength )
1138     {
1139         LogError( ( "Dumped packet. DumpedBytes=%lu.",
1140                     ( unsigned long ) totalBytesReceived ) );
1141         /* Packet dumped, so no data is available. */
1142         status = MQTTNoDataAvailable;
1143     }
1144
1145     /* Clear the buffer */
1146     ( void ) memset( pContext->networkBuffer.pBuffer,
1147                      0,
1148                      pContext->networkBuffer.size );
1149
1150     /* Reset the index. */
1151     pContext->index = 0;
1152
1153     return status;
1154 }
1155
1156 /*-----------------------------------------------------------*/
1157
1158 static MQTTStatus_t receivePacket( const MQTTContext_t * pContext,
1159                                    MQTTPacketInfo_t incomingPacket,
1160                                    uint32_t remainingTimeMs )
1161 {
1162     MQTTStatus_t status = MQTTSuccess;
1163     int32_t bytesReceived = 0;
1164     size_t bytesToReceive = 0U;
1165
1166     assert( pContext != NULL );
1167     assert( pContext->networkBuffer.pBuffer != NULL );
1168
1169     if( incomingPacket.remainingLength > pContext->networkBuffer.size )
1170     {
1171         LogError( ( "Incoming packet will be dumped: "
1172                     "Packet length exceeds network buffer size."
1173                     "PacketSize=%lu, NetworkBufferSize=%lu.",
1174                     ( unsigned long ) incomingPacket.remainingLength,
1175                     ( unsigned long ) pContext->networkBuffer.size ) );
1176         status = discardPacket( pContext,
1177                                 incomingPacket.remainingLength,
1178                                 remainingTimeMs );
1179     }
1180     else
1181     {
1182         bytesToReceive = incomingPacket.remainingLength;
1183         bytesReceived = recvExact( pContext, bytesToReceive );
1184
1185         if( bytesReceived == ( int32_t ) bytesToReceive )
1186         {
1187             /* Receive successful, bytesReceived == bytesToReceive. */
1188             LogDebug( ( "Packet received. ReceivedBytes=%ld.",
1189                         ( long int ) bytesReceived ) );
1190         }
1191         else
1192         {
1193             LogError( ( "Packet reception failed. ReceivedBytes=%ld, "
1194                         "ExpectedBytes=%lu.",
1195                         ( long int ) bytesReceived,
1196                         ( unsigned long ) bytesToReceive ) );
1197             status = MQTTRecvFailed;
1198         }
1199     }
1200
1201     return status;
1202 }
1203
1204 /*-----------------------------------------------------------*/
1205
1206 static uint8_t getAckTypeToSend( MQTTPublishState_t state )
1207 {
1208     uint8_t packetTypeByte = 0U;
1209
1210     switch( state )
1211     {
1212         case MQTTPubAckSend:
1213             packetTypeByte = MQTT_PACKET_TYPE_PUBACK;
1214             break;
1215
1216         case MQTTPubRecSend:
1217             packetTypeByte = MQTT_PACKET_TYPE_PUBREC;
1218             break;
1219
1220         case MQTTPubRelSend:
1221             packetTypeByte = MQTT_PACKET_TYPE_PUBREL;
1222             break;
1223
1224         case MQTTPubCompSend:
1225             packetTypeByte = MQTT_PACKET_TYPE_PUBCOMP;
1226             break;
1227
1228         case MQTTPubAckPending:
1229         case MQTTPubCompPending:
1230         case MQTTPubRecPending:
1231         case MQTTPubRelPending:
1232         case MQTTPublishDone:
1233         case MQTTPublishSend:
1234         case MQTTStateNull:
1235         default:
1236             /* Take no action for states that do not require sending an ack. */
1237             break;
1238     }
1239
1240     return packetTypeByte;
1241 }
1242
1243 /*-----------------------------------------------------------*/
1244
1245 static MQTTStatus_t sendPublishAcks( MQTTContext_t * pContext,
1246                                      uint16_t packetId,
1247                                      MQTTPublishState_t publishState )
1248 {
1249     MQTTStatus_t status = MQTTSuccess;
1250     MQTTPublishState_t newState = MQTTStateNull;
1251     int32_t sendResult = 0;
1252     uint8_t packetTypeByte = 0U;
1253     MQTTPubAckType_t packetType;
1254     MQTTFixedBuffer_t localBuffer;
1255     uint8_t pubAckPacket[ MQTT_PUBLISH_ACK_PACKET_SIZE ];
1256
1257     localBuffer.pBuffer = pubAckPacket;
1258     localBuffer.size = MQTT_PUBLISH_ACK_PACKET_SIZE;
1259
1260     assert( pContext != NULL );
1261
1262     packetTypeByte = getAckTypeToSend( publishState );
1263
1264     if( packetTypeByte != 0U )
1265     {
1266         packetType = getAckFromPacketType( packetTypeByte );
1267
1268         status = MQTT_SerializeAck( &localBuffer,
1269                                     packetTypeByte,
1270                                     packetId );
1271
1272         if( status == MQTTSuccess )
1273         {
1274             MQTT_PRE_SEND_HOOK( pContext );
1275
1276             /* Here, we are not using the vector approach for efficiency. There is just one buffer
1277              * to be sent which can be achieved with a normal send call. */
1278             sendResult = sendBuffer( pContext,
1279                                      localBuffer.pBuffer,
1280                                      MQTT_PUBLISH_ACK_PACKET_SIZE );
1281
1282             MQTT_POST_SEND_HOOK( pContext );
1283         }
1284
1285         if( sendResult == ( int32_t ) MQTT_PUBLISH_ACK_PACKET_SIZE )
1286         {
1287             pContext->controlPacketSent = true;
1288
1289             MQTT_PRE_STATE_UPDATE_HOOK( pContext );
1290
1291             status = MQTT_UpdateStateAck( pContext,
1292                                           packetId,
1293                                           packetType,
1294                                           MQTT_SEND,
1295                                           &newState );
1296
1297             MQTT_POST_STATE_UPDATE_HOOK( pContext );
1298
1299             if( status != MQTTSuccess )
1300             {
1301                 LogError( ( "Failed to update state of publish %hu.",
1302                             ( unsigned short ) packetId ) );
1303             }
1304         }
1305         else
1306         {
1307             LogError( ( "Failed to send ACK packet: PacketType=%02x, SentBytes=%ld, "
1308                         "PacketSize=%lu.",
1309                         ( unsigned int ) packetTypeByte, ( long int ) sendResult,
1310                         MQTT_PUBLISH_ACK_PACKET_SIZE ) );
1311             status = MQTTSendFailed;
1312         }
1313     }
1314
1315     return status;
1316 }
1317
1318 /*-----------------------------------------------------------*/
1319
1320 static MQTTStatus_t handleKeepAlive( MQTTContext_t * pContext )
1321 {
1322     MQTTStatus_t status = MQTTSuccess;
1323     uint32_t now = 0U;
1324     uint32_t packetTxTimeoutMs = 0U;
1325
1326     assert( pContext != NULL );
1327     assert( pContext->getTime != NULL );
1328
1329     now = pContext->getTime();
1330
1331     packetTxTimeoutMs = 1000U * ( uint32_t ) pContext->keepAliveIntervalSec;
1332
1333     if( PACKET_TX_TIMEOUT_MS < packetTxTimeoutMs )
1334     {
1335         packetTxTimeoutMs = PACKET_TX_TIMEOUT_MS;
1336     }
1337
1338     /* If keep alive interval is 0, it is disabled. */
1339     if( pContext->waitingForPingResp == true )
1340     {
1341         /* Has time expired? */
1342         if( calculateElapsedTime( now, pContext->pingReqSendTimeMs ) >
1343             MQTT_PINGRESP_TIMEOUT_MS )
1344         {
1345             status = MQTTKeepAliveTimeout;
1346         }
1347     }
1348     else
1349     {
1350         if( ( packetTxTimeoutMs != 0U ) && ( calculateElapsedTime( now, pContext->lastPacketTxTime ) >= packetTxTimeoutMs ) )
1351         {
1352             status = MQTT_Ping( pContext );
1353         }
1354         else
1355         {
1356             const uint32_t timeElapsed = calculateElapsedTime( now, pContext->lastPacketRxTime );
1357
1358             if( ( timeElapsed != 0U ) && ( timeElapsed >= PACKET_RX_TIMEOUT_MS ) )
1359             {
1360                 status = MQTT_Ping( pContext );
1361             }
1362         }
1363     }
1364
1365     return status;
1366 }
1367
1368 /*-----------------------------------------------------------*/
1369
1370 static MQTTStatus_t handleIncomingPublish( MQTTContext_t * pContext,
1371                                            MQTTPacketInfo_t * pIncomingPacket )
1372 {
1373     MQTTStatus_t status = MQTTBadParameter;
1374     MQTTPublishState_t publishRecordState = MQTTStateNull;
1375     uint16_t packetIdentifier = 0U;
1376     MQTTPublishInfo_t publishInfo;
1377     MQTTDeserializedInfo_t deserializedInfo;
1378     bool duplicatePublish = false;
1379
1380     assert( pContext != NULL );
1381     assert( pIncomingPacket != NULL );
1382     assert( pContext->appCallback != NULL );
1383
1384     status = MQTT_DeserializePublish( pIncomingPacket, &packetIdentifier, &publishInfo );
1385     LogInfo( ( "De-serialized incoming PUBLISH packet: DeserializerResult=%s.",
1386                MQTT_Status_strerror( status ) ) );
1387
1388     if( ( status == MQTTSuccess ) &&
1389         ( pContext->incomingPublishRecords == NULL ) &&
1390         ( publishInfo.qos > MQTTQoS0 ) )
1391     {
1392         LogError( ( "Incoming publish has QoS > MQTTQoS0 but incoming "
1393                     "publish records have not been initialized. Dropping the "
1394                     "incoming publish. Please call MQTT_InitStatefulQoS to enable "
1395                     "use of QoS1 and QoS2 publishes." ) );
1396         status = MQTTRecvFailed;
1397     }
1398
1399     if( status == MQTTSuccess )
1400     {
1401         MQTT_PRE_STATE_UPDATE_HOOK( pContext );
1402
1403         status = MQTT_UpdateStatePublish( pContext,
1404                                           packetIdentifier,
1405                                           MQTT_RECEIVE,
1406                                           publishInfo.qos,
1407                                           &publishRecordState );
1408
1409         MQTT_POST_STATE_UPDATE_HOOK( pContext );
1410
1411         if( status == MQTTSuccess )
1412         {
1413             LogInfo( ( "State record updated. New state=%s.",
1414                        MQTT_State_strerror( publishRecordState ) ) );
1415         }
1416
1417         /* Different cases in which an incoming publish with duplicate flag is
1418          * handled are as listed below.
1419          * 1. No collision - This is the first instance of the incoming publish
1420          *    packet received or an earlier received packet state is lost. This
1421          *    will be handled as a new incoming publish for both QoS1 and QoS2
1422          *    publishes.
1423          * 2. Collision - The incoming packet was received before and a state
1424          *    record is present in the state engine. For QoS1 and QoS2 publishes
1425          *    this case can happen at 2 different cases and handling is
1426          *    different.
1427          *    a. QoS1 - If a PUBACK is not successfully sent for the incoming
1428          *       publish due to a connection issue, it can result in broker
1429          *       sending out a duplicate publish with dup flag set, when a
1430          *       session is reestablished. It can result in a collision in
1431          *       state engine. This will be handled by processing the incoming
1432          *       publish as a new publish ignoring the
1433          *       #MQTTStateCollision status from the state engine. The publish
1434          *       data is not passed to the application.
1435          *    b. QoS2 - If a PUBREC is not successfully sent for the incoming
1436          *       publish or the PUBREC sent is not successfully received by the
1437          *       broker due to a connection issue, it can result in broker
1438          *       sending out a duplicate publish with dup flag set, when a
1439          *       session is reestablished. It can result in a collision in
1440          *       state engine. This will be handled by ignoring the
1441          *       #MQTTStateCollision status from the state engine. The publish
1442          *       data is not passed to the application. */
1443         else if( status == MQTTStateCollision )
1444         {
1445             status = MQTTSuccess;
1446             duplicatePublish = true;
1447
1448             /* Calculate the state for the ack packet that needs to be sent out
1449              * for the duplicate incoming publish. */
1450             publishRecordState = MQTT_CalculateStatePublish( MQTT_RECEIVE,
1451                                                              publishInfo.qos );
1452
1453             LogDebug( ( "Incoming publish packet with packet id %hu already exists.",
1454                         ( unsigned short ) packetIdentifier ) );
1455
1456             if( publishInfo.dup == false )
1457             {
1458                 LogError( ( "DUP flag is 0 for duplicate packet (MQTT-3.3.1.-1)." ) );
1459             }
1460         }
1461         else
1462         {
1463             LogError( ( "Error in updating publish state for incoming publish with packet id %hu."
1464                         " Error is %s",
1465                         ( unsigned short ) packetIdentifier,
1466                         MQTT_Status_strerror( status ) ) );
1467         }
1468     }
1469
1470     if( status == MQTTSuccess )
1471     {
1472         /* Set fields of deserialized struct. */
1473         deserializedInfo.packetIdentifier = packetIdentifier;
1474         deserializedInfo.pPublishInfo = &publishInfo;
1475         deserializedInfo.deserializationResult = status;
1476
1477         /* Invoke application callback to hand the buffer over to application
1478          * before sending acks.
1479          * Application callback will be invoked for all publishes, except for
1480          * duplicate incoming publishes. */
1481         if( duplicatePublish == false )
1482         {
1483             pContext->appCallback( pContext,
1484                                    pIncomingPacket,
1485                                    &deserializedInfo );
1486         }
1487
1488         /* Send PUBACK or PUBREC if necessary. */
1489         status = sendPublishAcks( pContext,
1490                                   packetIdentifier,
1491                                   publishRecordState );
1492     }
1493
1494     return status;
1495 }
1496
1497 /*-----------------------------------------------------------*/
1498
1499 static MQTTStatus_t handlePublishAcks( MQTTContext_t * pContext,
1500                                        MQTTPacketInfo_t * pIncomingPacket )
1501 {
1502     MQTTStatus_t status = MQTTBadResponse;
1503     MQTTPublishState_t publishRecordState = MQTTStateNull;
1504     uint16_t packetIdentifier;
1505     MQTTPubAckType_t ackType;
1506     MQTTEventCallback_t appCallback;
1507     MQTTDeserializedInfo_t deserializedInfo;
1508
1509     assert( pContext != NULL );
1510     assert( pIncomingPacket != NULL );
1511     assert( pContext->appCallback != NULL );
1512
1513     appCallback = pContext->appCallback;
1514
1515     ackType = getAckFromPacketType( pIncomingPacket->type );
1516     status = MQTT_DeserializeAck( pIncomingPacket, &packetIdentifier, NULL );
1517     LogInfo( ( "Ack packet deserialized with result: %s.",
1518                MQTT_Status_strerror( status ) ) );
1519
1520     if( status == MQTTSuccess )
1521     {
1522         MQTT_PRE_STATE_UPDATE_HOOK( pContext );
1523
1524         status = MQTT_UpdateStateAck( pContext,
1525                                       packetIdentifier,
1526                                       ackType,
1527                                       MQTT_RECEIVE,
1528                                       &publishRecordState );
1529
1530         MQTT_POST_STATE_UPDATE_HOOK( pContext );
1531
1532         if( status == MQTTSuccess )
1533         {
1534             LogInfo( ( "State record updated. New state=%s.",
1535                        MQTT_State_strerror( publishRecordState ) ) );
1536         }
1537         else
1538         {
1539             LogError( ( "Updating the state engine for packet id %hu"
1540                         " failed with error %s.",
1541                         ( unsigned short ) packetIdentifier,
1542                         MQTT_Status_strerror( status ) ) );
1543         }
1544     }
1545
1546     if( status == MQTTSuccess )
1547     {
1548         /* Set fields of deserialized struct. */
1549         deserializedInfo.packetIdentifier = packetIdentifier;
1550         deserializedInfo.deserializationResult = status;
1551         deserializedInfo.pPublishInfo = NULL;
1552
1553         /* Invoke application callback to hand the buffer over to application
1554          * before sending acks. */
1555         appCallback( pContext, pIncomingPacket, &deserializedInfo );
1556
1557         /* Send PUBREL or PUBCOMP if necessary. */
1558         status = sendPublishAcks( pContext,
1559                                   packetIdentifier,
1560                                   publishRecordState );
1561     }
1562
1563     return status;
1564 }
1565
1566 /*-----------------------------------------------------------*/
1567
1568 static MQTTStatus_t handleIncomingAck( MQTTContext_t * pContext,
1569                                        MQTTPacketInfo_t * pIncomingPacket,
1570                                        bool manageKeepAlive )
1571 {
1572     MQTTStatus_t status = MQTTBadResponse;
1573     uint16_t packetIdentifier = MQTT_PACKET_ID_INVALID;
1574     MQTTDeserializedInfo_t deserializedInfo;
1575
1576     /* We should always invoke the app callback unless we receive a PINGRESP
1577      * and are managing keep alive, or if we receive an unknown packet. We
1578      * initialize this to false since the callback must be invoked before
1579      * sending any PUBREL or PUBCOMP. However, for other cases, we invoke it
1580      * at the end to reduce the complexity of this function. */
1581     bool invokeAppCallback = false;
1582     MQTTEventCallback_t appCallback = NULL;
1583
1584     assert( pContext != NULL );
1585     assert( pIncomingPacket != NULL );
1586     assert( pContext->appCallback != NULL );
1587
1588     appCallback = pContext->appCallback;
1589
1590     LogDebug( ( "Received packet of type %02x.",
1591                 ( unsigned int ) pIncomingPacket->type ) );
1592
1593     switch( pIncomingPacket->type )
1594     {
1595         case MQTT_PACKET_TYPE_PUBACK:
1596         case MQTT_PACKET_TYPE_PUBREC:
1597         case MQTT_PACKET_TYPE_PUBREL:
1598         case MQTT_PACKET_TYPE_PUBCOMP:
1599
1600             /* Handle all the publish acks. The app callback is invoked here. */
1601             status = handlePublishAcks( pContext, pIncomingPacket );
1602
1603             break;
1604
1605         case MQTT_PACKET_TYPE_PINGRESP:
1606             status = MQTT_DeserializeAck( pIncomingPacket, &packetIdentifier, NULL );
1607             invokeAppCallback = ( status == MQTTSuccess ) && !manageKeepAlive;
1608
1609             if( ( status == MQTTSuccess ) && ( manageKeepAlive == true ) )
1610             {
1611                 pContext->waitingForPingResp = false;
1612             }
1613
1614             break;
1615
1616         case MQTT_PACKET_TYPE_SUBACK:
1617         case MQTT_PACKET_TYPE_UNSUBACK:
1618             /* Deserialize and give these to the app provided callback. */
1619             status = MQTT_DeserializeAck( pIncomingPacket, &packetIdentifier, NULL );
1620             invokeAppCallback = ( status == MQTTSuccess ) || ( status == MQTTServerRefused );
1621             break;
1622
1623         default:
1624             /* Bad response from the server. */
1625             LogError( ( "Unexpected packet type from server: PacketType=%02x.",
1626                         ( unsigned int ) pIncomingPacket->type ) );
1627             status = MQTTBadResponse;
1628             break;
1629     }
1630
1631     if( invokeAppCallback == true )
1632     {
1633         /* Set fields of deserialized struct. */
1634         deserializedInfo.packetIdentifier = packetIdentifier;
1635         deserializedInfo.deserializationResult = status;
1636         deserializedInfo.pPublishInfo = NULL;
1637         appCallback( pContext, pIncomingPacket, &deserializedInfo );
1638         /* In case a SUBACK indicated refusal, reset the status to continue the loop. */
1639         status = MQTTSuccess;
1640     }
1641
1642     return status;
1643 }
1644 /*-----------------------------------------------------------*/
1645
1646 static MQTTStatus_t receiveSingleIteration( MQTTContext_t * pContext,
1647                                             bool manageKeepAlive )
1648 {
1649     MQTTStatus_t status = MQTTSuccess;
1650     MQTTPacketInfo_t incomingPacket = { 0 };
1651     int32_t recvBytes;
1652     size_t totalMQTTPacketLength = 0;
1653
1654     assert( pContext != NULL );
1655     assert( pContext->networkBuffer.pBuffer != NULL );
1656
1657     /* Read as many bytes as possible into the network buffer. */
1658     recvBytes = pContext->transportInterface.recv( pContext->transportInterface.pNetworkContext,
1659                                                    &( pContext->networkBuffer.pBuffer[ pContext->index ] ),
1660                                                    pContext->networkBuffer.size - pContext->index );
1661
1662     if( recvBytes < 0 )
1663     {
1664         /* The receive function has failed. Bubble up the error up to the user. */
1665         status = MQTTRecvFailed;
1666     }
1667     else if( ( recvBytes == 0 ) && ( pContext->index == 0U ) )
1668     {
1669         /* No more bytes available since the last read and neither is anything in
1670          * the buffer. */
1671         status = MQTTNoDataAvailable;
1672     }
1673
1674     /* Either something was received, or there is still data to be processed in the
1675      * buffer, or both. */
1676     else
1677     {
1678         /* Update the number of bytes in the MQTT fixed buffer. */
1679         pContext->index += ( size_t ) recvBytes;
1680
1681         status = MQTT_ProcessIncomingPacketTypeAndLength( pContext->networkBuffer.pBuffer,
1682                                                           &pContext->index,
1683                                                           &incomingPacket );
1684
1685         totalMQTTPacketLength = incomingPacket.remainingLength + incomingPacket.headerLength;
1686     }
1687
1688     /* No data was received, check for keep alive timeout. */
1689     if( recvBytes == 0 )
1690     {
1691         if( manageKeepAlive == true )
1692         {
1693             /* Keep the copy of the status to be reset later. */
1694             MQTTStatus_t statusCopy = status;
1695
1696             /* Assign status so an error can be bubbled up to application,
1697              * but reset it on success. */
1698             status = handleKeepAlive( pContext );
1699
1700             if( status == MQTTSuccess )
1701             {
1702                 /* Reset the status. */
1703                 status = statusCopy;
1704             }
1705             else
1706             {
1707                 LogError( ( "Handling of keep alive failed. Status=%s",
1708                             MQTT_Status_strerror( status ) ) );
1709             }
1710         }
1711     }
1712
1713     /* Check whether there is data available before processing the packet further. */
1714     if( ( status == MQTTNeedMoreBytes ) || ( status == MQTTNoDataAvailable ) )
1715     {
1716         /* Do nothing as there is nothing to be processed right now. The proper
1717          * error code will be bubbled up to the user. */
1718     }
1719     /* Any other error code. */
1720     else if( status != MQTTSuccess )
1721     {
1722         LogError( ( "Call to receiveSingleIteration failed. Status=%s",
1723                     MQTT_Status_strerror( status ) ) );
1724     }
1725     /* If the MQTT Packet size is bigger than the buffer itself. */
1726     else if( totalMQTTPacketLength > pContext->networkBuffer.size )
1727     {
1728         /* Discard the packet from the receive buffer and drain the pending
1729          * data from the socket buffer. */
1730         status = discardStoredPacket( pContext,
1731                                       &incomingPacket );
1732     }
1733     /* If the total packet is of more length than the bytes we have available. */
1734     else if( totalMQTTPacketLength > pContext->index )
1735     {
1736         status = MQTTNeedMoreBytes;
1737     }
1738     else
1739     {
1740         /* MISRA else. */
1741     }
1742
1743     /* Handle received packet. If incomplete data was read then this will not execute. */
1744     if( status == MQTTSuccess )
1745     {
1746         incomingPacket.pRemainingData = &pContext->networkBuffer.pBuffer[ incomingPacket.headerLength ];
1747
1748         /* PUBLISH packets allow flags in the lower four bits. For other
1749          * packet types, they are reserved. */
1750         if( ( incomingPacket.type & 0xF0U ) == MQTT_PACKET_TYPE_PUBLISH )
1751         {
1752             status = handleIncomingPublish( pContext, &incomingPacket );
1753         }
1754         else
1755         {
1756             status = handleIncomingAck( pContext, &incomingPacket, manageKeepAlive );
1757         }
1758
1759         /* Update the index to reflect the remaining bytes in the buffer.  */
1760         pContext->index -= totalMQTTPacketLength;
1761
1762         /* Move the remaining bytes to the front of the buffer. */
1763         ( void ) memmove( pContext->networkBuffer.pBuffer,
1764                           &( pContext->networkBuffer.pBuffer[ totalMQTTPacketLength ] ),
1765                           pContext->index );
1766     }
1767
1768     if( status == MQTTNoDataAvailable )
1769     {
1770         /* No data available is not an error. Reset to MQTTSuccess so the
1771          * return code will indicate success. */
1772         status = MQTTSuccess;
1773     }
1774
1775     return status;
1776 }
1777
1778 /*-----------------------------------------------------------*/
1779
1780 static MQTTStatus_t validateSubscribeUnsubscribeParams( const MQTTContext_t * pContext,
1781                                                         const MQTTSubscribeInfo_t * pSubscriptionList,
1782                                                         size_t subscriptionCount,
1783                                                         uint16_t packetId )
1784 {
1785     MQTTStatus_t status = MQTTSuccess;
1786     size_t iterator;
1787
1788     /* Validate all the parameters. */
1789     if( ( pContext == NULL ) || ( pSubscriptionList == NULL ) )
1790     {
1791         LogError( ( "Argument cannot be NULL: pContext=%p, "
1792                     "pSubscriptionList=%p.",
1793                     ( void * ) pContext,
1794                     ( void * ) pSubscriptionList ) );
1795         status = MQTTBadParameter;
1796     }
1797     else if( subscriptionCount == 0UL )
1798     {
1799         LogError( ( "Subscription count is 0." ) );
1800         status = MQTTBadParameter;
1801     }
1802     else if( packetId == 0U )
1803     {
1804         LogError( ( "Packet Id for subscription packet is 0." ) );
1805         status = MQTTBadParameter;
1806     }
1807     else
1808     {
1809         if( pContext->incomingPublishRecords == NULL )
1810         {
1811             for( iterator = 0; iterator < subscriptionCount; iterator++ )
1812             {
1813                 if( pSubscriptionList->qos > MQTTQoS0 )
1814                 {
1815                     LogError( ( "The incoming publish record list is not "
1816                                 "initialised for QoS1/QoS2 records. Please call "
1817                                 " MQTT_InitStatefulQoS to enable use of QoS1 and "
1818                                 " QoS2 packets." ) );
1819                     status = MQTTBadParameter;
1820                     break;
1821                 }
1822             }
1823         }
1824     }
1825
1826     return status;
1827 }
1828
1829 /*-----------------------------------------------------------*/
1830
1831 static size_t addEncodedStringToVector( uint8_t serailizedLength[ 2 ],
1832                                         const char * const string,
1833                                         uint16_t length,
1834                                         TransportOutVector_t * iterator,
1835                                         size_t * updatedLength )
1836 {
1837     size_t packetLength = 0U;
1838     const size_t seralizedLengthFieldSize = 2U;
1839     TransportOutVector_t * pLocalIterator = iterator;
1840     /* This function always adds 2 vectors. */
1841     size_t vectorsAdded = 0U;
1842
1843     /* When length is non-zero, the string must be non-NULL. */
1844     assert( ( length != 0U ) == ( string != NULL ) );
1845
1846     serailizedLength[ 0 ] = ( ( uint8_t ) ( ( length ) >> 8 ) );
1847     serailizedLength[ 1 ] = ( ( uint8_t ) ( ( length ) & 0x00ffU ) );
1848
1849     /* Add the serialized length of the string first. */
1850     pLocalIterator[ 0 ].iov_base = serailizedLength;
1851     pLocalIterator[ 0 ].iov_len = seralizedLengthFieldSize;
1852     vectorsAdded++;
1853     packetLength = seralizedLengthFieldSize;
1854
1855     /* Sometimes the string can be NULL that is, of 0 length. In that case,
1856      * only the length field should be encoded in the vector. */
1857     if( ( string != NULL ) && ( length != 0U ) )
1858     {
1859         /* Then add the pointer to the string itself. */
1860         pLocalIterator[ 1 ].iov_base = string;
1861         pLocalIterator[ 1 ].iov_len = length;
1862         vectorsAdded++;
1863         packetLength += length;
1864     }
1865
1866     ( *updatedLength ) = ( *updatedLength ) + packetLength;
1867
1868     return vectorsAdded;
1869 }
1870
1871 /*-----------------------------------------------------------*/
1872
1873 static MQTTStatus_t sendSubscribeWithoutCopy( MQTTContext_t * pContext,
1874                                               const MQTTSubscribeInfo_t * pSubscriptionList,
1875                                               size_t subscriptionCount,
1876                                               uint16_t packetId,
1877                                               size_t remainingLength )
1878 {
1879     MQTTStatus_t status = MQTTSuccess;
1880     uint8_t subscribeheader[ 7 ];
1881     uint8_t * pIndex;
1882     TransportOutVector_t pIoVector[ MQTT_SUB_UNSUB_MAX_VECTORS ];
1883     TransportOutVector_t * pIterator;
1884     uint8_t serializedTopicFieldLength[ MQTT_SUB_UNSUB_MAX_VECTORS ][ 2 ];
1885     size_t totalPacketLength = 0U;
1886     size_t ioVectorLength = 0U;
1887     size_t subscriptionsSent = 0U;
1888     /* For subscribe, only three vector slots are required per topic string. */
1889     const size_t subscriptionStringVectorSlots = 3U;
1890     size_t vectorsAdded;
1891     size_t topicFieldLengthIndex;
1892
1893     /* The vector array should be at least three element long as the topic
1894      * string needs these many vector elements to be stored. */
1895     assert( MQTT_SUB_UNSUB_MAX_VECTORS >= subscriptionStringVectorSlots );
1896
1897     pIndex = subscribeheader;
1898     pIterator = pIoVector;
1899
1900     pIndex = MQTT_SerializeSubscribeHeader( remainingLength,
1901                                             pIndex,
1902                                             packetId );
1903
1904     /* The header is to be sent first. */
1905     pIterator->iov_base = subscribeheader;
1906     /* More details at: https://github.com/FreeRTOS/coreMQTT/blob/main/MISRA.md#rule-182 */
1907     /* More details at: https://github.com/FreeRTOS/coreMQTT/blob/main/MISRA.md#rule-108 */
1908     /* coverity[misra_c_2012_rule_18_2_violation] */
1909     /* coverity[misra_c_2012_rule_10_8_violation] */
1910     pIterator->iov_len = ( size_t ) ( pIndex - subscribeheader );
1911     totalPacketLength += pIterator->iov_len;
1912     pIterator++;
1913     ioVectorLength++;
1914
1915     while( ( status == MQTTSuccess ) && ( subscriptionsSent < subscriptionCount ) )
1916     {
1917         /* Reset the index for next iteration. */
1918         topicFieldLengthIndex = 0;
1919
1920         /* Check whether the subscription topic (with QoS) will fit in the
1921          * given vector. */
1922         while( ( ioVectorLength <= ( MQTT_SUB_UNSUB_MAX_VECTORS - subscriptionStringVectorSlots ) ) &&
1923                ( subscriptionsSent < subscriptionCount ) )
1924         {
1925             /* The topic filter gets sent next. */
1926             vectorsAdded = addEncodedStringToVector( serializedTopicFieldLength[ topicFieldLengthIndex ],
1927                                                      pSubscriptionList[ subscriptionsSent ].pTopicFilter,
1928                                                      pSubscriptionList[ subscriptionsSent ].topicFilterLength,
1929                                                      pIterator,
1930                                                      &totalPacketLength );
1931
1932             /* Update the pointer after the above operation. */
1933             pIterator = &pIterator[ vectorsAdded ];
1934
1935             /* Lastly, the QoS gets sent. */
1936             pIterator->iov_base = &( pSubscriptionList[ subscriptionsSent ].qos );
1937             pIterator->iov_len = 1U;
1938             totalPacketLength += pIterator->iov_len;
1939
1940             /* Increment the pointer. */
1941             pIterator++;
1942
1943             /* Two slots get used by the topic string length and topic string.
1944              * One slot gets used by the quality of service. */
1945             ioVectorLength += vectorsAdded + 1U;
1946
1947             subscriptionsSent++;
1948
1949             /* The index needs to be updated for next iteration. */
1950             topicFieldLengthIndex++;
1951         }
1952
1953         if( sendMessageVector( pContext,
1954                                pIoVector,
1955                                ioVectorLength ) != ( int32_t ) totalPacketLength )
1956         {
1957             status = MQTTSendFailed;
1958         }
1959
1960         /* Update the iterator for the next potential loop iteration. */
1961         pIterator = pIoVector;
1962         /* Reset the vector length for the next potential loop iteration. */
1963         ioVectorLength = 0U;
1964         /* Reset the packet length for the next potential loop iteration. */
1965         totalPacketLength = 0U;
1966     }
1967
1968     return status;
1969 }
1970
1971 /*-----------------------------------------------------------*/
1972
1973 static MQTTStatus_t sendUnsubscribeWithoutCopy( MQTTContext_t * pContext,
1974                                                 const MQTTSubscribeInfo_t * pSubscriptionList,
1975                                                 size_t subscriptionCount,
1976                                                 uint16_t packetId,
1977                                                 size_t remainingLength )
1978 {
1979     MQTTStatus_t status = MQTTSuccess;
1980     uint8_t unsubscribeheader[ 7 ];
1981     uint8_t * pIndex;
1982     TransportOutVector_t pIoVector[ MQTT_SUB_UNSUB_MAX_VECTORS ];
1983     TransportOutVector_t * pIterator;
1984     uint8_t serializedTopicFieldLength[ MQTT_SUB_UNSUB_MAX_VECTORS ][ 2 ];
1985     size_t totalPacketLength = 0U;
1986     size_t unsubscriptionsSent = 0U;
1987     size_t ioVectorLength = 0U;
1988     /* For unsubscribe, only two vector slots are required per topic string. */
1989     const size_t unsubscribeStringVectorSlots = 2U;
1990     size_t vectorsAdded;
1991     size_t topicFieldLengthIndex;
1992
1993     /* The vector array should be at least three element long as the topic
1994      * string needs these many vector elements to be stored. */
1995     assert( MQTT_SUB_UNSUB_MAX_VECTORS >= unsubscribeStringVectorSlots );
1996
1997     pIndex = unsubscribeheader;
1998     pIterator = pIoVector;
1999
2000     pIndex = MQTT_SerializeUnsubscribeHeader( remainingLength,
2001                                               pIndex,
2002                                               packetId );
2003
2004     /* The header is to be sent first. */
2005     pIterator->iov_base = unsubscribeheader;
2006     /* More details at: https://github.com/FreeRTOS/coreMQTT/blob/main/MISRA.md#rule-182 */
2007     /* More details at: https://github.com/FreeRTOS/coreMQTT/blob/main/MISRA.md#rule-108 */
2008     /* coverity[misra_c_2012_rule_18_2_violation] */
2009     /* coverity[misra_c_2012_rule_10_8_violation] */
2010     pIterator->iov_len = ( size_t ) ( pIndex - unsubscribeheader );
2011     totalPacketLength += pIterator->iov_len;
2012     pIterator++;
2013     ioVectorLength++;
2014
2015     while( ( status == MQTTSuccess ) && ( unsubscriptionsSent < subscriptionCount ) )
2016     {
2017         /* Reset the index for next iteration. */
2018         topicFieldLengthIndex = 0;
2019
2020         /* Check whether the subscription topic will fit in the given vector. */
2021         while( ( ioVectorLength <= ( MQTT_SUB_UNSUB_MAX_VECTORS - unsubscribeStringVectorSlots ) ) &&
2022                ( unsubscriptionsSent < subscriptionCount ) )
2023         {
2024             /* The topic filter gets sent next. */
2025             vectorsAdded = addEncodedStringToVector( serializedTopicFieldLength[ topicFieldLengthIndex ],
2026                                                      pSubscriptionList[ unsubscriptionsSent ].pTopicFilter,
2027                                                      pSubscriptionList[ unsubscriptionsSent ].topicFilterLength,
2028                                                      pIterator,
2029                                                      &totalPacketLength );
2030
2031             /* Update the iterator to point to the next empty location. */
2032             pIterator = &pIterator[ vectorsAdded ];
2033             /* Update the total count based on how many vectors were added. */
2034             ioVectorLength += vectorsAdded;
2035
2036             unsubscriptionsSent++;
2037
2038             /* Update the index for next iteration. */
2039             topicFieldLengthIndex++;
2040         }
2041
2042         if( sendMessageVector( pContext, pIoVector, ioVectorLength ) != ( int32_t ) totalPacketLength )
2043         {
2044             status = MQTTSendFailed;
2045         }
2046
2047         /* Update the iterator for the next potential loop iteration. */
2048         pIterator = pIoVector;
2049         /* Reset the vector length for the next potential loop iteration. */
2050         ioVectorLength = 0U;
2051         /* Reset the packet length for the next potential loop iteration. */
2052         totalPacketLength = 0U;
2053     }
2054
2055     return status;
2056 }
2057
2058 /*-----------------------------------------------------------*/
2059
2060 static MQTTStatus_t sendPublishWithoutCopy( MQTTContext_t * pContext,
2061                                             const MQTTPublishInfo_t * pPublishInfo,
2062                                             const uint8_t * pMqttHeader,
2063                                             size_t headerSize,
2064                                             uint16_t packetId )
2065 {
2066     MQTTStatus_t status = MQTTSuccess;
2067     uint8_t serializedPacketID[ 2 ];
2068     TransportOutVector_t pIoVector[ 4 ];
2069     size_t ioVectorLength;
2070     size_t totalMessageLength;
2071     const size_t packetIDLength = 2U;
2072
2073     /* The header is sent first. */
2074     pIoVector[ 0U ].iov_base = pMqttHeader;
2075     pIoVector[ 0U ].iov_len = headerSize;
2076     totalMessageLength = headerSize;
2077
2078     /* Then the topic name has to be sent. */
2079     pIoVector[ 1U ].iov_base = pPublishInfo->pTopicName;
2080     pIoVector[ 1U ].iov_len = pPublishInfo->topicNameLength;
2081     totalMessageLength += pPublishInfo->topicNameLength;
2082
2083     /* The next field's index should be 2 as the first two fields
2084      * have been filled in. */
2085     ioVectorLength = 2U;
2086
2087     if( pPublishInfo->qos > MQTTQoS0 )
2088     {
2089         /* Encode the packet ID. */
2090         serializedPacketID[ 0 ] = ( ( uint8_t ) ( ( packetId ) >> 8 ) );
2091         serializedPacketID[ 1 ] = ( ( uint8_t ) ( ( packetId ) & 0x00ffU ) );
2092
2093         pIoVector[ ioVectorLength ].iov_base = serializedPacketID;
2094         pIoVector[ ioVectorLength ].iov_len = packetIDLength;
2095
2096         ioVectorLength++;
2097         totalMessageLength += packetIDLength;
2098     }
2099
2100     /* Publish packets are allowed to contain no payload. */
2101     if( pPublishInfo->payloadLength > 0U )
2102     {
2103         pIoVector[ ioVectorLength ].iov_base = pPublishInfo->pPayload;
2104         pIoVector[ ioVectorLength ].iov_len = pPublishInfo->payloadLength;
2105
2106         ioVectorLength++;
2107         totalMessageLength += pPublishInfo->payloadLength;
2108     }
2109
2110     if( sendMessageVector( pContext, pIoVector, ioVectorLength ) != ( int32_t ) totalMessageLength )
2111     {
2112         status = MQTTSendFailed;
2113     }
2114
2115     return status;
2116 }
2117
2118 /*-----------------------------------------------------------*/
2119
2120 static MQTTStatus_t sendConnectWithoutCopy( MQTTContext_t * pContext,
2121                                             const MQTTConnectInfo_t * pConnectInfo,
2122                                             const MQTTPublishInfo_t * pWillInfo,
2123                                             size_t remainingLength )
2124 {
2125     MQTTStatus_t status = MQTTSuccess;
2126     TransportOutVector_t * iterator;
2127     size_t ioVectorLength = 0U;
2128     size_t totalMessageLength = 0U;
2129     int32_t bytesSentOrError;
2130
2131     /* Connect packet header can be of maximum 15 bytes. */
2132     uint8_t connectPacketHeader[ 15 ];
2133     uint8_t * pIndex = connectPacketHeader;
2134     TransportOutVector_t pIoVector[ 11 ];
2135     uint8_t serializedClientIDLength[ 2 ];
2136     uint8_t serializedTopicLength[ 2 ];
2137     uint8_t serializedPayloadLength[ 2 ];
2138     uint8_t serializedUsernameLength[ 2 ];
2139     uint8_t serializedPasswordLength[ 2 ];
2140     size_t vectorsAdded;
2141
2142     iterator = pIoVector;
2143
2144     /* Validate arguments. */
2145     if( ( pWillInfo != NULL ) && ( pWillInfo->pTopicName == NULL ) )
2146     {
2147         LogError( ( "pWillInfo->pTopicName cannot be NULL if Will is present." ) );
2148         status = MQTTBadParameter;
2149     }
2150     else
2151     {
2152         pIndex = MQTT_SerializeConnectFixedHeader( pIndex,
2153                                                    pConnectInfo,
2154                                                    pWillInfo,
2155                                                    remainingLength );
2156
2157         assert( ( pIndex - connectPacketHeader ) <= 15 );
2158
2159         /* The header gets sent first. */
2160         iterator->iov_base = connectPacketHeader;
2161         /* More details at: https://github.com/FreeRTOS/coreMQTT/blob/main/MISRA.md#rule-182 */
2162         /* More details at: https://github.com/FreeRTOS/coreMQTT/blob/main/MISRA.md#rule-108 */
2163         /* coverity[misra_c_2012_rule_18_2_violation] */
2164         /* coverity[misra_c_2012_rule_10_8_violation] */
2165         iterator->iov_len = ( size_t ) ( pIndex - connectPacketHeader );
2166         totalMessageLength += iterator->iov_len;
2167         iterator++;
2168         ioVectorLength++;
2169
2170         /* Serialize the client ID. */
2171         vectorsAdded = addEncodedStringToVector( serializedClientIDLength,
2172                                                  pConnectInfo->pClientIdentifier,
2173                                                  pConnectInfo->clientIdentifierLength,
2174                                                  iterator,
2175                                                  &totalMessageLength );
2176
2177         /* Update the iterator to point to the next empty slot. */
2178         iterator = &iterator[ vectorsAdded ];
2179         ioVectorLength += vectorsAdded;
2180
2181         if( pWillInfo != NULL )
2182         {
2183             /* Serialize the topic. */
2184             vectorsAdded = addEncodedStringToVector( serializedTopicLength,
2185                                                      pWillInfo->pTopicName,
2186                                                      pWillInfo->topicNameLength,
2187                                                      iterator,
2188                                                      &totalMessageLength );
2189
2190             /* Update the iterator to point to the next empty slot. */
2191             iterator = &iterator[ vectorsAdded ];
2192             ioVectorLength += vectorsAdded;
2193
2194
2195             /* Serialize the payload. Payload of last will and testament can be NULL. */
2196             vectorsAdded = addEncodedStringToVector( serializedPayloadLength,
2197                                                      pWillInfo->pPayload,
2198                                                      ( uint16_t ) pWillInfo->payloadLength,
2199                                                      iterator,
2200                                                      &totalMessageLength );
2201
2202             /* Update the iterator to point to the next empty slot. */
2203             iterator = &iterator[ vectorsAdded ];
2204             ioVectorLength += vectorsAdded;
2205         }
2206
2207         /* Encode the user name if provided. */
2208         if( pConnectInfo->pUserName != NULL )
2209         {
2210             /* Serialize the user name string. */
2211             vectorsAdded = addEncodedStringToVector( serializedUsernameLength,
2212                                                      pConnectInfo->pUserName,
2213                                                      pConnectInfo->userNameLength,
2214                                                      iterator,
2215                                                      &totalMessageLength );
2216
2217             /* Update the iterator to point to the next empty slot. */
2218             iterator = &iterator[ vectorsAdded ];
2219             ioVectorLength += vectorsAdded;
2220         }
2221
2222         /* Encode the password if provided. */
2223         if( pConnectInfo->pPassword != NULL )
2224         {
2225             /* Serialize the user name string. */
2226             vectorsAdded = addEncodedStringToVector( serializedPasswordLength,
2227                                                      pConnectInfo->pPassword,
2228                                                      pConnectInfo->passwordLength,
2229                                                      iterator,
2230                                                      &totalMessageLength );
2231             /* Update the iterator to point to the next empty slot. */
2232             iterator = &iterator[ vectorsAdded ];
2233             ioVectorLength += vectorsAdded;
2234         }
2235
2236         bytesSentOrError = sendMessageVector( pContext, pIoVector, ioVectorLength );
2237
2238         if( bytesSentOrError != ( int32_t ) totalMessageLength )
2239         {
2240             status = MQTTSendFailed;
2241         }
2242     }
2243
2244     return status;
2245 }
2246
2247 /*-----------------------------------------------------------*/
2248
2249 static MQTTStatus_t receiveConnack( const MQTTContext_t * pContext,
2250                                     uint32_t timeoutMs,
2251                                     bool cleanSession,
2252                                     MQTTPacketInfo_t * pIncomingPacket,
2253                                     bool * pSessionPresent )
2254 {
2255     MQTTStatus_t status = MQTTSuccess;
2256     MQTTGetCurrentTimeFunc_t getTimeStamp = NULL;
2257     uint32_t entryTimeMs = 0U, remainingTimeMs = 0U, timeTakenMs = 0U;
2258     bool breakFromLoop = false;
2259     uint16_t loopCount = 0U;
2260
2261     assert( pContext != NULL );
2262     assert( pIncomingPacket != NULL );
2263     assert( pContext->getTime != NULL );
2264
2265     getTimeStamp = pContext->getTime;
2266
2267     /* Get the entry time for the function. */
2268     entryTimeMs = getTimeStamp();
2269
2270     do
2271     {
2272         /* Transport read for incoming CONNACK packet type and length.
2273          * MQTT_GetIncomingPacketTypeAndLength is a blocking call and it is
2274          * returned after a transport receive timeout, an error, or a successful
2275          * receive of packet type and length. */
2276         status = MQTT_GetIncomingPacketTypeAndLength( pContext->transportInterface.recv,
2277                                                       pContext->transportInterface.pNetworkContext,
2278                                                       pIncomingPacket );
2279
2280         /* The loop times out based on 2 conditions.
2281          * 1. If timeoutMs is greater than 0:
2282          *    Loop times out based on the timeout calculated by getTime()
2283          *    function.
2284          * 2. If timeoutMs is 0:
2285          *    Loop times out based on the maximum number of retries config
2286          *    MQTT_MAX_CONNACK_RECEIVE_RETRY_COUNT. This config will control
2287          *    maximum the number of retry attempts to read the CONNACK packet.
2288          *    A value of 0 for the config will try once to read CONNACK. */
2289         if( timeoutMs > 0U )
2290         {
2291             breakFromLoop = calculateElapsedTime( getTimeStamp(), entryTimeMs ) >= timeoutMs;
2292         }
2293         else
2294         {
2295             breakFromLoop = loopCount >= MQTT_MAX_CONNACK_RECEIVE_RETRY_COUNT;
2296             loopCount++;
2297         }
2298
2299         /* Loop until there is data to read or if we have exceeded the timeout/retries. */
2300     } while( ( status == MQTTNoDataAvailable ) && ( breakFromLoop == false ) );
2301
2302     if( status == MQTTSuccess )
2303     {
2304         /* Time taken in this function so far. */
2305         timeTakenMs = calculateElapsedTime( getTimeStamp(), entryTimeMs );
2306
2307         if( timeTakenMs < timeoutMs )
2308         {
2309             /* Calculate remaining time for receiving the remainder of
2310              * the packet. */
2311             remainingTimeMs = timeoutMs - timeTakenMs;
2312         }
2313
2314         /* Reading the remainder of the packet by transport recv.
2315          * Attempt to read once even if the timeout has expired.
2316          * Invoking receivePacket with remainingTime as 0 would attempt to
2317          * recv from network once. If using retries, the remainder of the
2318          * CONNACK packet is tried to be read only once. Reading once would be
2319          * good as the packet type and remaining length was already read. Hence,
2320          * the probability of the remaining 2 bytes available to read is very high. */
2321         if( pIncomingPacket->type == MQTT_PACKET_TYPE_CONNACK )
2322         {
2323             status = receivePacket( pContext,
2324                                     *pIncomingPacket,
2325                                     remainingTimeMs );
2326         }
2327         else
2328         {
2329             LogError( ( "Incorrect packet type %X received while expecting"
2330                         " CONNACK(%X).",
2331                         ( unsigned int ) pIncomingPacket->type,
2332                         MQTT_PACKET_TYPE_CONNACK ) );
2333             status = MQTTBadResponse;
2334         }
2335     }
2336
2337     if( status == MQTTSuccess )
2338     {
2339         /* Update the packet info pointer to the buffer read. */
2340         pIncomingPacket->pRemainingData = pContext->networkBuffer.pBuffer;
2341
2342         /* Deserialize CONNACK. */
2343         status = MQTT_DeserializeAck( pIncomingPacket, NULL, pSessionPresent );
2344     }
2345
2346     /* If a clean session is requested, a session present should not be set by
2347      * broker. */
2348     if( status == MQTTSuccess )
2349     {
2350         if( ( cleanSession == true ) && ( *pSessionPresent == true ) )
2351         {
2352             LogError( ( "Unexpected session present flag in CONNACK response from broker."
2353                         " CONNECT request with clean session was made with broker." ) );
2354             status = MQTTBadResponse;
2355         }
2356     }
2357
2358     if( status == MQTTSuccess )
2359     {
2360         LogDebug( ( "Received MQTT CONNACK successfully from broker." ) );
2361     }
2362     else
2363     {
2364         LogError( ( "CONNACK recv failed with status = %s.",
2365                     MQTT_Status_strerror( status ) ) );
2366     }
2367
2368     return status;
2369 }
2370
2371 /*-----------------------------------------------------------*/
2372
2373 static MQTTStatus_t handleSessionResumption( MQTTContext_t * pContext,
2374                                              bool sessionPresent )
2375 {
2376     MQTTStatus_t status = MQTTSuccess;
2377     MQTTStateCursor_t cursor = MQTT_STATE_CURSOR_INITIALIZER;
2378     uint16_t packetId = MQTT_PACKET_ID_INVALID;
2379     MQTTPublishState_t state = MQTTStateNull;
2380
2381     assert( pContext != NULL );
2382
2383     /* Reset the index and clear the buffer when a new session is established. */
2384     pContext->index = 0;
2385     ( void ) memset( pContext->networkBuffer.pBuffer, 0, pContext->networkBuffer.size );
2386
2387     if( sessionPresent == true )
2388     {
2389         /* Get the next packet ID for which a PUBREL need to be resent. */
2390         packetId = MQTT_PubrelToResend( pContext, &cursor, &state );
2391
2392         /* Resend all the PUBREL acks after session is reestablished. */
2393         while( ( packetId != MQTT_PACKET_ID_INVALID ) &&
2394                ( status == MQTTSuccess ) )
2395         {
2396             status = sendPublishAcks( pContext, packetId, state );
2397
2398             packetId = MQTT_PubrelToResend( pContext, &cursor, &state );
2399         }
2400     }
2401     else
2402     {
2403         /* Clear any existing records if a new session is established. */
2404         if( pContext->outgoingPublishRecordMaxCount > 0U )
2405         {
2406             ( void ) memset( pContext->outgoingPublishRecords,
2407                              0x00,
2408                              pContext->outgoingPublishRecordMaxCount * sizeof( *pContext->outgoingPublishRecords ) );
2409         }
2410
2411         if( pContext->incomingPublishRecordMaxCount > 0U )
2412         {
2413             ( void ) memset( pContext->incomingPublishRecords,
2414                              0x00,
2415                              pContext->incomingPublishRecordMaxCount * sizeof( *pContext->incomingPublishRecords ) );
2416         }
2417     }
2418
2419     return status;
2420 }
2421
2422 static MQTTStatus_t validatePublishParams( const MQTTContext_t * pContext,
2423                                            const MQTTPublishInfo_t * pPublishInfo,
2424                                            uint16_t packetId )
2425 {
2426     MQTTStatus_t status = MQTTSuccess;
2427
2428     /* Validate arguments. */
2429     if( ( pContext == NULL ) || ( pPublishInfo == NULL ) )
2430     {
2431         LogError( ( "Argument cannot be NULL: pContext=%p, "
2432                     "pPublishInfo=%p.",
2433                     ( void * ) pContext,
2434                     ( void * ) pPublishInfo ) );
2435         status = MQTTBadParameter;
2436     }
2437     else if( ( pPublishInfo->qos != MQTTQoS0 ) && ( packetId == 0U ) )
2438     {
2439         LogError( ( "Packet Id is 0 for PUBLISH with QoS=%u.",
2440                     ( unsigned int ) pPublishInfo->qos ) );
2441         status = MQTTBadParameter;
2442     }
2443     else if( ( pPublishInfo->payloadLength > 0U ) && ( pPublishInfo->pPayload == NULL ) )
2444     {
2445         LogError( ( "A nonzero payload length requires a non-NULL payload: "
2446                     "payloadLength=%lu, pPayload=%p.",
2447                     ( unsigned long ) pPublishInfo->payloadLength,
2448                     pPublishInfo->pPayload ) );
2449         status = MQTTBadParameter;
2450     }
2451     else if( ( pContext->outgoingPublishRecords == NULL ) && ( pPublishInfo->qos > MQTTQoS0 ) )
2452     {
2453         LogError( ( "Trying to publish a QoS > MQTTQoS0 packet when outgoing publishes "
2454                     "for QoS1/QoS2 have not been enabled. Please, call MQTT_InitStatefulQoS "
2455                     "to initialize and enable the use of QoS1/QoS2 publishes." ) );
2456         status = MQTTBadParameter;
2457     }
2458     else
2459     {
2460         /* MISRA else */
2461     }
2462
2463     return status;
2464 }
2465
2466 /*-----------------------------------------------------------*/
2467
2468 MQTTStatus_t MQTT_Init( MQTTContext_t * pContext,
2469                         const TransportInterface_t * pTransportInterface,
2470                         MQTTGetCurrentTimeFunc_t getTimeFunction,
2471                         MQTTEventCallback_t userCallback,
2472                         const MQTTFixedBuffer_t * pNetworkBuffer )
2473 {
2474     MQTTStatus_t status = MQTTSuccess;
2475
2476     /* Validate arguments. */
2477     if( ( pContext == NULL ) || ( pTransportInterface == NULL ) ||
2478         ( pNetworkBuffer == NULL ) )
2479     {
2480         LogError( ( "Argument cannot be NULL: pContext=%p, "
2481                     "pTransportInterface=%p, "
2482                     "pNetworkBuffer=%p",
2483                     ( void * ) pContext,
2484                     ( void * ) pTransportInterface,
2485                     ( void * ) pNetworkBuffer ) );
2486         status = MQTTBadParameter;
2487     }
2488     else if( getTimeFunction == NULL )
2489     {
2490         LogError( ( "Invalid parameter: getTimeFunction is NULL" ) );
2491         status = MQTTBadParameter;
2492     }
2493     else if( userCallback == NULL )
2494     {
2495         LogError( ( "Invalid parameter: userCallback is NULL" ) );
2496         status = MQTTBadParameter;
2497     }
2498     else if( pTransportInterface->recv == NULL )
2499     {
2500         LogError( ( "Invalid parameter: pTransportInterface->recv is NULL" ) );
2501         status = MQTTBadParameter;
2502     }
2503     else if( pTransportInterface->send == NULL )
2504     {
2505         LogError( ( "Invalid parameter: pTransportInterface->send is NULL" ) );
2506         status = MQTTBadParameter;
2507     }
2508     else
2509     {
2510         ( void ) memset( pContext, 0x00, sizeof( MQTTContext_t ) );
2511
2512         pContext->connectStatus = MQTTNotConnected;
2513         pContext->transportInterface = *pTransportInterface;
2514         pContext->getTime = getTimeFunction;
2515         pContext->appCallback = userCallback;
2516         pContext->networkBuffer = *pNetworkBuffer;
2517
2518         /* Zero is not a valid packet ID per MQTT spec. Start from 1. */
2519         pContext->nextPacketId = 1;
2520     }
2521
2522     return status;
2523 }
2524
2525 /*-----------------------------------------------------------*/
2526
2527 MQTTStatus_t MQTT_InitStatefulQoS( MQTTContext_t * pContext,
2528                                    MQTTPubAckInfo_t * pOutgoingPublishRecords,
2529                                    size_t outgoingPublishCount,
2530                                    MQTTPubAckInfo_t * pIncomingPublishRecords,
2531                                    size_t incomingPublishCount )
2532 {
2533     MQTTStatus_t status = MQTTSuccess;
2534
2535     if( pContext == NULL )
2536     {
2537         LogError( ( "Argument cannot be NULL: pContext=%p\n",
2538                     ( void * ) pContext ) );
2539         status = MQTTBadParameter;
2540     }
2541
2542     /* Check whether the arguments make sense. Not equal here behaves
2543      * like an exclusive-or operator for boolean values. */
2544     else if( ( outgoingPublishCount == 0U ) !=
2545              ( pOutgoingPublishRecords == NULL ) )
2546     {
2547         LogError( ( "Arguments do not match: pOutgoingPublishRecords=%p, "
2548                     "outgoingPublishCount=%lu",
2549                     ( void * ) pOutgoingPublishRecords,
2550                     outgoingPublishCount ) );
2551         status = MQTTBadParameter;
2552     }
2553
2554     /* Check whether the arguments make sense. Not equal here behaves
2555      * like an exclusive-or operator for boolean values. */
2556     else if( ( incomingPublishCount == 0U ) !=
2557              ( pIncomingPublishRecords == NULL ) )
2558     {
2559         LogError( ( "Arguments do not match: pIncomingPublishRecords=%p, "
2560                     "incomingPublishCount=%lu",
2561                     ( void * ) pIncomingPublishRecords,
2562                     incomingPublishCount ) );
2563         status = MQTTBadParameter;
2564     }
2565     else if( pContext->appCallback == NULL )
2566     {
2567         LogError( ( "MQTT_InitStatefulQoS must be called only after MQTT_Init has"
2568                     " been called succesfully.\n" ) );
2569         status = MQTTBadParameter;
2570     }
2571     else
2572     {
2573         pContext->incomingPublishRecordMaxCount = incomingPublishCount;
2574         pContext->incomingPublishRecords = pIncomingPublishRecords;
2575         pContext->outgoingPublishRecordMaxCount = outgoingPublishCount;
2576         pContext->outgoingPublishRecords = pOutgoingPublishRecords;
2577     }
2578
2579     return status;
2580 }
2581
2582 /*-----------------------------------------------------------*/
2583
2584 MQTTStatus_t MQTT_CancelCallback( const MQTTContext_t * pContext,
2585                                   uint16_t packetId )
2586 {
2587     MQTTStatus_t status = MQTTSuccess;
2588
2589     if( pContext == NULL )
2590     {
2591         LogWarn( ( "pContext is NULL\n" ) );
2592         status = MQTTBadParameter;
2593     }
2594     else if( pContext->outgoingPublishRecords == NULL )
2595     {
2596         LogError( ( "QoS1/QoS2 is not initialized for use. Please, "
2597                     "call MQTT_InitStatefulQoS to enable QoS1 and QoS2 "
2598                     "publishes.\n" ) );
2599         status = MQTTBadParameter;
2600     }
2601     else
2602     {
2603         MQTT_PRE_STATE_UPDATE_HOOK( pContext );
2604
2605         status = MQTT_RemoveStateRecord( pContext,
2606                                          packetId );
2607
2608         MQTT_POST_STATE_UPDATE_HOOK( pContext );
2609     }
2610
2611     return status;
2612 }
2613
2614 /*-----------------------------------------------------------*/
2615
2616 MQTTStatus_t MQTT_Connect( MQTTContext_t * pContext,
2617                            const MQTTConnectInfo_t * pConnectInfo,
2618                            const MQTTPublishInfo_t * pWillInfo,
2619                            uint32_t timeoutMs,
2620                            bool * pSessionPresent )
2621 {
2622     size_t remainingLength = 0UL, packetSize = 0UL;
2623     MQTTStatus_t status = MQTTSuccess;
2624     MQTTPacketInfo_t incomingPacket = { 0 };
2625
2626     incomingPacket.type = ( uint8_t ) 0;
2627
2628     if( ( pContext == NULL ) || ( pConnectInfo == NULL ) || ( pSessionPresent == NULL ) )
2629     {
2630         LogError( ( "Argument cannot be NULL: pContext=%p, "
2631                     "pConnectInfo=%p, pSessionPresent=%p.",
2632                     ( void * ) pContext,
2633                     ( void * ) pConnectInfo,
2634                     ( void * ) pSessionPresent ) );
2635         status = MQTTBadParameter;
2636     }
2637
2638     if( status == MQTTSuccess )
2639     {
2640         /* Get MQTT connect packet size and remaining length. */
2641         status = MQTT_GetConnectPacketSize( pConnectInfo,
2642                                             pWillInfo,
2643                                             &remainingLength,
2644                                             &packetSize );
2645         LogDebug( ( "CONNECT packet size is %lu and remaining length is %lu.",
2646                     ( unsigned long ) packetSize,
2647                     ( unsigned long ) remainingLength ) );
2648     }
2649
2650     if( status == MQTTSuccess )
2651     {
2652         MQTT_PRE_SEND_HOOK( pContext );
2653
2654         status = sendConnectWithoutCopy( pContext,
2655                                          pConnectInfo,
2656                                          pWillInfo,
2657                                          remainingLength );
2658
2659         MQTT_POST_SEND_HOOK( pContext );
2660     }
2661
2662     /* Read CONNACK from transport layer. */
2663     if( status == MQTTSuccess )
2664     {
2665         status = receiveConnack( pContext,
2666                                  timeoutMs,
2667                                  pConnectInfo->cleanSession,
2668                                  &incomingPacket,
2669                                  pSessionPresent );
2670     }
2671
2672     if( status == MQTTSuccess )
2673     {
2674         /* Resend PUBRELs when reestablishing a session, or clear records for new sessions. */
2675         status = handleSessionResumption( pContext, *pSessionPresent );
2676     }
2677
2678     if( status == MQTTSuccess )
2679     {
2680         LogInfo( ( "MQTT connection established with the broker." ) );
2681         pContext->connectStatus = MQTTConnected;
2682         /* Initialize keep-alive fields after a successful connection. */
2683         pContext->keepAliveIntervalSec = pConnectInfo->keepAliveSeconds;
2684         pContext->waitingForPingResp = false;
2685         pContext->pingReqSendTimeMs = 0U;
2686     }
2687     else
2688     {
2689         LogError( ( "MQTT connection failed with status = %s.",
2690                     MQTT_Status_strerror( status ) ) );
2691     }
2692
2693     return status;
2694 }
2695
2696 /*-----------------------------------------------------------*/
2697
2698 MQTTStatus_t MQTT_Subscribe( MQTTContext_t * pContext,
2699                              const MQTTSubscribeInfo_t * pSubscriptionList,
2700                              size_t subscriptionCount,
2701                              uint16_t packetId )
2702 {
2703     size_t remainingLength = 0UL, packetSize = 0UL;
2704
2705     /* Validate arguments. */
2706     MQTTStatus_t status = validateSubscribeUnsubscribeParams( pContext,
2707                                                               pSubscriptionList,
2708                                                               subscriptionCount,
2709                                                               packetId );
2710
2711     if( status == MQTTSuccess )
2712     {
2713         /* Get the remaining length and packet size.*/
2714         status = MQTT_GetSubscribePacketSize( pSubscriptionList,
2715                                               subscriptionCount,
2716                                               &remainingLength,
2717                                               &packetSize );
2718         LogDebug( ( "SUBSCRIBE packet size is %lu and remaining length is %lu.",
2719                     ( unsigned long ) packetSize,
2720                     ( unsigned long ) remainingLength ) );
2721     }
2722
2723     if( status == MQTTSuccess )
2724     {
2725         MQTT_PRE_SEND_HOOK( pContext );
2726
2727         /* Send MQTT SUBSCRIBE packet. */
2728         status = sendSubscribeWithoutCopy( pContext,
2729                                            pSubscriptionList,
2730                                            subscriptionCount,
2731                                            packetId,
2732                                            remainingLength );
2733
2734         MQTT_POST_SEND_HOOK( pContext );
2735     }
2736
2737     return status;
2738 }
2739
2740 /*-----------------------------------------------------------*/
2741
2742 MQTTStatus_t MQTT_Publish( MQTTContext_t * pContext,
2743                            const MQTTPublishInfo_t * pPublishInfo,
2744                            uint16_t packetId )
2745 {
2746     size_t headerSize = 0UL;
2747     size_t remainingLength = 0UL;
2748     size_t packetSize = 0UL;
2749     MQTTPublishState_t publishStatus = MQTTStateNull;
2750     bool stateUpdateHookExecuted = false;
2751
2752     /* 1 header byte + 4 bytes (maximum) required for encoding the length +
2753      * 2 bytes for topic string. */
2754     uint8_t mqttHeader[ 7 ];
2755
2756     /* Validate arguments. */
2757     MQTTStatus_t status = validatePublishParams( pContext, pPublishInfo, packetId );
2758
2759     if( status == MQTTSuccess )
2760     {
2761         /* Get the remaining length and packet size.*/
2762         status = MQTT_GetPublishPacketSize( pPublishInfo,
2763                                             &remainingLength,
2764                                             &packetSize );
2765     }
2766
2767     if( status == MQTTSuccess )
2768     {
2769         status = MQTT_SerializePublishHeaderWithoutTopic( pPublishInfo,
2770                                                           remainingLength,
2771                                                           mqttHeader,
2772                                                           &headerSize );
2773     }
2774
2775     if( ( status == MQTTSuccess ) && ( pPublishInfo->qos > MQTTQoS0 ) )
2776     {
2777         MQTT_PRE_STATE_UPDATE_HOOK( pContext );
2778
2779         /* Set the flag so that the corresponding hook can be called later. */
2780         stateUpdateHookExecuted = true;
2781
2782         status = MQTT_ReserveState( pContext,
2783                                     packetId,
2784                                     pPublishInfo->qos );
2785
2786         /* State already exists for a duplicate packet.
2787          * If a state doesn't exist, it will be handled as a new publish in
2788          * state engine. */
2789         if( ( status == MQTTStateCollision ) && ( pPublishInfo->dup == true ) )
2790         {
2791             status = MQTTSuccess;
2792         }
2793     }
2794
2795     if( status == MQTTSuccess )
2796     {
2797         /* Take the mutex as multiple send calls are required for sending this
2798          * packet. */
2799         MQTT_PRE_SEND_HOOK( pContext );
2800
2801         status = sendPublishWithoutCopy( pContext,
2802                                          pPublishInfo,
2803                                          mqttHeader,
2804                                          headerSize,
2805                                          packetId );
2806
2807         /* Give the mutex away for the next taker. */
2808         MQTT_POST_SEND_HOOK( pContext );
2809     }
2810
2811     if( ( status == MQTTSuccess ) &&
2812         ( pPublishInfo->qos > MQTTQoS0 ) )
2813     {
2814         /* Update state machine after PUBLISH is sent.
2815          * Only to be done for QoS1 or QoS2. */
2816         status = MQTT_UpdateStatePublish( pContext,
2817                                           packetId,
2818                                           MQTT_SEND,
2819                                           pPublishInfo->qos,
2820                                           &publishStatus );
2821
2822         if( status != MQTTSuccess )
2823         {
2824             LogError( ( "Update state for publish failed with status %s."
2825                         " However PUBLISH packet was sent to the broker."
2826                         " Any further handling of ACKs for the packet Id"
2827                         " will fail.",
2828                         MQTT_Status_strerror( status ) ) );
2829         }
2830     }
2831
2832     if( stateUpdateHookExecuted == true )
2833     {
2834         /* Regardless of the status, if the mutex was taken due to the
2835          * packet being of QoS > QoS0, then it should be relinquished. */
2836         MQTT_POST_STATE_UPDATE_HOOK( pContext );
2837     }
2838
2839     if( status != MQTTSuccess )
2840     {
2841         LogError( ( "MQTT PUBLISH failed with status %s.",
2842                     MQTT_Status_strerror( status ) ) );
2843     }
2844
2845     return status;
2846 }
2847
2848 /*-----------------------------------------------------------*/
2849
2850 MQTTStatus_t MQTT_Ping( MQTTContext_t * pContext )
2851 {
2852     int32_t sendResult = 0;
2853     MQTTStatus_t status = MQTTSuccess;
2854     size_t packetSize = 0U;
2855     /* MQTT ping packets are of fixed length. */
2856     uint8_t pingreqPacket[ 2U ];
2857     MQTTFixedBuffer_t localBuffer;
2858
2859     localBuffer.pBuffer = pingreqPacket;
2860     localBuffer.size = 2U;
2861
2862     if( pContext == NULL )
2863     {
2864         LogError( ( "pContext is NULL." ) );
2865         status = MQTTBadParameter;
2866     }
2867
2868     if( status == MQTTSuccess )
2869     {
2870         /* Get MQTT PINGREQ packet size. */
2871         status = MQTT_GetPingreqPacketSize( &packetSize );
2872
2873         if( status == MQTTSuccess )
2874         {
2875             LogDebug( ( "MQTT PINGREQ packet size is %lu.",
2876                         ( unsigned long ) packetSize ) );
2877         }
2878         else
2879         {
2880             LogError( ( "Failed to get the PINGREQ packet size." ) );
2881         }
2882     }
2883
2884     if( status == MQTTSuccess )
2885     {
2886         /* Serialize MQTT PINGREQ. */
2887         status = MQTT_SerializePingreq( &localBuffer );
2888     }
2889
2890     if( status == MQTTSuccess )
2891     {
2892         /* Take the mutex as the send call should not be interrupted in
2893          * between. */
2894         MQTT_PRE_SEND_HOOK( pContext );
2895
2896         /* Send the serialized PINGREQ packet to transport layer.
2897          * Here, we do not use the vectored IO approach for efficiency as the
2898          * Ping packet does not have numerous fields which need to be copied
2899          * from the user provided buffers. Thus it can be sent directly. */
2900         sendResult = sendBuffer( pContext,
2901                                  localBuffer.pBuffer,
2902                                  2U );
2903
2904         /* Give the mutex away. */
2905         MQTT_POST_SEND_HOOK( pContext );
2906
2907         /* It is an error to not send the entire PINGREQ packet. */
2908         if( sendResult < ( int32_t ) packetSize )
2909         {
2910             LogError( ( "Transport send failed for PINGREQ packet." ) );
2911             status = MQTTSendFailed;
2912         }
2913         else
2914         {
2915             pContext->pingReqSendTimeMs = pContext->lastPacketTxTime;
2916             pContext->waitingForPingResp = true;
2917             LogDebug( ( "Sent %ld bytes of PINGREQ packet.",
2918                         ( long int ) sendResult ) );
2919         }
2920     }
2921
2922     return status;
2923 }
2924
2925 /*-----------------------------------------------------------*/
2926
2927 MQTTStatus_t MQTT_Unsubscribe( MQTTContext_t * pContext,
2928                                const MQTTSubscribeInfo_t * pSubscriptionList,
2929                                size_t subscriptionCount,
2930                                uint16_t packetId )
2931 {
2932     size_t remainingLength = 0UL, packetSize = 0UL;
2933
2934     /* Validate arguments. */
2935     MQTTStatus_t status = validateSubscribeUnsubscribeParams( pContext,
2936                                                               pSubscriptionList,
2937                                                               subscriptionCount,
2938                                                               packetId );
2939
2940     if( status == MQTTSuccess )
2941     {
2942         /* Get the remaining length and packet size.*/
2943         status = MQTT_GetUnsubscribePacketSize( pSubscriptionList,
2944                                                 subscriptionCount,
2945                                                 &remainingLength,
2946                                                 &packetSize );
2947         LogDebug( ( "UNSUBSCRIBE packet size is %lu and remaining length is %lu.",
2948                     ( unsigned long ) packetSize,
2949                     ( unsigned long ) remainingLength ) );
2950     }
2951
2952     if( status == MQTTSuccess )
2953     {
2954         /* Take the mutex because the below call should not be interrupted. */
2955         MQTT_PRE_SEND_HOOK( pContext );
2956
2957         status = sendUnsubscribeWithoutCopy( pContext,
2958                                              pSubscriptionList,
2959                                              subscriptionCount,
2960                                              packetId,
2961                                              remainingLength );
2962
2963         /* Give the mutex away. */
2964         MQTT_POST_SEND_HOOK( pContext );
2965     }
2966
2967     return status;
2968 }
2969
2970 /*-----------------------------------------------------------*/
2971
2972 MQTTStatus_t MQTT_Disconnect( MQTTContext_t * pContext )
2973 {
2974     size_t packetSize = 0U;
2975     int32_t sendResult = 0;
2976     MQTTStatus_t status = MQTTSuccess;
2977     MQTTFixedBuffer_t localBuffer;
2978     uint8_t disconnectPacket[ 2U ];
2979
2980     localBuffer.pBuffer = disconnectPacket;
2981     localBuffer.size = 2U;
2982
2983     /* Validate arguments. */
2984     if( pContext == NULL )
2985     {
2986         LogError( ( "pContext cannot be NULL." ) );
2987         status = MQTTBadParameter;
2988     }
2989
2990     if( status == MQTTSuccess )
2991     {
2992         /* Get MQTT DISCONNECT packet size. */
2993         status = MQTT_GetDisconnectPacketSize( &packetSize );
2994         LogDebug( ( "MQTT DISCONNECT packet size is %lu.",
2995                     ( unsigned long ) packetSize ) );
2996     }
2997
2998     if( status == MQTTSuccess )
2999     {
3000         /* Serialize MQTT DISCONNECT packet. */
3001         status = MQTT_SerializeDisconnect( &localBuffer );
3002     }
3003
3004     if( status == MQTTSuccess )
3005     {
3006         /* Take the mutex because the below call should not be interrupted. */
3007         MQTT_PRE_SEND_HOOK( pContext );
3008
3009         /* Here we do not use vectors as the disconnect packet has fixed fields
3010          * which do not reside in user provided buffers. Thus, it can be sent
3011          * using a simple send call. */
3012         sendResult = sendBuffer( pContext,
3013                                  localBuffer.pBuffer,
3014                                  packetSize );
3015
3016         /* Give the mutex away. */
3017         MQTT_POST_SEND_HOOK( pContext );
3018
3019         if( sendResult < ( int32_t ) packetSize )
3020         {
3021             LogError( ( "Transport send failed for DISCONNECT packet." ) );
3022             status = MQTTSendFailed;
3023         }
3024         else
3025         {
3026             LogDebug( ( "Sent %ld bytes of DISCONNECT packet.",
3027                         ( long int ) sendResult ) );
3028         }
3029     }
3030
3031     if( status == MQTTSuccess )
3032     {
3033         LogInfo( ( "Disconnected from the broker." ) );
3034         pContext->connectStatus = MQTTNotConnected;
3035
3036         /* Reset the index and clean the buffer on a successful disconnect. */
3037         pContext->index = 0;
3038         ( void ) memset( pContext->networkBuffer.pBuffer, 0, pContext->networkBuffer.size );
3039     }
3040
3041     return status;
3042 }
3043
3044 /*-----------------------------------------------------------*/
3045
3046 MQTTStatus_t MQTT_ProcessLoop( MQTTContext_t * pContext )
3047 {
3048     MQTTStatus_t status = MQTTBadParameter;
3049
3050     if( pContext == NULL )
3051     {
3052         LogError( ( "Invalid input parameter: MQTT Context cannot be NULL." ) );
3053     }
3054     else if( pContext->getTime == NULL )
3055     {
3056         LogError( ( "Invalid input parameter: MQTT Context must have valid getTime." ) );
3057     }
3058     else if( pContext->networkBuffer.pBuffer == NULL )
3059     {
3060         LogError( ( "Invalid input parameter: The MQTT context's networkBuffer must not be NULL." ) );
3061     }
3062     else
3063     {
3064         pContext->controlPacketSent = false;
3065         status = receiveSingleIteration( pContext, true );
3066     }
3067
3068     return status;
3069 }
3070
3071 /*-----------------------------------------------------------*/
3072
3073 MQTTStatus_t MQTT_ReceiveLoop( MQTTContext_t * pContext )
3074 {
3075     MQTTStatus_t status = MQTTBadParameter;
3076
3077     if( pContext == NULL )
3078     {
3079         LogError( ( "Invalid input parameter: MQTT Context cannot be NULL." ) );
3080     }
3081     else if( pContext->getTime == NULL )
3082     {
3083         LogError( ( "Invalid input parameter: MQTT Context must have a valid getTime function." ) );
3084     }
3085     else if( pContext->networkBuffer.pBuffer == NULL )
3086     {
3087         LogError( ( "Invalid input parameter: MQTT context's networkBuffer must not be NULL." ) );
3088     }
3089     else
3090     {
3091         status = receiveSingleIteration( pContext, false );
3092     }
3093
3094     return status;
3095 }
3096
3097 /*-----------------------------------------------------------*/
3098
3099 uint16_t MQTT_GetPacketId( MQTTContext_t * pContext )
3100 {
3101     uint16_t packetId = 0U;
3102
3103     if( pContext != NULL )
3104     {
3105         MQTT_PRE_STATE_UPDATE_HOOK( pContext );
3106
3107         packetId = pContext->nextPacketId;
3108
3109         /* A packet ID of zero is not a valid packet ID. When the max ID
3110          * is reached the next one should start at 1. */
3111         if( pContext->nextPacketId == ( uint16_t ) UINT16_MAX )
3112         {
3113             pContext->nextPacketId = 1;
3114         }
3115         else
3116         {
3117             pContext->nextPacketId++;
3118         }
3119
3120         MQTT_POST_STATE_UPDATE_HOOK( pContext );
3121     }
3122
3123     return packetId;
3124 }
3125
3126 /*-----------------------------------------------------------*/
3127
3128 MQTTStatus_t MQTT_MatchTopic( const char * pTopicName,
3129                               const uint16_t topicNameLength,
3130                               const char * pTopicFilter,
3131                               const uint16_t topicFilterLength,
3132                               bool * pIsMatch )
3133 {
3134     MQTTStatus_t status = MQTTSuccess;
3135     bool topicFilterStartsWithWildcard = false;
3136     bool matchStatus = false;
3137
3138     if( ( pTopicName == NULL ) || ( topicNameLength == 0u ) )
3139     {
3140         LogError( ( "Invalid paramater: Topic name should be non-NULL and its "
3141                     "length should be > 0: TopicName=%p, TopicNameLength=%hu",
3142                     ( void * ) pTopicName,
3143                     ( unsigned short ) topicNameLength ) );
3144
3145         status = MQTTBadParameter;
3146     }
3147     else if( ( pTopicFilter == NULL ) || ( topicFilterLength == 0u ) )
3148     {
3149         LogError( ( "Invalid paramater: Topic filter should be non-NULL and "
3150                     "its length should be > 0: TopicName=%p, TopicFilterLength=%hu",
3151                     ( void * ) pTopicFilter,
3152                     ( unsigned short ) topicFilterLength ) );
3153         status = MQTTBadParameter;
3154     }
3155     else if( pIsMatch == NULL )
3156     {
3157         LogError( ( "Invalid paramater: Output parameter, pIsMatch, is NULL" ) );
3158         status = MQTTBadParameter;
3159     }
3160     else
3161     {
3162         /* Check for an exact match if the incoming topic name and the registered
3163          * topic filter length match. */
3164         if( topicNameLength == topicFilterLength )
3165         {
3166             matchStatus = strncmp( pTopicName, pTopicFilter, topicNameLength ) == 0;
3167         }
3168
3169         if( matchStatus == false )
3170         {
3171             /* If an exact match was not found, match against wildcard characters in
3172              * topic filter.*/
3173
3174             /* Determine if topic filter starts with a wildcard. */
3175             topicFilterStartsWithWildcard = ( pTopicFilter[ 0 ] == '+' ) ||
3176                                             ( pTopicFilter[ 0 ] == '#' );
3177
3178             /* Note: According to the MQTT 3.1.1 specification, incoming PUBLISH topic names
3179              * starting with "$" character cannot be matched against topic filter starting with
3180              * a wildcard, i.e. for example, "$SYS/sport" cannot be matched with "#" or
3181              * "+/sport" topic filters. */
3182             if( !( ( pTopicName[ 0 ] == '$' ) && ( topicFilterStartsWithWildcard == true ) ) )
3183             {
3184                 matchStatus = matchTopicFilter( pTopicName, topicNameLength, pTopicFilter, topicFilterLength );
3185             }
3186         }
3187
3188         /* Update the output parameter with the match result. */
3189         *pIsMatch = matchStatus;
3190     }
3191
3192     return status;
3193 }
3194
3195 /*-----------------------------------------------------------*/
3196
3197 MQTTStatus_t MQTT_GetSubAckStatusCodes( const MQTTPacketInfo_t * pSubackPacket,
3198                                         uint8_t ** pPayloadStart,
3199                                         size_t * pPayloadSize )
3200 {
3201     MQTTStatus_t status = MQTTSuccess;
3202
3203     if( pSubackPacket == NULL )
3204     {
3205         LogError( ( "Invalid parameter: pSubackPacket is NULL." ) );
3206         status = MQTTBadParameter;
3207     }
3208     else if( pPayloadStart == NULL )
3209     {
3210         LogError( ( "Invalid parameter: pPayloadStart is NULL." ) );
3211         status = MQTTBadParameter;
3212     }
3213     else if( pPayloadSize == NULL )
3214     {
3215         LogError( ( "Invalid parameter: pPayloadSize is NULL." ) );
3216         status = MQTTBadParameter;
3217     }
3218     else if( pSubackPacket->type != MQTT_PACKET_TYPE_SUBACK )
3219     {
3220         LogError( ( "Invalid parameter: Input packet is not a SUBACK packet: "
3221                     "ExpectedType=%02x, InputType=%02x",
3222                     ( int ) MQTT_PACKET_TYPE_SUBACK,
3223                     ( int ) pSubackPacket->type ) );
3224         status = MQTTBadParameter;
3225     }
3226     else if( pSubackPacket->pRemainingData == NULL )
3227     {
3228         LogError( ( "Invalid parameter: pSubackPacket->pRemainingData is NULL" ) );
3229         status = MQTTBadParameter;
3230     }
3231
3232     /* A SUBACK must have a remaining length of at least 3 to accommodate the
3233      * packet identifier and at least 1 return code. */
3234     else if( pSubackPacket->remainingLength < 3U )
3235     {
3236         LogError( ( "Invalid parameter: Packet remaining length is invalid: "
3237                     "Should be greater than 2 for SUBACK packet: InputRemainingLength=%lu",
3238                     ( unsigned long ) pSubackPacket->remainingLength ) );
3239         status = MQTTBadParameter;
3240     }
3241     else
3242     {
3243         /* According to the MQTT 3.1.1 protocol specification, the "Remaining Length" field is a
3244          * length of the variable header (2 bytes) plus the length of the payload.
3245          * Therefore, we add 2 positions for the starting address of the payload, and
3246          * subtract 2 bytes from the remaining length for the length of the payload.*/
3247         *pPayloadStart = &pSubackPacket->pRemainingData[ sizeof( uint16_t ) ];
3248         *pPayloadSize = pSubackPacket->remainingLength - sizeof( uint16_t );
3249     }
3250
3251     return status;
3252 }
3253
3254 /*-----------------------------------------------------------*/
3255
3256 const char * MQTT_Status_strerror( MQTTStatus_t status )
3257 {
3258     const char * str = NULL;
3259
3260     switch( status )
3261     {
3262         case MQTTSuccess:
3263             str = "MQTTSuccess";
3264             break;
3265
3266         case MQTTBadParameter:
3267             str = "MQTTBadParameter";
3268             break;
3269
3270         case MQTTNoMemory:
3271             str = "MQTTNoMemory";
3272             break;
3273
3274         case MQTTSendFailed:
3275             str = "MQTTSendFailed";
3276             break;
3277
3278         case MQTTRecvFailed:
3279             str = "MQTTRecvFailed";
3280             break;
3281
3282         case MQTTBadResponse:
3283             str = "MQTTBadResponse";
3284             break;
3285
3286         case MQTTServerRefused:
3287             str = "MQTTServerRefused";
3288             break;
3289
3290         case MQTTNoDataAvailable:
3291             str = "MQTTNoDataAvailable";
3292             break;
3293
3294         case MQTTIllegalState:
3295             str = "MQTTIllegalState";
3296             break;
3297
3298         case MQTTStateCollision:
3299             str = "MQTTStateCollision";
3300             break;
3301
3302         case MQTTKeepAliveTimeout:
3303             str = "MQTTKeepAliveTimeout";
3304             break;
3305
3306         default:
3307             str = "Invalid MQTT Status code";
3308             break;
3309     }
3310
3311     return str;
3312 }
3313
3314 /*-----------------------------------------------------------*/