RaspberrPi project source code
guowenxue
2024-05-27 2c971f2fcf6c6322a0ea584b2af4c3cef20d3d63
commit | author | age
d6b4a7 1 /*
G 2  * coreMQTT v2.1.1
3  * Copyright (C) 2022 Amazon.com, Inc. or its affiliates.  All Rights Reserved.
4  *
5  * SPDX-License-Identifier: MIT
6  *
7  * Permission is hereby granted, free of charge, to any person obtaining a copy of
8  * this software and associated documentation files (the "Software"), to deal in
9  * the Software without restriction, including without limitation the rights to
10  * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
11  * the Software, and to permit persons to whom the Software is furnished to do so,
12  * subject to the following conditions:
13  *
14  * The above copyright notice and this permission notice shall be included in all
15  * copies or substantial portions of the Software.
16  *
17  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
19  * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
20  * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
21  * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
22  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
23  */
24
25 /**
26  * @file core_mqtt_state.c
27  * @brief Implements the functions in core_mqtt_state.h.
28  */
29 #include <assert.h>
30 #include <string.h>
31 #include "core_mqtt_state.h"
32
33 /* Include config defaults header to get default values of configs. */
34 #include "core_mqtt_config_defaults.h"
35
36 #include "core_mqtt_default_logging.h"
37
38 /*-----------------------------------------------------------*/
39
40 /**
41  * @brief A global static variable used to generate the macro
42  * #MQTT_INVALID_STATE_COUNT of size_t length.
43  */
44 static const size_t ZERO_SIZE_T = 0U;
45
46 /**
47  * @brief This macro depicts the invalid value for the state publishes.
48  */
49 #define MQTT_INVALID_STATE_COUNT    ( ~ZERO_SIZE_T )
50
51 /**
52  * @brief Create a 16-bit bitmap with bit set at specified position.
53  *
54  * @param[in] position The position at which the bit need to be set.
55  */
56 #define UINT16_BITMAP_BIT_SET_AT( position )    ( ( uint16_t ) 0x01U << ( ( uint16_t ) position ) )
57
58 /**
59  * @brief Set a bit in an 16-bit unsigned integer.
60  *
61  * @param[in] x The 16-bit unsigned integer to set a bit.
62  * @param[in] position The position at which the bit need to be set.
63  */
64 #define UINT16_SET_BIT( x, position )           ( ( x ) = ( uint16_t ) ( ( x ) | ( UINT16_BITMAP_BIT_SET_AT( position ) ) ) )
65
66 /**
67  * @brief Macro for checking if a bit is set in a 16-bit unsigned integer.
68  *
69  * @param[in] x The unsigned 16-bit integer to check.
70  * @param[in] position Which bit to check.
71  */
72 #define UINT16_CHECK_BIT( x, position )         ( ( ( x ) & ( UINT16_BITMAP_BIT_SET_AT( position ) ) ) == ( UINT16_BITMAP_BIT_SET_AT( position ) ) )
73
74 /*-----------------------------------------------------------*/
75
76 /**
77  * @brief Test if a transition to new state is possible, when dealing with PUBLISHes.
78  *
79  * @param[in] currentState The current state.
80  * @param[in] newState State to transition to.
81  * @param[in] opType Reserve, Send, or Receive.
82  * @param[in] qos 0, 1, or 2.
83  *
84  * @note This function does not validate the current state, or the new state
85  * based on either the operation type or QoS. It assumes the new state is valid
86  * given the opType and QoS, which will be the case if calculated by
87  * MQTT_CalculateStatePublish().
88  *
89  * @return `true` if transition is possible, else `false`
90  */
91 static bool validateTransitionPublish( MQTTPublishState_t currentState,
92                                        MQTTPublishState_t newState,
93                                        MQTTStateOperation_t opType,
94                                        MQTTQoS_t qos );
95
96 /**
97  * @brief Test if a transition to a new state is possible, when dealing with acks.
98  *
99  * @param[in] currentState The current state.
100  * @param[in] newState State to transition to.
101  *
102  * @return `true` if transition is possible, else `false`.
103  */
104 static bool validateTransitionAck( MQTTPublishState_t currentState,
105                                    MQTTPublishState_t newState );
106
107 /**
108  * @brief Test if the publish corresponding to an ack is outgoing or incoming.
109  *
110  * @param[in] packetType PUBACK, PUBREC, PUBREL, or PUBCOMP.
111  * @param[in] opType Send, or Receive.
112  *
113  * @return `true` if corresponds to outgoing publish, else `false`.
114  */
115 static bool isPublishOutgoing( MQTTPubAckType_t packetType,
116                                MQTTStateOperation_t opType );
117
118 /**
119  * @brief Find a packet ID in the state record.
120  *
121  * @param[in] records State record array.
122  * @param[in] recordCount Length of record array.
123  * @param[in] packetId packet ID to search for.
124  * @param[out] pQos QoS retrieved from record.
125  * @param[out] pCurrentState state retrieved from record.
126  *
127  * @return index of the packet id in the record if it exists, else the record length.
128  */
129 static size_t findInRecord( const MQTTPubAckInfo_t * records,
130                             size_t recordCount,
131                             uint16_t packetId,
132                             MQTTQoS_t * pQos,
133                             MQTTPublishState_t * pCurrentState );
134
135 /**
136  * @brief Compact records.
137  *
138  * Records are arranged in the relative order to maintain message ordering.
139  * This will lead to fragmentation and this function will help in defragmenting
140  * the records array.
141  *
142  * @param[in] records State record array.
143  * @param[in] recordCount Length of record array.
144  */
145 static void compactRecords( MQTTPubAckInfo_t * records,
146                             size_t recordCount );
147
148 /**
149  * @brief Store a new entry in the state record.
150  *
151  * @param[in] records State record array.
152  * @param[in] recordCount Length of record array.
153  * @param[in] packetId Packet ID of new entry.
154  * @param[in] qos QoS of new entry.
155  * @param[in] publishState State of new entry.
156  *
157  * @return #MQTTSuccess, #MQTTNoMemory, or #MQTTStateCollision.
158  */
159 static MQTTStatus_t addRecord( MQTTPubAckInfo_t * records,
160                                size_t recordCount,
161                                uint16_t packetId,
162                                MQTTQoS_t qos,
163                                MQTTPublishState_t publishState );
164
165 /**
166  * @brief Update and possibly delete an entry in the state record.
167  *
168  * @param[in] records State record array.
169  * @param[in] recordIndex index of record to update.
170  * @param[in] newState New state to update.
171  * @param[in] shouldDelete Whether an existing entry should be deleted.
172  */
173 static void updateRecord( MQTTPubAckInfo_t * records,
174                           size_t recordIndex,
175                           MQTTPublishState_t newState,
176                           bool shouldDelete );
177
178 /**
179  * @brief Get the packet ID and index of an outgoing publish in specified
180  * states.
181  *
182  * @param[in] pMqttContext Initialized MQTT context.
183  * @param[in] searchStates The states to search for in 2-byte bit map.
184  * @param[in,out] pCursor Index at which to start searching.
185  *
186  * @return Packet ID of the outgoing publish.
187  */
188 static uint16_t stateSelect( const MQTTContext_t * pMqttContext,
189                              uint16_t searchStates,
190                              MQTTStateCursor_t * pCursor );
191
192 /**
193  * @brief Update the state records for an ACK after state transition
194  * validations.
195  *
196  * @param[in] records State records pointer.
197  * @param[in] maxRecordCount The maximum number of records.
198  * @param[in] recordIndex Index at which the record is stored.
199  * @param[in] packetId Packet id of the packet.
200  * @param[in] currentState Current state of the publish record.
201  * @param[in] newState New state of the publish.
202  *
203  * @return #MQTTIllegalState, or #MQTTSuccess.
204  */
205 static MQTTStatus_t updateStateAck( MQTTPubAckInfo_t * records,
206                                     size_t maxRecordCount,
207                                     size_t recordIndex,
208                                     uint16_t packetId,
209                                     MQTTPublishState_t currentState,
210                                     MQTTPublishState_t newState );
211
212 /**
213  * @brief Update the state record for a PUBLISH packet after validating
214  * the state transitions.
215  *
216  * @param[in] pMqttContext Initialized MQTT context.
217  * @param[in] recordIndex Index in state records at which publish record exists.
218  * @param[in] packetId ID of the PUBLISH packet.
219  * @param[in] opType Send or Receive.
220  * @param[in] qos 0, 1, or 2.
221  * @param[in] currentState Current state of the publish record.
222  * @param[in] newState New state of the publish record.
223  *
224  * @return #MQTTIllegalState, #MQTTStateCollision or #MQTTSuccess.
225  */
226 static MQTTStatus_t updateStatePublish( const MQTTContext_t * pMqttContext,
227                                         size_t recordIndex,
228                                         uint16_t packetId,
229                                         MQTTStateOperation_t opType,
230                                         MQTTQoS_t qos,
231                                         MQTTPublishState_t currentState,
232                                         MQTTPublishState_t newState );
233
234 /*-----------------------------------------------------------*/
235
236 static bool validateTransitionPublish( MQTTPublishState_t currentState,
237                                        MQTTPublishState_t newState,
238                                        MQTTStateOperation_t opType,
239                                        MQTTQoS_t qos )
240 {
241     bool isValid = false;
242
243     switch( currentState )
244     {
245         case MQTTStateNull:
246
247             /* Transitions from null occur when storing a new entry into the record. */
248             if( opType == MQTT_RECEIVE )
249             {
250                 isValid = ( newState == MQTTPubAckSend ) || ( newState == MQTTPubRecSend );
251             }
252
253             break;
254
255         case MQTTPublishSend:
256
257             /* Outgoing publish. All such publishes start in this state due to
258              * the reserve operation. */
259             switch( qos )
260             {
261                 case MQTTQoS1:
262                     isValid = newState == MQTTPubAckPending;
263                     break;
264
265                 case MQTTQoS2:
266                     isValid = newState == MQTTPubRecPending;
267                     break;
268
269                 case MQTTQoS0:
270                 default:
271                     /* QoS 0 is checked before calling this function. */
272                     break;
273             }
274
275             break;
276
277         /* Below cases are for validating the resends of publish when a session is
278          * reestablished. */
279         case MQTTPubAckPending:
280
281             /* When a session is reestablished, outgoing QoS1 publishes in state
282              * #MQTTPubAckPending can be resent. The state remains the same. */
283             isValid = newState == MQTTPubAckPending;
284
285             break;
286
287         case MQTTPubRecPending:
288
289             /* When a session is reestablished, outgoing QoS2 publishes in state
290              * #MQTTPubRecPending can be resent. The state remains the same. */
291             isValid = newState == MQTTPubRecPending;
292
293             break;
294
295         case MQTTPubAckSend:
296         case MQTTPubCompPending:
297         case MQTTPubCompSend:
298         case MQTTPubRecSend:
299         case MQTTPubRelPending:
300         case MQTTPubRelSend:
301         case MQTTPublishDone:
302         default:
303             /* For a PUBLISH, we should not start from any other state. */
304             break;
305     }
306
307     return isValid;
308 }
309
310 /*-----------------------------------------------------------*/
311
312 static bool validateTransitionAck( MQTTPublishState_t currentState,
313                                    MQTTPublishState_t newState )
314 {
315     bool isValid = false;
316
317     switch( currentState )
318     {
319         case MQTTPubAckSend:
320         /* Incoming publish, QoS 1. */
321         case MQTTPubAckPending:
322             /* Outgoing publish, QoS 1. */
323             isValid = newState == MQTTPublishDone;
324             break;
325
326         case MQTTPubRecSend:
327             /* Incoming publish, QoS 2. */
328             isValid = newState == MQTTPubRelPending;
329             break;
330
331         case MQTTPubRelPending:
332
333             /* Incoming publish, QoS 2.
334              * There are 2 valid transitions possible.
335              * 1. MQTTPubRelPending -> MQTTPubCompSend : A PUBREL ack is received
336              *    when publish record state is MQTTPubRelPending. This is the
337              *    normal state transition without any connection interruptions.
338              * 2. MQTTPubRelPending -> MQTTPubRelPending : Receiving a duplicate
339              *    QoS2 publish can result in a transition to the same state.
340              *    This can happen in the below state transition.
341              *    1. Incoming publish received.
342              *    2. PUBREC ack sent and state is now MQTTPubRelPending.
343              *    3. TCP connection failure and broker didn't receive the PUBREC.
344              *    4. Reestablished MQTT session.
345              *    5. MQTT broker resent the un-acked publish.
346              *    6. Publish is received when publish record state is in
347              *       MQTTPubRelPending.
348              *    7. Sending out a PUBREC will result in this transition
349              *       to the same state. */
350             isValid = ( newState == MQTTPubCompSend ) ||
351                       ( newState == MQTTPubRelPending );
352             break;
353
354         case MQTTPubCompSend:
355
356             /* Incoming publish, QoS 2.
357              * There are 2 valid transitions possible.
358              * 1. MQTTPubCompSend -> MQTTPublishDone : A PUBCOMP ack is sent
359              *    after receiving a PUBREL from broker. This is the
360              *    normal state transition without any connection interruptions.
361              * 2. MQTTPubCompSend -> MQTTPubCompSend : Receiving a duplicate PUBREL
362              *    can result in a transition to the same state.
363              *    This can happen in the below state transition.
364              *    1. A TCP connection failure happened before sending a PUBCOMP
365              *       for an incoming PUBREL.
366              *    2. Reestablished an MQTT session.
367              *    3. MQTT broker resent the un-acked PUBREL.
368              *    4. Receiving the PUBREL again will result in this transition
369              *       to the same state. */
370             isValid = ( newState == MQTTPublishDone ) ||
371                       ( newState == MQTTPubCompSend );
372             break;
373
374         case MQTTPubRecPending:
375             /* Outgoing publish, Qos 2. */
376             isValid = newState == MQTTPubRelSend;
377             break;
378
379         case MQTTPubRelSend:
380             /* Outgoing publish, Qos 2. */
381             isValid = newState == MQTTPubCompPending;
382             break;
383
384         case MQTTPubCompPending:
385
386             /* Outgoing publish, Qos 2.
387              * There are 2 valid transitions possible.
388              * 1. MQTTPubCompPending -> MQTTPublishDone : A PUBCOMP is received.
389              *    This marks the complete state transition for the publish packet.
390              *    This is the normal state transition without any connection
391              *    interruptions.
392              * 2. MQTTPubCompPending -> MQTTPubCompPending : Resending a PUBREL for
393              *    packets in state #MQTTPubCompPending can result in this
394              *    transition to the same state.
395              *    This can happen in the below state transition.
396              *    1. A TCP connection failure happened before receiving a PUBCOMP
397              *       for an outgoing PUBREL.
398              *    2. An MQTT session is reestablished.
399              *    3. Resending the un-acked PUBREL results in this transition
400              *       to the same state. */
401             isValid = ( newState == MQTTPublishDone ) ||
402                       ( newState == MQTTPubCompPending );
403             break;
404
405         case MQTTPublishDone:
406         /* Done state should transition to invalid since it will be removed from the record. */
407         case MQTTPublishSend:
408         /* If an ack was sent/received we shouldn't have been in this state. */
409         case MQTTStateNull:
410         /* If an ack was sent/received the record should exist. */
411         default:
412             /* Invalid. */
413             break;
414     }
415
416     return isValid;
417 }
418
419 /*-----------------------------------------------------------*/
420
421 static bool isPublishOutgoing( MQTTPubAckType_t packetType,
422                                MQTTStateOperation_t opType )
423 {
424     bool isOutgoing = false;
425
426     switch( packetType )
427     {
428         case MQTTPuback:
429         case MQTTPubrec:
430         case MQTTPubcomp:
431             isOutgoing = opType == MQTT_RECEIVE;
432             break;
433
434         case MQTTPubrel:
435             isOutgoing = opType == MQTT_SEND;
436             break;
437
438         default:
439             /* No other ack type. */
440             break;
441     }
442
443     return isOutgoing;
444 }
445
446 /*-----------------------------------------------------------*/
447
448 static size_t findInRecord( const MQTTPubAckInfo_t * records,
449                             size_t recordCount,
450                             uint16_t packetId,
451                             MQTTQoS_t * pQos,
452                             MQTTPublishState_t * pCurrentState )
453 {
454     size_t index = 0;
455
456     assert( packetId != MQTT_PACKET_ID_INVALID );
457
458     *pCurrentState = MQTTStateNull;
459
460     for( index = 0; index < recordCount; index++ )
461     {
462         if( records[ index ].packetId == packetId )
463         {
464             *pQos = records[ index ].qos;
465             *pCurrentState = records[ index ].publishState;
466             break;
467         }
468     }
469
470     if( index == recordCount )
471     {
472         index = MQTT_INVALID_STATE_COUNT;
473     }
474
475     return index;
476 }
477
478 /*-----------------------------------------------------------*/
479
480 static void compactRecords( MQTTPubAckInfo_t * records,
481                             size_t recordCount )
482 {
483     size_t index = 0;
484     size_t emptyIndex = MQTT_INVALID_STATE_COUNT;
485
486     assert( records != NULL );
487
488     /* Find the empty spots and fill those with non empty values. */
489     for( ; index < recordCount; index++ )
490     {
491         /* Find the first empty spot. */
492         if( records[ index ].packetId == MQTT_PACKET_ID_INVALID )
493         {
494             if( emptyIndex == MQTT_INVALID_STATE_COUNT )
495             {
496                 emptyIndex = index;
497             }
498         }
499         else
500         {
501             if( emptyIndex != MQTT_INVALID_STATE_COUNT )
502             {
503                 /* Copy over the contents at non empty index to empty index. */
504                 records[ emptyIndex ].packetId = records[ index ].packetId;
505                 records[ emptyIndex ].qos = records[ index ].qos;
506                 records[ emptyIndex ].publishState = records[ index ].publishState;
507
508                 /* Mark the record at current non empty index as invalid. */
509                 records[ index ].packetId = MQTT_PACKET_ID_INVALID;
510                 records[ index ].qos = MQTTQoS0;
511                 records[ index ].publishState = MQTTStateNull;
512
513                 /* Advance the emptyIndex. */
514                 emptyIndex++;
515             }
516         }
517     }
518 }
519
520 /*-----------------------------------------------------------*/
521
522 static MQTTStatus_t addRecord( MQTTPubAckInfo_t * records,
523                                size_t recordCount,
524                                uint16_t packetId,
525                                MQTTQoS_t qos,
526                                MQTTPublishState_t publishState )
527 {
528     MQTTStatus_t status = MQTTNoMemory;
529     int32_t index = 0;
530     size_t availableIndex = recordCount;
531     bool validEntryFound = false;
532
533     assert( packetId != MQTT_PACKET_ID_INVALID );
534     assert( qos != MQTTQoS0 );
535
536     /* Check if we have to compact the records. This is known by checking if
537      * the last spot in the array is filled. */
538     if( records[ recordCount - 1U ].packetId != MQTT_PACKET_ID_INVALID )
539     {
540         compactRecords( records, recordCount );
541     }
542
543     /* Start from end so first available index will be populated.
544      * Available index is always found after the last element in the records.
545      * This is to make sure the relative order of the records in order to meet
546      * the message ordering requirement of MQTT spec 3.1.1. */
547     for( index = ( ( int32_t ) recordCount - 1 ); index >= 0; index-- )
548     {
549         /* Available index is only found after packet at the highest index. */
550         if( records[ index ].packetId == MQTT_PACKET_ID_INVALID )
551         {
552             if( validEntryFound == false )
553             {
554                 availableIndex = ( size_t ) index;
555             }
556         }
557         else
558         {
559             /* A non-empty spot found in the records. */
560             validEntryFound = true;
561
562             if( records[ index ].packetId == packetId )
563             {
564                 /* Collision. */
565                 LogError( ( "Collision when adding PacketID=%u at index=%d.",
566                             ( unsigned int ) packetId,
567                             ( int ) index ) );
568
569                 status = MQTTStateCollision;
570                 availableIndex = recordCount;
571                 break;
572             }
573         }
574     }
575
576     if( availableIndex < recordCount )
577     {
578         records[ availableIndex ].packetId = packetId;
579         records[ availableIndex ].qos = qos;
580         records[ availableIndex ].publishState = publishState;
581         status = MQTTSuccess;
582     }
583
584     return status;
585 }
586
587 /*-----------------------------------------------------------*/
588
589 static void updateRecord( MQTTPubAckInfo_t * records,
590                           size_t recordIndex,
591                           MQTTPublishState_t newState,
592                           bool shouldDelete )
593 {
594     assert( records != NULL );
595
596     if( shouldDelete == true )
597     {
598         /* Mark the record as invalid. */
599         records[ recordIndex ].packetId = MQTT_PACKET_ID_INVALID;
600         records[ recordIndex ].qos = MQTTQoS0;
601         records[ recordIndex ].publishState = MQTTStateNull;
602     }
603     else
604     {
605         records[ recordIndex ].publishState = newState;
606     }
607 }
608
609 /*-----------------------------------------------------------*/
610
611 static uint16_t stateSelect( const MQTTContext_t * pMqttContext,
612                              uint16_t searchStates,
613                              MQTTStateCursor_t * pCursor )
614 {
615     uint16_t packetId = MQTT_PACKET_ID_INVALID;
616     uint16_t outgoingStates = 0U;
617     const MQTTPubAckInfo_t * records = NULL;
618     size_t maxCount;
619     bool stateCheck = false;
620
621     assert( pMqttContext != NULL );
622     assert( searchStates != 0U );
623     assert( pCursor != NULL );
624
625     /* Create a bit map with all the outgoing publish states. */
626     UINT16_SET_BIT( outgoingStates, MQTTPublishSend );
627     UINT16_SET_BIT( outgoingStates, MQTTPubAckPending );
628     UINT16_SET_BIT( outgoingStates, MQTTPubRecPending );
629     UINT16_SET_BIT( outgoingStates, MQTTPubRelSend );
630     UINT16_SET_BIT( outgoingStates, MQTTPubCompPending );
631
632     /* Only outgoing publish records need to be searched. */
633     assert( ( outgoingStates & searchStates ) > 0U );
634     assert( ( ~outgoingStates & searchStates ) == 0U );
635
636     records = pMqttContext->outgoingPublishRecords;
637     maxCount = pMqttContext->outgoingPublishRecordMaxCount;
638
639     while( *pCursor < maxCount )
640     {
641         /* Check if any of the search states are present. */
642         stateCheck = UINT16_CHECK_BIT( searchStates, records[ *pCursor ].publishState );
643
644         if( stateCheck == true )
645         {
646             packetId = records[ *pCursor ].packetId;
647             ( *pCursor )++;
648             break;
649         }
650
651         ( *pCursor )++;
652     }
653
654     return packetId;
655 }
656
657 /*-----------------------------------------------------------*/
658
659 MQTTPublishState_t MQTT_CalculateStateAck( MQTTPubAckType_t packetType,
660                                            MQTTStateOperation_t opType,
661                                            MQTTQoS_t qos )
662 {
663     MQTTPublishState_t calculatedState = MQTTStateNull;
664     /* There are more QoS2 cases than QoS1, so initialize to that. */
665     bool qosValid = qos == MQTTQoS2;
666
667     switch( packetType )
668     {
669         case MQTTPuback:
670             qosValid = qos == MQTTQoS1;
671             calculatedState = MQTTPublishDone;
672             break;
673
674         case MQTTPubrec:
675
676             /* Incoming publish: send PUBREC, PUBREL pending.
677              * Outgoing publish: receive PUBREC, send PUBREL. */
678             calculatedState = ( opType == MQTT_SEND ) ? MQTTPubRelPending : MQTTPubRelSend;
679             break;
680
681         case MQTTPubrel:
682
683             /* Incoming publish: receive PUBREL, send PUBCOMP.
684              * Outgoing publish: send PUBREL, PUBCOMP pending. */
685             calculatedState = ( opType == MQTT_SEND ) ? MQTTPubCompPending : MQTTPubCompSend;
686             break;
687
688         case MQTTPubcomp:
689             calculatedState = MQTTPublishDone;
690             break;
691
692         default:
693             /* No other ack type. */
694             break;
695     }
696
697     /* Sanity check, make sure ack and QoS agree. */
698     if( qosValid == false )
699     {
700         calculatedState = MQTTStateNull;
701     }
702
703     return calculatedState;
704 }
705
706 /*-----------------------------------------------------------*/
707
708 static MQTTStatus_t updateStateAck( MQTTPubAckInfo_t * records,
709                                     size_t maxRecordCount,
710                                     size_t recordIndex,
711                                     uint16_t packetId,
712                                     MQTTPublishState_t currentState,
713                                     MQTTPublishState_t newState )
714 {
715     MQTTStatus_t status = MQTTIllegalState;
716     bool shouldDeleteRecord = false;
717     bool isTransitionValid = false;
718
719     assert( records != NULL );
720
721     /* Record to be deleted if the state transition is completed or if a PUBREC
722      * is received for an outgoing QoS2 publish. When a PUBREC is received,
723      * record is deleted and added back to the end of the records to maintain
724      * ordering for PUBRELs. */
725     shouldDeleteRecord = ( newState == MQTTPublishDone ) || ( newState == MQTTPubRelSend );
726     isTransitionValid = validateTransitionAck( currentState, newState );
727
728     if( isTransitionValid == true )
729     {
730         status = MQTTSuccess;
731
732         /* Update record for acks. When sending or receiving acks for packets that
733          * are resent during a session reestablishment, the new state and
734          * current state can be the same. No update of record required in that case. */
735         if( currentState != newState )
736         {
737             updateRecord( records,
738                           recordIndex,
739                           newState,
740                           shouldDeleteRecord );
741
742             /* For QoS2 messages, in order to preserve the message ordering, when
743              * a PUBREC is received for an outgoing publish, the record should be
744              * moved to the last. This move will help preserve the order in which
745              * a PUBREL needs to be resent in case of a session reestablishment. */
746             if( newState == MQTTPubRelSend )
747             {
748                 status = addRecord( records,
749                                     maxRecordCount,
750                                     packetId,
751                                     MQTTQoS2,
752                                     MQTTPubRelSend );
753             }
754         }
755     }
756     else
757     {
758         /* Invalid state transition. */
759         LogError( ( "Invalid transition from state %s to state %s.",
760                     MQTT_State_strerror( currentState ),
761                     MQTT_State_strerror( newState ) ) );
762     }
763
764     return status;
765 }
766
767 /*-----------------------------------------------------------*/
768
769 static MQTTStatus_t updateStatePublish( const MQTTContext_t * pMqttContext,
770                                         size_t recordIndex,
771                                         uint16_t packetId,
772                                         MQTTStateOperation_t opType,
773                                         MQTTQoS_t qos,
774                                         MQTTPublishState_t currentState,
775                                         MQTTPublishState_t newState )
776 {
777     MQTTStatus_t status = MQTTSuccess;
778     bool isTransitionValid = false;
779
780     assert( pMqttContext != NULL );
781     assert( packetId != MQTT_PACKET_ID_INVALID );
782     assert( qos != MQTTQoS0 );
783
784     /* This will always succeed for an incoming publish. This is due to the fact
785      * that the passed in currentState must be MQTTStateNull, since
786      * #MQTT_UpdateStatePublish does not perform a lookup for receives. */
787     isTransitionValid = validateTransitionPublish( currentState, newState, opType, qos );
788
789     if( isTransitionValid == true )
790     {
791         /* addRecord will check for collisions. */
792         if( opType == MQTT_RECEIVE )
793         {
794             status = addRecord( pMqttContext->incomingPublishRecords,
795                                 pMqttContext->incomingPublishRecordMaxCount,
796                                 packetId,
797                                 qos,
798                                 newState );
799         }
800         /* Send operation. */
801         else
802         {
803             /* Skip updating record when publish is resend and no state
804              * update is required. */
805             if( currentState != newState )
806             {
807                 updateRecord( pMqttContext->outgoingPublishRecords,
808                               recordIndex,
809                               newState,
810                               false );
811             }
812         }
813     }
814     else
815     {
816         status = MQTTIllegalState;
817         LogError( ( "Invalid transition from state %s to state %s.",
818                     MQTT_State_strerror( currentState ),
819                     MQTT_State_strerror( newState ) ) );
820     }
821
822     return status;
823 }
824
825 /*-----------------------------------------------------------*/
826
827 MQTTStatus_t MQTT_ReserveState( const MQTTContext_t * pMqttContext,
828                                 uint16_t packetId,
829                                 MQTTQoS_t qos )
830 {
831     MQTTStatus_t status = MQTTSuccess;
832
833     if( qos == MQTTQoS0 )
834     {
835         status = MQTTSuccess;
836     }
837     else if( ( packetId == MQTT_PACKET_ID_INVALID ) || ( pMqttContext == NULL ) )
838     {
839         status = MQTTBadParameter;
840     }
841     else
842     {
843         /* Collisions are detected when adding the record. */
844         status = addRecord( pMqttContext->outgoingPublishRecords,
845                             pMqttContext->outgoingPublishRecordMaxCount,
846                             packetId,
847                             qos,
848                             MQTTPublishSend );
849     }
850
851     return status;
852 }
853
854 /*-----------------------------------------------------------*/
855
856 MQTTPublishState_t MQTT_CalculateStatePublish( MQTTStateOperation_t opType,
857                                                MQTTQoS_t qos )
858 {
859     MQTTPublishState_t calculatedState = MQTTStateNull;
860
861     switch( qos )
862     {
863         case MQTTQoS0:
864             calculatedState = MQTTPublishDone;
865             break;
866
867         case MQTTQoS1:
868             calculatedState = ( opType == MQTT_SEND ) ? MQTTPubAckPending : MQTTPubAckSend;
869             break;
870
871         case MQTTQoS2:
872             calculatedState = ( opType == MQTT_SEND ) ? MQTTPubRecPending : MQTTPubRecSend;
873             break;
874
875         default:
876             /* No other QoS values. */
877             break;
878     }
879
880     return calculatedState;
881 }
882
883 /*-----------------------------------------------------------*/
884
885 MQTTStatus_t MQTT_UpdateStatePublish( const MQTTContext_t * pMqttContext,
886                                       uint16_t packetId,
887                                       MQTTStateOperation_t opType,
888                                       MQTTQoS_t qos,
889                                       MQTTPublishState_t * pNewState )
890 {
891     MQTTPublishState_t newState = MQTTStateNull;
892     MQTTPublishState_t currentState = MQTTStateNull;
893     MQTTStatus_t mqttStatus = MQTTSuccess;
894     size_t recordIndex = MQTT_INVALID_STATE_COUNT;
895     MQTTQoS_t foundQoS = MQTTQoS0;
896
897     if( ( pMqttContext == NULL ) || ( pNewState == NULL ) )
898     {
899         LogError( ( "Argument cannot be NULL: pMqttContext=%p, pNewState=%p",
900                     ( void * ) pMqttContext,
901                     ( void * ) pNewState ) );
902
903         mqttStatus = MQTTBadParameter;
904     }
905     else if( qos == MQTTQoS0 )
906     {
907         /* QoS 0 publish. Do nothing. */
908         *pNewState = MQTTPublishDone;
909     }
910     else if( packetId == MQTT_PACKET_ID_INVALID )
911     {
912         /* Publishes > QoS 0 need a valid packet ID. */
913         mqttStatus = MQTTBadParameter;
914     }
915     else if( opType == MQTT_SEND )
916     {
917         /* Search record for entry so we can check QoS. */
918         recordIndex = findInRecord( pMqttContext->outgoingPublishRecords,
919                                     pMqttContext->outgoingPublishRecordMaxCount,
920                                     packetId,
921                                     &foundQoS,
922                                     &currentState );
923
924         if( ( recordIndex == MQTT_INVALID_STATE_COUNT ) || ( foundQoS != qos ) )
925         {
926             /* Entry should match with supplied QoS. */
927             mqttStatus = MQTTBadParameter;
928         }
929     }
930     else
931     {
932         /* QoS 1 or 2 receive. Nothing to be done. */
933     }
934
935     if( ( qos != MQTTQoS0 ) && ( mqttStatus == MQTTSuccess ) )
936     {
937         newState = MQTT_CalculateStatePublish( opType, qos );
938         /* Validate state transition and update state records. */
939         mqttStatus = updateStatePublish( pMqttContext,
940                                          recordIndex,
941                                          packetId,
942                                          opType,
943                                          qos,
944                                          currentState,
945                                          newState );
946
947         /* Update output parameter on success. */
948         if( mqttStatus == MQTTSuccess )
949         {
950             *pNewState = newState;
951         }
952     }
953
954     return mqttStatus;
955 }
956
957 /*-----------------------------------------------------------*/
958
959 MQTTStatus_t MQTT_RemoveStateRecord( const MQTTContext_t * pMqttContext,
960                                      uint16_t packetId )
961 {
962     MQTTStatus_t status = MQTTSuccess;
963     MQTTPubAckInfo_t * records;
964     size_t recordIndex;
965     /* Current state is updated by the findInRecord function. */
966     MQTTPublishState_t currentState;
967     MQTTQoS_t qos = MQTTQoS0;
968
969
970     if( ( pMqttContext == NULL ) || ( ( pMqttContext->outgoingPublishRecords == NULL ) ) )
971     {
972         status = MQTTBadParameter;
973     }
974     else
975     {
976         records = pMqttContext->outgoingPublishRecords;
977
978         recordIndex = findInRecord( records,
979                                     pMqttContext->outgoingPublishRecordMaxCount,
980                                     packetId,
981                                     &qos,
982                                     &currentState );
983
984         if( currentState == MQTTStateNull )
985         {
986             status = MQTTBadParameter;
987         }
988         else if( ( qos != MQTTQoS1 ) && ( qos != MQTTQoS2 ) )
989         {
990             status = MQTTBadParameter;
991         }
992         else
993         {
994             /* Delete the record. */
995             updateRecord( records,
996                           recordIndex,
997                           MQTTStateNull,
998                           true );
999         }
1000     }
1001
1002     return status;
1003 }
1004
1005 /*-----------------------------------------------------------*/
1006
1007 MQTTStatus_t MQTT_UpdateStateAck( const MQTTContext_t * pMqttContext,
1008                                   uint16_t packetId,
1009                                   MQTTPubAckType_t packetType,
1010                                   MQTTStateOperation_t opType,
1011                                   MQTTPublishState_t * pNewState )
1012 {
1013     MQTTPublishState_t newState = MQTTStateNull;
1014     MQTTPublishState_t currentState = MQTTStateNull;
1015     bool isOutgoingPublish = isPublishOutgoing( packetType, opType );
1016     MQTTQoS_t qos = MQTTQoS0;
1017     size_t maxRecordCount = MQTT_INVALID_STATE_COUNT;
1018     size_t recordIndex = MQTT_INVALID_STATE_COUNT;
1019
1020     MQTTPubAckInfo_t * records = NULL;
1021     MQTTStatus_t status = MQTTBadResponse;
1022
1023     if( ( pMqttContext == NULL ) || ( pNewState == NULL ) )
1024     {
1025         LogError( ( "Argument cannot be NULL: pMqttContext=%p, pNewState=%p.",
1026                     ( void * ) pMqttContext,
1027                     ( void * ) pNewState ) );
1028         status = MQTTBadParameter;
1029     }
1030     else if( packetId == MQTT_PACKET_ID_INVALID )
1031     {
1032         LogError( ( "Packet ID must be nonzero." ) );
1033         status = MQTTBadParameter;
1034     }
1035     else if( packetType > MQTTPubcomp )
1036     {
1037         LogError( ( "Invalid packet type %u.", ( unsigned int ) packetType ) );
1038         status = MQTTBadParameter;
1039     }
1040     else
1041     {
1042         if( isOutgoingPublish == true )
1043         {
1044             records = pMqttContext->outgoingPublishRecords;
1045             maxRecordCount = pMqttContext->outgoingPublishRecordMaxCount;
1046         }
1047         else
1048         {
1049             records = pMqttContext->incomingPublishRecords;
1050             maxRecordCount = pMqttContext->incomingPublishRecordMaxCount;
1051         }
1052
1053         recordIndex = findInRecord( records,
1054                                     maxRecordCount,
1055                                     packetId,
1056                                     &qos,
1057                                     &currentState );
1058     }
1059
1060     if( recordIndex != MQTT_INVALID_STATE_COUNT )
1061     {
1062         newState = MQTT_CalculateStateAck( packetType, opType, qos );
1063
1064         /* Validate state transition and update state record. */
1065         status = updateStateAck( records,
1066                                  maxRecordCount,
1067                                  recordIndex,
1068                                  packetId,
1069                                  currentState,
1070                                  newState );
1071
1072         /* Update the output parameter. */
1073         if( status == MQTTSuccess )
1074         {
1075             *pNewState = newState;
1076         }
1077     }
1078     else
1079     {
1080         LogError( ( "No matching record found for publish: PacketId=%u.",
1081                     ( unsigned int ) packetId ) );
1082     }
1083
1084     return status;
1085 }
1086
1087 /*-----------------------------------------------------------*/
1088
1089 uint16_t MQTT_PubrelToResend( const MQTTContext_t * pMqttContext,
1090                               MQTTStateCursor_t * pCursor,
1091                               MQTTPublishState_t * pState )
1092 {
1093     uint16_t packetId = MQTT_PACKET_ID_INVALID;
1094     uint16_t searchStates = 0U;
1095
1096     /* Validate arguments. */
1097     if( ( pMqttContext == NULL ) || ( pCursor == NULL ) || ( pState == NULL ) )
1098     {
1099         LogError( ( "Arguments cannot be NULL pMqttContext=%p, pCursor=%p"
1100                     " pState=%p.",
1101                     ( void * ) pMqttContext,
1102                     ( void * ) pCursor,
1103                     ( void * ) pState ) );
1104     }
1105     else
1106     {
1107         /* PUBREL for packets in state #MQTTPubCompPending and #MQTTPubRelSend
1108          * would need to be resent when a session is reestablished.*/
1109         UINT16_SET_BIT( searchStates, MQTTPubCompPending );
1110         UINT16_SET_BIT( searchStates, MQTTPubRelSend );
1111         packetId = stateSelect( pMqttContext, searchStates, pCursor );
1112
1113         /* The state needs to be in #MQTTPubRelSend for sending PUBREL. */
1114         if( packetId != MQTT_PACKET_ID_INVALID )
1115         {
1116             *pState = MQTTPubRelSend;
1117         }
1118     }
1119
1120     return packetId;
1121 }
1122
1123 /*-----------------------------------------------------------*/
1124
1125 uint16_t MQTT_PublishToResend( const MQTTContext_t * pMqttContext,
1126                                MQTTStateCursor_t * pCursor )
1127 {
1128     uint16_t packetId = MQTT_PACKET_ID_INVALID;
1129     uint16_t searchStates = 0U;
1130
1131     /* Validate arguments. */
1132     if( ( pMqttContext == NULL ) || ( pCursor == NULL ) )
1133     {
1134         LogError( ( "Arguments cannot be NULL pMqttContext=%p, pCursor=%p",
1135                     ( void * ) pMqttContext,
1136                     ( void * ) pCursor ) );
1137     }
1138     else
1139     {
1140         /* Packets in state #MQTTPublishSend, #MQTTPubAckPending and
1141          * #MQTTPubRecPending would need to be resent when a session is
1142          * reestablished. */
1143         UINT16_SET_BIT( searchStates, MQTTPublishSend );
1144         UINT16_SET_BIT( searchStates, MQTTPubAckPending );
1145         UINT16_SET_BIT( searchStates, MQTTPubRecPending );
1146
1147         packetId = stateSelect( pMqttContext, searchStates, pCursor );
1148     }
1149
1150     return packetId;
1151 }
1152
1153 /*-----------------------------------------------------------*/
1154
1155 const char * MQTT_State_strerror( MQTTPublishState_t state )
1156 {
1157     const char * str = NULL;
1158
1159     switch( state )
1160     {
1161         case MQTTStateNull:
1162             str = "MQTTStateNull";
1163             break;
1164
1165         case MQTTPublishSend:
1166             str = "MQTTPublishSend";
1167             break;
1168
1169         case MQTTPubAckSend:
1170             str = "MQTTPubAckSend";
1171             break;
1172
1173         case MQTTPubRecSend:
1174             str = "MQTTPubRecSend";
1175             break;
1176
1177         case MQTTPubRelSend:
1178             str = "MQTTPubRelSend";
1179             break;
1180
1181         case MQTTPubCompSend:
1182             str = "MQTTPubCompSend";
1183             break;
1184
1185         case MQTTPubAckPending:
1186             str = "MQTTPubAckPending";
1187             break;
1188
1189         case MQTTPubRecPending:
1190             str = "MQTTPubRecPending";
1191             break;
1192
1193         case MQTTPubRelPending:
1194             str = "MQTTPubRelPending";
1195             break;
1196
1197         case MQTTPubCompPending:
1198             str = "MQTTPubCompPending";
1199             break;
1200
1201         case MQTTPublishDone:
1202             str = "MQTTPublishDone";
1203             break;
1204
1205         default:
1206             /* Invalid state received. */
1207             str = "Invalid MQTT State";
1208             break;
1209     }
1210
1211     return str;
1212 }
1213
1214 /*-----------------------------------------------------------*/