/*
|
* coreMQTT v2.1.1
|
* Copyright (C) 2022 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
*
|
* SPDX-License-Identifier: MIT
|
*
|
* Permission is hereby granted, free of charge, to any person obtaining a copy of
|
* this software and associated documentation files (the "Software"), to deal in
|
* the Software without restriction, including without limitation the rights to
|
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
|
* the Software, and to permit persons to whom the Software is furnished to do so,
|
* subject to the following conditions:
|
*
|
* The above copyright notice and this permission notice shall be included in all
|
* copies or substantial portions of the Software.
|
*
|
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
|
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
|
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
|
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
|
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
*/
|
|
/**
|
* @file core_mqtt_state.c
|
* @brief Implements the functions in core_mqtt_state.h.
|
*/
|
#include <assert.h>
|
#include <string.h>
|
#include "core_mqtt_state.h"
|
|
/* Include config defaults header to get default values of configs. */
|
#include "core_mqtt_config_defaults.h"
|
|
#include "core_mqtt_default_logging.h"
|
|
/*-----------------------------------------------------------*/
|
|
/**
|
* @brief A global static variable used to generate the macro
|
* #MQTT_INVALID_STATE_COUNT of size_t length.
|
*/
|
static const size_t ZERO_SIZE_T = 0U;
|
|
/**
|
* @brief This macro depicts the invalid value for the state publishes.
|
*/
|
#define MQTT_INVALID_STATE_COUNT ( ~ZERO_SIZE_T )
|
|
/**
|
* @brief Create a 16-bit bitmap with bit set at specified position.
|
*
|
* @param[in] position The position at which the bit need to be set.
|
*/
|
#define UINT16_BITMAP_BIT_SET_AT( position ) ( ( uint16_t ) 0x01U << ( ( uint16_t ) position ) )
|
|
/**
|
* @brief Set a bit in an 16-bit unsigned integer.
|
*
|
* @param[in] x The 16-bit unsigned integer to set a bit.
|
* @param[in] position The position at which the bit need to be set.
|
*/
|
#define UINT16_SET_BIT( x, position ) ( ( x ) = ( uint16_t ) ( ( x ) | ( UINT16_BITMAP_BIT_SET_AT( position ) ) ) )
|
|
/**
|
* @brief Macro for checking if a bit is set in a 16-bit unsigned integer.
|
*
|
* @param[in] x The unsigned 16-bit integer to check.
|
* @param[in] position Which bit to check.
|
*/
|
#define UINT16_CHECK_BIT( x, position ) ( ( ( x ) & ( UINT16_BITMAP_BIT_SET_AT( position ) ) ) == ( UINT16_BITMAP_BIT_SET_AT( position ) ) )
|
|
/*-----------------------------------------------------------*/
|
|
/**
|
* @brief Test if a transition to new state is possible, when dealing with PUBLISHes.
|
*
|
* @param[in] currentState The current state.
|
* @param[in] newState State to transition to.
|
* @param[in] opType Reserve, Send, or Receive.
|
* @param[in] qos 0, 1, or 2.
|
*
|
* @note This function does not validate the current state, or the new state
|
* based on either the operation type or QoS. It assumes the new state is valid
|
* given the opType and QoS, which will be the case if calculated by
|
* MQTT_CalculateStatePublish().
|
*
|
* @return `true` if transition is possible, else `false`
|
*/
|
static bool validateTransitionPublish( MQTTPublishState_t currentState,
|
MQTTPublishState_t newState,
|
MQTTStateOperation_t opType,
|
MQTTQoS_t qos );
|
|
/**
|
* @brief Test if a transition to a new state is possible, when dealing with acks.
|
*
|
* @param[in] currentState The current state.
|
* @param[in] newState State to transition to.
|
*
|
* @return `true` if transition is possible, else `false`.
|
*/
|
static bool validateTransitionAck( MQTTPublishState_t currentState,
|
MQTTPublishState_t newState );
|
|
/**
|
* @brief Test if the publish corresponding to an ack is outgoing or incoming.
|
*
|
* @param[in] packetType PUBACK, PUBREC, PUBREL, or PUBCOMP.
|
* @param[in] opType Send, or Receive.
|
*
|
* @return `true` if corresponds to outgoing publish, else `false`.
|
*/
|
static bool isPublishOutgoing( MQTTPubAckType_t packetType,
|
MQTTStateOperation_t opType );
|
|
/**
|
* @brief Find a packet ID in the state record.
|
*
|
* @param[in] records State record array.
|
* @param[in] recordCount Length of record array.
|
* @param[in] packetId packet ID to search for.
|
* @param[out] pQos QoS retrieved from record.
|
* @param[out] pCurrentState state retrieved from record.
|
*
|
* @return index of the packet id in the record if it exists, else the record length.
|
*/
|
static size_t findInRecord( const MQTTPubAckInfo_t * records,
|
size_t recordCount,
|
uint16_t packetId,
|
MQTTQoS_t * pQos,
|
MQTTPublishState_t * pCurrentState );
|
|
/**
|
* @brief Compact records.
|
*
|
* Records are arranged in the relative order to maintain message ordering.
|
* This will lead to fragmentation and this function will help in defragmenting
|
* the records array.
|
*
|
* @param[in] records State record array.
|
* @param[in] recordCount Length of record array.
|
*/
|
static void compactRecords( MQTTPubAckInfo_t * records,
|
size_t recordCount );
|
|
/**
|
* @brief Store a new entry in the state record.
|
*
|
* @param[in] records State record array.
|
* @param[in] recordCount Length of record array.
|
* @param[in] packetId Packet ID of new entry.
|
* @param[in] qos QoS of new entry.
|
* @param[in] publishState State of new entry.
|
*
|
* @return #MQTTSuccess, #MQTTNoMemory, or #MQTTStateCollision.
|
*/
|
static MQTTStatus_t addRecord( MQTTPubAckInfo_t * records,
|
size_t recordCount,
|
uint16_t packetId,
|
MQTTQoS_t qos,
|
MQTTPublishState_t publishState );
|
|
/**
|
* @brief Update and possibly delete an entry in the state record.
|
*
|
* @param[in] records State record array.
|
* @param[in] recordIndex index of record to update.
|
* @param[in] newState New state to update.
|
* @param[in] shouldDelete Whether an existing entry should be deleted.
|
*/
|
static void updateRecord( MQTTPubAckInfo_t * records,
|
size_t recordIndex,
|
MQTTPublishState_t newState,
|
bool shouldDelete );
|
|
/**
|
* @brief Get the packet ID and index of an outgoing publish in specified
|
* states.
|
*
|
* @param[in] pMqttContext Initialized MQTT context.
|
* @param[in] searchStates The states to search for in 2-byte bit map.
|
* @param[in,out] pCursor Index at which to start searching.
|
*
|
* @return Packet ID of the outgoing publish.
|
*/
|
static uint16_t stateSelect( const MQTTContext_t * pMqttContext,
|
uint16_t searchStates,
|
MQTTStateCursor_t * pCursor );
|
|
/**
|
* @brief Update the state records for an ACK after state transition
|
* validations.
|
*
|
* @param[in] records State records pointer.
|
* @param[in] maxRecordCount The maximum number of records.
|
* @param[in] recordIndex Index at which the record is stored.
|
* @param[in] packetId Packet id of the packet.
|
* @param[in] currentState Current state of the publish record.
|
* @param[in] newState New state of the publish.
|
*
|
* @return #MQTTIllegalState, or #MQTTSuccess.
|
*/
|
static MQTTStatus_t updateStateAck( MQTTPubAckInfo_t * records,
|
size_t maxRecordCount,
|
size_t recordIndex,
|
uint16_t packetId,
|
MQTTPublishState_t currentState,
|
MQTTPublishState_t newState );
|
|
/**
|
* @brief Update the state record for a PUBLISH packet after validating
|
* the state transitions.
|
*
|
* @param[in] pMqttContext Initialized MQTT context.
|
* @param[in] recordIndex Index in state records at which publish record exists.
|
* @param[in] packetId ID of the PUBLISH packet.
|
* @param[in] opType Send or Receive.
|
* @param[in] qos 0, 1, or 2.
|
* @param[in] currentState Current state of the publish record.
|
* @param[in] newState New state of the publish record.
|
*
|
* @return #MQTTIllegalState, #MQTTStateCollision or #MQTTSuccess.
|
*/
|
static MQTTStatus_t updateStatePublish( const MQTTContext_t * pMqttContext,
|
size_t recordIndex,
|
uint16_t packetId,
|
MQTTStateOperation_t opType,
|
MQTTQoS_t qos,
|
MQTTPublishState_t currentState,
|
MQTTPublishState_t newState );
|
|
/*-----------------------------------------------------------*/
|
|
static bool validateTransitionPublish( MQTTPublishState_t currentState,
|
MQTTPublishState_t newState,
|
MQTTStateOperation_t opType,
|
MQTTQoS_t qos )
|
{
|
bool isValid = false;
|
|
switch( currentState )
|
{
|
case MQTTStateNull:
|
|
/* Transitions from null occur when storing a new entry into the record. */
|
if( opType == MQTT_RECEIVE )
|
{
|
isValid = ( newState == MQTTPubAckSend ) || ( newState == MQTTPubRecSend );
|
}
|
|
break;
|
|
case MQTTPublishSend:
|
|
/* Outgoing publish. All such publishes start in this state due to
|
* the reserve operation. */
|
switch( qos )
|
{
|
case MQTTQoS1:
|
isValid = newState == MQTTPubAckPending;
|
break;
|
|
case MQTTQoS2:
|
isValid = newState == MQTTPubRecPending;
|
break;
|
|
case MQTTQoS0:
|
default:
|
/* QoS 0 is checked before calling this function. */
|
break;
|
}
|
|
break;
|
|
/* Below cases are for validating the resends of publish when a session is
|
* reestablished. */
|
case MQTTPubAckPending:
|
|
/* When a session is reestablished, outgoing QoS1 publishes in state
|
* #MQTTPubAckPending can be resent. The state remains the same. */
|
isValid = newState == MQTTPubAckPending;
|
|
break;
|
|
case MQTTPubRecPending:
|
|
/* When a session is reestablished, outgoing QoS2 publishes in state
|
* #MQTTPubRecPending can be resent. The state remains the same. */
|
isValid = newState == MQTTPubRecPending;
|
|
break;
|
|
case MQTTPubAckSend:
|
case MQTTPubCompPending:
|
case MQTTPubCompSend:
|
case MQTTPubRecSend:
|
case MQTTPubRelPending:
|
case MQTTPubRelSend:
|
case MQTTPublishDone:
|
default:
|
/* For a PUBLISH, we should not start from any other state. */
|
break;
|
}
|
|
return isValid;
|
}
|
|
/*-----------------------------------------------------------*/
|
|
static bool validateTransitionAck( MQTTPublishState_t currentState,
|
MQTTPublishState_t newState )
|
{
|
bool isValid = false;
|
|
switch( currentState )
|
{
|
case MQTTPubAckSend:
|
/* Incoming publish, QoS 1. */
|
case MQTTPubAckPending:
|
/* Outgoing publish, QoS 1. */
|
isValid = newState == MQTTPublishDone;
|
break;
|
|
case MQTTPubRecSend:
|
/* Incoming publish, QoS 2. */
|
isValid = newState == MQTTPubRelPending;
|
break;
|
|
case MQTTPubRelPending:
|
|
/* Incoming publish, QoS 2.
|
* There are 2 valid transitions possible.
|
* 1. MQTTPubRelPending -> MQTTPubCompSend : A PUBREL ack is received
|
* when publish record state is MQTTPubRelPending. This is the
|
* normal state transition without any connection interruptions.
|
* 2. MQTTPubRelPending -> MQTTPubRelPending : Receiving a duplicate
|
* QoS2 publish can result in a transition to the same state.
|
* This can happen in the below state transition.
|
* 1. Incoming publish received.
|
* 2. PUBREC ack sent and state is now MQTTPubRelPending.
|
* 3. TCP connection failure and broker didn't receive the PUBREC.
|
* 4. Reestablished MQTT session.
|
* 5. MQTT broker resent the un-acked publish.
|
* 6. Publish is received when publish record state is in
|
* MQTTPubRelPending.
|
* 7. Sending out a PUBREC will result in this transition
|
* to the same state. */
|
isValid = ( newState == MQTTPubCompSend ) ||
|
( newState == MQTTPubRelPending );
|
break;
|
|
case MQTTPubCompSend:
|
|
/* Incoming publish, QoS 2.
|
* There are 2 valid transitions possible.
|
* 1. MQTTPubCompSend -> MQTTPublishDone : A PUBCOMP ack is sent
|
* after receiving a PUBREL from broker. This is the
|
* normal state transition without any connection interruptions.
|
* 2. MQTTPubCompSend -> MQTTPubCompSend : Receiving a duplicate PUBREL
|
* can result in a transition to the same state.
|
* This can happen in the below state transition.
|
* 1. A TCP connection failure happened before sending a PUBCOMP
|
* for an incoming PUBREL.
|
* 2. Reestablished an MQTT session.
|
* 3. MQTT broker resent the un-acked PUBREL.
|
* 4. Receiving the PUBREL again will result in this transition
|
* to the same state. */
|
isValid = ( newState == MQTTPublishDone ) ||
|
( newState == MQTTPubCompSend );
|
break;
|
|
case MQTTPubRecPending:
|
/* Outgoing publish, Qos 2. */
|
isValid = newState == MQTTPubRelSend;
|
break;
|
|
case MQTTPubRelSend:
|
/* Outgoing publish, Qos 2. */
|
isValid = newState == MQTTPubCompPending;
|
break;
|
|
case MQTTPubCompPending:
|
|
/* Outgoing publish, Qos 2.
|
* There are 2 valid transitions possible.
|
* 1. MQTTPubCompPending -> MQTTPublishDone : A PUBCOMP is received.
|
* This marks the complete state transition for the publish packet.
|
* This is the normal state transition without any connection
|
* interruptions.
|
* 2. MQTTPubCompPending -> MQTTPubCompPending : Resending a PUBREL for
|
* packets in state #MQTTPubCompPending can result in this
|
* transition to the same state.
|
* This can happen in the below state transition.
|
* 1. A TCP connection failure happened before receiving a PUBCOMP
|
* for an outgoing PUBREL.
|
* 2. An MQTT session is reestablished.
|
* 3. Resending the un-acked PUBREL results in this transition
|
* to the same state. */
|
isValid = ( newState == MQTTPublishDone ) ||
|
( newState == MQTTPubCompPending );
|
break;
|
|
case MQTTPublishDone:
|
/* Done state should transition to invalid since it will be removed from the record. */
|
case MQTTPublishSend:
|
/* If an ack was sent/received we shouldn't have been in this state. */
|
case MQTTStateNull:
|
/* If an ack was sent/received the record should exist. */
|
default:
|
/* Invalid. */
|
break;
|
}
|
|
return isValid;
|
}
|
|
/*-----------------------------------------------------------*/
|
|
static bool isPublishOutgoing( MQTTPubAckType_t packetType,
|
MQTTStateOperation_t opType )
|
{
|
bool isOutgoing = false;
|
|
switch( packetType )
|
{
|
case MQTTPuback:
|
case MQTTPubrec:
|
case MQTTPubcomp:
|
isOutgoing = opType == MQTT_RECEIVE;
|
break;
|
|
case MQTTPubrel:
|
isOutgoing = opType == MQTT_SEND;
|
break;
|
|
default:
|
/* No other ack type. */
|
break;
|
}
|
|
return isOutgoing;
|
}
|
|
/*-----------------------------------------------------------*/
|
|
static size_t findInRecord( const MQTTPubAckInfo_t * records,
|
size_t recordCount,
|
uint16_t packetId,
|
MQTTQoS_t * pQos,
|
MQTTPublishState_t * pCurrentState )
|
{
|
size_t index = 0;
|
|
assert( packetId != MQTT_PACKET_ID_INVALID );
|
|
*pCurrentState = MQTTStateNull;
|
|
for( index = 0; index < recordCount; index++ )
|
{
|
if( records[ index ].packetId == packetId )
|
{
|
*pQos = records[ index ].qos;
|
*pCurrentState = records[ index ].publishState;
|
break;
|
}
|
}
|
|
if( index == recordCount )
|
{
|
index = MQTT_INVALID_STATE_COUNT;
|
}
|
|
return index;
|
}
|
|
/*-----------------------------------------------------------*/
|
|
static void compactRecords( MQTTPubAckInfo_t * records,
|
size_t recordCount )
|
{
|
size_t index = 0;
|
size_t emptyIndex = MQTT_INVALID_STATE_COUNT;
|
|
assert( records != NULL );
|
|
/* Find the empty spots and fill those with non empty values. */
|
for( ; index < recordCount; index++ )
|
{
|
/* Find the first empty spot. */
|
if( records[ index ].packetId == MQTT_PACKET_ID_INVALID )
|
{
|
if( emptyIndex == MQTT_INVALID_STATE_COUNT )
|
{
|
emptyIndex = index;
|
}
|
}
|
else
|
{
|
if( emptyIndex != MQTT_INVALID_STATE_COUNT )
|
{
|
/* Copy over the contents at non empty index to empty index. */
|
records[ emptyIndex ].packetId = records[ index ].packetId;
|
records[ emptyIndex ].qos = records[ index ].qos;
|
records[ emptyIndex ].publishState = records[ index ].publishState;
|
|
/* Mark the record at current non empty index as invalid. */
|
records[ index ].packetId = MQTT_PACKET_ID_INVALID;
|
records[ index ].qos = MQTTQoS0;
|
records[ index ].publishState = MQTTStateNull;
|
|
/* Advance the emptyIndex. */
|
emptyIndex++;
|
}
|
}
|
}
|
}
|
|
/*-----------------------------------------------------------*/
|
|
static MQTTStatus_t addRecord( MQTTPubAckInfo_t * records,
|
size_t recordCount,
|
uint16_t packetId,
|
MQTTQoS_t qos,
|
MQTTPublishState_t publishState )
|
{
|
MQTTStatus_t status = MQTTNoMemory;
|
int32_t index = 0;
|
size_t availableIndex = recordCount;
|
bool validEntryFound = false;
|
|
assert( packetId != MQTT_PACKET_ID_INVALID );
|
assert( qos != MQTTQoS0 );
|
|
/* Check if we have to compact the records. This is known by checking if
|
* the last spot in the array is filled. */
|
if( records[ recordCount - 1U ].packetId != MQTT_PACKET_ID_INVALID )
|
{
|
compactRecords( records, recordCount );
|
}
|
|
/* Start from end so first available index will be populated.
|
* Available index is always found after the last element in the records.
|
* This is to make sure the relative order of the records in order to meet
|
* the message ordering requirement of MQTT spec 3.1.1. */
|
for( index = ( ( int32_t ) recordCount - 1 ); index >= 0; index-- )
|
{
|
/* Available index is only found after packet at the highest index. */
|
if( records[ index ].packetId == MQTT_PACKET_ID_INVALID )
|
{
|
if( validEntryFound == false )
|
{
|
availableIndex = ( size_t ) index;
|
}
|
}
|
else
|
{
|
/* A non-empty spot found in the records. */
|
validEntryFound = true;
|
|
if( records[ index ].packetId == packetId )
|
{
|
/* Collision. */
|
LogError( ( "Collision when adding PacketID=%u at index=%d.",
|
( unsigned int ) packetId,
|
( int ) index ) );
|
|
status = MQTTStateCollision;
|
availableIndex = recordCount;
|
break;
|
}
|
}
|
}
|
|
if( availableIndex < recordCount )
|
{
|
records[ availableIndex ].packetId = packetId;
|
records[ availableIndex ].qos = qos;
|
records[ availableIndex ].publishState = publishState;
|
status = MQTTSuccess;
|
}
|
|
return status;
|
}
|
|
/*-----------------------------------------------------------*/
|
|
static void updateRecord( MQTTPubAckInfo_t * records,
|
size_t recordIndex,
|
MQTTPublishState_t newState,
|
bool shouldDelete )
|
{
|
assert( records != NULL );
|
|
if( shouldDelete == true )
|
{
|
/* Mark the record as invalid. */
|
records[ recordIndex ].packetId = MQTT_PACKET_ID_INVALID;
|
records[ recordIndex ].qos = MQTTQoS0;
|
records[ recordIndex ].publishState = MQTTStateNull;
|
}
|
else
|
{
|
records[ recordIndex ].publishState = newState;
|
}
|
}
|
|
/*-----------------------------------------------------------*/
|
|
static uint16_t stateSelect( const MQTTContext_t * pMqttContext,
|
uint16_t searchStates,
|
MQTTStateCursor_t * pCursor )
|
{
|
uint16_t packetId = MQTT_PACKET_ID_INVALID;
|
uint16_t outgoingStates = 0U;
|
const MQTTPubAckInfo_t * records = NULL;
|
size_t maxCount;
|
bool stateCheck = false;
|
|
assert( pMqttContext != NULL );
|
assert( searchStates != 0U );
|
assert( pCursor != NULL );
|
|
/* Create a bit map with all the outgoing publish states. */
|
UINT16_SET_BIT( outgoingStates, MQTTPublishSend );
|
UINT16_SET_BIT( outgoingStates, MQTTPubAckPending );
|
UINT16_SET_BIT( outgoingStates, MQTTPubRecPending );
|
UINT16_SET_BIT( outgoingStates, MQTTPubRelSend );
|
UINT16_SET_BIT( outgoingStates, MQTTPubCompPending );
|
|
/* Only outgoing publish records need to be searched. */
|
assert( ( outgoingStates & searchStates ) > 0U );
|
assert( ( ~outgoingStates & searchStates ) == 0U );
|
|
records = pMqttContext->outgoingPublishRecords;
|
maxCount = pMqttContext->outgoingPublishRecordMaxCount;
|
|
while( *pCursor < maxCount )
|
{
|
/* Check if any of the search states are present. */
|
stateCheck = UINT16_CHECK_BIT( searchStates, records[ *pCursor ].publishState );
|
|
if( stateCheck == true )
|
{
|
packetId = records[ *pCursor ].packetId;
|
( *pCursor )++;
|
break;
|
}
|
|
( *pCursor )++;
|
}
|
|
return packetId;
|
}
|
|
/*-----------------------------------------------------------*/
|
|
MQTTPublishState_t MQTT_CalculateStateAck( MQTTPubAckType_t packetType,
|
MQTTStateOperation_t opType,
|
MQTTQoS_t qos )
|
{
|
MQTTPublishState_t calculatedState = MQTTStateNull;
|
/* There are more QoS2 cases than QoS1, so initialize to that. */
|
bool qosValid = qos == MQTTQoS2;
|
|
switch( packetType )
|
{
|
case MQTTPuback:
|
qosValid = qos == MQTTQoS1;
|
calculatedState = MQTTPublishDone;
|
break;
|
|
case MQTTPubrec:
|
|
/* Incoming publish: send PUBREC, PUBREL pending.
|
* Outgoing publish: receive PUBREC, send PUBREL. */
|
calculatedState = ( opType == MQTT_SEND ) ? MQTTPubRelPending : MQTTPubRelSend;
|
break;
|
|
case MQTTPubrel:
|
|
/* Incoming publish: receive PUBREL, send PUBCOMP.
|
* Outgoing publish: send PUBREL, PUBCOMP pending. */
|
calculatedState = ( opType == MQTT_SEND ) ? MQTTPubCompPending : MQTTPubCompSend;
|
break;
|
|
case MQTTPubcomp:
|
calculatedState = MQTTPublishDone;
|
break;
|
|
default:
|
/* No other ack type. */
|
break;
|
}
|
|
/* Sanity check, make sure ack and QoS agree. */
|
if( qosValid == false )
|
{
|
calculatedState = MQTTStateNull;
|
}
|
|
return calculatedState;
|
}
|
|
/*-----------------------------------------------------------*/
|
|
static MQTTStatus_t updateStateAck( MQTTPubAckInfo_t * records,
|
size_t maxRecordCount,
|
size_t recordIndex,
|
uint16_t packetId,
|
MQTTPublishState_t currentState,
|
MQTTPublishState_t newState )
|
{
|
MQTTStatus_t status = MQTTIllegalState;
|
bool shouldDeleteRecord = false;
|
bool isTransitionValid = false;
|
|
assert( records != NULL );
|
|
/* Record to be deleted if the state transition is completed or if a PUBREC
|
* is received for an outgoing QoS2 publish. When a PUBREC is received,
|
* record is deleted and added back to the end of the records to maintain
|
* ordering for PUBRELs. */
|
shouldDeleteRecord = ( newState == MQTTPublishDone ) || ( newState == MQTTPubRelSend );
|
isTransitionValid = validateTransitionAck( currentState, newState );
|
|
if( isTransitionValid == true )
|
{
|
status = MQTTSuccess;
|
|
/* Update record for acks. When sending or receiving acks for packets that
|
* are resent during a session reestablishment, the new state and
|
* current state can be the same. No update of record required in that case. */
|
if( currentState != newState )
|
{
|
updateRecord( records,
|
recordIndex,
|
newState,
|
shouldDeleteRecord );
|
|
/* For QoS2 messages, in order to preserve the message ordering, when
|
* a PUBREC is received for an outgoing publish, the record should be
|
* moved to the last. This move will help preserve the order in which
|
* a PUBREL needs to be resent in case of a session reestablishment. */
|
if( newState == MQTTPubRelSend )
|
{
|
status = addRecord( records,
|
maxRecordCount,
|
packetId,
|
MQTTQoS2,
|
MQTTPubRelSend );
|
}
|
}
|
}
|
else
|
{
|
/* Invalid state transition. */
|
LogError( ( "Invalid transition from state %s to state %s.",
|
MQTT_State_strerror( currentState ),
|
MQTT_State_strerror( newState ) ) );
|
}
|
|
return status;
|
}
|
|
/*-----------------------------------------------------------*/
|
|
static MQTTStatus_t updateStatePublish( const MQTTContext_t * pMqttContext,
|
size_t recordIndex,
|
uint16_t packetId,
|
MQTTStateOperation_t opType,
|
MQTTQoS_t qos,
|
MQTTPublishState_t currentState,
|
MQTTPublishState_t newState )
|
{
|
MQTTStatus_t status = MQTTSuccess;
|
bool isTransitionValid = false;
|
|
assert( pMqttContext != NULL );
|
assert( packetId != MQTT_PACKET_ID_INVALID );
|
assert( qos != MQTTQoS0 );
|
|
/* This will always succeed for an incoming publish. This is due to the fact
|
* that the passed in currentState must be MQTTStateNull, since
|
* #MQTT_UpdateStatePublish does not perform a lookup for receives. */
|
isTransitionValid = validateTransitionPublish( currentState, newState, opType, qos );
|
|
if( isTransitionValid == true )
|
{
|
/* addRecord will check for collisions. */
|
if( opType == MQTT_RECEIVE )
|
{
|
status = addRecord( pMqttContext->incomingPublishRecords,
|
pMqttContext->incomingPublishRecordMaxCount,
|
packetId,
|
qos,
|
newState );
|
}
|
/* Send operation. */
|
else
|
{
|
/* Skip updating record when publish is resend and no state
|
* update is required. */
|
if( currentState != newState )
|
{
|
updateRecord( pMqttContext->outgoingPublishRecords,
|
recordIndex,
|
newState,
|
false );
|
}
|
}
|
}
|
else
|
{
|
status = MQTTIllegalState;
|
LogError( ( "Invalid transition from state %s to state %s.",
|
MQTT_State_strerror( currentState ),
|
MQTT_State_strerror( newState ) ) );
|
}
|
|
return status;
|
}
|
|
/*-----------------------------------------------------------*/
|
|
MQTTStatus_t MQTT_ReserveState( const MQTTContext_t * pMqttContext,
|
uint16_t packetId,
|
MQTTQoS_t qos )
|
{
|
MQTTStatus_t status = MQTTSuccess;
|
|
if( qos == MQTTQoS0 )
|
{
|
status = MQTTSuccess;
|
}
|
else if( ( packetId == MQTT_PACKET_ID_INVALID ) || ( pMqttContext == NULL ) )
|
{
|
status = MQTTBadParameter;
|
}
|
else
|
{
|
/* Collisions are detected when adding the record. */
|
status = addRecord( pMqttContext->outgoingPublishRecords,
|
pMqttContext->outgoingPublishRecordMaxCount,
|
packetId,
|
qos,
|
MQTTPublishSend );
|
}
|
|
return status;
|
}
|
|
/*-----------------------------------------------------------*/
|
|
MQTTPublishState_t MQTT_CalculateStatePublish( MQTTStateOperation_t opType,
|
MQTTQoS_t qos )
|
{
|
MQTTPublishState_t calculatedState = MQTTStateNull;
|
|
switch( qos )
|
{
|
case MQTTQoS0:
|
calculatedState = MQTTPublishDone;
|
break;
|
|
case MQTTQoS1:
|
calculatedState = ( opType == MQTT_SEND ) ? MQTTPubAckPending : MQTTPubAckSend;
|
break;
|
|
case MQTTQoS2:
|
calculatedState = ( opType == MQTT_SEND ) ? MQTTPubRecPending : MQTTPubRecSend;
|
break;
|
|
default:
|
/* No other QoS values. */
|
break;
|
}
|
|
return calculatedState;
|
}
|
|
/*-----------------------------------------------------------*/
|
|
MQTTStatus_t MQTT_UpdateStatePublish( const MQTTContext_t * pMqttContext,
|
uint16_t packetId,
|
MQTTStateOperation_t opType,
|
MQTTQoS_t qos,
|
MQTTPublishState_t * pNewState )
|
{
|
MQTTPublishState_t newState = MQTTStateNull;
|
MQTTPublishState_t currentState = MQTTStateNull;
|
MQTTStatus_t mqttStatus = MQTTSuccess;
|
size_t recordIndex = MQTT_INVALID_STATE_COUNT;
|
MQTTQoS_t foundQoS = MQTTQoS0;
|
|
if( ( pMqttContext == NULL ) || ( pNewState == NULL ) )
|
{
|
LogError( ( "Argument cannot be NULL: pMqttContext=%p, pNewState=%p",
|
( void * ) pMqttContext,
|
( void * ) pNewState ) );
|
|
mqttStatus = MQTTBadParameter;
|
}
|
else if( qos == MQTTQoS0 )
|
{
|
/* QoS 0 publish. Do nothing. */
|
*pNewState = MQTTPublishDone;
|
}
|
else if( packetId == MQTT_PACKET_ID_INVALID )
|
{
|
/* Publishes > QoS 0 need a valid packet ID. */
|
mqttStatus = MQTTBadParameter;
|
}
|
else if( opType == MQTT_SEND )
|
{
|
/* Search record for entry so we can check QoS. */
|
recordIndex = findInRecord( pMqttContext->outgoingPublishRecords,
|
pMqttContext->outgoingPublishRecordMaxCount,
|
packetId,
|
&foundQoS,
|
¤tState );
|
|
if( ( recordIndex == MQTT_INVALID_STATE_COUNT ) || ( foundQoS != qos ) )
|
{
|
/* Entry should match with supplied QoS. */
|
mqttStatus = MQTTBadParameter;
|
}
|
}
|
else
|
{
|
/* QoS 1 or 2 receive. Nothing to be done. */
|
}
|
|
if( ( qos != MQTTQoS0 ) && ( mqttStatus == MQTTSuccess ) )
|
{
|
newState = MQTT_CalculateStatePublish( opType, qos );
|
/* Validate state transition and update state records. */
|
mqttStatus = updateStatePublish( pMqttContext,
|
recordIndex,
|
packetId,
|
opType,
|
qos,
|
currentState,
|
newState );
|
|
/* Update output parameter on success. */
|
if( mqttStatus == MQTTSuccess )
|
{
|
*pNewState = newState;
|
}
|
}
|
|
return mqttStatus;
|
}
|
|
/*-----------------------------------------------------------*/
|
|
MQTTStatus_t MQTT_RemoveStateRecord( const MQTTContext_t * pMqttContext,
|
uint16_t packetId )
|
{
|
MQTTStatus_t status = MQTTSuccess;
|
MQTTPubAckInfo_t * records;
|
size_t recordIndex;
|
/* Current state is updated by the findInRecord function. */
|
MQTTPublishState_t currentState;
|
MQTTQoS_t qos = MQTTQoS0;
|
|
|
if( ( pMqttContext == NULL ) || ( ( pMqttContext->outgoingPublishRecords == NULL ) ) )
|
{
|
status = MQTTBadParameter;
|
}
|
else
|
{
|
records = pMqttContext->outgoingPublishRecords;
|
|
recordIndex = findInRecord( records,
|
pMqttContext->outgoingPublishRecordMaxCount,
|
packetId,
|
&qos,
|
¤tState );
|
|
if( currentState == MQTTStateNull )
|
{
|
status = MQTTBadParameter;
|
}
|
else if( ( qos != MQTTQoS1 ) && ( qos != MQTTQoS2 ) )
|
{
|
status = MQTTBadParameter;
|
}
|
else
|
{
|
/* Delete the record. */
|
updateRecord( records,
|
recordIndex,
|
MQTTStateNull,
|
true );
|
}
|
}
|
|
return status;
|
}
|
|
/*-----------------------------------------------------------*/
|
|
MQTTStatus_t MQTT_UpdateStateAck( const MQTTContext_t * pMqttContext,
|
uint16_t packetId,
|
MQTTPubAckType_t packetType,
|
MQTTStateOperation_t opType,
|
MQTTPublishState_t * pNewState )
|
{
|
MQTTPublishState_t newState = MQTTStateNull;
|
MQTTPublishState_t currentState = MQTTStateNull;
|
bool isOutgoingPublish = isPublishOutgoing( packetType, opType );
|
MQTTQoS_t qos = MQTTQoS0;
|
size_t maxRecordCount = MQTT_INVALID_STATE_COUNT;
|
size_t recordIndex = MQTT_INVALID_STATE_COUNT;
|
|
MQTTPubAckInfo_t * records = NULL;
|
MQTTStatus_t status = MQTTBadResponse;
|
|
if( ( pMqttContext == NULL ) || ( pNewState == NULL ) )
|
{
|
LogError( ( "Argument cannot be NULL: pMqttContext=%p, pNewState=%p.",
|
( void * ) pMqttContext,
|
( void * ) pNewState ) );
|
status = MQTTBadParameter;
|
}
|
else if( packetId == MQTT_PACKET_ID_INVALID )
|
{
|
LogError( ( "Packet ID must be nonzero." ) );
|
status = MQTTBadParameter;
|
}
|
else if( packetType > MQTTPubcomp )
|
{
|
LogError( ( "Invalid packet type %u.", ( unsigned int ) packetType ) );
|
status = MQTTBadParameter;
|
}
|
else
|
{
|
if( isOutgoingPublish == true )
|
{
|
records = pMqttContext->outgoingPublishRecords;
|
maxRecordCount = pMqttContext->outgoingPublishRecordMaxCount;
|
}
|
else
|
{
|
records = pMqttContext->incomingPublishRecords;
|
maxRecordCount = pMqttContext->incomingPublishRecordMaxCount;
|
}
|
|
recordIndex = findInRecord( records,
|
maxRecordCount,
|
packetId,
|
&qos,
|
¤tState );
|
}
|
|
if( recordIndex != MQTT_INVALID_STATE_COUNT )
|
{
|
newState = MQTT_CalculateStateAck( packetType, opType, qos );
|
|
/* Validate state transition and update state record. */
|
status = updateStateAck( records,
|
maxRecordCount,
|
recordIndex,
|
packetId,
|
currentState,
|
newState );
|
|
/* Update the output parameter. */
|
if( status == MQTTSuccess )
|
{
|
*pNewState = newState;
|
}
|
}
|
else
|
{
|
LogError( ( "No matching record found for publish: PacketId=%u.",
|
( unsigned int ) packetId ) );
|
}
|
|
return status;
|
}
|
|
/*-----------------------------------------------------------*/
|
|
uint16_t MQTT_PubrelToResend( const MQTTContext_t * pMqttContext,
|
MQTTStateCursor_t * pCursor,
|
MQTTPublishState_t * pState )
|
{
|
uint16_t packetId = MQTT_PACKET_ID_INVALID;
|
uint16_t searchStates = 0U;
|
|
/* Validate arguments. */
|
if( ( pMqttContext == NULL ) || ( pCursor == NULL ) || ( pState == NULL ) )
|
{
|
LogError( ( "Arguments cannot be NULL pMqttContext=%p, pCursor=%p"
|
" pState=%p.",
|
( void * ) pMqttContext,
|
( void * ) pCursor,
|
( void * ) pState ) );
|
}
|
else
|
{
|
/* PUBREL for packets in state #MQTTPubCompPending and #MQTTPubRelSend
|
* would need to be resent when a session is reestablished.*/
|
UINT16_SET_BIT( searchStates, MQTTPubCompPending );
|
UINT16_SET_BIT( searchStates, MQTTPubRelSend );
|
packetId = stateSelect( pMqttContext, searchStates, pCursor );
|
|
/* The state needs to be in #MQTTPubRelSend for sending PUBREL. */
|
if( packetId != MQTT_PACKET_ID_INVALID )
|
{
|
*pState = MQTTPubRelSend;
|
}
|
}
|
|
return packetId;
|
}
|
|
/*-----------------------------------------------------------*/
|
|
uint16_t MQTT_PublishToResend( const MQTTContext_t * pMqttContext,
|
MQTTStateCursor_t * pCursor )
|
{
|
uint16_t packetId = MQTT_PACKET_ID_INVALID;
|
uint16_t searchStates = 0U;
|
|
/* Validate arguments. */
|
if( ( pMqttContext == NULL ) || ( pCursor == NULL ) )
|
{
|
LogError( ( "Arguments cannot be NULL pMqttContext=%p, pCursor=%p",
|
( void * ) pMqttContext,
|
( void * ) pCursor ) );
|
}
|
else
|
{
|
/* Packets in state #MQTTPublishSend, #MQTTPubAckPending and
|
* #MQTTPubRecPending would need to be resent when a session is
|
* reestablished. */
|
UINT16_SET_BIT( searchStates, MQTTPublishSend );
|
UINT16_SET_BIT( searchStates, MQTTPubAckPending );
|
UINT16_SET_BIT( searchStates, MQTTPubRecPending );
|
|
packetId = stateSelect( pMqttContext, searchStates, pCursor );
|
}
|
|
return packetId;
|
}
|
|
/*-----------------------------------------------------------*/
|
|
const char * MQTT_State_strerror( MQTTPublishState_t state )
|
{
|
const char * str = NULL;
|
|
switch( state )
|
{
|
case MQTTStateNull:
|
str = "MQTTStateNull";
|
break;
|
|
case MQTTPublishSend:
|
str = "MQTTPublishSend";
|
break;
|
|
case MQTTPubAckSend:
|
str = "MQTTPubAckSend";
|
break;
|
|
case MQTTPubRecSend:
|
str = "MQTTPubRecSend";
|
break;
|
|
case MQTTPubRelSend:
|
str = "MQTTPubRelSend";
|
break;
|
|
case MQTTPubCompSend:
|
str = "MQTTPubCompSend";
|
break;
|
|
case MQTTPubAckPending:
|
str = "MQTTPubAckPending";
|
break;
|
|
case MQTTPubRecPending:
|
str = "MQTTPubRecPending";
|
break;
|
|
case MQTTPubRelPending:
|
str = "MQTTPubRelPending";
|
break;
|
|
case MQTTPubCompPending:
|
str = "MQTTPubCompPending";
|
break;
|
|
case MQTTPublishDone:
|
str = "MQTTPublishDone";
|
break;
|
|
default:
|
/* Invalid state received. */
|
str = "Invalid MQTT State";
|
break;
|
}
|
|
return str;
|
}
|
|
/*-----------------------------------------------------------*/
|