/* * coreMQTT v2.1.1 * Copyright (C) 2022 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * SPDX-License-Identifier: MIT * * Permission is hereby granted, free of charge, to any person obtaining a copy of * this software and associated documentation files (the "Software"), to deal in * the Software without restriction, including without limitation the rights to * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of * the Software, and to permit persons to whom the Software is furnished to do so, * subject to the following conditions: * * The above copyright notice and this permission notice shall be included in all * copies or substantial portions of the Software. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ /** * @file core_mqtt_state.c * @brief Implements the functions in core_mqtt_state.h. */ #include #include #include "core_mqtt_state.h" /* Include config defaults header to get default values of configs. */ #include "core_mqtt_config_defaults.h" #include "core_mqtt_default_logging.h" /*-----------------------------------------------------------*/ /** * @brief A global static variable used to generate the macro * #MQTT_INVALID_STATE_COUNT of size_t length. */ static const size_t ZERO_SIZE_T = 0U; /** * @brief This macro depicts the invalid value for the state publishes. */ #define MQTT_INVALID_STATE_COUNT ( ~ZERO_SIZE_T ) /** * @brief Create a 16-bit bitmap with bit set at specified position. * * @param[in] position The position at which the bit need to be set. */ #define UINT16_BITMAP_BIT_SET_AT( position ) ( ( uint16_t ) 0x01U << ( ( uint16_t ) position ) ) /** * @brief Set a bit in an 16-bit unsigned integer. * * @param[in] x The 16-bit unsigned integer to set a bit. * @param[in] position The position at which the bit need to be set. */ #define UINT16_SET_BIT( x, position ) ( ( x ) = ( uint16_t ) ( ( x ) | ( UINT16_BITMAP_BIT_SET_AT( position ) ) ) ) /** * @brief Macro for checking if a bit is set in a 16-bit unsigned integer. * * @param[in] x The unsigned 16-bit integer to check. * @param[in] position Which bit to check. */ #define UINT16_CHECK_BIT( x, position ) ( ( ( x ) & ( UINT16_BITMAP_BIT_SET_AT( position ) ) ) == ( UINT16_BITMAP_BIT_SET_AT( position ) ) ) /*-----------------------------------------------------------*/ /** * @brief Test if a transition to new state is possible, when dealing with PUBLISHes. * * @param[in] currentState The current state. * @param[in] newState State to transition to. * @param[in] opType Reserve, Send, or Receive. * @param[in] qos 0, 1, or 2. * * @note This function does not validate the current state, or the new state * based on either the operation type or QoS. It assumes the new state is valid * given the opType and QoS, which will be the case if calculated by * MQTT_CalculateStatePublish(). * * @return `true` if transition is possible, else `false` */ static bool validateTransitionPublish( MQTTPublishState_t currentState, MQTTPublishState_t newState, MQTTStateOperation_t opType, MQTTQoS_t qos ); /** * @brief Test if a transition to a new state is possible, when dealing with acks. * * @param[in] currentState The current state. * @param[in] newState State to transition to. * * @return `true` if transition is possible, else `false`. */ static bool validateTransitionAck( MQTTPublishState_t currentState, MQTTPublishState_t newState ); /** * @brief Test if the publish corresponding to an ack is outgoing or incoming. * * @param[in] packetType PUBACK, PUBREC, PUBREL, or PUBCOMP. * @param[in] opType Send, or Receive. * * @return `true` if corresponds to outgoing publish, else `false`. */ static bool isPublishOutgoing( MQTTPubAckType_t packetType, MQTTStateOperation_t opType ); /** * @brief Find a packet ID in the state record. * * @param[in] records State record array. * @param[in] recordCount Length of record array. * @param[in] packetId packet ID to search for. * @param[out] pQos QoS retrieved from record. * @param[out] pCurrentState state retrieved from record. * * @return index of the packet id in the record if it exists, else the record length. */ static size_t findInRecord( const MQTTPubAckInfo_t * records, size_t recordCount, uint16_t packetId, MQTTQoS_t * pQos, MQTTPublishState_t * pCurrentState ); /** * @brief Compact records. * * Records are arranged in the relative order to maintain message ordering. * This will lead to fragmentation and this function will help in defragmenting * the records array. * * @param[in] records State record array. * @param[in] recordCount Length of record array. */ static void compactRecords( MQTTPubAckInfo_t * records, size_t recordCount ); /** * @brief Store a new entry in the state record. * * @param[in] records State record array. * @param[in] recordCount Length of record array. * @param[in] packetId Packet ID of new entry. * @param[in] qos QoS of new entry. * @param[in] publishState State of new entry. * * @return #MQTTSuccess, #MQTTNoMemory, or #MQTTStateCollision. */ static MQTTStatus_t addRecord( MQTTPubAckInfo_t * records, size_t recordCount, uint16_t packetId, MQTTQoS_t qos, MQTTPublishState_t publishState ); /** * @brief Update and possibly delete an entry in the state record. * * @param[in] records State record array. * @param[in] recordIndex index of record to update. * @param[in] newState New state to update. * @param[in] shouldDelete Whether an existing entry should be deleted. */ static void updateRecord( MQTTPubAckInfo_t * records, size_t recordIndex, MQTTPublishState_t newState, bool shouldDelete ); /** * @brief Get the packet ID and index of an outgoing publish in specified * states. * * @param[in] pMqttContext Initialized MQTT context. * @param[in] searchStates The states to search for in 2-byte bit map. * @param[in,out] pCursor Index at which to start searching. * * @return Packet ID of the outgoing publish. */ static uint16_t stateSelect( const MQTTContext_t * pMqttContext, uint16_t searchStates, MQTTStateCursor_t * pCursor ); /** * @brief Update the state records for an ACK after state transition * validations. * * @param[in] records State records pointer. * @param[in] maxRecordCount The maximum number of records. * @param[in] recordIndex Index at which the record is stored. * @param[in] packetId Packet id of the packet. * @param[in] currentState Current state of the publish record. * @param[in] newState New state of the publish. * * @return #MQTTIllegalState, or #MQTTSuccess. */ static MQTTStatus_t updateStateAck( MQTTPubAckInfo_t * records, size_t maxRecordCount, size_t recordIndex, uint16_t packetId, MQTTPublishState_t currentState, MQTTPublishState_t newState ); /** * @brief Update the state record for a PUBLISH packet after validating * the state transitions. * * @param[in] pMqttContext Initialized MQTT context. * @param[in] recordIndex Index in state records at which publish record exists. * @param[in] packetId ID of the PUBLISH packet. * @param[in] opType Send or Receive. * @param[in] qos 0, 1, or 2. * @param[in] currentState Current state of the publish record. * @param[in] newState New state of the publish record. * * @return #MQTTIllegalState, #MQTTStateCollision or #MQTTSuccess. */ static MQTTStatus_t updateStatePublish( const MQTTContext_t * pMqttContext, size_t recordIndex, uint16_t packetId, MQTTStateOperation_t opType, MQTTQoS_t qos, MQTTPublishState_t currentState, MQTTPublishState_t newState ); /*-----------------------------------------------------------*/ static bool validateTransitionPublish( MQTTPublishState_t currentState, MQTTPublishState_t newState, MQTTStateOperation_t opType, MQTTQoS_t qos ) { bool isValid = false; switch( currentState ) { case MQTTStateNull: /* Transitions from null occur when storing a new entry into the record. */ if( opType == MQTT_RECEIVE ) { isValid = ( newState == MQTTPubAckSend ) || ( newState == MQTTPubRecSend ); } break; case MQTTPublishSend: /* Outgoing publish. All such publishes start in this state due to * the reserve operation. */ switch( qos ) { case MQTTQoS1: isValid = newState == MQTTPubAckPending; break; case MQTTQoS2: isValid = newState == MQTTPubRecPending; break; case MQTTQoS0: default: /* QoS 0 is checked before calling this function. */ break; } break; /* Below cases are for validating the resends of publish when a session is * reestablished. */ case MQTTPubAckPending: /* When a session is reestablished, outgoing QoS1 publishes in state * #MQTTPubAckPending can be resent. The state remains the same. */ isValid = newState == MQTTPubAckPending; break; case MQTTPubRecPending: /* When a session is reestablished, outgoing QoS2 publishes in state * #MQTTPubRecPending can be resent. The state remains the same. */ isValid = newState == MQTTPubRecPending; break; case MQTTPubAckSend: case MQTTPubCompPending: case MQTTPubCompSend: case MQTTPubRecSend: case MQTTPubRelPending: case MQTTPubRelSend: case MQTTPublishDone: default: /* For a PUBLISH, we should not start from any other state. */ break; } return isValid; } /*-----------------------------------------------------------*/ static bool validateTransitionAck( MQTTPublishState_t currentState, MQTTPublishState_t newState ) { bool isValid = false; switch( currentState ) { case MQTTPubAckSend: /* Incoming publish, QoS 1. */ case MQTTPubAckPending: /* Outgoing publish, QoS 1. */ isValid = newState == MQTTPublishDone; break; case MQTTPubRecSend: /* Incoming publish, QoS 2. */ isValid = newState == MQTTPubRelPending; break; case MQTTPubRelPending: /* Incoming publish, QoS 2. * There are 2 valid transitions possible. * 1. MQTTPubRelPending -> MQTTPubCompSend : A PUBREL ack is received * when publish record state is MQTTPubRelPending. This is the * normal state transition without any connection interruptions. * 2. MQTTPubRelPending -> MQTTPubRelPending : Receiving a duplicate * QoS2 publish can result in a transition to the same state. * This can happen in the below state transition. * 1. Incoming publish received. * 2. PUBREC ack sent and state is now MQTTPubRelPending. * 3. TCP connection failure and broker didn't receive the PUBREC. * 4. Reestablished MQTT session. * 5. MQTT broker resent the un-acked publish. * 6. Publish is received when publish record state is in * MQTTPubRelPending. * 7. Sending out a PUBREC will result in this transition * to the same state. */ isValid = ( newState == MQTTPubCompSend ) || ( newState == MQTTPubRelPending ); break; case MQTTPubCompSend: /* Incoming publish, QoS 2. * There are 2 valid transitions possible. * 1. MQTTPubCompSend -> MQTTPublishDone : A PUBCOMP ack is sent * after receiving a PUBREL from broker. This is the * normal state transition without any connection interruptions. * 2. MQTTPubCompSend -> MQTTPubCompSend : Receiving a duplicate PUBREL * can result in a transition to the same state. * This can happen in the below state transition. * 1. A TCP connection failure happened before sending a PUBCOMP * for an incoming PUBREL. * 2. Reestablished an MQTT session. * 3. MQTT broker resent the un-acked PUBREL. * 4. Receiving the PUBREL again will result in this transition * to the same state. */ isValid = ( newState == MQTTPublishDone ) || ( newState == MQTTPubCompSend ); break; case MQTTPubRecPending: /* Outgoing publish, Qos 2. */ isValid = newState == MQTTPubRelSend; break; case MQTTPubRelSend: /* Outgoing publish, Qos 2. */ isValid = newState == MQTTPubCompPending; break; case MQTTPubCompPending: /* Outgoing publish, Qos 2. * There are 2 valid transitions possible. * 1. MQTTPubCompPending -> MQTTPublishDone : A PUBCOMP is received. * This marks the complete state transition for the publish packet. * This is the normal state transition without any connection * interruptions. * 2. MQTTPubCompPending -> MQTTPubCompPending : Resending a PUBREL for * packets in state #MQTTPubCompPending can result in this * transition to the same state. * This can happen in the below state transition. * 1. A TCP connection failure happened before receiving a PUBCOMP * for an outgoing PUBREL. * 2. An MQTT session is reestablished. * 3. Resending the un-acked PUBREL results in this transition * to the same state. */ isValid = ( newState == MQTTPublishDone ) || ( newState == MQTTPubCompPending ); break; case MQTTPublishDone: /* Done state should transition to invalid since it will be removed from the record. */ case MQTTPublishSend: /* If an ack was sent/received we shouldn't have been in this state. */ case MQTTStateNull: /* If an ack was sent/received the record should exist. */ default: /* Invalid. */ break; } return isValid; } /*-----------------------------------------------------------*/ static bool isPublishOutgoing( MQTTPubAckType_t packetType, MQTTStateOperation_t opType ) { bool isOutgoing = false; switch( packetType ) { case MQTTPuback: case MQTTPubrec: case MQTTPubcomp: isOutgoing = opType == MQTT_RECEIVE; break; case MQTTPubrel: isOutgoing = opType == MQTT_SEND; break; default: /* No other ack type. */ break; } return isOutgoing; } /*-----------------------------------------------------------*/ static size_t findInRecord( const MQTTPubAckInfo_t * records, size_t recordCount, uint16_t packetId, MQTTQoS_t * pQos, MQTTPublishState_t * pCurrentState ) { size_t index = 0; assert( packetId != MQTT_PACKET_ID_INVALID ); *pCurrentState = MQTTStateNull; for( index = 0; index < recordCount; index++ ) { if( records[ index ].packetId == packetId ) { *pQos = records[ index ].qos; *pCurrentState = records[ index ].publishState; break; } } if( index == recordCount ) { index = MQTT_INVALID_STATE_COUNT; } return index; } /*-----------------------------------------------------------*/ static void compactRecords( MQTTPubAckInfo_t * records, size_t recordCount ) { size_t index = 0; size_t emptyIndex = MQTT_INVALID_STATE_COUNT; assert( records != NULL ); /* Find the empty spots and fill those with non empty values. */ for( ; index < recordCount; index++ ) { /* Find the first empty spot. */ if( records[ index ].packetId == MQTT_PACKET_ID_INVALID ) { if( emptyIndex == MQTT_INVALID_STATE_COUNT ) { emptyIndex = index; } } else { if( emptyIndex != MQTT_INVALID_STATE_COUNT ) { /* Copy over the contents at non empty index to empty index. */ records[ emptyIndex ].packetId = records[ index ].packetId; records[ emptyIndex ].qos = records[ index ].qos; records[ emptyIndex ].publishState = records[ index ].publishState; /* Mark the record at current non empty index as invalid. */ records[ index ].packetId = MQTT_PACKET_ID_INVALID; records[ index ].qos = MQTTQoS0; records[ index ].publishState = MQTTStateNull; /* Advance the emptyIndex. */ emptyIndex++; } } } } /*-----------------------------------------------------------*/ static MQTTStatus_t addRecord( MQTTPubAckInfo_t * records, size_t recordCount, uint16_t packetId, MQTTQoS_t qos, MQTTPublishState_t publishState ) { MQTTStatus_t status = MQTTNoMemory; int32_t index = 0; size_t availableIndex = recordCount; bool validEntryFound = false; assert( packetId != MQTT_PACKET_ID_INVALID ); assert( qos != MQTTQoS0 ); /* Check if we have to compact the records. This is known by checking if * the last spot in the array is filled. */ if( records[ recordCount - 1U ].packetId != MQTT_PACKET_ID_INVALID ) { compactRecords( records, recordCount ); } /* Start from end so first available index will be populated. * Available index is always found after the last element in the records. * This is to make sure the relative order of the records in order to meet * the message ordering requirement of MQTT spec 3.1.1. */ for( index = ( ( int32_t ) recordCount - 1 ); index >= 0; index-- ) { /* Available index is only found after packet at the highest index. */ if( records[ index ].packetId == MQTT_PACKET_ID_INVALID ) { if( validEntryFound == false ) { availableIndex = ( size_t ) index; } } else { /* A non-empty spot found in the records. */ validEntryFound = true; if( records[ index ].packetId == packetId ) { /* Collision. */ LogError( ( "Collision when adding PacketID=%u at index=%d.", ( unsigned int ) packetId, ( int ) index ) ); status = MQTTStateCollision; availableIndex = recordCount; break; } } } if( availableIndex < recordCount ) { records[ availableIndex ].packetId = packetId; records[ availableIndex ].qos = qos; records[ availableIndex ].publishState = publishState; status = MQTTSuccess; } return status; } /*-----------------------------------------------------------*/ static void updateRecord( MQTTPubAckInfo_t * records, size_t recordIndex, MQTTPublishState_t newState, bool shouldDelete ) { assert( records != NULL ); if( shouldDelete == true ) { /* Mark the record as invalid. */ records[ recordIndex ].packetId = MQTT_PACKET_ID_INVALID; records[ recordIndex ].qos = MQTTQoS0; records[ recordIndex ].publishState = MQTTStateNull; } else { records[ recordIndex ].publishState = newState; } } /*-----------------------------------------------------------*/ static uint16_t stateSelect( const MQTTContext_t * pMqttContext, uint16_t searchStates, MQTTStateCursor_t * pCursor ) { uint16_t packetId = MQTT_PACKET_ID_INVALID; uint16_t outgoingStates = 0U; const MQTTPubAckInfo_t * records = NULL; size_t maxCount; bool stateCheck = false; assert( pMqttContext != NULL ); assert( searchStates != 0U ); assert( pCursor != NULL ); /* Create a bit map with all the outgoing publish states. */ UINT16_SET_BIT( outgoingStates, MQTTPublishSend ); UINT16_SET_BIT( outgoingStates, MQTTPubAckPending ); UINT16_SET_BIT( outgoingStates, MQTTPubRecPending ); UINT16_SET_BIT( outgoingStates, MQTTPubRelSend ); UINT16_SET_BIT( outgoingStates, MQTTPubCompPending ); /* Only outgoing publish records need to be searched. */ assert( ( outgoingStates & searchStates ) > 0U ); assert( ( ~outgoingStates & searchStates ) == 0U ); records = pMqttContext->outgoingPublishRecords; maxCount = pMqttContext->outgoingPublishRecordMaxCount; while( *pCursor < maxCount ) { /* Check if any of the search states are present. */ stateCheck = UINT16_CHECK_BIT( searchStates, records[ *pCursor ].publishState ); if( stateCheck == true ) { packetId = records[ *pCursor ].packetId; ( *pCursor )++; break; } ( *pCursor )++; } return packetId; } /*-----------------------------------------------------------*/ MQTTPublishState_t MQTT_CalculateStateAck( MQTTPubAckType_t packetType, MQTTStateOperation_t opType, MQTTQoS_t qos ) { MQTTPublishState_t calculatedState = MQTTStateNull; /* There are more QoS2 cases than QoS1, so initialize to that. */ bool qosValid = qos == MQTTQoS2; switch( packetType ) { case MQTTPuback: qosValid = qos == MQTTQoS1; calculatedState = MQTTPublishDone; break; case MQTTPubrec: /* Incoming publish: send PUBREC, PUBREL pending. * Outgoing publish: receive PUBREC, send PUBREL. */ calculatedState = ( opType == MQTT_SEND ) ? MQTTPubRelPending : MQTTPubRelSend; break; case MQTTPubrel: /* Incoming publish: receive PUBREL, send PUBCOMP. * Outgoing publish: send PUBREL, PUBCOMP pending. */ calculatedState = ( opType == MQTT_SEND ) ? MQTTPubCompPending : MQTTPubCompSend; break; case MQTTPubcomp: calculatedState = MQTTPublishDone; break; default: /* No other ack type. */ break; } /* Sanity check, make sure ack and QoS agree. */ if( qosValid == false ) { calculatedState = MQTTStateNull; } return calculatedState; } /*-----------------------------------------------------------*/ static MQTTStatus_t updateStateAck( MQTTPubAckInfo_t * records, size_t maxRecordCount, size_t recordIndex, uint16_t packetId, MQTTPublishState_t currentState, MQTTPublishState_t newState ) { MQTTStatus_t status = MQTTIllegalState; bool shouldDeleteRecord = false; bool isTransitionValid = false; assert( records != NULL ); /* Record to be deleted if the state transition is completed or if a PUBREC * is received for an outgoing QoS2 publish. When a PUBREC is received, * record is deleted and added back to the end of the records to maintain * ordering for PUBRELs. */ shouldDeleteRecord = ( newState == MQTTPublishDone ) || ( newState == MQTTPubRelSend ); isTransitionValid = validateTransitionAck( currentState, newState ); if( isTransitionValid == true ) { status = MQTTSuccess; /* Update record for acks. When sending or receiving acks for packets that * are resent during a session reestablishment, the new state and * current state can be the same. No update of record required in that case. */ if( currentState != newState ) { updateRecord( records, recordIndex, newState, shouldDeleteRecord ); /* For QoS2 messages, in order to preserve the message ordering, when * a PUBREC is received for an outgoing publish, the record should be * moved to the last. This move will help preserve the order in which * a PUBREL needs to be resent in case of a session reestablishment. */ if( newState == MQTTPubRelSend ) { status = addRecord( records, maxRecordCount, packetId, MQTTQoS2, MQTTPubRelSend ); } } } else { /* Invalid state transition. */ LogError( ( "Invalid transition from state %s to state %s.", MQTT_State_strerror( currentState ), MQTT_State_strerror( newState ) ) ); } return status; } /*-----------------------------------------------------------*/ static MQTTStatus_t updateStatePublish( const MQTTContext_t * pMqttContext, size_t recordIndex, uint16_t packetId, MQTTStateOperation_t opType, MQTTQoS_t qos, MQTTPublishState_t currentState, MQTTPublishState_t newState ) { MQTTStatus_t status = MQTTSuccess; bool isTransitionValid = false; assert( pMqttContext != NULL ); assert( packetId != MQTT_PACKET_ID_INVALID ); assert( qos != MQTTQoS0 ); /* This will always succeed for an incoming publish. This is due to the fact * that the passed in currentState must be MQTTStateNull, since * #MQTT_UpdateStatePublish does not perform a lookup for receives. */ isTransitionValid = validateTransitionPublish( currentState, newState, opType, qos ); if( isTransitionValid == true ) { /* addRecord will check for collisions. */ if( opType == MQTT_RECEIVE ) { status = addRecord( pMqttContext->incomingPublishRecords, pMqttContext->incomingPublishRecordMaxCount, packetId, qos, newState ); } /* Send operation. */ else { /* Skip updating record when publish is resend and no state * update is required. */ if( currentState != newState ) { updateRecord( pMqttContext->outgoingPublishRecords, recordIndex, newState, false ); } } } else { status = MQTTIllegalState; LogError( ( "Invalid transition from state %s to state %s.", MQTT_State_strerror( currentState ), MQTT_State_strerror( newState ) ) ); } return status; } /*-----------------------------------------------------------*/ MQTTStatus_t MQTT_ReserveState( const MQTTContext_t * pMqttContext, uint16_t packetId, MQTTQoS_t qos ) { MQTTStatus_t status = MQTTSuccess; if( qos == MQTTQoS0 ) { status = MQTTSuccess; } else if( ( packetId == MQTT_PACKET_ID_INVALID ) || ( pMqttContext == NULL ) ) { status = MQTTBadParameter; } else { /* Collisions are detected when adding the record. */ status = addRecord( pMqttContext->outgoingPublishRecords, pMqttContext->outgoingPublishRecordMaxCount, packetId, qos, MQTTPublishSend ); } return status; } /*-----------------------------------------------------------*/ MQTTPublishState_t MQTT_CalculateStatePublish( MQTTStateOperation_t opType, MQTTQoS_t qos ) { MQTTPublishState_t calculatedState = MQTTStateNull; switch( qos ) { case MQTTQoS0: calculatedState = MQTTPublishDone; break; case MQTTQoS1: calculatedState = ( opType == MQTT_SEND ) ? MQTTPubAckPending : MQTTPubAckSend; break; case MQTTQoS2: calculatedState = ( opType == MQTT_SEND ) ? MQTTPubRecPending : MQTTPubRecSend; break; default: /* No other QoS values. */ break; } return calculatedState; } /*-----------------------------------------------------------*/ MQTTStatus_t MQTT_UpdateStatePublish( const MQTTContext_t * pMqttContext, uint16_t packetId, MQTTStateOperation_t opType, MQTTQoS_t qos, MQTTPublishState_t * pNewState ) { MQTTPublishState_t newState = MQTTStateNull; MQTTPublishState_t currentState = MQTTStateNull; MQTTStatus_t mqttStatus = MQTTSuccess; size_t recordIndex = MQTT_INVALID_STATE_COUNT; MQTTQoS_t foundQoS = MQTTQoS0; if( ( pMqttContext == NULL ) || ( pNewState == NULL ) ) { LogError( ( "Argument cannot be NULL: pMqttContext=%p, pNewState=%p", ( void * ) pMqttContext, ( void * ) pNewState ) ); mqttStatus = MQTTBadParameter; } else if( qos == MQTTQoS0 ) { /* QoS 0 publish. Do nothing. */ *pNewState = MQTTPublishDone; } else if( packetId == MQTT_PACKET_ID_INVALID ) { /* Publishes > QoS 0 need a valid packet ID. */ mqttStatus = MQTTBadParameter; } else if( opType == MQTT_SEND ) { /* Search record for entry so we can check QoS. */ recordIndex = findInRecord( pMqttContext->outgoingPublishRecords, pMqttContext->outgoingPublishRecordMaxCount, packetId, &foundQoS, ¤tState ); if( ( recordIndex == MQTT_INVALID_STATE_COUNT ) || ( foundQoS != qos ) ) { /* Entry should match with supplied QoS. */ mqttStatus = MQTTBadParameter; } } else { /* QoS 1 or 2 receive. Nothing to be done. */ } if( ( qos != MQTTQoS0 ) && ( mqttStatus == MQTTSuccess ) ) { newState = MQTT_CalculateStatePublish( opType, qos ); /* Validate state transition and update state records. */ mqttStatus = updateStatePublish( pMqttContext, recordIndex, packetId, opType, qos, currentState, newState ); /* Update output parameter on success. */ if( mqttStatus == MQTTSuccess ) { *pNewState = newState; } } return mqttStatus; } /*-----------------------------------------------------------*/ MQTTStatus_t MQTT_RemoveStateRecord( const MQTTContext_t * pMqttContext, uint16_t packetId ) { MQTTStatus_t status = MQTTSuccess; MQTTPubAckInfo_t * records; size_t recordIndex; /* Current state is updated by the findInRecord function. */ MQTTPublishState_t currentState; MQTTQoS_t qos = MQTTQoS0; if( ( pMqttContext == NULL ) || ( ( pMqttContext->outgoingPublishRecords == NULL ) ) ) { status = MQTTBadParameter; } else { records = pMqttContext->outgoingPublishRecords; recordIndex = findInRecord( records, pMqttContext->outgoingPublishRecordMaxCount, packetId, &qos, ¤tState ); if( currentState == MQTTStateNull ) { status = MQTTBadParameter; } else if( ( qos != MQTTQoS1 ) && ( qos != MQTTQoS2 ) ) { status = MQTTBadParameter; } else { /* Delete the record. */ updateRecord( records, recordIndex, MQTTStateNull, true ); } } return status; } /*-----------------------------------------------------------*/ MQTTStatus_t MQTT_UpdateStateAck( const MQTTContext_t * pMqttContext, uint16_t packetId, MQTTPubAckType_t packetType, MQTTStateOperation_t opType, MQTTPublishState_t * pNewState ) { MQTTPublishState_t newState = MQTTStateNull; MQTTPublishState_t currentState = MQTTStateNull; bool isOutgoingPublish = isPublishOutgoing( packetType, opType ); MQTTQoS_t qos = MQTTQoS0; size_t maxRecordCount = MQTT_INVALID_STATE_COUNT; size_t recordIndex = MQTT_INVALID_STATE_COUNT; MQTTPubAckInfo_t * records = NULL; MQTTStatus_t status = MQTTBadResponse; if( ( pMqttContext == NULL ) || ( pNewState == NULL ) ) { LogError( ( "Argument cannot be NULL: pMqttContext=%p, pNewState=%p.", ( void * ) pMqttContext, ( void * ) pNewState ) ); status = MQTTBadParameter; } else if( packetId == MQTT_PACKET_ID_INVALID ) { LogError( ( "Packet ID must be nonzero." ) ); status = MQTTBadParameter; } else if( packetType > MQTTPubcomp ) { LogError( ( "Invalid packet type %u.", ( unsigned int ) packetType ) ); status = MQTTBadParameter; } else { if( isOutgoingPublish == true ) { records = pMqttContext->outgoingPublishRecords; maxRecordCount = pMqttContext->outgoingPublishRecordMaxCount; } else { records = pMqttContext->incomingPublishRecords; maxRecordCount = pMqttContext->incomingPublishRecordMaxCount; } recordIndex = findInRecord( records, maxRecordCount, packetId, &qos, ¤tState ); } if( recordIndex != MQTT_INVALID_STATE_COUNT ) { newState = MQTT_CalculateStateAck( packetType, opType, qos ); /* Validate state transition and update state record. */ status = updateStateAck( records, maxRecordCount, recordIndex, packetId, currentState, newState ); /* Update the output parameter. */ if( status == MQTTSuccess ) { *pNewState = newState; } } else { LogError( ( "No matching record found for publish: PacketId=%u.", ( unsigned int ) packetId ) ); } return status; } /*-----------------------------------------------------------*/ uint16_t MQTT_PubrelToResend( const MQTTContext_t * pMqttContext, MQTTStateCursor_t * pCursor, MQTTPublishState_t * pState ) { uint16_t packetId = MQTT_PACKET_ID_INVALID; uint16_t searchStates = 0U; /* Validate arguments. */ if( ( pMqttContext == NULL ) || ( pCursor == NULL ) || ( pState == NULL ) ) { LogError( ( "Arguments cannot be NULL pMqttContext=%p, pCursor=%p" " pState=%p.", ( void * ) pMqttContext, ( void * ) pCursor, ( void * ) pState ) ); } else { /* PUBREL for packets in state #MQTTPubCompPending and #MQTTPubRelSend * would need to be resent when a session is reestablished.*/ UINT16_SET_BIT( searchStates, MQTTPubCompPending ); UINT16_SET_BIT( searchStates, MQTTPubRelSend ); packetId = stateSelect( pMqttContext, searchStates, pCursor ); /* The state needs to be in #MQTTPubRelSend for sending PUBREL. */ if( packetId != MQTT_PACKET_ID_INVALID ) { *pState = MQTTPubRelSend; } } return packetId; } /*-----------------------------------------------------------*/ uint16_t MQTT_PublishToResend( const MQTTContext_t * pMqttContext, MQTTStateCursor_t * pCursor ) { uint16_t packetId = MQTT_PACKET_ID_INVALID; uint16_t searchStates = 0U; /* Validate arguments. */ if( ( pMqttContext == NULL ) || ( pCursor == NULL ) ) { LogError( ( "Arguments cannot be NULL pMqttContext=%p, pCursor=%p", ( void * ) pMqttContext, ( void * ) pCursor ) ); } else { /* Packets in state #MQTTPublishSend, #MQTTPubAckPending and * #MQTTPubRecPending would need to be resent when a session is * reestablished. */ UINT16_SET_BIT( searchStates, MQTTPublishSend ); UINT16_SET_BIT( searchStates, MQTTPubAckPending ); UINT16_SET_BIT( searchStates, MQTTPubRecPending ); packetId = stateSelect( pMqttContext, searchStates, pCursor ); } return packetId; } /*-----------------------------------------------------------*/ const char * MQTT_State_strerror( MQTTPublishState_t state ) { const char * str = NULL; switch( state ) { case MQTTStateNull: str = "MQTTStateNull"; break; case MQTTPublishSend: str = "MQTTPublishSend"; break; case MQTTPubAckSend: str = "MQTTPubAckSend"; break; case MQTTPubRecSend: str = "MQTTPubRecSend"; break; case MQTTPubRelSend: str = "MQTTPubRelSend"; break; case MQTTPubCompSend: str = "MQTTPubCompSend"; break; case MQTTPubAckPending: str = "MQTTPubAckPending"; break; case MQTTPubRecPending: str = "MQTTPubRecPending"; break; case MQTTPubRelPending: str = "MQTTPubRelPending"; break; case MQTTPubCompPending: str = "MQTTPubCompPending"; break; case MQTTPublishDone: str = "MQTTPublishDone"; break; default: /* Invalid state received. */ str = "Invalid MQTT State"; break; } return str; } /*-----------------------------------------------------------*/