commit | author | age
|
d6b4a7
|
1 |
/* |
G |
2 |
* coreMQTT v2.1.1 |
|
3 |
* Copyright (C) 2022 Amazon.com, Inc. or its affiliates. All Rights Reserved. |
|
4 |
* |
|
5 |
* SPDX-License-Identifier: MIT |
|
6 |
* |
|
7 |
* Permission is hereby granted, free of charge, to any person obtaining a copy of |
|
8 |
* this software and associated documentation files (the "Software"), to deal in |
|
9 |
* the Software without restriction, including without limitation the rights to |
|
10 |
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of |
|
11 |
* the Software, and to permit persons to whom the Software is furnished to do so, |
|
12 |
* subject to the following conditions: |
|
13 |
* |
|
14 |
* The above copyright notice and this permission notice shall be included in all |
|
15 |
* copies or substantial portions of the Software. |
|
16 |
* |
|
17 |
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR |
|
18 |
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS |
|
19 |
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR |
|
20 |
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER |
|
21 |
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN |
|
22 |
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. |
|
23 |
*/ |
|
24 |
|
|
25 |
/** |
|
26 |
* @file core_mqtt.c |
|
27 |
* @brief Implements the user-facing functions in core_mqtt.h. |
|
28 |
*/ |
|
29 |
#include <string.h> |
|
30 |
#include <assert.h> |
|
31 |
|
|
32 |
#include "core_mqtt.h" |
|
33 |
#include "core_mqtt_state.h" |
|
34 |
|
|
35 |
/* Include config defaults header to get default values of configs. */ |
|
36 |
#include "core_mqtt_config_defaults.h" |
|
37 |
|
|
38 |
#include "core_mqtt_default_logging.h" |
|
39 |
|
|
40 |
#ifndef MQTT_PRE_SEND_HOOK |
|
41 |
|
|
42 |
/** |
|
43 |
* @brief Hook called before a 'send' operation is executed. |
|
44 |
*/ |
|
45 |
#define MQTT_PRE_SEND_HOOK( pContext ) |
|
46 |
#endif /* !MQTT_PRE_SEND_HOOK */ |
|
47 |
|
|
48 |
#ifndef MQTT_POST_SEND_HOOK |
|
49 |
|
|
50 |
/** |
|
51 |
* @brief Hook called after the 'send' operation is complete. |
|
52 |
*/ |
|
53 |
#define MQTT_POST_SEND_HOOK( pContext ) |
|
54 |
#endif /* !MQTT_POST_SEND_HOOK */ |
|
55 |
|
|
56 |
#ifndef MQTT_PRE_STATE_UPDATE_HOOK |
|
57 |
|
|
58 |
/** |
|
59 |
* @brief Hook called just before an update to the MQTT state is made. |
|
60 |
*/ |
|
61 |
#define MQTT_PRE_STATE_UPDATE_HOOK( pContext ) |
|
62 |
#endif /* !MQTT_PRE_STATE_UPDATE_HOOK */ |
|
63 |
|
|
64 |
#ifndef MQTT_POST_STATE_UPDATE_HOOK |
|
65 |
|
|
66 |
/** |
|
67 |
* @brief Hook called just after an update to the MQTT state has |
|
68 |
* been made. |
|
69 |
*/ |
|
70 |
#define MQTT_POST_STATE_UPDATE_HOOK( pContext ) |
|
71 |
#endif /* !MQTT_POST_STATE_UPDATE_HOOK */ |
|
72 |
|
|
73 |
/*-----------------------------------------------------------*/ |
|
74 |
|
|
75 |
/** |
|
76 |
* @brief Sends provided buffer to network using transport send. |
|
77 |
* |
|
78 |
* @brief param[in] pContext Initialized MQTT context. |
|
79 |
* @brief param[in] pBufferToSend Buffer to be sent to network. |
|
80 |
* @brief param[in] bytesToSend Number of bytes to be sent. |
|
81 |
* |
|
82 |
* @note This operation may call the transport send function |
|
83 |
* repeatedly to send bytes over the network until either: |
|
84 |
* 1. The requested number of bytes @a bytesToSend have been sent. |
|
85 |
* OR |
|
86 |
* 2. MQTT_SEND_TIMEOUT_MS milliseconds have gone by since entering this |
|
87 |
* function. |
|
88 |
* OR |
|
89 |
* 3. There is an error in sending data over the network. |
|
90 |
* |
|
91 |
* @return Total number of bytes sent, or negative value on network error. |
|
92 |
*/ |
|
93 |
static int32_t sendBuffer( MQTTContext_t * pContext, |
|
94 |
const uint8_t * pBufferToSend, |
|
95 |
size_t bytesToSend ); |
|
96 |
|
|
97 |
/** |
|
98 |
* @brief Sends MQTT connect without copying the users data into any buffer. |
|
99 |
* |
|
100 |
* @brief param[in] pContext Initialized MQTT context. |
|
101 |
* @brief param[in] pConnectInfo MQTT CONNECT packet information. |
|
102 |
* @brief param[in] pWillInfo Last Will and Testament. Pass NULL if Last Will and |
|
103 |
* Testament is not used. |
|
104 |
* @brief param[in] remainingLength the length of the connect packet. |
|
105 |
* |
|
106 |
* @note This operation may call the transport send function |
|
107 |
* repeatedly to send bytes over the network until either: |
|
108 |
* 1. The requested number of bytes @a remainingLength have been sent. |
|
109 |
* OR |
|
110 |
* 2. MQTT_SEND_TIMEOUT_MS milliseconds have gone by since entering this |
|
111 |
* function. |
|
112 |
* OR |
|
113 |
* 3. There is an error in sending data over the network. |
|
114 |
* |
|
115 |
* @return #MQTTSendFailed or #MQTTSuccess. |
|
116 |
*/ |
|
117 |
static MQTTStatus_t sendConnectWithoutCopy( MQTTContext_t * pContext, |
|
118 |
const MQTTConnectInfo_t * pConnectInfo, |
|
119 |
const MQTTPublishInfo_t * pWillInfo, |
|
120 |
size_t remainingLength ); |
|
121 |
|
|
122 |
/** |
|
123 |
* @brief Sends the vector array passed through the parameters over the network. |
|
124 |
* |
|
125 |
* @note The preference is given to 'writev' function if it is present in the |
|
126 |
* transport interface. Otherwise, a send call is made repeatedly to achieve the |
|
127 |
* result. |
|
128 |
* |
|
129 |
* @param[in] pContext Initialized MQTT context. |
|
130 |
* @param[in] pIoVec The vector array to be sent. |
|
131 |
* @param[in] ioVecCount The number of elements in the array. |
|
132 |
* |
|
133 |
* @note This operation may call the transport send or writev functions |
|
134 |
* repeatedly to send bytes over the network until either: |
|
135 |
* 1. The requested number of bytes have been sent. |
|
136 |
* OR |
|
137 |
* 2. MQTT_SEND_TIMEOUT_MS milliseconds have gone by since entering this |
|
138 |
* function. |
|
139 |
* OR |
|
140 |
* 3. There is an error in sending data over the network. |
|
141 |
* |
|
142 |
* @return The total number of bytes sent or the error code as received from the |
|
143 |
* transport interface. |
|
144 |
*/ |
|
145 |
static int32_t sendMessageVector( MQTTContext_t * pContext, |
|
146 |
TransportOutVector_t * pIoVec, |
|
147 |
size_t ioVecCount ); |
|
148 |
|
|
149 |
/** |
|
150 |
* @brief Add a string and its length after serializing it in a manner outlined by |
|
151 |
* the MQTT specification. |
|
152 |
* |
|
153 |
* @param[in] serailizedLength Array of two bytes to which the vector will point. |
|
154 |
* The array must remain in scope until the message has been sent. |
|
155 |
* @param[in] string The string to be serialized. |
|
156 |
* @param[in] length The length of the string to be serialized. |
|
157 |
* @param[in] iterator The iterator pointing to the first element in the |
|
158 |
* transport interface IO array. |
|
159 |
* @param[out] updatedLength This parameter will be added to with the number of |
|
160 |
* bytes added to the vector. |
|
161 |
* |
|
162 |
* @return The number of vectors added. |
|
163 |
*/ |
|
164 |
static size_t addEncodedStringToVector( uint8_t serailizedLength[ 2 ], |
|
165 |
const char * const string, |
|
166 |
uint16_t length, |
|
167 |
TransportOutVector_t * iterator, |
|
168 |
size_t * updatedLength ); |
|
169 |
|
|
170 |
/** |
|
171 |
* @brief Send MQTT SUBSCRIBE message without copying the user data into a buffer and |
|
172 |
* directly sending it. |
|
173 |
* |
|
174 |
* @param[in] pContext Initialized MQTT context. |
|
175 |
* @param[in] pSubscriptionList List of MQTT subscription info. |
|
176 |
* @param[in] subscriptionCount The count of elements in the list. |
|
177 |
* @param[in] packetId The packet ID of the subscribe packet |
|
178 |
* @param[in] remainingLength The remaining length of the subscribe packet. |
|
179 |
* |
|
180 |
* @return #MQTTSuccess or #MQTTSendFailed. |
|
181 |
*/ |
|
182 |
static MQTTStatus_t sendSubscribeWithoutCopy( MQTTContext_t * pContext, |
|
183 |
const MQTTSubscribeInfo_t * pSubscriptionList, |
|
184 |
size_t subscriptionCount, |
|
185 |
uint16_t packetId, |
|
186 |
size_t remainingLength ); |
|
187 |
|
|
188 |
/** |
|
189 |
* @brief Send MQTT UNSUBSCRIBE message without copying the user data into a buffer and |
|
190 |
* directly sending it. |
|
191 |
* |
|
192 |
* @param[in] pContext Initialized MQTT context. |
|
193 |
* @param[in] pSubscriptionList MQTT subscription info. |
|
194 |
* @param[in] subscriptionCount The count of elements in the list. |
|
195 |
* @param[in] packetId The packet ID of the unsubscribe packet. |
|
196 |
* @param[in] remainingLength The remaining length of the unsubscribe packet. |
|
197 |
* |
|
198 |
* @return #MQTTSuccess or #MQTTSendFailed. |
|
199 |
*/ |
|
200 |
static MQTTStatus_t sendUnsubscribeWithoutCopy( MQTTContext_t * pContext, |
|
201 |
const MQTTSubscribeInfo_t * pSubscriptionList, |
|
202 |
size_t subscriptionCount, |
|
203 |
uint16_t packetId, |
|
204 |
size_t remainingLength ); |
|
205 |
|
|
206 |
/** |
|
207 |
* @brief Calculate the interval between two millisecond timestamps, including |
|
208 |
* when the later value has overflowed. |
|
209 |
* |
|
210 |
* @note In C, the operands are promoted to signed integers in subtraction. |
|
211 |
* Using this function avoids the need to cast the result of subtractions back |
|
212 |
* to uint32_t. |
|
213 |
* |
|
214 |
* @param[in] later The later time stamp, in milliseconds. |
|
215 |
* @param[in] start The earlier time stamp, in milliseconds. |
|
216 |
* |
|
217 |
* @return later - start. |
|
218 |
*/ |
|
219 |
static uint32_t calculateElapsedTime( uint32_t later, |
|
220 |
uint32_t start ); |
|
221 |
|
|
222 |
/** |
|
223 |
* @brief Convert a byte indicating a publish ack type to an #MQTTPubAckType_t. |
|
224 |
* |
|
225 |
* @param[in] packetType First byte of fixed header. |
|
226 |
* |
|
227 |
* @return Type of ack. |
|
228 |
*/ |
|
229 |
static MQTTPubAckType_t getAckFromPacketType( uint8_t packetType ); |
|
230 |
|
|
231 |
/** |
|
232 |
* @brief Receive bytes into the network buffer. |
|
233 |
* |
|
234 |
* @param[in] pContext Initialized MQTT Context. |
|
235 |
* @param[in] bytesToRecv Number of bytes to receive. |
|
236 |
* |
|
237 |
* @note This operation calls the transport receive function |
|
238 |
* repeatedly to read bytes from the network until either: |
|
239 |
* 1. The requested number of bytes @a bytesToRecv are read. |
|
240 |
* OR |
|
241 |
* 2. No data is received from the network for MQTT_RECV_POLLING_TIMEOUT_MS duration. |
|
242 |
* |
|
243 |
* OR |
|
244 |
* 3. There is an error in reading from the network. |
|
245 |
* |
|
246 |
* |
|
247 |
* @return Number of bytes received, or negative number on network error. |
|
248 |
*/ |
|
249 |
static int32_t recvExact( const MQTTContext_t * pContext, |
|
250 |
size_t bytesToRecv ); |
|
251 |
|
|
252 |
/** |
|
253 |
* @brief Discard a packet from the transport interface. |
|
254 |
* |
|
255 |
* @param[in] pContext MQTT Connection context. |
|
256 |
* @param[in] remainingLength Remaining length of the packet to dump. |
|
257 |
* @param[in] timeoutMs Time remaining to discard the packet. |
|
258 |
* |
|
259 |
* @return #MQTTRecvFailed or #MQTTNoDataAvailable. |
|
260 |
*/ |
|
261 |
static MQTTStatus_t discardPacket( const MQTTContext_t * pContext, |
|
262 |
size_t remainingLength, |
|
263 |
uint32_t timeoutMs ); |
|
264 |
|
|
265 |
/** |
|
266 |
* @brief Discard a packet from the MQTT buffer and the transport interface. |
|
267 |
* |
|
268 |
* @param[in] pContext MQTT Connection context. |
|
269 |
* @param[in] pPacketInfo Information struct of the packet to be discarded. |
|
270 |
* |
|
271 |
* @return #MQTTRecvFailed or #MQTTNoDataAvailable. |
|
272 |
*/ |
|
273 |
static MQTTStatus_t discardStoredPacket( MQTTContext_t * pContext, |
|
274 |
const MQTTPacketInfo_t * pPacketInfo ); |
|
275 |
|
|
276 |
/** |
|
277 |
* @brief Receive a packet from the transport interface. |
|
278 |
* |
|
279 |
* @param[in] pContext MQTT Connection context. |
|
280 |
* @param[in] incomingPacket packet struct with remaining length. |
|
281 |
* @param[in] remainingTimeMs Time remaining to receive the packet. |
|
282 |
* |
|
283 |
* @return #MQTTSuccess or #MQTTRecvFailed. |
|
284 |
*/ |
|
285 |
static MQTTStatus_t receivePacket( const MQTTContext_t * pContext, |
|
286 |
MQTTPacketInfo_t incomingPacket, |
|
287 |
uint32_t remainingTimeMs ); |
|
288 |
|
|
289 |
/** |
|
290 |
* @brief Get the correct ack type to send. |
|
291 |
* |
|
292 |
* @param[in] state Current state of publish. |
|
293 |
* |
|
294 |
* @return Packet Type byte of PUBACK, PUBREC, PUBREL, or PUBCOMP if one of |
|
295 |
* those should be sent, else 0. |
|
296 |
*/ |
|
297 |
static uint8_t getAckTypeToSend( MQTTPublishState_t state ); |
|
298 |
|
|
299 |
/** |
|
300 |
* @brief Send acks for received QoS 1/2 publishes. |
|
301 |
* |
|
302 |
* @param[in] pContext MQTT Connection context. |
|
303 |
* @param[in] packetId packet ID of original PUBLISH. |
|
304 |
* @param[in] publishState Current publish state in record. |
|
305 |
* |
|
306 |
* @return #MQTTSuccess, #MQTTIllegalState or #MQTTSendFailed. |
|
307 |
*/ |
|
308 |
static MQTTStatus_t sendPublishAcks( MQTTContext_t * pContext, |
|
309 |
uint16_t packetId, |
|
310 |
MQTTPublishState_t publishState ); |
|
311 |
|
|
312 |
/** |
|
313 |
* @brief Send a keep alive PINGREQ if the keep alive interval has elapsed. |
|
314 |
* |
|
315 |
* @param[in] pContext Initialized MQTT Context. |
|
316 |
* |
|
317 |
* @return #MQTTKeepAliveTimeout if a PINGRESP is not received in time, |
|
318 |
* #MQTTSendFailed if the PINGREQ cannot be sent, or #MQTTSuccess. |
|
319 |
*/ |
|
320 |
static MQTTStatus_t handleKeepAlive( MQTTContext_t * pContext ); |
|
321 |
|
|
322 |
/** |
|
323 |
* @brief Handle received MQTT PUBLISH packet. |
|
324 |
* |
|
325 |
* @param[in] pContext MQTT Connection context. |
|
326 |
* @param[in] pIncomingPacket Incoming packet. |
|
327 |
* |
|
328 |
* @return MQTTSuccess, MQTTIllegalState or deserialization error. |
|
329 |
*/ |
|
330 |
static MQTTStatus_t handleIncomingPublish( MQTTContext_t * pContext, |
|
331 |
MQTTPacketInfo_t * pIncomingPacket ); |
|
332 |
|
|
333 |
/** |
|
334 |
* @brief Handle received MQTT publish acks. |
|
335 |
* |
|
336 |
* @param[in] pContext MQTT Connection context. |
|
337 |
* @param[in] pIncomingPacket Incoming packet. |
|
338 |
* |
|
339 |
* @return MQTTSuccess, MQTTIllegalState, or deserialization error. |
|
340 |
*/ |
|
341 |
static MQTTStatus_t handlePublishAcks( MQTTContext_t * pContext, |
|
342 |
MQTTPacketInfo_t * pIncomingPacket ); |
|
343 |
|
|
344 |
/** |
|
345 |
* @brief Handle received MQTT ack. |
|
346 |
* |
|
347 |
* @param[in] pContext MQTT Connection context. |
|
348 |
* @param[in] pIncomingPacket Incoming packet. |
|
349 |
* @param[in] manageKeepAlive Flag indicating if PINGRESPs should not be given |
|
350 |
* to the application |
|
351 |
* |
|
352 |
* @return MQTTSuccess, MQTTIllegalState, or deserialization error. |
|
353 |
*/ |
|
354 |
static MQTTStatus_t handleIncomingAck( MQTTContext_t * pContext, |
|
355 |
MQTTPacketInfo_t * pIncomingPacket, |
|
356 |
bool manageKeepAlive ); |
|
357 |
|
|
358 |
/** |
|
359 |
* @brief Run a single iteration of the receive loop. |
|
360 |
* |
|
361 |
* @param[in] pContext MQTT Connection context. |
|
362 |
* @param[in] manageKeepAlive Flag indicating if keep alive should be handled. |
|
363 |
* |
|
364 |
* @return #MQTTRecvFailed if a network error occurs during reception; |
|
365 |
* #MQTTSendFailed if a network error occurs while sending an ACK or PINGREQ; |
|
366 |
* #MQTTBadResponse if an invalid packet is received; |
|
367 |
* #MQTTKeepAliveTimeout if the server has not sent a PINGRESP before |
|
368 |
* #MQTT_PINGRESP_TIMEOUT_MS milliseconds; |
|
369 |
* #MQTTIllegalState if an incoming QoS 1/2 publish or ack causes an |
|
370 |
* invalid transition for the internal state machine; |
|
371 |
* #MQTTSuccess on success. |
|
372 |
*/ |
|
373 |
static MQTTStatus_t receiveSingleIteration( MQTTContext_t * pContext, |
|
374 |
bool manageKeepAlive ); |
|
375 |
|
|
376 |
/** |
|
377 |
* @brief Validates parameters of #MQTT_Subscribe or #MQTT_Unsubscribe. |
|
378 |
* |
|
379 |
* @param[in] pContext Initialized MQTT context. |
|
380 |
* @param[in] pSubscriptionList List of MQTT subscription info. |
|
381 |
* @param[in] subscriptionCount The number of elements in pSubscriptionList. |
|
382 |
* @param[in] packetId Packet identifier. |
|
383 |
* |
|
384 |
* @return #MQTTBadParameter if invalid parameters are passed; |
|
385 |
* #MQTTSuccess otherwise. |
|
386 |
*/ |
|
387 |
static MQTTStatus_t validateSubscribeUnsubscribeParams( const MQTTContext_t * pContext, |
|
388 |
const MQTTSubscribeInfo_t * pSubscriptionList, |
|
389 |
size_t subscriptionCount, |
|
390 |
uint16_t packetId ); |
|
391 |
|
|
392 |
/** |
|
393 |
* @brief Receives a CONNACK MQTT packet. |
|
394 |
* |
|
395 |
* @param[in] pContext Initialized MQTT context. |
|
396 |
* @param[in] timeoutMs Timeout for waiting for CONNACK packet. |
|
397 |
* @param[in] cleanSession Clean session flag set by application. |
|
398 |
* @param[out] pIncomingPacket List of MQTT subscription info. |
|
399 |
* @param[out] pSessionPresent Whether a previous session was present. |
|
400 |
* Only relevant if not establishing a clean session. |
|
401 |
* |
|
402 |
* @return #MQTTBadResponse if a bad response is received; |
|
403 |
* #MQTTNoDataAvailable if no data available for transport recv; |
|
404 |
* ##MQTTRecvFailed if transport recv failed; |
|
405 |
* #MQTTSuccess otherwise. |
|
406 |
*/ |
|
407 |
static MQTTStatus_t receiveConnack( const MQTTContext_t * pContext, |
|
408 |
uint32_t timeoutMs, |
|
409 |
bool cleanSession, |
|
410 |
MQTTPacketInfo_t * pIncomingPacket, |
|
411 |
bool * pSessionPresent ); |
|
412 |
|
|
413 |
/** |
|
414 |
* @brief Resends pending acks for a re-established MQTT session, or |
|
415 |
* clears existing state records for a clean session. |
|
416 |
* |
|
417 |
* @param[in] pContext Initialized MQTT context. |
|
418 |
* @param[in] sessionPresent Session present flag received from the MQTT broker. |
|
419 |
* |
|
420 |
* @return #MQTTSendFailed if transport send during resend failed; |
|
421 |
* #MQTTSuccess otherwise. |
|
422 |
*/ |
|
423 |
static MQTTStatus_t handleSessionResumption( MQTTContext_t * pContext, |
|
424 |
bool sessionPresent ); |
|
425 |
|
|
426 |
|
|
427 |
/** |
|
428 |
* @brief Send the publish packet without copying the topic string and payload in |
|
429 |
* the buffer. |
|
430 |
* |
|
431 |
* @brief param[in] pContext Initialized MQTT context. |
|
432 |
* @brief param[in] pPublishInfo MQTT PUBLISH packet parameters. |
|
433 |
* @brief param[in] pMqttHeader the serialized MQTT header with the header byte; |
|
434 |
* the encoded length of the packet; and the encoded length of the topic string. |
|
435 |
* @brief param[in] headerSize Size of the serialized PUBLISH header. |
|
436 |
* @brief param[in] packetId Packet Id of the publish packet. |
|
437 |
* |
|
438 |
* @return #MQTTSendFailed if transport send during resend failed; |
|
439 |
* #MQTTSuccess otherwise. |
|
440 |
*/ |
|
441 |
static MQTTStatus_t sendPublishWithoutCopy( MQTTContext_t * pContext, |
|
442 |
const MQTTPublishInfo_t * pPublishInfo, |
|
443 |
const uint8_t * pMqttHeader, |
|
444 |
size_t headerSize, |
|
445 |
uint16_t packetId ); |
|
446 |
|
|
447 |
/** |
|
448 |
* @brief Function to validate #MQTT_Publish parameters. |
|
449 |
* |
|
450 |
* @brief param[in] pContext Initialized MQTT context. |
|
451 |
* @brief param[in] pPublishInfo MQTT PUBLISH packet parameters. |
|
452 |
* @brief param[in] packetId Packet Id for the MQTT PUBLISH packet. |
|
453 |
* |
|
454 |
* @return #MQTTBadParameter if invalid parameters are passed; |
|
455 |
* #MQTTSuccess otherwise. |
|
456 |
*/ |
|
457 |
static MQTTStatus_t validatePublishParams( const MQTTContext_t * pContext, |
|
458 |
const MQTTPublishInfo_t * pPublishInfo, |
|
459 |
uint16_t packetId ); |
|
460 |
|
|
461 |
/** |
|
462 |
* @brief Performs matching for special cases when a topic filter ends |
|
463 |
* with a wildcard character. |
|
464 |
* |
|
465 |
* When the topic name has been consumed but there are remaining characters to |
|
466 |
* to match in topic filter, this function handles the following 2 cases: |
|
467 |
* - When the topic filter ends with "/+" or "/#" characters, but the topic |
|
468 |
* name only ends with '/'. |
|
469 |
* - When the topic filter ends with "/#" characters, but the topic name |
|
470 |
* ends at the parent level. |
|
471 |
* |
|
472 |
* @note This function ASSUMES that the topic name been consumed in linear |
|
473 |
* matching with the topic filer, but the topic filter has remaining characters |
|
474 |
* to be matched. |
|
475 |
* |
|
476 |
* @param[in] pTopicFilter The topic filter containing the wildcard. |
|
477 |
* @param[in] topicFilterLength Length of the topic filter being examined. |
|
478 |
* @param[in] filterIndex Index of the topic filter being examined. |
|
479 |
* |
|
480 |
* @return Returns whether the topic filter and the topic name match. |
|
481 |
*/ |
|
482 |
static bool matchEndWildcardsSpecialCases( const char * pTopicFilter, |
|
483 |
uint16_t topicFilterLength, |
|
484 |
uint16_t filterIndex ); |
|
485 |
|
|
486 |
/** |
|
487 |
* @brief Attempt to match topic name with a topic filter starting with a wildcard. |
|
488 |
* |
|
489 |
* If the topic filter starts with a '+' (single-level) wildcard, the function |
|
490 |
* advances the @a pNameIndex by a level in the topic name. |
|
491 |
* If the topic filter starts with a '#' (multi-level) wildcard, the function |
|
492 |
* concludes that both the topic name and topic filter match. |
|
493 |
* |
|
494 |
* @param[in] pTopicName The topic name to match. |
|
495 |
* @param[in] topicNameLength Length of the topic name. |
|
496 |
* @param[in] pTopicFilter The topic filter to match. |
|
497 |
* @param[in] topicFilterLength Length of the topic filter. |
|
498 |
* @param[in,out] pNameIndex Current index in the topic name being examined. It is |
|
499 |
* advanced by one level for `+` wildcards. |
|
500 |
* @param[in, out] pFilterIndex Current index in the topic filter being examined. |
|
501 |
* It is advanced to position of '/' level separator for '+' wildcard. |
|
502 |
* @param[out] pMatch Whether the topic filter and topic name match. |
|
503 |
* |
|
504 |
* @return `true` if the caller of this function should exit; `false` if the |
|
505 |
* caller should continue parsing the topics. |
|
506 |
*/ |
|
507 |
static bool matchWildcards( const char * pTopicName, |
|
508 |
uint16_t topicNameLength, |
|
509 |
const char * pTopicFilter, |
|
510 |
uint16_t topicFilterLength, |
|
511 |
uint16_t * pNameIndex, |
|
512 |
uint16_t * pFilterIndex, |
|
513 |
bool * pMatch ); |
|
514 |
|
|
515 |
/** |
|
516 |
* @brief Match a topic name and topic filter allowing the use of wildcards. |
|
517 |
* |
|
518 |
* @param[in] pTopicName The topic name to check. |
|
519 |
* @param[in] topicNameLength Length of the topic name. |
|
520 |
* @param[in] pTopicFilter The topic filter to check. |
|
521 |
* @param[in] topicFilterLength Length of topic filter. |
|
522 |
* |
|
523 |
* @return `true` if the topic name and topic filter match; `false` otherwise. |
|
524 |
*/ |
|
525 |
static bool matchTopicFilter( const char * pTopicName, |
|
526 |
uint16_t topicNameLength, |
|
527 |
const char * pTopicFilter, |
|
528 |
uint16_t topicFilterLength ); |
|
529 |
|
|
530 |
/*-----------------------------------------------------------*/ |
|
531 |
|
|
532 |
static bool matchEndWildcardsSpecialCases( const char * pTopicFilter, |
|
533 |
uint16_t topicFilterLength, |
|
534 |
uint16_t filterIndex ) |
|
535 |
{ |
|
536 |
bool matchFound = false; |
|
537 |
|
|
538 |
assert( pTopicFilter != NULL ); |
|
539 |
assert( topicFilterLength != 0U ); |
|
540 |
|
|
541 |
/* Check if the topic filter has 2 remaining characters and it ends in |
|
542 |
* "/#". This check handles the case to match filter "sport/#" with topic |
|
543 |
* "sport". The reason is that the '#' wildcard represents the parent and |
|
544 |
* any number of child levels in the topic name.*/ |
|
545 |
if( ( topicFilterLength >= 3U ) && |
|
546 |
( filterIndex == ( topicFilterLength - 3U ) ) && |
|
547 |
( pTopicFilter[ filterIndex + 1U ] == '/' ) && |
|
548 |
( pTopicFilter[ filterIndex + 2U ] == '#' ) ) |
|
549 |
|
|
550 |
{ |
|
551 |
matchFound = true; |
|
552 |
} |
|
553 |
|
|
554 |
/* Check if the next character is "#" or "+" and the topic filter ends in |
|
555 |
* "/#" or "/+". This check handles the cases to match: |
|
556 |
* |
|
557 |
* - Topic filter "sport/+" with topic "sport/". |
|
558 |
* - Topic filter "sport/#" with topic "sport/". |
|
559 |
*/ |
|
560 |
if( ( filterIndex == ( topicFilterLength - 2U ) ) && |
|
561 |
( pTopicFilter[ filterIndex ] == '/' ) ) |
|
562 |
{ |
|
563 |
/* Check that the last character is a wildcard. */ |
|
564 |
matchFound = ( pTopicFilter[ filterIndex + 1U ] == '+' ) || |
|
565 |
( pTopicFilter[ filterIndex + 1U ] == '#' ); |
|
566 |
} |
|
567 |
|
|
568 |
return matchFound; |
|
569 |
} |
|
570 |
|
|
571 |
/*-----------------------------------------------------------*/ |
|
572 |
|
|
573 |
static bool matchWildcards( const char * pTopicName, |
|
574 |
uint16_t topicNameLength, |
|
575 |
const char * pTopicFilter, |
|
576 |
uint16_t topicFilterLength, |
|
577 |
uint16_t * pNameIndex, |
|
578 |
uint16_t * pFilterIndex, |
|
579 |
bool * pMatch ) |
|
580 |
{ |
|
581 |
bool shouldStopMatching = false; |
|
582 |
bool locationIsValidForWildcard; |
|
583 |
|
|
584 |
assert( pTopicName != NULL ); |
|
585 |
assert( topicNameLength != 0U ); |
|
586 |
assert( pTopicFilter != NULL ); |
|
587 |
assert( topicFilterLength != 0U ); |
|
588 |
assert( pNameIndex != NULL ); |
|
589 |
assert( pFilterIndex != NULL ); |
|
590 |
assert( pMatch != NULL ); |
|
591 |
|
|
592 |
/* Wild card in a topic filter is only valid either at the starting position |
|
593 |
* or when it is preceded by a '/'.*/ |
|
594 |
locationIsValidForWildcard = ( *pFilterIndex == 0u ) || |
|
595 |
( pTopicFilter[ *pFilterIndex - 1U ] == '/' ); |
|
596 |
|
|
597 |
if( ( pTopicFilter[ *pFilterIndex ] == '+' ) && ( locationIsValidForWildcard == true ) ) |
|
598 |
{ |
|
599 |
bool nextLevelExistsInTopicName = false; |
|
600 |
bool nextLevelExistsinTopicFilter = false; |
|
601 |
|
|
602 |
/* Move topic name index to the end of the current level. The end of the |
|
603 |
* current level is identified by the last character before the next level |
|
604 |
* separator '/'. */ |
|
605 |
while( *pNameIndex < topicNameLength ) |
|
606 |
{ |
|
607 |
/* Exit the loop if we hit the level separator. */ |
|
608 |
if( pTopicName[ *pNameIndex ] == '/' ) |
|
609 |
{ |
|
610 |
nextLevelExistsInTopicName = true; |
|
611 |
break; |
|
612 |
} |
|
613 |
|
|
614 |
( *pNameIndex )++; |
|
615 |
} |
|
616 |
|
|
617 |
/* Determine if the topic filter contains a child level after the current level |
|
618 |
* represented by the '+' wildcard. */ |
|
619 |
if( ( *pFilterIndex < ( topicFilterLength - 1U ) ) && |
|
620 |
( pTopicFilter[ *pFilterIndex + 1U ] == '/' ) ) |
|
621 |
{ |
|
622 |
nextLevelExistsinTopicFilter = true; |
|
623 |
} |
|
624 |
|
|
625 |
/* If the topic name contains a child level but the topic filter ends at |
|
626 |
* the current level, then there does not exist a match. */ |
|
627 |
if( ( nextLevelExistsInTopicName == true ) && |
|
628 |
( nextLevelExistsinTopicFilter == false ) ) |
|
629 |
{ |
|
630 |
*pMatch = false; |
|
631 |
shouldStopMatching = true; |
|
632 |
} |
|
633 |
|
|
634 |
/* If the topic name and topic filter have child levels, then advance the |
|
635 |
* filter index to the level separator in the topic filter, so that match |
|
636 |
* can be performed in the next level. |
|
637 |
* Note: The name index already points to the level separator in the topic |
|
638 |
* name. */ |
|
639 |
else if( nextLevelExistsInTopicName == true ) |
|
640 |
{ |
|
641 |
( *pFilterIndex )++; |
|
642 |
} |
|
643 |
else |
|
644 |
{ |
|
645 |
/* If we have reached here, the the loop terminated on the |
|
646 |
* ( *pNameIndex < topicNameLength) condition, which means that have |
|
647 |
* reached past the end of the topic name, and thus, we decrement the |
|
648 |
* index to the last character in the topic name.*/ |
|
649 |
( *pNameIndex )--; |
|
650 |
} |
|
651 |
} |
|
652 |
|
|
653 |
/* '#' matches everything remaining in the topic name. It must be the |
|
654 |
* last character in a topic filter. */ |
|
655 |
else if( ( pTopicFilter[ *pFilterIndex ] == '#' ) && |
|
656 |
( *pFilterIndex == ( topicFilterLength - 1U ) ) && |
|
657 |
( locationIsValidForWildcard == true ) ) |
|
658 |
{ |
|
659 |
/* Subsequent characters don't need to be checked for the |
|
660 |
* multi-level wildcard. */ |
|
661 |
*pMatch = true; |
|
662 |
shouldStopMatching = true; |
|
663 |
} |
|
664 |
else |
|
665 |
{ |
|
666 |
/* Any character mismatch other than '+' or '#' means the topic |
|
667 |
* name does not match the topic filter. */ |
|
668 |
*pMatch = false; |
|
669 |
shouldStopMatching = true; |
|
670 |
} |
|
671 |
|
|
672 |
return shouldStopMatching; |
|
673 |
} |
|
674 |
|
|
675 |
/*-----------------------------------------------------------*/ |
|
676 |
|
|
677 |
static bool matchTopicFilter( const char * pTopicName, |
|
678 |
uint16_t topicNameLength, |
|
679 |
const char * pTopicFilter, |
|
680 |
uint16_t topicFilterLength ) |
|
681 |
{ |
|
682 |
bool matchFound = false, shouldStopMatching = false; |
|
683 |
uint16_t nameIndex = 0, filterIndex = 0; |
|
684 |
|
|
685 |
assert( pTopicName != NULL ); |
|
686 |
assert( topicNameLength != 0 ); |
|
687 |
assert( pTopicFilter != NULL ); |
|
688 |
assert( topicFilterLength != 0 ); |
|
689 |
|
|
690 |
while( ( nameIndex < topicNameLength ) && ( filterIndex < topicFilterLength ) ) |
|
691 |
{ |
|
692 |
/* Check if the character in the topic name matches the corresponding |
|
693 |
* character in the topic filter string. */ |
|
694 |
if( pTopicName[ nameIndex ] == pTopicFilter[ filterIndex ] ) |
|
695 |
{ |
|
696 |
/* If the topic name has been consumed but the topic filter has not |
|
697 |
* been consumed, match for special cases when the topic filter ends |
|
698 |
* with wildcard character. */ |
|
699 |
if( nameIndex == ( topicNameLength - 1U ) ) |
|
700 |
{ |
|
701 |
matchFound = matchEndWildcardsSpecialCases( pTopicFilter, |
|
702 |
topicFilterLength, |
|
703 |
filterIndex ); |
|
704 |
} |
|
705 |
} |
|
706 |
else |
|
707 |
{ |
|
708 |
/* Check for matching wildcards. */ |
|
709 |
shouldStopMatching = matchWildcards( pTopicName, |
|
710 |
topicNameLength, |
|
711 |
pTopicFilter, |
|
712 |
topicFilterLength, |
|
713 |
&nameIndex, |
|
714 |
&filterIndex, |
|
715 |
&matchFound ); |
|
716 |
} |
|
717 |
|
|
718 |
if( ( matchFound == true ) || ( shouldStopMatching == true ) ) |
|
719 |
{ |
|
720 |
break; |
|
721 |
} |
|
722 |
|
|
723 |
/* Increment indexes. */ |
|
724 |
nameIndex++; |
|
725 |
filterIndex++; |
|
726 |
} |
|
727 |
|
|
728 |
if( matchFound == false ) |
|
729 |
{ |
|
730 |
/* If the end of both strings has been reached, they match. This represents the |
|
731 |
* case when the topic filter contains the '+' wildcard at a non-starting position. |
|
732 |
* For example, when matching either of "sport/+/player" OR "sport/hockey/+" topic |
|
733 |
* filters with "sport/hockey/player" topic name. */ |
|
734 |
matchFound = ( nameIndex == topicNameLength ) && |
|
735 |
( filterIndex == topicFilterLength ); |
|
736 |
} |
|
737 |
|
|
738 |
return matchFound; |
|
739 |
} |
|
740 |
|
|
741 |
/*-----------------------------------------------------------*/ |
|
742 |
|
|
743 |
static int32_t sendMessageVector( MQTTContext_t * pContext, |
|
744 |
TransportOutVector_t * pIoVec, |
|
745 |
size_t ioVecCount ) |
|
746 |
{ |
|
747 |
int32_t sendResult; |
|
748 |
uint32_t timeoutMs; |
|
749 |
TransportOutVector_t * pIoVectIterator; |
|
750 |
size_t vectorsToBeSent = ioVecCount; |
|
751 |
size_t bytesToSend = 0U; |
|
752 |
int32_t bytesSentOrError = 0; |
|
753 |
|
|
754 |
assert( pContext != NULL ); |
|
755 |
assert( pIoVec != NULL ); |
|
756 |
assert( pContext->getTime != NULL ); |
|
757 |
/* Send must always be defined */ |
|
758 |
assert( pContext->transportInterface.send != NULL ); |
|
759 |
|
|
760 |
/* Count the total number of bytes to be sent as outlined in the vector. */ |
|
761 |
for( pIoVectIterator = pIoVec; pIoVectIterator <= &( pIoVec[ ioVecCount - 1U ] ); pIoVectIterator++ ) |
|
762 |
{ |
|
763 |
bytesToSend += pIoVectIterator->iov_len; |
|
764 |
} |
|
765 |
|
|
766 |
/* Reset the iterator to point to the first entry in the array. */ |
|
767 |
pIoVectIterator = pIoVec; |
|
768 |
|
|
769 |
/* Set the timeout. */ |
|
770 |
timeoutMs = pContext->getTime() + MQTT_SEND_TIMEOUT_MS; |
|
771 |
|
|
772 |
while( ( bytesSentOrError < ( int32_t ) bytesToSend ) && ( bytesSentOrError >= 0 ) ) |
|
773 |
{ |
|
774 |
if( pContext->transportInterface.writev != NULL ) |
|
775 |
{ |
|
776 |
sendResult = pContext->transportInterface.writev( pContext->transportInterface.pNetworkContext, |
|
777 |
pIoVectIterator, |
|
778 |
vectorsToBeSent ); |
|
779 |
} |
|
780 |
else |
|
781 |
{ |
|
782 |
sendResult = pContext->transportInterface.send( pContext->transportInterface.pNetworkContext, |
|
783 |
pIoVectIterator->iov_base, |
|
784 |
pIoVectIterator->iov_len ); |
|
785 |
} |
|
786 |
|
|
787 |
if( sendResult > 0 ) |
|
788 |
{ |
|
789 |
/* It is a bug in the application's transport send implementation if |
|
790 |
* more bytes than expected are sent. */ |
|
791 |
assert( sendResult <= ( ( int32_t ) bytesToSend - bytesSentOrError ) ); |
|
792 |
|
|
793 |
bytesSentOrError += sendResult; |
|
794 |
|
|
795 |
/* Set last transmission time. */ |
|
796 |
pContext->lastPacketTxTime = pContext->getTime(); |
|
797 |
|
|
798 |
LogDebug( ( "sendMessageVector: Bytes Sent=%ld, Bytes Remaining=%lu", |
|
799 |
( long int ) sendResult, |
|
800 |
( unsigned long ) ( bytesToSend - ( size_t ) bytesSentOrError ) ) ); |
|
801 |
} |
|
802 |
else if( sendResult < 0 ) |
|
803 |
{ |
|
804 |
bytesSentOrError = sendResult; |
|
805 |
LogError( ( "sendMessageVector: Unable to send packet: Network Error." ) ); |
|
806 |
} |
|
807 |
else |
|
808 |
{ |
|
809 |
/* MISRA Empty body */ |
|
810 |
} |
|
811 |
|
|
812 |
/* Check for timeout. */ |
|
813 |
if( pContext->getTime() >= timeoutMs ) |
|
814 |
{ |
|
815 |
LogError( ( "sendMessageVector: Unable to send packet: Timed out." ) ); |
|
816 |
break; |
|
817 |
} |
|
818 |
|
|
819 |
/* Update the send pointer to the correct vector and offset. */ |
|
820 |
while( ( pIoVectIterator <= &( pIoVec[ ioVecCount - 1U ] ) ) && |
|
821 |
( sendResult >= ( int32_t ) pIoVectIterator->iov_len ) ) |
|
822 |
{ |
|
823 |
sendResult -= ( int32_t ) pIoVectIterator->iov_len; |
|
824 |
pIoVectIterator++; |
|
825 |
/* Update the number of vector which are yet to be sent. */ |
|
826 |
vectorsToBeSent--; |
|
827 |
} |
|
828 |
|
|
829 |
/* Some of the bytes from this vector were sent as well, update the length |
|
830 |
* and the pointer to data in this vector. */ |
|
831 |
if( ( sendResult > 0 ) && |
|
832 |
( pIoVectIterator <= &( pIoVec[ ioVecCount - 1U ] ) ) ) |
|
833 |
{ |
|
834 |
pIoVectIterator->iov_base = ( const void * ) &( ( ( const uint8_t * ) pIoVectIterator->iov_base )[ sendResult ] ); |
|
835 |
pIoVectIterator->iov_len -= ( size_t ) sendResult; |
|
836 |
} |
|
837 |
} |
|
838 |
|
|
839 |
return bytesSentOrError; |
|
840 |
} |
|
841 |
|
|
842 |
static int32_t sendBuffer( MQTTContext_t * pContext, |
|
843 |
const uint8_t * pBufferToSend, |
|
844 |
size_t bytesToSend ) |
|
845 |
{ |
|
846 |
int32_t sendResult; |
|
847 |
uint32_t timeoutMs; |
|
848 |
int32_t bytesSentOrError = 0; |
|
849 |
const uint8_t * pIndex = pBufferToSend; |
|
850 |
|
|
851 |
assert( pContext != NULL ); |
|
852 |
assert( pContext->getTime != NULL ); |
|
853 |
assert( pContext->transportInterface.send != NULL ); |
|
854 |
assert( pIndex != NULL ); |
|
855 |
|
|
856 |
/* Set the timeout. */ |
|
857 |
timeoutMs = pContext->getTime() + MQTT_SEND_TIMEOUT_MS; |
|
858 |
|
|
859 |
while( ( bytesSentOrError < ( int32_t ) bytesToSend ) && ( bytesSentOrError >= 0 ) ) |
|
860 |
{ |
|
861 |
sendResult = pContext->transportInterface.send( pContext->transportInterface.pNetworkContext, |
|
862 |
pIndex, |
|
863 |
bytesToSend - ( size_t ) bytesSentOrError ); |
|
864 |
|
|
865 |
if( sendResult > 0 ) |
|
866 |
{ |
|
867 |
/* It is a bug in the application's transport send implementation if |
|
868 |
* more bytes than expected are sent. */ |
|
869 |
assert( sendResult <= ( ( int32_t ) bytesToSend - bytesSentOrError ) ); |
|
870 |
|
|
871 |
bytesSentOrError += sendResult; |
|
872 |
pIndex = &pIndex[ sendResult ]; |
|
873 |
|
|
874 |
/* Set last transmission time. */ |
|
875 |
pContext->lastPacketTxTime = pContext->getTime(); |
|
876 |
|
|
877 |
LogDebug( ( "sendBuffer: Bytes Sent=%ld, Bytes Remaining=%lu", |
|
878 |
( long int ) sendResult, |
|
879 |
( unsigned long ) ( bytesToSend - ( size_t ) bytesSentOrError ) ) ); |
|
880 |
} |
|
881 |
else if( sendResult < 0 ) |
|
882 |
{ |
|
883 |
bytesSentOrError = sendResult; |
|
884 |
LogError( ( "sendBuffer: Unable to send packet: Network Error." ) ); |
|
885 |
} |
|
886 |
else |
|
887 |
{ |
|
888 |
/* MISRA Empty body */ |
|
889 |
} |
|
890 |
|
|
891 |
/* Check for timeout. */ |
|
892 |
if( pContext->getTime() >= timeoutMs ) |
|
893 |
{ |
|
894 |
LogError( ( "sendBuffer: Unable to send packet: Timed out." ) ); |
|
895 |
break; |
|
896 |
} |
|
897 |
} |
|
898 |
|
|
899 |
return bytesSentOrError; |
|
900 |
} |
|
901 |
|
|
902 |
/*-----------------------------------------------------------*/ |
|
903 |
|
|
904 |
static uint32_t calculateElapsedTime( uint32_t later, |
|
905 |
uint32_t start ) |
|
906 |
{ |
|
907 |
return later - start; |
|
908 |
} |
|
909 |
|
|
910 |
/*-----------------------------------------------------------*/ |
|
911 |
|
|
912 |
static MQTTPubAckType_t getAckFromPacketType( uint8_t packetType ) |
|
913 |
{ |
|
914 |
MQTTPubAckType_t ackType = MQTTPuback; |
|
915 |
|
|
916 |
switch( packetType ) |
|
917 |
{ |
|
918 |
case MQTT_PACKET_TYPE_PUBACK: |
|
919 |
ackType = MQTTPuback; |
|
920 |
break; |
|
921 |
|
|
922 |
case MQTT_PACKET_TYPE_PUBREC: |
|
923 |
ackType = MQTTPubrec; |
|
924 |
break; |
|
925 |
|
|
926 |
case MQTT_PACKET_TYPE_PUBREL: |
|
927 |
ackType = MQTTPubrel; |
|
928 |
break; |
|
929 |
|
|
930 |
case MQTT_PACKET_TYPE_PUBCOMP: |
|
931 |
default: |
|
932 |
|
|
933 |
/* This function is only called after checking the type is one of |
|
934 |
* the above four values, so packet type must be PUBCOMP here. */ |
|
935 |
assert( packetType == MQTT_PACKET_TYPE_PUBCOMP ); |
|
936 |
ackType = MQTTPubcomp; |
|
937 |
break; |
|
938 |
} |
|
939 |
|
|
940 |
return ackType; |
|
941 |
} |
|
942 |
|
|
943 |
/*-----------------------------------------------------------*/ |
|
944 |
|
|
945 |
static int32_t recvExact( const MQTTContext_t * pContext, |
|
946 |
size_t bytesToRecv ) |
|
947 |
{ |
|
948 |
uint8_t * pIndex = NULL; |
|
949 |
size_t bytesRemaining = bytesToRecv; |
|
950 |
int32_t totalBytesRecvd = 0, bytesRecvd; |
|
951 |
uint32_t lastDataRecvTimeMs = 0U, timeSinceLastRecvMs = 0U; |
|
952 |
TransportRecv_t recvFunc = NULL; |
|
953 |
MQTTGetCurrentTimeFunc_t getTimeStampMs = NULL; |
|
954 |
bool receiveError = false; |
|
955 |
|
|
956 |
assert( pContext != NULL ); |
|
957 |
assert( bytesToRecv <= pContext->networkBuffer.size ); |
|
958 |
assert( pContext->getTime != NULL ); |
|
959 |
assert( pContext->transportInterface.recv != NULL ); |
|
960 |
assert( pContext->networkBuffer.pBuffer != NULL ); |
|
961 |
|
|
962 |
pIndex = pContext->networkBuffer.pBuffer; |
|
963 |
recvFunc = pContext->transportInterface.recv; |
|
964 |
getTimeStampMs = pContext->getTime; |
|
965 |
|
|
966 |
/* Part of the MQTT packet has been read before calling this function. */ |
|
967 |
lastDataRecvTimeMs = getTimeStampMs(); |
|
968 |
|
|
969 |
while( ( bytesRemaining > 0U ) && ( receiveError == false ) ) |
|
970 |
{ |
|
971 |
bytesRecvd = recvFunc( pContext->transportInterface.pNetworkContext, |
|
972 |
pIndex, |
|
973 |
bytesRemaining ); |
|
974 |
|
|
975 |
if( bytesRecvd < 0 ) |
|
976 |
{ |
|
977 |
LogError( ( "Network error while receiving packet: ReturnCode=%ld.", |
|
978 |
( long int ) bytesRecvd ) ); |
|
979 |
totalBytesRecvd = bytesRecvd; |
|
980 |
receiveError = true; |
|
981 |
} |
|
982 |
else if( bytesRecvd > 0 ) |
|
983 |
{ |
|
984 |
/* Reset the starting time as we have received some data from the network. */ |
|
985 |
lastDataRecvTimeMs = getTimeStampMs(); |
|
986 |
|
|
987 |
/* It is a bug in the application's transport receive implementation |
|
988 |
* if more bytes than expected are received. To avoid a possible |
|
989 |
* overflow in converting bytesRemaining from unsigned to signed, |
|
990 |
* this assert must exist after the check for bytesRecvd being |
|
991 |
* negative. */ |
|
992 |
assert( ( size_t ) bytesRecvd <= bytesRemaining ); |
|
993 |
|
|
994 |
bytesRemaining -= ( size_t ) bytesRecvd; |
|
995 |
totalBytesRecvd += ( int32_t ) bytesRecvd; |
|
996 |
/* Increment the index. */ |
|
997 |
pIndex = &pIndex[ bytesRecvd ]; |
|
998 |
LogDebug( ( "BytesReceived=%ld, BytesRemaining=%lu, TotalBytesReceived=%ld.", |
|
999 |
( long int ) bytesRecvd, |
|
1000 |
( unsigned long ) bytesRemaining, |
|
1001 |
( long int ) totalBytesRecvd ) ); |
|
1002 |
} |
|
1003 |
else |
|
1004 |
{ |
|
1005 |
/* No bytes were read from the network. */ |
|
1006 |
timeSinceLastRecvMs = calculateElapsedTime( getTimeStampMs(), lastDataRecvTimeMs ); |
|
1007 |
|
|
1008 |
/* Check for timeout if we have been waiting to receive any byte on the network. */ |
|
1009 |
if( timeSinceLastRecvMs >= MQTT_RECV_POLLING_TIMEOUT_MS ) |
|
1010 |
{ |
|
1011 |
LogError( ( "Unable to receive packet: Timed out in transport recv." ) ); |
|
1012 |
receiveError = true; |
|
1013 |
} |
|
1014 |
} |
|
1015 |
} |
|
1016 |
|
|
1017 |
return totalBytesRecvd; |
|
1018 |
} |
|
1019 |
|
|
1020 |
/*-----------------------------------------------------------*/ |
|
1021 |
|
|
1022 |
static MQTTStatus_t discardPacket( const MQTTContext_t * pContext, |
|
1023 |
size_t remainingLength, |
|
1024 |
uint32_t timeoutMs ) |
|
1025 |
{ |
|
1026 |
MQTTStatus_t status = MQTTRecvFailed; |
|
1027 |
int32_t bytesReceived = 0; |
|
1028 |
size_t bytesToReceive = 0U; |
|
1029 |
uint32_t totalBytesReceived = 0U; |
|
1030 |
uint32_t entryTimeMs = 0U; |
|
1031 |
uint32_t elapsedTimeMs = 0U; |
|
1032 |
MQTTGetCurrentTimeFunc_t getTimeStampMs = NULL; |
|
1033 |
bool receiveError = false; |
|
1034 |
|
|
1035 |
assert( pContext != NULL ); |
|
1036 |
assert( pContext->getTime != NULL ); |
|
1037 |
|
|
1038 |
bytesToReceive = pContext->networkBuffer.size; |
|
1039 |
getTimeStampMs = pContext->getTime; |
|
1040 |
|
|
1041 |
entryTimeMs = getTimeStampMs(); |
|
1042 |
|
|
1043 |
while( ( totalBytesReceived < remainingLength ) && ( receiveError == false ) ) |
|
1044 |
{ |
|
1045 |
if( ( remainingLength - totalBytesReceived ) < bytesToReceive ) |
|
1046 |
{ |
|
1047 |
bytesToReceive = remainingLength - totalBytesReceived; |
|
1048 |
} |
|
1049 |
|
|
1050 |
bytesReceived = recvExact( pContext, bytesToReceive ); |
|
1051 |
|
|
1052 |
if( bytesReceived != ( int32_t ) bytesToReceive ) |
|
1053 |
{ |
|
1054 |
LogError( ( "Receive error while discarding packet." |
|
1055 |
"ReceivedBytes=%ld, ExpectedBytes=%lu.", |
|
1056 |
( long int ) bytesReceived, |
|
1057 |
( unsigned long ) bytesToReceive ) ); |
|
1058 |
receiveError = true; |
|
1059 |
} |
|
1060 |
else |
|
1061 |
{ |
|
1062 |
totalBytesReceived += ( uint32_t ) bytesReceived; |
|
1063 |
|
|
1064 |
elapsedTimeMs = calculateElapsedTime( getTimeStampMs(), entryTimeMs ); |
|
1065 |
|
|
1066 |
/* Check for timeout. */ |
|
1067 |
if( elapsedTimeMs >= timeoutMs ) |
|
1068 |
{ |
|
1069 |
LogError( ( "Time expired while discarding packet." ) ); |
|
1070 |
receiveError = true; |
|
1071 |
} |
|
1072 |
} |
|
1073 |
} |
|
1074 |
|
|
1075 |
if( totalBytesReceived == remainingLength ) |
|
1076 |
{ |
|
1077 |
LogError( ( "Dumped packet. DumpedBytes=%lu.", |
|
1078 |
( unsigned long ) totalBytesReceived ) ); |
|
1079 |
/* Packet dumped, so no data is available. */ |
|
1080 |
status = MQTTNoDataAvailable; |
|
1081 |
} |
|
1082 |
|
|
1083 |
return status; |
|
1084 |
} |
|
1085 |
|
|
1086 |
/*-----------------------------------------------------------*/ |
|
1087 |
|
|
1088 |
static MQTTStatus_t discardStoredPacket( MQTTContext_t * pContext, |
|
1089 |
const MQTTPacketInfo_t * pPacketInfo ) |
|
1090 |
{ |
|
1091 |
MQTTStatus_t status = MQTTRecvFailed; |
|
1092 |
int32_t bytesReceived = 0; |
|
1093 |
size_t bytesToReceive = 0U; |
|
1094 |
uint32_t totalBytesReceived = 0U; |
|
1095 |
bool receiveError = false; |
|
1096 |
size_t mqttPacketSize = 0; |
|
1097 |
size_t remainingLength; |
|
1098 |
|
|
1099 |
assert( pContext != NULL ); |
|
1100 |
assert( pPacketInfo != NULL ); |
|
1101 |
|
|
1102 |
mqttPacketSize = pPacketInfo->remainingLength + pPacketInfo->headerLength; |
|
1103 |
|
|
1104 |
/* Assert that the packet being discarded is bigger than the |
|
1105 |
* receive buffer. */ |
|
1106 |
assert( mqttPacketSize > pContext->networkBuffer.size ); |
|
1107 |
|
|
1108 |
/* Discard these many bytes at a time. */ |
|
1109 |
bytesToReceive = pContext->networkBuffer.size; |
|
1110 |
|
|
1111 |
/* Number of bytes depicted by 'index' have already been received. */ |
|
1112 |
remainingLength = mqttPacketSize - pContext->index; |
|
1113 |
|
|
1114 |
while( ( totalBytesReceived < remainingLength ) && ( receiveError == false ) ) |
|
1115 |
{ |
|
1116 |
if( ( remainingLength - totalBytesReceived ) < bytesToReceive ) |
|
1117 |
{ |
|
1118 |
bytesToReceive = remainingLength - totalBytesReceived; |
|
1119 |
} |
|
1120 |
|
|
1121 |
bytesReceived = recvExact( pContext, bytesToReceive ); |
|
1122 |
|
|
1123 |
if( bytesReceived != ( int32_t ) bytesToReceive ) |
|
1124 |
{ |
|
1125 |
LogError( ( "Receive error while discarding packet." |
|
1126 |
"ReceivedBytes=%ld, ExpectedBytes=%lu.", |
|
1127 |
( long int ) bytesReceived, |
|
1128 |
( unsigned long ) bytesToReceive ) ); |
|
1129 |
receiveError = true; |
|
1130 |
} |
|
1131 |
else |
|
1132 |
{ |
|
1133 |
totalBytesReceived += ( uint32_t ) bytesReceived; |
|
1134 |
} |
|
1135 |
} |
|
1136 |
|
|
1137 |
if( totalBytesReceived == remainingLength ) |
|
1138 |
{ |
|
1139 |
LogError( ( "Dumped packet. DumpedBytes=%lu.", |
|
1140 |
( unsigned long ) totalBytesReceived ) ); |
|
1141 |
/* Packet dumped, so no data is available. */ |
|
1142 |
status = MQTTNoDataAvailable; |
|
1143 |
} |
|
1144 |
|
|
1145 |
/* Clear the buffer */ |
|
1146 |
( void ) memset( pContext->networkBuffer.pBuffer, |
|
1147 |
0, |
|
1148 |
pContext->networkBuffer.size ); |
|
1149 |
|
|
1150 |
/* Reset the index. */ |
|
1151 |
pContext->index = 0; |
|
1152 |
|
|
1153 |
return status; |
|
1154 |
} |
|
1155 |
|
|
1156 |
/*-----------------------------------------------------------*/ |
|
1157 |
|
|
1158 |
static MQTTStatus_t receivePacket( const MQTTContext_t * pContext, |
|
1159 |
MQTTPacketInfo_t incomingPacket, |
|
1160 |
uint32_t remainingTimeMs ) |
|
1161 |
{ |
|
1162 |
MQTTStatus_t status = MQTTSuccess; |
|
1163 |
int32_t bytesReceived = 0; |
|
1164 |
size_t bytesToReceive = 0U; |
|
1165 |
|
|
1166 |
assert( pContext != NULL ); |
|
1167 |
assert( pContext->networkBuffer.pBuffer != NULL ); |
|
1168 |
|
|
1169 |
if( incomingPacket.remainingLength > pContext->networkBuffer.size ) |
|
1170 |
{ |
|
1171 |
LogError( ( "Incoming packet will be dumped: " |
|
1172 |
"Packet length exceeds network buffer size." |
|
1173 |
"PacketSize=%lu, NetworkBufferSize=%lu.", |
|
1174 |
( unsigned long ) incomingPacket.remainingLength, |
|
1175 |
( unsigned long ) pContext->networkBuffer.size ) ); |
|
1176 |
status = discardPacket( pContext, |
|
1177 |
incomingPacket.remainingLength, |
|
1178 |
remainingTimeMs ); |
|
1179 |
} |
|
1180 |
else |
|
1181 |
{ |
|
1182 |
bytesToReceive = incomingPacket.remainingLength; |
|
1183 |
bytesReceived = recvExact( pContext, bytesToReceive ); |
|
1184 |
|
|
1185 |
if( bytesReceived == ( int32_t ) bytesToReceive ) |
|
1186 |
{ |
|
1187 |
/* Receive successful, bytesReceived == bytesToReceive. */ |
|
1188 |
LogDebug( ( "Packet received. ReceivedBytes=%ld.", |
|
1189 |
( long int ) bytesReceived ) ); |
|
1190 |
} |
|
1191 |
else |
|
1192 |
{ |
|
1193 |
LogError( ( "Packet reception failed. ReceivedBytes=%ld, " |
|
1194 |
"ExpectedBytes=%lu.", |
|
1195 |
( long int ) bytesReceived, |
|
1196 |
( unsigned long ) bytesToReceive ) ); |
|
1197 |
status = MQTTRecvFailed; |
|
1198 |
} |
|
1199 |
} |
|
1200 |
|
|
1201 |
return status; |
|
1202 |
} |
|
1203 |
|
|
1204 |
/*-----------------------------------------------------------*/ |
|
1205 |
|
|
1206 |
static uint8_t getAckTypeToSend( MQTTPublishState_t state ) |
|
1207 |
{ |
|
1208 |
uint8_t packetTypeByte = 0U; |
|
1209 |
|
|
1210 |
switch( state ) |
|
1211 |
{ |
|
1212 |
case MQTTPubAckSend: |
|
1213 |
packetTypeByte = MQTT_PACKET_TYPE_PUBACK; |
|
1214 |
break; |
|
1215 |
|
|
1216 |
case MQTTPubRecSend: |
|
1217 |
packetTypeByte = MQTT_PACKET_TYPE_PUBREC; |
|
1218 |
break; |
|
1219 |
|
|
1220 |
case MQTTPubRelSend: |
|
1221 |
packetTypeByte = MQTT_PACKET_TYPE_PUBREL; |
|
1222 |
break; |
|
1223 |
|
|
1224 |
case MQTTPubCompSend: |
|
1225 |
packetTypeByte = MQTT_PACKET_TYPE_PUBCOMP; |
|
1226 |
break; |
|
1227 |
|
|
1228 |
case MQTTPubAckPending: |
|
1229 |
case MQTTPubCompPending: |
|
1230 |
case MQTTPubRecPending: |
|
1231 |
case MQTTPubRelPending: |
|
1232 |
case MQTTPublishDone: |
|
1233 |
case MQTTPublishSend: |
|
1234 |
case MQTTStateNull: |
|
1235 |
default: |
|
1236 |
/* Take no action for states that do not require sending an ack. */ |
|
1237 |
break; |
|
1238 |
} |
|
1239 |
|
|
1240 |
return packetTypeByte; |
|
1241 |
} |
|
1242 |
|
|
1243 |
/*-----------------------------------------------------------*/ |
|
1244 |
|
|
1245 |
static MQTTStatus_t sendPublishAcks( MQTTContext_t * pContext, |
|
1246 |
uint16_t packetId, |
|
1247 |
MQTTPublishState_t publishState ) |
|
1248 |
{ |
|
1249 |
MQTTStatus_t status = MQTTSuccess; |
|
1250 |
MQTTPublishState_t newState = MQTTStateNull; |
|
1251 |
int32_t sendResult = 0; |
|
1252 |
uint8_t packetTypeByte = 0U; |
|
1253 |
MQTTPubAckType_t packetType; |
|
1254 |
MQTTFixedBuffer_t localBuffer; |
|
1255 |
uint8_t pubAckPacket[ MQTT_PUBLISH_ACK_PACKET_SIZE ]; |
|
1256 |
|
|
1257 |
localBuffer.pBuffer = pubAckPacket; |
|
1258 |
localBuffer.size = MQTT_PUBLISH_ACK_PACKET_SIZE; |
|
1259 |
|
|
1260 |
assert( pContext != NULL ); |
|
1261 |
|
|
1262 |
packetTypeByte = getAckTypeToSend( publishState ); |
|
1263 |
|
|
1264 |
if( packetTypeByte != 0U ) |
|
1265 |
{ |
|
1266 |
packetType = getAckFromPacketType( packetTypeByte ); |
|
1267 |
|
|
1268 |
status = MQTT_SerializeAck( &localBuffer, |
|
1269 |
packetTypeByte, |
|
1270 |
packetId ); |
|
1271 |
|
|
1272 |
if( status == MQTTSuccess ) |
|
1273 |
{ |
|
1274 |
MQTT_PRE_SEND_HOOK( pContext ); |
|
1275 |
|
|
1276 |
/* Here, we are not using the vector approach for efficiency. There is just one buffer |
|
1277 |
* to be sent which can be achieved with a normal send call. */ |
|
1278 |
sendResult = sendBuffer( pContext, |
|
1279 |
localBuffer.pBuffer, |
|
1280 |
MQTT_PUBLISH_ACK_PACKET_SIZE ); |
|
1281 |
|
|
1282 |
MQTT_POST_SEND_HOOK( pContext ); |
|
1283 |
} |
|
1284 |
|
|
1285 |
if( sendResult == ( int32_t ) MQTT_PUBLISH_ACK_PACKET_SIZE ) |
|
1286 |
{ |
|
1287 |
pContext->controlPacketSent = true; |
|
1288 |
|
|
1289 |
MQTT_PRE_STATE_UPDATE_HOOK( pContext ); |
|
1290 |
|
|
1291 |
status = MQTT_UpdateStateAck( pContext, |
|
1292 |
packetId, |
|
1293 |
packetType, |
|
1294 |
MQTT_SEND, |
|
1295 |
&newState ); |
|
1296 |
|
|
1297 |
MQTT_POST_STATE_UPDATE_HOOK( pContext ); |
|
1298 |
|
|
1299 |
if( status != MQTTSuccess ) |
|
1300 |
{ |
|
1301 |
LogError( ( "Failed to update state of publish %hu.", |
|
1302 |
( unsigned short ) packetId ) ); |
|
1303 |
} |
|
1304 |
} |
|
1305 |
else |
|
1306 |
{ |
|
1307 |
LogError( ( "Failed to send ACK packet: PacketType=%02x, SentBytes=%ld, " |
|
1308 |
"PacketSize=%lu.", |
|
1309 |
( unsigned int ) packetTypeByte, ( long int ) sendResult, |
|
1310 |
MQTT_PUBLISH_ACK_PACKET_SIZE ) ); |
|
1311 |
status = MQTTSendFailed; |
|
1312 |
} |
|
1313 |
} |
|
1314 |
|
|
1315 |
return status; |
|
1316 |
} |
|
1317 |
|
|
1318 |
/*-----------------------------------------------------------*/ |
|
1319 |
|
|
1320 |
static MQTTStatus_t handleKeepAlive( MQTTContext_t * pContext ) |
|
1321 |
{ |
|
1322 |
MQTTStatus_t status = MQTTSuccess; |
|
1323 |
uint32_t now = 0U; |
|
1324 |
uint32_t packetTxTimeoutMs = 0U; |
|
1325 |
|
|
1326 |
assert( pContext != NULL ); |
|
1327 |
assert( pContext->getTime != NULL ); |
|
1328 |
|
|
1329 |
now = pContext->getTime(); |
|
1330 |
|
|
1331 |
packetTxTimeoutMs = 1000U * ( uint32_t ) pContext->keepAliveIntervalSec; |
|
1332 |
|
|
1333 |
if( PACKET_TX_TIMEOUT_MS < packetTxTimeoutMs ) |
|
1334 |
{ |
|
1335 |
packetTxTimeoutMs = PACKET_TX_TIMEOUT_MS; |
|
1336 |
} |
|
1337 |
|
|
1338 |
/* If keep alive interval is 0, it is disabled. */ |
|
1339 |
if( pContext->waitingForPingResp == true ) |
|
1340 |
{ |
|
1341 |
/* Has time expired? */ |
|
1342 |
if( calculateElapsedTime( now, pContext->pingReqSendTimeMs ) > |
|
1343 |
MQTT_PINGRESP_TIMEOUT_MS ) |
|
1344 |
{ |
|
1345 |
status = MQTTKeepAliveTimeout; |
|
1346 |
} |
|
1347 |
} |
|
1348 |
else |
|
1349 |
{ |
|
1350 |
if( ( packetTxTimeoutMs != 0U ) && ( calculateElapsedTime( now, pContext->lastPacketTxTime ) >= packetTxTimeoutMs ) ) |
|
1351 |
{ |
|
1352 |
status = MQTT_Ping( pContext ); |
|
1353 |
} |
|
1354 |
else |
|
1355 |
{ |
|
1356 |
const uint32_t timeElapsed = calculateElapsedTime( now, pContext->lastPacketRxTime ); |
|
1357 |
|
|
1358 |
if( ( timeElapsed != 0U ) && ( timeElapsed >= PACKET_RX_TIMEOUT_MS ) ) |
|
1359 |
{ |
|
1360 |
status = MQTT_Ping( pContext ); |
|
1361 |
} |
|
1362 |
} |
|
1363 |
} |
|
1364 |
|
|
1365 |
return status; |
|
1366 |
} |
|
1367 |
|
|
1368 |
/*-----------------------------------------------------------*/ |
|
1369 |
|
|
1370 |
static MQTTStatus_t handleIncomingPublish( MQTTContext_t * pContext, |
|
1371 |
MQTTPacketInfo_t * pIncomingPacket ) |
|
1372 |
{ |
|
1373 |
MQTTStatus_t status = MQTTBadParameter; |
|
1374 |
MQTTPublishState_t publishRecordState = MQTTStateNull; |
|
1375 |
uint16_t packetIdentifier = 0U; |
|
1376 |
MQTTPublishInfo_t publishInfo; |
|
1377 |
MQTTDeserializedInfo_t deserializedInfo; |
|
1378 |
bool duplicatePublish = false; |
|
1379 |
|
|
1380 |
assert( pContext != NULL ); |
|
1381 |
assert( pIncomingPacket != NULL ); |
|
1382 |
assert( pContext->appCallback != NULL ); |
|
1383 |
|
|
1384 |
status = MQTT_DeserializePublish( pIncomingPacket, &packetIdentifier, &publishInfo ); |
|
1385 |
LogInfo( ( "De-serialized incoming PUBLISH packet: DeserializerResult=%s.", |
|
1386 |
MQTT_Status_strerror( status ) ) ); |
|
1387 |
|
|
1388 |
if( ( status == MQTTSuccess ) && |
|
1389 |
( pContext->incomingPublishRecords == NULL ) && |
|
1390 |
( publishInfo.qos > MQTTQoS0 ) ) |
|
1391 |
{ |
|
1392 |
LogError( ( "Incoming publish has QoS > MQTTQoS0 but incoming " |
|
1393 |
"publish records have not been initialized. Dropping the " |
|
1394 |
"incoming publish. Please call MQTT_InitStatefulQoS to enable " |
|
1395 |
"use of QoS1 and QoS2 publishes." ) ); |
|
1396 |
status = MQTTRecvFailed; |
|
1397 |
} |
|
1398 |
|
|
1399 |
if( status == MQTTSuccess ) |
|
1400 |
{ |
|
1401 |
MQTT_PRE_STATE_UPDATE_HOOK( pContext ); |
|
1402 |
|
|
1403 |
status = MQTT_UpdateStatePublish( pContext, |
|
1404 |
packetIdentifier, |
|
1405 |
MQTT_RECEIVE, |
|
1406 |
publishInfo.qos, |
|
1407 |
&publishRecordState ); |
|
1408 |
|
|
1409 |
MQTT_POST_STATE_UPDATE_HOOK( pContext ); |
|
1410 |
|
|
1411 |
if( status == MQTTSuccess ) |
|
1412 |
{ |
|
1413 |
LogInfo( ( "State record updated. New state=%s.", |
|
1414 |
MQTT_State_strerror( publishRecordState ) ) ); |
|
1415 |
} |
|
1416 |
|
|
1417 |
/* Different cases in which an incoming publish with duplicate flag is |
|
1418 |
* handled are as listed below. |
|
1419 |
* 1. No collision - This is the first instance of the incoming publish |
|
1420 |
* packet received or an earlier received packet state is lost. This |
|
1421 |
* will be handled as a new incoming publish for both QoS1 and QoS2 |
|
1422 |
* publishes. |
|
1423 |
* 2. Collision - The incoming packet was received before and a state |
|
1424 |
* record is present in the state engine. For QoS1 and QoS2 publishes |
|
1425 |
* this case can happen at 2 different cases and handling is |
|
1426 |
* different. |
|
1427 |
* a. QoS1 - If a PUBACK is not successfully sent for the incoming |
|
1428 |
* publish due to a connection issue, it can result in broker |
|
1429 |
* sending out a duplicate publish with dup flag set, when a |
|
1430 |
* session is reestablished. It can result in a collision in |
|
1431 |
* state engine. This will be handled by processing the incoming |
|
1432 |
* publish as a new publish ignoring the |
|
1433 |
* #MQTTStateCollision status from the state engine. The publish |
|
1434 |
* data is not passed to the application. |
|
1435 |
* b. QoS2 - If a PUBREC is not successfully sent for the incoming |
|
1436 |
* publish or the PUBREC sent is not successfully received by the |
|
1437 |
* broker due to a connection issue, it can result in broker |
|
1438 |
* sending out a duplicate publish with dup flag set, when a |
|
1439 |
* session is reestablished. It can result in a collision in |
|
1440 |
* state engine. This will be handled by ignoring the |
|
1441 |
* #MQTTStateCollision status from the state engine. The publish |
|
1442 |
* data is not passed to the application. */ |
|
1443 |
else if( status == MQTTStateCollision ) |
|
1444 |
{ |
|
1445 |
status = MQTTSuccess; |
|
1446 |
duplicatePublish = true; |
|
1447 |
|
|
1448 |
/* Calculate the state for the ack packet that needs to be sent out |
|
1449 |
* for the duplicate incoming publish. */ |
|
1450 |
publishRecordState = MQTT_CalculateStatePublish( MQTT_RECEIVE, |
|
1451 |
publishInfo.qos ); |
|
1452 |
|
|
1453 |
LogDebug( ( "Incoming publish packet with packet id %hu already exists.", |
|
1454 |
( unsigned short ) packetIdentifier ) ); |
|
1455 |
|
|
1456 |
if( publishInfo.dup == false ) |
|
1457 |
{ |
|
1458 |
LogError( ( "DUP flag is 0 for duplicate packet (MQTT-3.3.1.-1)." ) ); |
|
1459 |
} |
|
1460 |
} |
|
1461 |
else |
|
1462 |
{ |
|
1463 |
LogError( ( "Error in updating publish state for incoming publish with packet id %hu." |
|
1464 |
" Error is %s", |
|
1465 |
( unsigned short ) packetIdentifier, |
|
1466 |
MQTT_Status_strerror( status ) ) ); |
|
1467 |
} |
|
1468 |
} |
|
1469 |
|
|
1470 |
if( status == MQTTSuccess ) |
|
1471 |
{ |
|
1472 |
/* Set fields of deserialized struct. */ |
|
1473 |
deserializedInfo.packetIdentifier = packetIdentifier; |
|
1474 |
deserializedInfo.pPublishInfo = &publishInfo; |
|
1475 |
deserializedInfo.deserializationResult = status; |
|
1476 |
|
|
1477 |
/* Invoke application callback to hand the buffer over to application |
|
1478 |
* before sending acks. |
|
1479 |
* Application callback will be invoked for all publishes, except for |
|
1480 |
* duplicate incoming publishes. */ |
|
1481 |
if( duplicatePublish == false ) |
|
1482 |
{ |
|
1483 |
pContext->appCallback( pContext, |
|
1484 |
pIncomingPacket, |
|
1485 |
&deserializedInfo ); |
|
1486 |
} |
|
1487 |
|
|
1488 |
/* Send PUBACK or PUBREC if necessary. */ |
|
1489 |
status = sendPublishAcks( pContext, |
|
1490 |
packetIdentifier, |
|
1491 |
publishRecordState ); |
|
1492 |
} |
|
1493 |
|
|
1494 |
return status; |
|
1495 |
} |
|
1496 |
|
|
1497 |
/*-----------------------------------------------------------*/ |
|
1498 |
|
|
1499 |
static MQTTStatus_t handlePublishAcks( MQTTContext_t * pContext, |
|
1500 |
MQTTPacketInfo_t * pIncomingPacket ) |
|
1501 |
{ |
|
1502 |
MQTTStatus_t status = MQTTBadResponse; |
|
1503 |
MQTTPublishState_t publishRecordState = MQTTStateNull; |
|
1504 |
uint16_t packetIdentifier; |
|
1505 |
MQTTPubAckType_t ackType; |
|
1506 |
MQTTEventCallback_t appCallback; |
|
1507 |
MQTTDeserializedInfo_t deserializedInfo; |
|
1508 |
|
|
1509 |
assert( pContext != NULL ); |
|
1510 |
assert( pIncomingPacket != NULL ); |
|
1511 |
assert( pContext->appCallback != NULL ); |
|
1512 |
|
|
1513 |
appCallback = pContext->appCallback; |
|
1514 |
|
|
1515 |
ackType = getAckFromPacketType( pIncomingPacket->type ); |
|
1516 |
status = MQTT_DeserializeAck( pIncomingPacket, &packetIdentifier, NULL ); |
|
1517 |
LogInfo( ( "Ack packet deserialized with result: %s.", |
|
1518 |
MQTT_Status_strerror( status ) ) ); |
|
1519 |
|
|
1520 |
if( status == MQTTSuccess ) |
|
1521 |
{ |
|
1522 |
MQTT_PRE_STATE_UPDATE_HOOK( pContext ); |
|
1523 |
|
|
1524 |
status = MQTT_UpdateStateAck( pContext, |
|
1525 |
packetIdentifier, |
|
1526 |
ackType, |
|
1527 |
MQTT_RECEIVE, |
|
1528 |
&publishRecordState ); |
|
1529 |
|
|
1530 |
MQTT_POST_STATE_UPDATE_HOOK( pContext ); |
|
1531 |
|
|
1532 |
if( status == MQTTSuccess ) |
|
1533 |
{ |
|
1534 |
LogInfo( ( "State record updated. New state=%s.", |
|
1535 |
MQTT_State_strerror( publishRecordState ) ) ); |
|
1536 |
} |
|
1537 |
else |
|
1538 |
{ |
|
1539 |
LogError( ( "Updating the state engine for packet id %hu" |
|
1540 |
" failed with error %s.", |
|
1541 |
( unsigned short ) packetIdentifier, |
|
1542 |
MQTT_Status_strerror( status ) ) ); |
|
1543 |
} |
|
1544 |
} |
|
1545 |
|
|
1546 |
if( status == MQTTSuccess ) |
|
1547 |
{ |
|
1548 |
/* Set fields of deserialized struct. */ |
|
1549 |
deserializedInfo.packetIdentifier = packetIdentifier; |
|
1550 |
deserializedInfo.deserializationResult = status; |
|
1551 |
deserializedInfo.pPublishInfo = NULL; |
|
1552 |
|
|
1553 |
/* Invoke application callback to hand the buffer over to application |
|
1554 |
* before sending acks. */ |
|
1555 |
appCallback( pContext, pIncomingPacket, &deserializedInfo ); |
|
1556 |
|
|
1557 |
/* Send PUBREL or PUBCOMP if necessary. */ |
|
1558 |
status = sendPublishAcks( pContext, |
|
1559 |
packetIdentifier, |
|
1560 |
publishRecordState ); |
|
1561 |
} |
|
1562 |
|
|
1563 |
return status; |
|
1564 |
} |
|
1565 |
|
|
1566 |
/*-----------------------------------------------------------*/ |
|
1567 |
|
|
1568 |
static MQTTStatus_t handleIncomingAck( MQTTContext_t * pContext, |
|
1569 |
MQTTPacketInfo_t * pIncomingPacket, |
|
1570 |
bool manageKeepAlive ) |
|
1571 |
{ |
|
1572 |
MQTTStatus_t status = MQTTBadResponse; |
|
1573 |
uint16_t packetIdentifier = MQTT_PACKET_ID_INVALID; |
|
1574 |
MQTTDeserializedInfo_t deserializedInfo; |
|
1575 |
|
|
1576 |
/* We should always invoke the app callback unless we receive a PINGRESP |
|
1577 |
* and are managing keep alive, or if we receive an unknown packet. We |
|
1578 |
* initialize this to false since the callback must be invoked before |
|
1579 |
* sending any PUBREL or PUBCOMP. However, for other cases, we invoke it |
|
1580 |
* at the end to reduce the complexity of this function. */ |
|
1581 |
bool invokeAppCallback = false; |
|
1582 |
MQTTEventCallback_t appCallback = NULL; |
|
1583 |
|
|
1584 |
assert( pContext != NULL ); |
|
1585 |
assert( pIncomingPacket != NULL ); |
|
1586 |
assert( pContext->appCallback != NULL ); |
|
1587 |
|
|
1588 |
appCallback = pContext->appCallback; |
|
1589 |
|
|
1590 |
LogDebug( ( "Received packet of type %02x.", |
|
1591 |
( unsigned int ) pIncomingPacket->type ) ); |
|
1592 |
|
|
1593 |
switch( pIncomingPacket->type ) |
|
1594 |
{ |
|
1595 |
case MQTT_PACKET_TYPE_PUBACK: |
|
1596 |
case MQTT_PACKET_TYPE_PUBREC: |
|
1597 |
case MQTT_PACKET_TYPE_PUBREL: |
|
1598 |
case MQTT_PACKET_TYPE_PUBCOMP: |
|
1599 |
|
|
1600 |
/* Handle all the publish acks. The app callback is invoked here. */ |
|
1601 |
status = handlePublishAcks( pContext, pIncomingPacket ); |
|
1602 |
|
|
1603 |
break; |
|
1604 |
|
|
1605 |
case MQTT_PACKET_TYPE_PINGRESP: |
|
1606 |
status = MQTT_DeserializeAck( pIncomingPacket, &packetIdentifier, NULL ); |
|
1607 |
invokeAppCallback = ( status == MQTTSuccess ) && !manageKeepAlive; |
|
1608 |
|
|
1609 |
if( ( status == MQTTSuccess ) && ( manageKeepAlive == true ) ) |
|
1610 |
{ |
|
1611 |
pContext->waitingForPingResp = false; |
|
1612 |
} |
|
1613 |
|
|
1614 |
break; |
|
1615 |
|
|
1616 |
case MQTT_PACKET_TYPE_SUBACK: |
|
1617 |
case MQTT_PACKET_TYPE_UNSUBACK: |
|
1618 |
/* Deserialize and give these to the app provided callback. */ |
|
1619 |
status = MQTT_DeserializeAck( pIncomingPacket, &packetIdentifier, NULL ); |
|
1620 |
invokeAppCallback = ( status == MQTTSuccess ) || ( status == MQTTServerRefused ); |
|
1621 |
break; |
|
1622 |
|
|
1623 |
default: |
|
1624 |
/* Bad response from the server. */ |
|
1625 |
LogError( ( "Unexpected packet type from server: PacketType=%02x.", |
|
1626 |
( unsigned int ) pIncomingPacket->type ) ); |
|
1627 |
status = MQTTBadResponse; |
|
1628 |
break; |
|
1629 |
} |
|
1630 |
|
|
1631 |
if( invokeAppCallback == true ) |
|
1632 |
{ |
|
1633 |
/* Set fields of deserialized struct. */ |
|
1634 |
deserializedInfo.packetIdentifier = packetIdentifier; |
|
1635 |
deserializedInfo.deserializationResult = status; |
|
1636 |
deserializedInfo.pPublishInfo = NULL; |
|
1637 |
appCallback( pContext, pIncomingPacket, &deserializedInfo ); |
|
1638 |
/* In case a SUBACK indicated refusal, reset the status to continue the loop. */ |
|
1639 |
status = MQTTSuccess; |
|
1640 |
} |
|
1641 |
|
|
1642 |
return status; |
|
1643 |
} |
|
1644 |
/*-----------------------------------------------------------*/ |
|
1645 |
|
|
1646 |
static MQTTStatus_t receiveSingleIteration( MQTTContext_t * pContext, |
|
1647 |
bool manageKeepAlive ) |
|
1648 |
{ |
|
1649 |
MQTTStatus_t status = MQTTSuccess; |
|
1650 |
MQTTPacketInfo_t incomingPacket = { 0 }; |
|
1651 |
int32_t recvBytes; |
|
1652 |
size_t totalMQTTPacketLength = 0; |
|
1653 |
|
|
1654 |
assert( pContext != NULL ); |
|
1655 |
assert( pContext->networkBuffer.pBuffer != NULL ); |
|
1656 |
|
|
1657 |
/* Read as many bytes as possible into the network buffer. */ |
|
1658 |
recvBytes = pContext->transportInterface.recv( pContext->transportInterface.pNetworkContext, |
|
1659 |
&( pContext->networkBuffer.pBuffer[ pContext->index ] ), |
|
1660 |
pContext->networkBuffer.size - pContext->index ); |
|
1661 |
|
|
1662 |
if( recvBytes < 0 ) |
|
1663 |
{ |
|
1664 |
/* The receive function has failed. Bubble up the error up to the user. */ |
|
1665 |
status = MQTTRecvFailed; |
|
1666 |
} |
|
1667 |
else if( ( recvBytes == 0 ) && ( pContext->index == 0U ) ) |
|
1668 |
{ |
|
1669 |
/* No more bytes available since the last read and neither is anything in |
|
1670 |
* the buffer. */ |
|
1671 |
status = MQTTNoDataAvailable; |
|
1672 |
} |
|
1673 |
|
|
1674 |
/* Either something was received, or there is still data to be processed in the |
|
1675 |
* buffer, or both. */ |
|
1676 |
else |
|
1677 |
{ |
|
1678 |
/* Update the number of bytes in the MQTT fixed buffer. */ |
|
1679 |
pContext->index += ( size_t ) recvBytes; |
|
1680 |
|
|
1681 |
status = MQTT_ProcessIncomingPacketTypeAndLength( pContext->networkBuffer.pBuffer, |
|
1682 |
&pContext->index, |
|
1683 |
&incomingPacket ); |
|
1684 |
|
|
1685 |
totalMQTTPacketLength = incomingPacket.remainingLength + incomingPacket.headerLength; |
|
1686 |
} |
|
1687 |
|
|
1688 |
/* No data was received, check for keep alive timeout. */ |
|
1689 |
if( recvBytes == 0 ) |
|
1690 |
{ |
|
1691 |
if( manageKeepAlive == true ) |
|
1692 |
{ |
|
1693 |
/* Keep the copy of the status to be reset later. */ |
|
1694 |
MQTTStatus_t statusCopy = status; |
|
1695 |
|
|
1696 |
/* Assign status so an error can be bubbled up to application, |
|
1697 |
* but reset it on success. */ |
|
1698 |
status = handleKeepAlive( pContext ); |
|
1699 |
|
|
1700 |
if( status == MQTTSuccess ) |
|
1701 |
{ |
|
1702 |
/* Reset the status. */ |
|
1703 |
status = statusCopy; |
|
1704 |
} |
|
1705 |
else |
|
1706 |
{ |
|
1707 |
LogError( ( "Handling of keep alive failed. Status=%s", |
|
1708 |
MQTT_Status_strerror( status ) ) ); |
|
1709 |
} |
|
1710 |
} |
|
1711 |
} |
|
1712 |
|
|
1713 |
/* Check whether there is data available before processing the packet further. */ |
|
1714 |
if( ( status == MQTTNeedMoreBytes ) || ( status == MQTTNoDataAvailable ) ) |
|
1715 |
{ |
|
1716 |
/* Do nothing as there is nothing to be processed right now. The proper |
|
1717 |
* error code will be bubbled up to the user. */ |
|
1718 |
} |
|
1719 |
/* Any other error code. */ |
|
1720 |
else if( status != MQTTSuccess ) |
|
1721 |
{ |
|
1722 |
LogError( ( "Call to receiveSingleIteration failed. Status=%s", |
|
1723 |
MQTT_Status_strerror( status ) ) ); |
|
1724 |
} |
|
1725 |
/* If the MQTT Packet size is bigger than the buffer itself. */ |
|
1726 |
else if( totalMQTTPacketLength > pContext->networkBuffer.size ) |
|
1727 |
{ |
|
1728 |
/* Discard the packet from the receive buffer and drain the pending |
|
1729 |
* data from the socket buffer. */ |
|
1730 |
status = discardStoredPacket( pContext, |
|
1731 |
&incomingPacket ); |
|
1732 |
} |
|
1733 |
/* If the total packet is of more length than the bytes we have available. */ |
|
1734 |
else if( totalMQTTPacketLength > pContext->index ) |
|
1735 |
{ |
|
1736 |
status = MQTTNeedMoreBytes; |
|
1737 |
} |
|
1738 |
else |
|
1739 |
{ |
|
1740 |
/* MISRA else. */ |
|
1741 |
} |
|
1742 |
|
|
1743 |
/* Handle received packet. If incomplete data was read then this will not execute. */ |
|
1744 |
if( status == MQTTSuccess ) |
|
1745 |
{ |
|
1746 |
incomingPacket.pRemainingData = &pContext->networkBuffer.pBuffer[ incomingPacket.headerLength ]; |
|
1747 |
|
|
1748 |
/* PUBLISH packets allow flags in the lower four bits. For other |
|
1749 |
* packet types, they are reserved. */ |
|
1750 |
if( ( incomingPacket.type & 0xF0U ) == MQTT_PACKET_TYPE_PUBLISH ) |
|
1751 |
{ |
|
1752 |
status = handleIncomingPublish( pContext, &incomingPacket ); |
|
1753 |
} |
|
1754 |
else |
|
1755 |
{ |
|
1756 |
status = handleIncomingAck( pContext, &incomingPacket, manageKeepAlive ); |
|
1757 |
} |
|
1758 |
|
|
1759 |
/* Update the index to reflect the remaining bytes in the buffer. */ |
|
1760 |
pContext->index -= totalMQTTPacketLength; |
|
1761 |
|
|
1762 |
/* Move the remaining bytes to the front of the buffer. */ |
|
1763 |
( void ) memmove( pContext->networkBuffer.pBuffer, |
|
1764 |
&( pContext->networkBuffer.pBuffer[ totalMQTTPacketLength ] ), |
|
1765 |
pContext->index ); |
|
1766 |
} |
|
1767 |
|
|
1768 |
if( status == MQTTNoDataAvailable ) |
|
1769 |
{ |
|
1770 |
/* No data available is not an error. Reset to MQTTSuccess so the |
|
1771 |
* return code will indicate success. */ |
|
1772 |
status = MQTTSuccess; |
|
1773 |
} |
|
1774 |
|
|
1775 |
return status; |
|
1776 |
} |
|
1777 |
|
|
1778 |
/*-----------------------------------------------------------*/ |
|
1779 |
|
|
1780 |
static MQTTStatus_t validateSubscribeUnsubscribeParams( const MQTTContext_t * pContext, |
|
1781 |
const MQTTSubscribeInfo_t * pSubscriptionList, |
|
1782 |
size_t subscriptionCount, |
|
1783 |
uint16_t packetId ) |
|
1784 |
{ |
|
1785 |
MQTTStatus_t status = MQTTSuccess; |
|
1786 |
size_t iterator; |
|
1787 |
|
|
1788 |
/* Validate all the parameters. */ |
|
1789 |
if( ( pContext == NULL ) || ( pSubscriptionList == NULL ) ) |
|
1790 |
{ |
|
1791 |
LogError( ( "Argument cannot be NULL: pContext=%p, " |
|
1792 |
"pSubscriptionList=%p.", |
|
1793 |
( void * ) pContext, |
|
1794 |
( void * ) pSubscriptionList ) ); |
|
1795 |
status = MQTTBadParameter; |
|
1796 |
} |
|
1797 |
else if( subscriptionCount == 0UL ) |
|
1798 |
{ |
|
1799 |
LogError( ( "Subscription count is 0." ) ); |
|
1800 |
status = MQTTBadParameter; |
|
1801 |
} |
|
1802 |
else if( packetId == 0U ) |
|
1803 |
{ |
|
1804 |
LogError( ( "Packet Id for subscription packet is 0." ) ); |
|
1805 |
status = MQTTBadParameter; |
|
1806 |
} |
|
1807 |
else |
|
1808 |
{ |
|
1809 |
if( pContext->incomingPublishRecords == NULL ) |
|
1810 |
{ |
|
1811 |
for( iterator = 0; iterator < subscriptionCount; iterator++ ) |
|
1812 |
{ |
|
1813 |
if( pSubscriptionList->qos > MQTTQoS0 ) |
|
1814 |
{ |
|
1815 |
LogError( ( "The incoming publish record list is not " |
|
1816 |
"initialised for QoS1/QoS2 records. Please call " |
|
1817 |
" MQTT_InitStatefulQoS to enable use of QoS1 and " |
|
1818 |
" QoS2 packets." ) ); |
|
1819 |
status = MQTTBadParameter; |
|
1820 |
break; |
|
1821 |
} |
|
1822 |
} |
|
1823 |
} |
|
1824 |
} |
|
1825 |
|
|
1826 |
return status; |
|
1827 |
} |
|
1828 |
|
|
1829 |
/*-----------------------------------------------------------*/ |
|
1830 |
|
|
1831 |
static size_t addEncodedStringToVector( uint8_t serailizedLength[ 2 ], |
|
1832 |
const char * const string, |
|
1833 |
uint16_t length, |
|
1834 |
TransportOutVector_t * iterator, |
|
1835 |
size_t * updatedLength ) |
|
1836 |
{ |
|
1837 |
size_t packetLength = 0U; |
|
1838 |
const size_t seralizedLengthFieldSize = 2U; |
|
1839 |
TransportOutVector_t * pLocalIterator = iterator; |
|
1840 |
/* This function always adds 2 vectors. */ |
|
1841 |
size_t vectorsAdded = 0U; |
|
1842 |
|
|
1843 |
/* When length is non-zero, the string must be non-NULL. */ |
|
1844 |
assert( ( length != 0U ) == ( string != NULL ) ); |
|
1845 |
|
|
1846 |
serailizedLength[ 0 ] = ( ( uint8_t ) ( ( length ) >> 8 ) ); |
|
1847 |
serailizedLength[ 1 ] = ( ( uint8_t ) ( ( length ) & 0x00ffU ) ); |
|
1848 |
|
|
1849 |
/* Add the serialized length of the string first. */ |
|
1850 |
pLocalIterator[ 0 ].iov_base = serailizedLength; |
|
1851 |
pLocalIterator[ 0 ].iov_len = seralizedLengthFieldSize; |
|
1852 |
vectorsAdded++; |
|
1853 |
packetLength = seralizedLengthFieldSize; |
|
1854 |
|
|
1855 |
/* Sometimes the string can be NULL that is, of 0 length. In that case, |
|
1856 |
* only the length field should be encoded in the vector. */ |
|
1857 |
if( ( string != NULL ) && ( length != 0U ) ) |
|
1858 |
{ |
|
1859 |
/* Then add the pointer to the string itself. */ |
|
1860 |
pLocalIterator[ 1 ].iov_base = string; |
|
1861 |
pLocalIterator[ 1 ].iov_len = length; |
|
1862 |
vectorsAdded++; |
|
1863 |
packetLength += length; |
|
1864 |
} |
|
1865 |
|
|
1866 |
( *updatedLength ) = ( *updatedLength ) + packetLength; |
|
1867 |
|
|
1868 |
return vectorsAdded; |
|
1869 |
} |
|
1870 |
|
|
1871 |
/*-----------------------------------------------------------*/ |
|
1872 |
|
|
1873 |
static MQTTStatus_t sendSubscribeWithoutCopy( MQTTContext_t * pContext, |
|
1874 |
const MQTTSubscribeInfo_t * pSubscriptionList, |
|
1875 |
size_t subscriptionCount, |
|
1876 |
uint16_t packetId, |
|
1877 |
size_t remainingLength ) |
|
1878 |
{ |
|
1879 |
MQTTStatus_t status = MQTTSuccess; |
|
1880 |
uint8_t subscribeheader[ 7 ]; |
|
1881 |
uint8_t * pIndex; |
|
1882 |
TransportOutVector_t pIoVector[ MQTT_SUB_UNSUB_MAX_VECTORS ]; |
|
1883 |
TransportOutVector_t * pIterator; |
|
1884 |
uint8_t serializedTopicFieldLength[ MQTT_SUB_UNSUB_MAX_VECTORS ][ 2 ]; |
|
1885 |
size_t totalPacketLength = 0U; |
|
1886 |
size_t ioVectorLength = 0U; |
|
1887 |
size_t subscriptionsSent = 0U; |
|
1888 |
/* For subscribe, only three vector slots are required per topic string. */ |
|
1889 |
const size_t subscriptionStringVectorSlots = 3U; |
|
1890 |
size_t vectorsAdded; |
|
1891 |
size_t topicFieldLengthIndex; |
|
1892 |
|
|
1893 |
/* The vector array should be at least three element long as the topic |
|
1894 |
* string needs these many vector elements to be stored. */ |
|
1895 |
assert( MQTT_SUB_UNSUB_MAX_VECTORS >= subscriptionStringVectorSlots ); |
|
1896 |
|
|
1897 |
pIndex = subscribeheader; |
|
1898 |
pIterator = pIoVector; |
|
1899 |
|
|
1900 |
pIndex = MQTT_SerializeSubscribeHeader( remainingLength, |
|
1901 |
pIndex, |
|
1902 |
packetId ); |
|
1903 |
|
|
1904 |
/* The header is to be sent first. */ |
|
1905 |
pIterator->iov_base = subscribeheader; |
|
1906 |
/* More details at: https://github.com/FreeRTOS/coreMQTT/blob/main/MISRA.md#rule-182 */ |
|
1907 |
/* More details at: https://github.com/FreeRTOS/coreMQTT/blob/main/MISRA.md#rule-108 */ |
|
1908 |
/* coverity[misra_c_2012_rule_18_2_violation] */ |
|
1909 |
/* coverity[misra_c_2012_rule_10_8_violation] */ |
|
1910 |
pIterator->iov_len = ( size_t ) ( pIndex - subscribeheader ); |
|
1911 |
totalPacketLength += pIterator->iov_len; |
|
1912 |
pIterator++; |
|
1913 |
ioVectorLength++; |
|
1914 |
|
|
1915 |
while( ( status == MQTTSuccess ) && ( subscriptionsSent < subscriptionCount ) ) |
|
1916 |
{ |
|
1917 |
/* Reset the index for next iteration. */ |
|
1918 |
topicFieldLengthIndex = 0; |
|
1919 |
|
|
1920 |
/* Check whether the subscription topic (with QoS) will fit in the |
|
1921 |
* given vector. */ |
|
1922 |
while( ( ioVectorLength <= ( MQTT_SUB_UNSUB_MAX_VECTORS - subscriptionStringVectorSlots ) ) && |
|
1923 |
( subscriptionsSent < subscriptionCount ) ) |
|
1924 |
{ |
|
1925 |
/* The topic filter gets sent next. */ |
|
1926 |
vectorsAdded = addEncodedStringToVector( serializedTopicFieldLength[ topicFieldLengthIndex ], |
|
1927 |
pSubscriptionList[ subscriptionsSent ].pTopicFilter, |
|
1928 |
pSubscriptionList[ subscriptionsSent ].topicFilterLength, |
|
1929 |
pIterator, |
|
1930 |
&totalPacketLength ); |
|
1931 |
|
|
1932 |
/* Update the pointer after the above operation. */ |
|
1933 |
pIterator = &pIterator[ vectorsAdded ]; |
|
1934 |
|
|
1935 |
/* Lastly, the QoS gets sent. */ |
|
1936 |
pIterator->iov_base = &( pSubscriptionList[ subscriptionsSent ].qos ); |
|
1937 |
pIterator->iov_len = 1U; |
|
1938 |
totalPacketLength += pIterator->iov_len; |
|
1939 |
|
|
1940 |
/* Increment the pointer. */ |
|
1941 |
pIterator++; |
|
1942 |
|
|
1943 |
/* Two slots get used by the topic string length and topic string. |
|
1944 |
* One slot gets used by the quality of service. */ |
|
1945 |
ioVectorLength += vectorsAdded + 1U; |
|
1946 |
|
|
1947 |
subscriptionsSent++; |
|
1948 |
|
|
1949 |
/* The index needs to be updated for next iteration. */ |
|
1950 |
topicFieldLengthIndex++; |
|
1951 |
} |
|
1952 |
|
|
1953 |
if( sendMessageVector( pContext, |
|
1954 |
pIoVector, |
|
1955 |
ioVectorLength ) != ( int32_t ) totalPacketLength ) |
|
1956 |
{ |
|
1957 |
status = MQTTSendFailed; |
|
1958 |
} |
|
1959 |
|
|
1960 |
/* Update the iterator for the next potential loop iteration. */ |
|
1961 |
pIterator = pIoVector; |
|
1962 |
/* Reset the vector length for the next potential loop iteration. */ |
|
1963 |
ioVectorLength = 0U; |
|
1964 |
/* Reset the packet length for the next potential loop iteration. */ |
|
1965 |
totalPacketLength = 0U; |
|
1966 |
} |
|
1967 |
|
|
1968 |
return status; |
|
1969 |
} |
|
1970 |
|
|
1971 |
/*-----------------------------------------------------------*/ |
|
1972 |
|
|
1973 |
static MQTTStatus_t sendUnsubscribeWithoutCopy( MQTTContext_t * pContext, |
|
1974 |
const MQTTSubscribeInfo_t * pSubscriptionList, |
|
1975 |
size_t subscriptionCount, |
|
1976 |
uint16_t packetId, |
|
1977 |
size_t remainingLength ) |
|
1978 |
{ |
|
1979 |
MQTTStatus_t status = MQTTSuccess; |
|
1980 |
uint8_t unsubscribeheader[ 7 ]; |
|
1981 |
uint8_t * pIndex; |
|
1982 |
TransportOutVector_t pIoVector[ MQTT_SUB_UNSUB_MAX_VECTORS ]; |
|
1983 |
TransportOutVector_t * pIterator; |
|
1984 |
uint8_t serializedTopicFieldLength[ MQTT_SUB_UNSUB_MAX_VECTORS ][ 2 ]; |
|
1985 |
size_t totalPacketLength = 0U; |
|
1986 |
size_t unsubscriptionsSent = 0U; |
|
1987 |
size_t ioVectorLength = 0U; |
|
1988 |
/* For unsubscribe, only two vector slots are required per topic string. */ |
|
1989 |
const size_t unsubscribeStringVectorSlots = 2U; |
|
1990 |
size_t vectorsAdded; |
|
1991 |
size_t topicFieldLengthIndex; |
|
1992 |
|
|
1993 |
/* The vector array should be at least three element long as the topic |
|
1994 |
* string needs these many vector elements to be stored. */ |
|
1995 |
assert( MQTT_SUB_UNSUB_MAX_VECTORS >= unsubscribeStringVectorSlots ); |
|
1996 |
|
|
1997 |
pIndex = unsubscribeheader; |
|
1998 |
pIterator = pIoVector; |
|
1999 |
|
|
2000 |
pIndex = MQTT_SerializeUnsubscribeHeader( remainingLength, |
|
2001 |
pIndex, |
|
2002 |
packetId ); |
|
2003 |
|
|
2004 |
/* The header is to be sent first. */ |
|
2005 |
pIterator->iov_base = unsubscribeheader; |
|
2006 |
/* More details at: https://github.com/FreeRTOS/coreMQTT/blob/main/MISRA.md#rule-182 */ |
|
2007 |
/* More details at: https://github.com/FreeRTOS/coreMQTT/blob/main/MISRA.md#rule-108 */ |
|
2008 |
/* coverity[misra_c_2012_rule_18_2_violation] */ |
|
2009 |
/* coverity[misra_c_2012_rule_10_8_violation] */ |
|
2010 |
pIterator->iov_len = ( size_t ) ( pIndex - unsubscribeheader ); |
|
2011 |
totalPacketLength += pIterator->iov_len; |
|
2012 |
pIterator++; |
|
2013 |
ioVectorLength++; |
|
2014 |
|
|
2015 |
while( ( status == MQTTSuccess ) && ( unsubscriptionsSent < subscriptionCount ) ) |
|
2016 |
{ |
|
2017 |
/* Reset the index for next iteration. */ |
|
2018 |
topicFieldLengthIndex = 0; |
|
2019 |
|
|
2020 |
/* Check whether the subscription topic will fit in the given vector. */ |
|
2021 |
while( ( ioVectorLength <= ( MQTT_SUB_UNSUB_MAX_VECTORS - unsubscribeStringVectorSlots ) ) && |
|
2022 |
( unsubscriptionsSent < subscriptionCount ) ) |
|
2023 |
{ |
|
2024 |
/* The topic filter gets sent next. */ |
|
2025 |
vectorsAdded = addEncodedStringToVector( serializedTopicFieldLength[ topicFieldLengthIndex ], |
|
2026 |
pSubscriptionList[ unsubscriptionsSent ].pTopicFilter, |
|
2027 |
pSubscriptionList[ unsubscriptionsSent ].topicFilterLength, |
|
2028 |
pIterator, |
|
2029 |
&totalPacketLength ); |
|
2030 |
|
|
2031 |
/* Update the iterator to point to the next empty location. */ |
|
2032 |
pIterator = &pIterator[ vectorsAdded ]; |
|
2033 |
/* Update the total count based on how many vectors were added. */ |
|
2034 |
ioVectorLength += vectorsAdded; |
|
2035 |
|
|
2036 |
unsubscriptionsSent++; |
|
2037 |
|
|
2038 |
/* Update the index for next iteration. */ |
|
2039 |
topicFieldLengthIndex++; |
|
2040 |
} |
|
2041 |
|
|
2042 |
if( sendMessageVector( pContext, pIoVector, ioVectorLength ) != ( int32_t ) totalPacketLength ) |
|
2043 |
{ |
|
2044 |
status = MQTTSendFailed; |
|
2045 |
} |
|
2046 |
|
|
2047 |
/* Update the iterator for the next potential loop iteration. */ |
|
2048 |
pIterator = pIoVector; |
|
2049 |
/* Reset the vector length for the next potential loop iteration. */ |
|
2050 |
ioVectorLength = 0U; |
|
2051 |
/* Reset the packet length for the next potential loop iteration. */ |
|
2052 |
totalPacketLength = 0U; |
|
2053 |
} |
|
2054 |
|
|
2055 |
return status; |
|
2056 |
} |
|
2057 |
|
|
2058 |
/*-----------------------------------------------------------*/ |
|
2059 |
|
|
2060 |
static MQTTStatus_t sendPublishWithoutCopy( MQTTContext_t * pContext, |
|
2061 |
const MQTTPublishInfo_t * pPublishInfo, |
|
2062 |
const uint8_t * pMqttHeader, |
|
2063 |
size_t headerSize, |
|
2064 |
uint16_t packetId ) |
|
2065 |
{ |
|
2066 |
MQTTStatus_t status = MQTTSuccess; |
|
2067 |
uint8_t serializedPacketID[ 2 ]; |
|
2068 |
TransportOutVector_t pIoVector[ 4 ]; |
|
2069 |
size_t ioVectorLength; |
|
2070 |
size_t totalMessageLength; |
|
2071 |
const size_t packetIDLength = 2U; |
|
2072 |
|
|
2073 |
/* The header is sent first. */ |
|
2074 |
pIoVector[ 0U ].iov_base = pMqttHeader; |
|
2075 |
pIoVector[ 0U ].iov_len = headerSize; |
|
2076 |
totalMessageLength = headerSize; |
|
2077 |
|
|
2078 |
/* Then the topic name has to be sent. */ |
|
2079 |
pIoVector[ 1U ].iov_base = pPublishInfo->pTopicName; |
|
2080 |
pIoVector[ 1U ].iov_len = pPublishInfo->topicNameLength; |
|
2081 |
totalMessageLength += pPublishInfo->topicNameLength; |
|
2082 |
|
|
2083 |
/* The next field's index should be 2 as the first two fields |
|
2084 |
* have been filled in. */ |
|
2085 |
ioVectorLength = 2U; |
|
2086 |
|
|
2087 |
if( pPublishInfo->qos > MQTTQoS0 ) |
|
2088 |
{ |
|
2089 |
/* Encode the packet ID. */ |
|
2090 |
serializedPacketID[ 0 ] = ( ( uint8_t ) ( ( packetId ) >> 8 ) ); |
|
2091 |
serializedPacketID[ 1 ] = ( ( uint8_t ) ( ( packetId ) & 0x00ffU ) ); |
|
2092 |
|
|
2093 |
pIoVector[ ioVectorLength ].iov_base = serializedPacketID; |
|
2094 |
pIoVector[ ioVectorLength ].iov_len = packetIDLength; |
|
2095 |
|
|
2096 |
ioVectorLength++; |
|
2097 |
totalMessageLength += packetIDLength; |
|
2098 |
} |
|
2099 |
|
|
2100 |
/* Publish packets are allowed to contain no payload. */ |
|
2101 |
if( pPublishInfo->payloadLength > 0U ) |
|
2102 |
{ |
|
2103 |
pIoVector[ ioVectorLength ].iov_base = pPublishInfo->pPayload; |
|
2104 |
pIoVector[ ioVectorLength ].iov_len = pPublishInfo->payloadLength; |
|
2105 |
|
|
2106 |
ioVectorLength++; |
|
2107 |
totalMessageLength += pPublishInfo->payloadLength; |
|
2108 |
} |
|
2109 |
|
|
2110 |
if( sendMessageVector( pContext, pIoVector, ioVectorLength ) != ( int32_t ) totalMessageLength ) |
|
2111 |
{ |
|
2112 |
status = MQTTSendFailed; |
|
2113 |
} |
|
2114 |
|
|
2115 |
return status; |
|
2116 |
} |
|
2117 |
|
|
2118 |
/*-----------------------------------------------------------*/ |
|
2119 |
|
|
2120 |
static MQTTStatus_t sendConnectWithoutCopy( MQTTContext_t * pContext, |
|
2121 |
const MQTTConnectInfo_t * pConnectInfo, |
|
2122 |
const MQTTPublishInfo_t * pWillInfo, |
|
2123 |
size_t remainingLength ) |
|
2124 |
{ |
|
2125 |
MQTTStatus_t status = MQTTSuccess; |
|
2126 |
TransportOutVector_t * iterator; |
|
2127 |
size_t ioVectorLength = 0U; |
|
2128 |
size_t totalMessageLength = 0U; |
|
2129 |
int32_t bytesSentOrError; |
|
2130 |
|
|
2131 |
/* Connect packet header can be of maximum 15 bytes. */ |
|
2132 |
uint8_t connectPacketHeader[ 15 ]; |
|
2133 |
uint8_t * pIndex = connectPacketHeader; |
|
2134 |
TransportOutVector_t pIoVector[ 11 ]; |
|
2135 |
uint8_t serializedClientIDLength[ 2 ]; |
|
2136 |
uint8_t serializedTopicLength[ 2 ]; |
|
2137 |
uint8_t serializedPayloadLength[ 2 ]; |
|
2138 |
uint8_t serializedUsernameLength[ 2 ]; |
|
2139 |
uint8_t serializedPasswordLength[ 2 ]; |
|
2140 |
size_t vectorsAdded; |
|
2141 |
|
|
2142 |
iterator = pIoVector; |
|
2143 |
|
|
2144 |
/* Validate arguments. */ |
|
2145 |
if( ( pWillInfo != NULL ) && ( pWillInfo->pTopicName == NULL ) ) |
|
2146 |
{ |
|
2147 |
LogError( ( "pWillInfo->pTopicName cannot be NULL if Will is present." ) ); |
|
2148 |
status = MQTTBadParameter; |
|
2149 |
} |
|
2150 |
else |
|
2151 |
{ |
|
2152 |
pIndex = MQTT_SerializeConnectFixedHeader( pIndex, |
|
2153 |
pConnectInfo, |
|
2154 |
pWillInfo, |
|
2155 |
remainingLength ); |
|
2156 |
|
|
2157 |
assert( ( pIndex - connectPacketHeader ) <= 15 ); |
|
2158 |
|
|
2159 |
/* The header gets sent first. */ |
|
2160 |
iterator->iov_base = connectPacketHeader; |
|
2161 |
/* More details at: https://github.com/FreeRTOS/coreMQTT/blob/main/MISRA.md#rule-182 */ |
|
2162 |
/* More details at: https://github.com/FreeRTOS/coreMQTT/blob/main/MISRA.md#rule-108 */ |
|
2163 |
/* coverity[misra_c_2012_rule_18_2_violation] */ |
|
2164 |
/* coverity[misra_c_2012_rule_10_8_violation] */ |
|
2165 |
iterator->iov_len = ( size_t ) ( pIndex - connectPacketHeader ); |
|
2166 |
totalMessageLength += iterator->iov_len; |
|
2167 |
iterator++; |
|
2168 |
ioVectorLength++; |
|
2169 |
|
|
2170 |
/* Serialize the client ID. */ |
|
2171 |
vectorsAdded = addEncodedStringToVector( serializedClientIDLength, |
|
2172 |
pConnectInfo->pClientIdentifier, |
|
2173 |
pConnectInfo->clientIdentifierLength, |
|
2174 |
iterator, |
|
2175 |
&totalMessageLength ); |
|
2176 |
|
|
2177 |
/* Update the iterator to point to the next empty slot. */ |
|
2178 |
iterator = &iterator[ vectorsAdded ]; |
|
2179 |
ioVectorLength += vectorsAdded; |
|
2180 |
|
|
2181 |
if( pWillInfo != NULL ) |
|
2182 |
{ |
|
2183 |
/* Serialize the topic. */ |
|
2184 |
vectorsAdded = addEncodedStringToVector( serializedTopicLength, |
|
2185 |
pWillInfo->pTopicName, |
|
2186 |
pWillInfo->topicNameLength, |
|
2187 |
iterator, |
|
2188 |
&totalMessageLength ); |
|
2189 |
|
|
2190 |
/* Update the iterator to point to the next empty slot. */ |
|
2191 |
iterator = &iterator[ vectorsAdded ]; |
|
2192 |
ioVectorLength += vectorsAdded; |
|
2193 |
|
|
2194 |
|
|
2195 |
/* Serialize the payload. Payload of last will and testament can be NULL. */ |
|
2196 |
vectorsAdded = addEncodedStringToVector( serializedPayloadLength, |
|
2197 |
pWillInfo->pPayload, |
|
2198 |
( uint16_t ) pWillInfo->payloadLength, |
|
2199 |
iterator, |
|
2200 |
&totalMessageLength ); |
|
2201 |
|
|
2202 |
/* Update the iterator to point to the next empty slot. */ |
|
2203 |
iterator = &iterator[ vectorsAdded ]; |
|
2204 |
ioVectorLength += vectorsAdded; |
|
2205 |
} |
|
2206 |
|
|
2207 |
/* Encode the user name if provided. */ |
|
2208 |
if( pConnectInfo->pUserName != NULL ) |
|
2209 |
{ |
|
2210 |
/* Serialize the user name string. */ |
|
2211 |
vectorsAdded = addEncodedStringToVector( serializedUsernameLength, |
|
2212 |
pConnectInfo->pUserName, |
|
2213 |
pConnectInfo->userNameLength, |
|
2214 |
iterator, |
|
2215 |
&totalMessageLength ); |
|
2216 |
|
|
2217 |
/* Update the iterator to point to the next empty slot. */ |
|
2218 |
iterator = &iterator[ vectorsAdded ]; |
|
2219 |
ioVectorLength += vectorsAdded; |
|
2220 |
} |
|
2221 |
|
|
2222 |
/* Encode the password if provided. */ |
|
2223 |
if( pConnectInfo->pPassword != NULL ) |
|
2224 |
{ |
|
2225 |
/* Serialize the user name string. */ |
|
2226 |
vectorsAdded = addEncodedStringToVector( serializedPasswordLength, |
|
2227 |
pConnectInfo->pPassword, |
|
2228 |
pConnectInfo->passwordLength, |
|
2229 |
iterator, |
|
2230 |
&totalMessageLength ); |
|
2231 |
/* Update the iterator to point to the next empty slot. */ |
|
2232 |
iterator = &iterator[ vectorsAdded ]; |
|
2233 |
ioVectorLength += vectorsAdded; |
|
2234 |
} |
|
2235 |
|
|
2236 |
bytesSentOrError = sendMessageVector( pContext, pIoVector, ioVectorLength ); |
|
2237 |
|
|
2238 |
if( bytesSentOrError != ( int32_t ) totalMessageLength ) |
|
2239 |
{ |
|
2240 |
status = MQTTSendFailed; |
|
2241 |
} |
|
2242 |
} |
|
2243 |
|
|
2244 |
return status; |
|
2245 |
} |
|
2246 |
|
|
2247 |
/*-----------------------------------------------------------*/ |
|
2248 |
|
|
2249 |
static MQTTStatus_t receiveConnack( const MQTTContext_t * pContext, |
|
2250 |
uint32_t timeoutMs, |
|
2251 |
bool cleanSession, |
|
2252 |
MQTTPacketInfo_t * pIncomingPacket, |
|
2253 |
bool * pSessionPresent ) |
|
2254 |
{ |
|
2255 |
MQTTStatus_t status = MQTTSuccess; |
|
2256 |
MQTTGetCurrentTimeFunc_t getTimeStamp = NULL; |
|
2257 |
uint32_t entryTimeMs = 0U, remainingTimeMs = 0U, timeTakenMs = 0U; |
|
2258 |
bool breakFromLoop = false; |
|
2259 |
uint16_t loopCount = 0U; |
|
2260 |
|
|
2261 |
assert( pContext != NULL ); |
|
2262 |
assert( pIncomingPacket != NULL ); |
|
2263 |
assert( pContext->getTime != NULL ); |
|
2264 |
|
|
2265 |
getTimeStamp = pContext->getTime; |
|
2266 |
|
|
2267 |
/* Get the entry time for the function. */ |
|
2268 |
entryTimeMs = getTimeStamp(); |
|
2269 |
|
|
2270 |
do |
|
2271 |
{ |
|
2272 |
/* Transport read for incoming CONNACK packet type and length. |
|
2273 |
* MQTT_GetIncomingPacketTypeAndLength is a blocking call and it is |
|
2274 |
* returned after a transport receive timeout, an error, or a successful |
|
2275 |
* receive of packet type and length. */ |
|
2276 |
status = MQTT_GetIncomingPacketTypeAndLength( pContext->transportInterface.recv, |
|
2277 |
pContext->transportInterface.pNetworkContext, |
|
2278 |
pIncomingPacket ); |
|
2279 |
|
|
2280 |
/* The loop times out based on 2 conditions. |
|
2281 |
* 1. If timeoutMs is greater than 0: |
|
2282 |
* Loop times out based on the timeout calculated by getTime() |
|
2283 |
* function. |
|
2284 |
* 2. If timeoutMs is 0: |
|
2285 |
* Loop times out based on the maximum number of retries config |
|
2286 |
* MQTT_MAX_CONNACK_RECEIVE_RETRY_COUNT. This config will control |
|
2287 |
* maximum the number of retry attempts to read the CONNACK packet. |
|
2288 |
* A value of 0 for the config will try once to read CONNACK. */ |
|
2289 |
if( timeoutMs > 0U ) |
|
2290 |
{ |
|
2291 |
breakFromLoop = calculateElapsedTime( getTimeStamp(), entryTimeMs ) >= timeoutMs; |
|
2292 |
} |
|
2293 |
else |
|
2294 |
{ |
|
2295 |
breakFromLoop = loopCount >= MQTT_MAX_CONNACK_RECEIVE_RETRY_COUNT; |
|
2296 |
loopCount++; |
|
2297 |
} |
|
2298 |
|
|
2299 |
/* Loop until there is data to read or if we have exceeded the timeout/retries. */ |
|
2300 |
} while( ( status == MQTTNoDataAvailable ) && ( breakFromLoop == false ) ); |
|
2301 |
|
|
2302 |
if( status == MQTTSuccess ) |
|
2303 |
{ |
|
2304 |
/* Time taken in this function so far. */ |
|
2305 |
timeTakenMs = calculateElapsedTime( getTimeStamp(), entryTimeMs ); |
|
2306 |
|
|
2307 |
if( timeTakenMs < timeoutMs ) |
|
2308 |
{ |
|
2309 |
/* Calculate remaining time for receiving the remainder of |
|
2310 |
* the packet. */ |
|
2311 |
remainingTimeMs = timeoutMs - timeTakenMs; |
|
2312 |
} |
|
2313 |
|
|
2314 |
/* Reading the remainder of the packet by transport recv. |
|
2315 |
* Attempt to read once even if the timeout has expired. |
|
2316 |
* Invoking receivePacket with remainingTime as 0 would attempt to |
|
2317 |
* recv from network once. If using retries, the remainder of the |
|
2318 |
* CONNACK packet is tried to be read only once. Reading once would be |
|
2319 |
* good as the packet type and remaining length was already read. Hence, |
|
2320 |
* the probability of the remaining 2 bytes available to read is very high. */ |
|
2321 |
if( pIncomingPacket->type == MQTT_PACKET_TYPE_CONNACK ) |
|
2322 |
{ |
|
2323 |
status = receivePacket( pContext, |
|
2324 |
*pIncomingPacket, |
|
2325 |
remainingTimeMs ); |
|
2326 |
} |
|
2327 |
else |
|
2328 |
{ |
|
2329 |
LogError( ( "Incorrect packet type %X received while expecting" |
|
2330 |
" CONNACK(%X).", |
|
2331 |
( unsigned int ) pIncomingPacket->type, |
|
2332 |
MQTT_PACKET_TYPE_CONNACK ) ); |
|
2333 |
status = MQTTBadResponse; |
|
2334 |
} |
|
2335 |
} |
|
2336 |
|
|
2337 |
if( status == MQTTSuccess ) |
|
2338 |
{ |
|
2339 |
/* Update the packet info pointer to the buffer read. */ |
|
2340 |
pIncomingPacket->pRemainingData = pContext->networkBuffer.pBuffer; |
|
2341 |
|
|
2342 |
/* Deserialize CONNACK. */ |
|
2343 |
status = MQTT_DeserializeAck( pIncomingPacket, NULL, pSessionPresent ); |
|
2344 |
} |
|
2345 |
|
|
2346 |
/* If a clean session is requested, a session present should not be set by |
|
2347 |
* broker. */ |
|
2348 |
if( status == MQTTSuccess ) |
|
2349 |
{ |
|
2350 |
if( ( cleanSession == true ) && ( *pSessionPresent == true ) ) |
|
2351 |
{ |
|
2352 |
LogError( ( "Unexpected session present flag in CONNACK response from broker." |
|
2353 |
" CONNECT request with clean session was made with broker." ) ); |
|
2354 |
status = MQTTBadResponse; |
|
2355 |
} |
|
2356 |
} |
|
2357 |
|
|
2358 |
if( status == MQTTSuccess ) |
|
2359 |
{ |
|
2360 |
LogDebug( ( "Received MQTT CONNACK successfully from broker." ) ); |
|
2361 |
} |
|
2362 |
else |
|
2363 |
{ |
|
2364 |
LogError( ( "CONNACK recv failed with status = %s.", |
|
2365 |
MQTT_Status_strerror( status ) ) ); |
|
2366 |
} |
|
2367 |
|
|
2368 |
return status; |
|
2369 |
} |
|
2370 |
|
|
2371 |
/*-----------------------------------------------------------*/ |
|
2372 |
|
|
2373 |
static MQTTStatus_t handleSessionResumption( MQTTContext_t * pContext, |
|
2374 |
bool sessionPresent ) |
|
2375 |
{ |
|
2376 |
MQTTStatus_t status = MQTTSuccess; |
|
2377 |
MQTTStateCursor_t cursor = MQTT_STATE_CURSOR_INITIALIZER; |
|
2378 |
uint16_t packetId = MQTT_PACKET_ID_INVALID; |
|
2379 |
MQTTPublishState_t state = MQTTStateNull; |
|
2380 |
|
|
2381 |
assert( pContext != NULL ); |
|
2382 |
|
|
2383 |
/* Reset the index and clear the buffer when a new session is established. */ |
|
2384 |
pContext->index = 0; |
|
2385 |
( void ) memset( pContext->networkBuffer.pBuffer, 0, pContext->networkBuffer.size ); |
|
2386 |
|
|
2387 |
if( sessionPresent == true ) |
|
2388 |
{ |
|
2389 |
/* Get the next packet ID for which a PUBREL need to be resent. */ |
|
2390 |
packetId = MQTT_PubrelToResend( pContext, &cursor, &state ); |
|
2391 |
|
|
2392 |
/* Resend all the PUBREL acks after session is reestablished. */ |
|
2393 |
while( ( packetId != MQTT_PACKET_ID_INVALID ) && |
|
2394 |
( status == MQTTSuccess ) ) |
|
2395 |
{ |
|
2396 |
status = sendPublishAcks( pContext, packetId, state ); |
|
2397 |
|
|
2398 |
packetId = MQTT_PubrelToResend( pContext, &cursor, &state ); |
|
2399 |
} |
|
2400 |
} |
|
2401 |
else |
|
2402 |
{ |
|
2403 |
/* Clear any existing records if a new session is established. */ |
|
2404 |
if( pContext->outgoingPublishRecordMaxCount > 0U ) |
|
2405 |
{ |
|
2406 |
( void ) memset( pContext->outgoingPublishRecords, |
|
2407 |
0x00, |
|
2408 |
pContext->outgoingPublishRecordMaxCount * sizeof( *pContext->outgoingPublishRecords ) ); |
|
2409 |
} |
|
2410 |
|
|
2411 |
if( pContext->incomingPublishRecordMaxCount > 0U ) |
|
2412 |
{ |
|
2413 |
( void ) memset( pContext->incomingPublishRecords, |
|
2414 |
0x00, |
|
2415 |
pContext->incomingPublishRecordMaxCount * sizeof( *pContext->incomingPublishRecords ) ); |
|
2416 |
} |
|
2417 |
} |
|
2418 |
|
|
2419 |
return status; |
|
2420 |
} |
|
2421 |
|
|
2422 |
static MQTTStatus_t validatePublishParams( const MQTTContext_t * pContext, |
|
2423 |
const MQTTPublishInfo_t * pPublishInfo, |
|
2424 |
uint16_t packetId ) |
|
2425 |
{ |
|
2426 |
MQTTStatus_t status = MQTTSuccess; |
|
2427 |
|
|
2428 |
/* Validate arguments. */ |
|
2429 |
if( ( pContext == NULL ) || ( pPublishInfo == NULL ) ) |
|
2430 |
{ |
|
2431 |
LogError( ( "Argument cannot be NULL: pContext=%p, " |
|
2432 |
"pPublishInfo=%p.", |
|
2433 |
( void * ) pContext, |
|
2434 |
( void * ) pPublishInfo ) ); |
|
2435 |
status = MQTTBadParameter; |
|
2436 |
} |
|
2437 |
else if( ( pPublishInfo->qos != MQTTQoS0 ) && ( packetId == 0U ) ) |
|
2438 |
{ |
|
2439 |
LogError( ( "Packet Id is 0 for PUBLISH with QoS=%u.", |
|
2440 |
( unsigned int ) pPublishInfo->qos ) ); |
|
2441 |
status = MQTTBadParameter; |
|
2442 |
} |
|
2443 |
else if( ( pPublishInfo->payloadLength > 0U ) && ( pPublishInfo->pPayload == NULL ) ) |
|
2444 |
{ |
|
2445 |
LogError( ( "A nonzero payload length requires a non-NULL payload: " |
|
2446 |
"payloadLength=%lu, pPayload=%p.", |
|
2447 |
( unsigned long ) pPublishInfo->payloadLength, |
|
2448 |
pPublishInfo->pPayload ) ); |
|
2449 |
status = MQTTBadParameter; |
|
2450 |
} |
|
2451 |
else if( ( pContext->outgoingPublishRecords == NULL ) && ( pPublishInfo->qos > MQTTQoS0 ) ) |
|
2452 |
{ |
|
2453 |
LogError( ( "Trying to publish a QoS > MQTTQoS0 packet when outgoing publishes " |
|
2454 |
"for QoS1/QoS2 have not been enabled. Please, call MQTT_InitStatefulQoS " |
|
2455 |
"to initialize and enable the use of QoS1/QoS2 publishes." ) ); |
|
2456 |
status = MQTTBadParameter; |
|
2457 |
} |
|
2458 |
else |
|
2459 |
{ |
|
2460 |
/* MISRA else */ |
|
2461 |
} |
|
2462 |
|
|
2463 |
return status; |
|
2464 |
} |
|
2465 |
|
|
2466 |
/*-----------------------------------------------------------*/ |
|
2467 |
|
|
2468 |
MQTTStatus_t MQTT_Init( MQTTContext_t * pContext, |
|
2469 |
const TransportInterface_t * pTransportInterface, |
|
2470 |
MQTTGetCurrentTimeFunc_t getTimeFunction, |
|
2471 |
MQTTEventCallback_t userCallback, |
|
2472 |
const MQTTFixedBuffer_t * pNetworkBuffer ) |
|
2473 |
{ |
|
2474 |
MQTTStatus_t status = MQTTSuccess; |
|
2475 |
|
|
2476 |
/* Validate arguments. */ |
|
2477 |
if( ( pContext == NULL ) || ( pTransportInterface == NULL ) || |
|
2478 |
( pNetworkBuffer == NULL ) ) |
|
2479 |
{ |
|
2480 |
LogError( ( "Argument cannot be NULL: pContext=%p, " |
|
2481 |
"pTransportInterface=%p, " |
|
2482 |
"pNetworkBuffer=%p", |
|
2483 |
( void * ) pContext, |
|
2484 |
( void * ) pTransportInterface, |
|
2485 |
( void * ) pNetworkBuffer ) ); |
|
2486 |
status = MQTTBadParameter; |
|
2487 |
} |
|
2488 |
else if( getTimeFunction == NULL ) |
|
2489 |
{ |
|
2490 |
LogError( ( "Invalid parameter: getTimeFunction is NULL" ) ); |
|
2491 |
status = MQTTBadParameter; |
|
2492 |
} |
|
2493 |
else if( userCallback == NULL ) |
|
2494 |
{ |
|
2495 |
LogError( ( "Invalid parameter: userCallback is NULL" ) ); |
|
2496 |
status = MQTTBadParameter; |
|
2497 |
} |
|
2498 |
else if( pTransportInterface->recv == NULL ) |
|
2499 |
{ |
|
2500 |
LogError( ( "Invalid parameter: pTransportInterface->recv is NULL" ) ); |
|
2501 |
status = MQTTBadParameter; |
|
2502 |
} |
|
2503 |
else if( pTransportInterface->send == NULL ) |
|
2504 |
{ |
|
2505 |
LogError( ( "Invalid parameter: pTransportInterface->send is NULL" ) ); |
|
2506 |
status = MQTTBadParameter; |
|
2507 |
} |
|
2508 |
else |
|
2509 |
{ |
|
2510 |
( void ) memset( pContext, 0x00, sizeof( MQTTContext_t ) ); |
|
2511 |
|
|
2512 |
pContext->connectStatus = MQTTNotConnected; |
|
2513 |
pContext->transportInterface = *pTransportInterface; |
|
2514 |
pContext->getTime = getTimeFunction; |
|
2515 |
pContext->appCallback = userCallback; |
|
2516 |
pContext->networkBuffer = *pNetworkBuffer; |
|
2517 |
|
|
2518 |
/* Zero is not a valid packet ID per MQTT spec. Start from 1. */ |
|
2519 |
pContext->nextPacketId = 1; |
|
2520 |
} |
|
2521 |
|
|
2522 |
return status; |
|
2523 |
} |
|
2524 |
|
|
2525 |
/*-----------------------------------------------------------*/ |
|
2526 |
|
|
2527 |
MQTTStatus_t MQTT_InitStatefulQoS( MQTTContext_t * pContext, |
|
2528 |
MQTTPubAckInfo_t * pOutgoingPublishRecords, |
|
2529 |
size_t outgoingPublishCount, |
|
2530 |
MQTTPubAckInfo_t * pIncomingPublishRecords, |
|
2531 |
size_t incomingPublishCount ) |
|
2532 |
{ |
|
2533 |
MQTTStatus_t status = MQTTSuccess; |
|
2534 |
|
|
2535 |
if( pContext == NULL ) |
|
2536 |
{ |
|
2537 |
LogError( ( "Argument cannot be NULL: pContext=%p\n", |
|
2538 |
( void * ) pContext ) ); |
|
2539 |
status = MQTTBadParameter; |
|
2540 |
} |
|
2541 |
|
|
2542 |
/* Check whether the arguments make sense. Not equal here behaves |
|
2543 |
* like an exclusive-or operator for boolean values. */ |
|
2544 |
else if( ( outgoingPublishCount == 0U ) != |
|
2545 |
( pOutgoingPublishRecords == NULL ) ) |
|
2546 |
{ |
|
2547 |
LogError( ( "Arguments do not match: pOutgoingPublishRecords=%p, " |
|
2548 |
"outgoingPublishCount=%lu", |
|
2549 |
( void * ) pOutgoingPublishRecords, |
|
2550 |
outgoingPublishCount ) ); |
|
2551 |
status = MQTTBadParameter; |
|
2552 |
} |
|
2553 |
|
|
2554 |
/* Check whether the arguments make sense. Not equal here behaves |
|
2555 |
* like an exclusive-or operator for boolean values. */ |
|
2556 |
else if( ( incomingPublishCount == 0U ) != |
|
2557 |
( pIncomingPublishRecords == NULL ) ) |
|
2558 |
{ |
|
2559 |
LogError( ( "Arguments do not match: pIncomingPublishRecords=%p, " |
|
2560 |
"incomingPublishCount=%lu", |
|
2561 |
( void * ) pIncomingPublishRecords, |
|
2562 |
incomingPublishCount ) ); |
|
2563 |
status = MQTTBadParameter; |
|
2564 |
} |
|
2565 |
else if( pContext->appCallback == NULL ) |
|
2566 |
{ |
|
2567 |
LogError( ( "MQTT_InitStatefulQoS must be called only after MQTT_Init has" |
|
2568 |
" been called succesfully.\n" ) ); |
|
2569 |
status = MQTTBadParameter; |
|
2570 |
} |
|
2571 |
else |
|
2572 |
{ |
|
2573 |
pContext->incomingPublishRecordMaxCount = incomingPublishCount; |
|
2574 |
pContext->incomingPublishRecords = pIncomingPublishRecords; |
|
2575 |
pContext->outgoingPublishRecordMaxCount = outgoingPublishCount; |
|
2576 |
pContext->outgoingPublishRecords = pOutgoingPublishRecords; |
|
2577 |
} |
|
2578 |
|
|
2579 |
return status; |
|
2580 |
} |
|
2581 |
|
|
2582 |
/*-----------------------------------------------------------*/ |
|
2583 |
|
|
2584 |
MQTTStatus_t MQTT_CancelCallback( const MQTTContext_t * pContext, |
|
2585 |
uint16_t packetId ) |
|
2586 |
{ |
|
2587 |
MQTTStatus_t status = MQTTSuccess; |
|
2588 |
|
|
2589 |
if( pContext == NULL ) |
|
2590 |
{ |
|
2591 |
LogWarn( ( "pContext is NULL\n" ) ); |
|
2592 |
status = MQTTBadParameter; |
|
2593 |
} |
|
2594 |
else if( pContext->outgoingPublishRecords == NULL ) |
|
2595 |
{ |
|
2596 |
LogError( ( "QoS1/QoS2 is not initialized for use. Please, " |
|
2597 |
"call MQTT_InitStatefulQoS to enable QoS1 and QoS2 " |
|
2598 |
"publishes.\n" ) ); |
|
2599 |
status = MQTTBadParameter; |
|
2600 |
} |
|
2601 |
else |
|
2602 |
{ |
|
2603 |
MQTT_PRE_STATE_UPDATE_HOOK( pContext ); |
|
2604 |
|
|
2605 |
status = MQTT_RemoveStateRecord( pContext, |
|
2606 |
packetId ); |
|
2607 |
|
|
2608 |
MQTT_POST_STATE_UPDATE_HOOK( pContext ); |
|
2609 |
} |
|
2610 |
|
|
2611 |
return status; |
|
2612 |
} |
|
2613 |
|
|
2614 |
/*-----------------------------------------------------------*/ |
|
2615 |
|
|
2616 |
MQTTStatus_t MQTT_Connect( MQTTContext_t * pContext, |
|
2617 |
const MQTTConnectInfo_t * pConnectInfo, |
|
2618 |
const MQTTPublishInfo_t * pWillInfo, |
|
2619 |
uint32_t timeoutMs, |
|
2620 |
bool * pSessionPresent ) |
|
2621 |
{ |
|
2622 |
size_t remainingLength = 0UL, packetSize = 0UL; |
|
2623 |
MQTTStatus_t status = MQTTSuccess; |
|
2624 |
MQTTPacketInfo_t incomingPacket = { 0 }; |
|
2625 |
|
|
2626 |
incomingPacket.type = ( uint8_t ) 0; |
|
2627 |
|
|
2628 |
if( ( pContext == NULL ) || ( pConnectInfo == NULL ) || ( pSessionPresent == NULL ) ) |
|
2629 |
{ |
|
2630 |
LogError( ( "Argument cannot be NULL: pContext=%p, " |
|
2631 |
"pConnectInfo=%p, pSessionPresent=%p.", |
|
2632 |
( void * ) pContext, |
|
2633 |
( void * ) pConnectInfo, |
|
2634 |
( void * ) pSessionPresent ) ); |
|
2635 |
status = MQTTBadParameter; |
|
2636 |
} |
|
2637 |
|
|
2638 |
if( status == MQTTSuccess ) |
|
2639 |
{ |
|
2640 |
/* Get MQTT connect packet size and remaining length. */ |
|
2641 |
status = MQTT_GetConnectPacketSize( pConnectInfo, |
|
2642 |
pWillInfo, |
|
2643 |
&remainingLength, |
|
2644 |
&packetSize ); |
|
2645 |
LogDebug( ( "CONNECT packet size is %lu and remaining length is %lu.", |
|
2646 |
( unsigned long ) packetSize, |
|
2647 |
( unsigned long ) remainingLength ) ); |
|
2648 |
} |
|
2649 |
|
|
2650 |
if( status == MQTTSuccess ) |
|
2651 |
{ |
|
2652 |
MQTT_PRE_SEND_HOOK( pContext ); |
|
2653 |
|
|
2654 |
status = sendConnectWithoutCopy( pContext, |
|
2655 |
pConnectInfo, |
|
2656 |
pWillInfo, |
|
2657 |
remainingLength ); |
|
2658 |
|
|
2659 |
MQTT_POST_SEND_HOOK( pContext ); |
|
2660 |
} |
|
2661 |
|
|
2662 |
/* Read CONNACK from transport layer. */ |
|
2663 |
if( status == MQTTSuccess ) |
|
2664 |
{ |
|
2665 |
status = receiveConnack( pContext, |
|
2666 |
timeoutMs, |
|
2667 |
pConnectInfo->cleanSession, |
|
2668 |
&incomingPacket, |
|
2669 |
pSessionPresent ); |
|
2670 |
} |
|
2671 |
|
|
2672 |
if( status == MQTTSuccess ) |
|
2673 |
{ |
|
2674 |
/* Resend PUBRELs when reestablishing a session, or clear records for new sessions. */ |
|
2675 |
status = handleSessionResumption( pContext, *pSessionPresent ); |
|
2676 |
} |
|
2677 |
|
|
2678 |
if( status == MQTTSuccess ) |
|
2679 |
{ |
|
2680 |
LogInfo( ( "MQTT connection established with the broker." ) ); |
|
2681 |
pContext->connectStatus = MQTTConnected; |
|
2682 |
/* Initialize keep-alive fields after a successful connection. */ |
|
2683 |
pContext->keepAliveIntervalSec = pConnectInfo->keepAliveSeconds; |
|
2684 |
pContext->waitingForPingResp = false; |
|
2685 |
pContext->pingReqSendTimeMs = 0U; |
|
2686 |
} |
|
2687 |
else |
|
2688 |
{ |
|
2689 |
LogError( ( "MQTT connection failed with status = %s.", |
|
2690 |
MQTT_Status_strerror( status ) ) ); |
|
2691 |
} |
|
2692 |
|
|
2693 |
return status; |
|
2694 |
} |
|
2695 |
|
|
2696 |
/*-----------------------------------------------------------*/ |
|
2697 |
|
|
2698 |
MQTTStatus_t MQTT_Subscribe( MQTTContext_t * pContext, |
|
2699 |
const MQTTSubscribeInfo_t * pSubscriptionList, |
|
2700 |
size_t subscriptionCount, |
|
2701 |
uint16_t packetId ) |
|
2702 |
{ |
|
2703 |
size_t remainingLength = 0UL, packetSize = 0UL; |
|
2704 |
|
|
2705 |
/* Validate arguments. */ |
|
2706 |
MQTTStatus_t status = validateSubscribeUnsubscribeParams( pContext, |
|
2707 |
pSubscriptionList, |
|
2708 |
subscriptionCount, |
|
2709 |
packetId ); |
|
2710 |
|
|
2711 |
if( status == MQTTSuccess ) |
|
2712 |
{ |
|
2713 |
/* Get the remaining length and packet size.*/ |
|
2714 |
status = MQTT_GetSubscribePacketSize( pSubscriptionList, |
|
2715 |
subscriptionCount, |
|
2716 |
&remainingLength, |
|
2717 |
&packetSize ); |
|
2718 |
LogDebug( ( "SUBSCRIBE packet size is %lu and remaining length is %lu.", |
|
2719 |
( unsigned long ) packetSize, |
|
2720 |
( unsigned long ) remainingLength ) ); |
|
2721 |
} |
|
2722 |
|
|
2723 |
if( status == MQTTSuccess ) |
|
2724 |
{ |
|
2725 |
MQTT_PRE_SEND_HOOK( pContext ); |
|
2726 |
|
|
2727 |
/* Send MQTT SUBSCRIBE packet. */ |
|
2728 |
status = sendSubscribeWithoutCopy( pContext, |
|
2729 |
pSubscriptionList, |
|
2730 |
subscriptionCount, |
|
2731 |
packetId, |
|
2732 |
remainingLength ); |
|
2733 |
|
|
2734 |
MQTT_POST_SEND_HOOK( pContext ); |
|
2735 |
} |
|
2736 |
|
|
2737 |
return status; |
|
2738 |
} |
|
2739 |
|
|
2740 |
/*-----------------------------------------------------------*/ |
|
2741 |
|
|
2742 |
MQTTStatus_t MQTT_Publish( MQTTContext_t * pContext, |
|
2743 |
const MQTTPublishInfo_t * pPublishInfo, |
|
2744 |
uint16_t packetId ) |
|
2745 |
{ |
|
2746 |
size_t headerSize = 0UL; |
|
2747 |
size_t remainingLength = 0UL; |
|
2748 |
size_t packetSize = 0UL; |
|
2749 |
MQTTPublishState_t publishStatus = MQTTStateNull; |
|
2750 |
bool stateUpdateHookExecuted = false; |
|
2751 |
|
|
2752 |
/* 1 header byte + 4 bytes (maximum) required for encoding the length + |
|
2753 |
* 2 bytes for topic string. */ |
|
2754 |
uint8_t mqttHeader[ 7 ]; |
|
2755 |
|
|
2756 |
/* Validate arguments. */ |
|
2757 |
MQTTStatus_t status = validatePublishParams( pContext, pPublishInfo, packetId ); |
|
2758 |
|
|
2759 |
if( status == MQTTSuccess ) |
|
2760 |
{ |
|
2761 |
/* Get the remaining length and packet size.*/ |
|
2762 |
status = MQTT_GetPublishPacketSize( pPublishInfo, |
|
2763 |
&remainingLength, |
|
2764 |
&packetSize ); |
|
2765 |
} |
|
2766 |
|
|
2767 |
if( status == MQTTSuccess ) |
|
2768 |
{ |
|
2769 |
status = MQTT_SerializePublishHeaderWithoutTopic( pPublishInfo, |
|
2770 |
remainingLength, |
|
2771 |
mqttHeader, |
|
2772 |
&headerSize ); |
|
2773 |
} |
|
2774 |
|
|
2775 |
if( ( status == MQTTSuccess ) && ( pPublishInfo->qos > MQTTQoS0 ) ) |
|
2776 |
{ |
|
2777 |
MQTT_PRE_STATE_UPDATE_HOOK( pContext ); |
|
2778 |
|
|
2779 |
/* Set the flag so that the corresponding hook can be called later. */ |
|
2780 |
stateUpdateHookExecuted = true; |
|
2781 |
|
|
2782 |
status = MQTT_ReserveState( pContext, |
|
2783 |
packetId, |
|
2784 |
pPublishInfo->qos ); |
|
2785 |
|
|
2786 |
/* State already exists for a duplicate packet. |
|
2787 |
* If a state doesn't exist, it will be handled as a new publish in |
|
2788 |
* state engine. */ |
|
2789 |
if( ( status == MQTTStateCollision ) && ( pPublishInfo->dup == true ) ) |
|
2790 |
{ |
|
2791 |
status = MQTTSuccess; |
|
2792 |
} |
|
2793 |
} |
|
2794 |
|
|
2795 |
if( status == MQTTSuccess ) |
|
2796 |
{ |
|
2797 |
/* Take the mutex as multiple send calls are required for sending this |
|
2798 |
* packet. */ |
|
2799 |
MQTT_PRE_SEND_HOOK( pContext ); |
|
2800 |
|
|
2801 |
status = sendPublishWithoutCopy( pContext, |
|
2802 |
pPublishInfo, |
|
2803 |
mqttHeader, |
|
2804 |
headerSize, |
|
2805 |
packetId ); |
|
2806 |
|
|
2807 |
/* Give the mutex away for the next taker. */ |
|
2808 |
MQTT_POST_SEND_HOOK( pContext ); |
|
2809 |
} |
|
2810 |
|
|
2811 |
if( ( status == MQTTSuccess ) && |
|
2812 |
( pPublishInfo->qos > MQTTQoS0 ) ) |
|
2813 |
{ |
|
2814 |
/* Update state machine after PUBLISH is sent. |
|
2815 |
* Only to be done for QoS1 or QoS2. */ |
|
2816 |
status = MQTT_UpdateStatePublish( pContext, |
|
2817 |
packetId, |
|
2818 |
MQTT_SEND, |
|
2819 |
pPublishInfo->qos, |
|
2820 |
&publishStatus ); |
|
2821 |
|
|
2822 |
if( status != MQTTSuccess ) |
|
2823 |
{ |
|
2824 |
LogError( ( "Update state for publish failed with status %s." |
|
2825 |
" However PUBLISH packet was sent to the broker." |
|
2826 |
" Any further handling of ACKs for the packet Id" |
|
2827 |
" will fail.", |
|
2828 |
MQTT_Status_strerror( status ) ) ); |
|
2829 |
} |
|
2830 |
} |
|
2831 |
|
|
2832 |
if( stateUpdateHookExecuted == true ) |
|
2833 |
{ |
|
2834 |
/* Regardless of the status, if the mutex was taken due to the |
|
2835 |
* packet being of QoS > QoS0, then it should be relinquished. */ |
|
2836 |
MQTT_POST_STATE_UPDATE_HOOK( pContext ); |
|
2837 |
} |
|
2838 |
|
|
2839 |
if( status != MQTTSuccess ) |
|
2840 |
{ |
|
2841 |
LogError( ( "MQTT PUBLISH failed with status %s.", |
|
2842 |
MQTT_Status_strerror( status ) ) ); |
|
2843 |
} |
|
2844 |
|
|
2845 |
return status; |
|
2846 |
} |
|
2847 |
|
|
2848 |
/*-----------------------------------------------------------*/ |
|
2849 |
|
|
2850 |
MQTTStatus_t MQTT_Ping( MQTTContext_t * pContext ) |
|
2851 |
{ |
|
2852 |
int32_t sendResult = 0; |
|
2853 |
MQTTStatus_t status = MQTTSuccess; |
|
2854 |
size_t packetSize = 0U; |
|
2855 |
/* MQTT ping packets are of fixed length. */ |
|
2856 |
uint8_t pingreqPacket[ 2U ]; |
|
2857 |
MQTTFixedBuffer_t localBuffer; |
|
2858 |
|
|
2859 |
localBuffer.pBuffer = pingreqPacket; |
|
2860 |
localBuffer.size = 2U; |
|
2861 |
|
|
2862 |
if( pContext == NULL ) |
|
2863 |
{ |
|
2864 |
LogError( ( "pContext is NULL." ) ); |
|
2865 |
status = MQTTBadParameter; |
|
2866 |
} |
|
2867 |
|
|
2868 |
if( status == MQTTSuccess ) |
|
2869 |
{ |
|
2870 |
/* Get MQTT PINGREQ packet size. */ |
|
2871 |
status = MQTT_GetPingreqPacketSize( &packetSize ); |
|
2872 |
|
|
2873 |
if( status == MQTTSuccess ) |
|
2874 |
{ |
|
2875 |
LogDebug( ( "MQTT PINGREQ packet size is %lu.", |
|
2876 |
( unsigned long ) packetSize ) ); |
|
2877 |
} |
|
2878 |
else |
|
2879 |
{ |
|
2880 |
LogError( ( "Failed to get the PINGREQ packet size." ) ); |
|
2881 |
} |
|
2882 |
} |
|
2883 |
|
|
2884 |
if( status == MQTTSuccess ) |
|
2885 |
{ |
|
2886 |
/* Serialize MQTT PINGREQ. */ |
|
2887 |
status = MQTT_SerializePingreq( &localBuffer ); |
|
2888 |
} |
|
2889 |
|
|
2890 |
if( status == MQTTSuccess ) |
|
2891 |
{ |
|
2892 |
/* Take the mutex as the send call should not be interrupted in |
|
2893 |
* between. */ |
|
2894 |
MQTT_PRE_SEND_HOOK( pContext ); |
|
2895 |
|
|
2896 |
/* Send the serialized PINGREQ packet to transport layer. |
|
2897 |
* Here, we do not use the vectored IO approach for efficiency as the |
|
2898 |
* Ping packet does not have numerous fields which need to be copied |
|
2899 |
* from the user provided buffers. Thus it can be sent directly. */ |
|
2900 |
sendResult = sendBuffer( pContext, |
|
2901 |
localBuffer.pBuffer, |
|
2902 |
2U ); |
|
2903 |
|
|
2904 |
/* Give the mutex away. */ |
|
2905 |
MQTT_POST_SEND_HOOK( pContext ); |
|
2906 |
|
|
2907 |
/* It is an error to not send the entire PINGREQ packet. */ |
|
2908 |
if( sendResult < ( int32_t ) packetSize ) |
|
2909 |
{ |
|
2910 |
LogError( ( "Transport send failed for PINGREQ packet." ) ); |
|
2911 |
status = MQTTSendFailed; |
|
2912 |
} |
|
2913 |
else |
|
2914 |
{ |
|
2915 |
pContext->pingReqSendTimeMs = pContext->lastPacketTxTime; |
|
2916 |
pContext->waitingForPingResp = true; |
|
2917 |
LogDebug( ( "Sent %ld bytes of PINGREQ packet.", |
|
2918 |
( long int ) sendResult ) ); |
|
2919 |
} |
|
2920 |
} |
|
2921 |
|
|
2922 |
return status; |
|
2923 |
} |
|
2924 |
|
|
2925 |
/*-----------------------------------------------------------*/ |
|
2926 |
|
|
2927 |
MQTTStatus_t MQTT_Unsubscribe( MQTTContext_t * pContext, |
|
2928 |
const MQTTSubscribeInfo_t * pSubscriptionList, |
|
2929 |
size_t subscriptionCount, |
|
2930 |
uint16_t packetId ) |
|
2931 |
{ |
|
2932 |
size_t remainingLength = 0UL, packetSize = 0UL; |
|
2933 |
|
|
2934 |
/* Validate arguments. */ |
|
2935 |
MQTTStatus_t status = validateSubscribeUnsubscribeParams( pContext, |
|
2936 |
pSubscriptionList, |
|
2937 |
subscriptionCount, |
|
2938 |
packetId ); |
|
2939 |
|
|
2940 |
if( status == MQTTSuccess ) |
|
2941 |
{ |
|
2942 |
/* Get the remaining length and packet size.*/ |
|
2943 |
status = MQTT_GetUnsubscribePacketSize( pSubscriptionList, |
|
2944 |
subscriptionCount, |
|
2945 |
&remainingLength, |
|
2946 |
&packetSize ); |
|
2947 |
LogDebug( ( "UNSUBSCRIBE packet size is %lu and remaining length is %lu.", |
|
2948 |
( unsigned long ) packetSize, |
|
2949 |
( unsigned long ) remainingLength ) ); |
|
2950 |
} |
|
2951 |
|
|
2952 |
if( status == MQTTSuccess ) |
|
2953 |
{ |
|
2954 |
/* Take the mutex because the below call should not be interrupted. */ |
|
2955 |
MQTT_PRE_SEND_HOOK( pContext ); |
|
2956 |
|
|
2957 |
status = sendUnsubscribeWithoutCopy( pContext, |
|
2958 |
pSubscriptionList, |
|
2959 |
subscriptionCount, |
|
2960 |
packetId, |
|
2961 |
remainingLength ); |
|
2962 |
|
|
2963 |
/* Give the mutex away. */ |
|
2964 |
MQTT_POST_SEND_HOOK( pContext ); |
|
2965 |
} |
|
2966 |
|
|
2967 |
return status; |
|
2968 |
} |
|
2969 |
|
|
2970 |
/*-----------------------------------------------------------*/ |
|
2971 |
|
|
2972 |
MQTTStatus_t MQTT_Disconnect( MQTTContext_t * pContext ) |
|
2973 |
{ |
|
2974 |
size_t packetSize = 0U; |
|
2975 |
int32_t sendResult = 0; |
|
2976 |
MQTTStatus_t status = MQTTSuccess; |
|
2977 |
MQTTFixedBuffer_t localBuffer; |
|
2978 |
uint8_t disconnectPacket[ 2U ]; |
|
2979 |
|
|
2980 |
localBuffer.pBuffer = disconnectPacket; |
|
2981 |
localBuffer.size = 2U; |
|
2982 |
|
|
2983 |
/* Validate arguments. */ |
|
2984 |
if( pContext == NULL ) |
|
2985 |
{ |
|
2986 |
LogError( ( "pContext cannot be NULL." ) ); |
|
2987 |
status = MQTTBadParameter; |
|
2988 |
} |
|
2989 |
|
|
2990 |
if( status == MQTTSuccess ) |
|
2991 |
{ |
|
2992 |
/* Get MQTT DISCONNECT packet size. */ |
|
2993 |
status = MQTT_GetDisconnectPacketSize( &packetSize ); |
|
2994 |
LogDebug( ( "MQTT DISCONNECT packet size is %lu.", |
|
2995 |
( unsigned long ) packetSize ) ); |
|
2996 |
} |
|
2997 |
|
|
2998 |
if( status == MQTTSuccess ) |
|
2999 |
{ |
|
3000 |
/* Serialize MQTT DISCONNECT packet. */ |
|
3001 |
status = MQTT_SerializeDisconnect( &localBuffer ); |
|
3002 |
} |
|
3003 |
|
|
3004 |
if( status == MQTTSuccess ) |
|
3005 |
{ |
|
3006 |
/* Take the mutex because the below call should not be interrupted. */ |
|
3007 |
MQTT_PRE_SEND_HOOK( pContext ); |
|
3008 |
|
|
3009 |
/* Here we do not use vectors as the disconnect packet has fixed fields |
|
3010 |
* which do not reside in user provided buffers. Thus, it can be sent |
|
3011 |
* using a simple send call. */ |
|
3012 |
sendResult = sendBuffer( pContext, |
|
3013 |
localBuffer.pBuffer, |
|
3014 |
packetSize ); |
|
3015 |
|
|
3016 |
/* Give the mutex away. */ |
|
3017 |
MQTT_POST_SEND_HOOK( pContext ); |
|
3018 |
|
|
3019 |
if( sendResult < ( int32_t ) packetSize ) |
|
3020 |
{ |
|
3021 |
LogError( ( "Transport send failed for DISCONNECT packet." ) ); |
|
3022 |
status = MQTTSendFailed; |
|
3023 |
} |
|
3024 |
else |
|
3025 |
{ |
|
3026 |
LogDebug( ( "Sent %ld bytes of DISCONNECT packet.", |
|
3027 |
( long int ) sendResult ) ); |
|
3028 |
} |
|
3029 |
} |
|
3030 |
|
|
3031 |
if( status == MQTTSuccess ) |
|
3032 |
{ |
|
3033 |
LogInfo( ( "Disconnected from the broker." ) ); |
|
3034 |
pContext->connectStatus = MQTTNotConnected; |
|
3035 |
|
|
3036 |
/* Reset the index and clean the buffer on a successful disconnect. */ |
|
3037 |
pContext->index = 0; |
|
3038 |
( void ) memset( pContext->networkBuffer.pBuffer, 0, pContext->networkBuffer.size ); |
|
3039 |
} |
|
3040 |
|
|
3041 |
return status; |
|
3042 |
} |
|
3043 |
|
|
3044 |
/*-----------------------------------------------------------*/ |
|
3045 |
|
|
3046 |
MQTTStatus_t MQTT_ProcessLoop( MQTTContext_t * pContext ) |
|
3047 |
{ |
|
3048 |
MQTTStatus_t status = MQTTBadParameter; |
|
3049 |
|
|
3050 |
if( pContext == NULL ) |
|
3051 |
{ |
|
3052 |
LogError( ( "Invalid input parameter: MQTT Context cannot be NULL." ) ); |
|
3053 |
} |
|
3054 |
else if( pContext->getTime == NULL ) |
|
3055 |
{ |
|
3056 |
LogError( ( "Invalid input parameter: MQTT Context must have valid getTime." ) ); |
|
3057 |
} |
|
3058 |
else if( pContext->networkBuffer.pBuffer == NULL ) |
|
3059 |
{ |
|
3060 |
LogError( ( "Invalid input parameter: The MQTT context's networkBuffer must not be NULL." ) ); |
|
3061 |
} |
|
3062 |
else |
|
3063 |
{ |
|
3064 |
pContext->controlPacketSent = false; |
|
3065 |
status = receiveSingleIteration( pContext, true ); |
|
3066 |
} |
|
3067 |
|
|
3068 |
return status; |
|
3069 |
} |
|
3070 |
|
|
3071 |
/*-----------------------------------------------------------*/ |
|
3072 |
|
|
3073 |
MQTTStatus_t MQTT_ReceiveLoop( MQTTContext_t * pContext ) |
|
3074 |
{ |
|
3075 |
MQTTStatus_t status = MQTTBadParameter; |
|
3076 |
|
|
3077 |
if( pContext == NULL ) |
|
3078 |
{ |
|
3079 |
LogError( ( "Invalid input parameter: MQTT Context cannot be NULL." ) ); |
|
3080 |
} |
|
3081 |
else if( pContext->getTime == NULL ) |
|
3082 |
{ |
|
3083 |
LogError( ( "Invalid input parameter: MQTT Context must have a valid getTime function." ) ); |
|
3084 |
} |
|
3085 |
else if( pContext->networkBuffer.pBuffer == NULL ) |
|
3086 |
{ |
|
3087 |
LogError( ( "Invalid input parameter: MQTT context's networkBuffer must not be NULL." ) ); |
|
3088 |
} |
|
3089 |
else |
|
3090 |
{ |
|
3091 |
status = receiveSingleIteration( pContext, false ); |
|
3092 |
} |
|
3093 |
|
|
3094 |
return status; |
|
3095 |
} |
|
3096 |
|
|
3097 |
/*-----------------------------------------------------------*/ |
|
3098 |
|
|
3099 |
uint16_t MQTT_GetPacketId( MQTTContext_t * pContext ) |
|
3100 |
{ |
|
3101 |
uint16_t packetId = 0U; |
|
3102 |
|
|
3103 |
if( pContext != NULL ) |
|
3104 |
{ |
|
3105 |
MQTT_PRE_STATE_UPDATE_HOOK( pContext ); |
|
3106 |
|
|
3107 |
packetId = pContext->nextPacketId; |
|
3108 |
|
|
3109 |
/* A packet ID of zero is not a valid packet ID. When the max ID |
|
3110 |
* is reached the next one should start at 1. */ |
|
3111 |
if( pContext->nextPacketId == ( uint16_t ) UINT16_MAX ) |
|
3112 |
{ |
|
3113 |
pContext->nextPacketId = 1; |
|
3114 |
} |
|
3115 |
else |
|
3116 |
{ |
|
3117 |
pContext->nextPacketId++; |
|
3118 |
} |
|
3119 |
|
|
3120 |
MQTT_POST_STATE_UPDATE_HOOK( pContext ); |
|
3121 |
} |
|
3122 |
|
|
3123 |
return packetId; |
|
3124 |
} |
|
3125 |
|
|
3126 |
/*-----------------------------------------------------------*/ |
|
3127 |
|
|
3128 |
MQTTStatus_t MQTT_MatchTopic( const char * pTopicName, |
|
3129 |
const uint16_t topicNameLength, |
|
3130 |
const char * pTopicFilter, |
|
3131 |
const uint16_t topicFilterLength, |
|
3132 |
bool * pIsMatch ) |
|
3133 |
{ |
|
3134 |
MQTTStatus_t status = MQTTSuccess; |
|
3135 |
bool topicFilterStartsWithWildcard = false; |
|
3136 |
bool matchStatus = false; |
|
3137 |
|
|
3138 |
if( ( pTopicName == NULL ) || ( topicNameLength == 0u ) ) |
|
3139 |
{ |
|
3140 |
LogError( ( "Invalid paramater: Topic name should be non-NULL and its " |
|
3141 |
"length should be > 0: TopicName=%p, TopicNameLength=%hu", |
|
3142 |
( void * ) pTopicName, |
|
3143 |
( unsigned short ) topicNameLength ) ); |
|
3144 |
|
|
3145 |
status = MQTTBadParameter; |
|
3146 |
} |
|
3147 |
else if( ( pTopicFilter == NULL ) || ( topicFilterLength == 0u ) ) |
|
3148 |
{ |
|
3149 |
LogError( ( "Invalid paramater: Topic filter should be non-NULL and " |
|
3150 |
"its length should be > 0: TopicName=%p, TopicFilterLength=%hu", |
|
3151 |
( void * ) pTopicFilter, |
|
3152 |
( unsigned short ) topicFilterLength ) ); |
|
3153 |
status = MQTTBadParameter; |
|
3154 |
} |
|
3155 |
else if( pIsMatch == NULL ) |
|
3156 |
{ |
|
3157 |
LogError( ( "Invalid paramater: Output parameter, pIsMatch, is NULL" ) ); |
|
3158 |
status = MQTTBadParameter; |
|
3159 |
} |
|
3160 |
else |
|
3161 |
{ |
|
3162 |
/* Check for an exact match if the incoming topic name and the registered |
|
3163 |
* topic filter length match. */ |
|
3164 |
if( topicNameLength == topicFilterLength ) |
|
3165 |
{ |
|
3166 |
matchStatus = strncmp( pTopicName, pTopicFilter, topicNameLength ) == 0; |
|
3167 |
} |
|
3168 |
|
|
3169 |
if( matchStatus == false ) |
|
3170 |
{ |
|
3171 |
/* If an exact match was not found, match against wildcard characters in |
|
3172 |
* topic filter.*/ |
|
3173 |
|
|
3174 |
/* Determine if topic filter starts with a wildcard. */ |
|
3175 |
topicFilterStartsWithWildcard = ( pTopicFilter[ 0 ] == '+' ) || |
|
3176 |
( pTopicFilter[ 0 ] == '#' ); |
|
3177 |
|
|
3178 |
/* Note: According to the MQTT 3.1.1 specification, incoming PUBLISH topic names |
|
3179 |
* starting with "$" character cannot be matched against topic filter starting with |
|
3180 |
* a wildcard, i.e. for example, "$SYS/sport" cannot be matched with "#" or |
|
3181 |
* "+/sport" topic filters. */ |
|
3182 |
if( !( ( pTopicName[ 0 ] == '$' ) && ( topicFilterStartsWithWildcard == true ) ) ) |
|
3183 |
{ |
|
3184 |
matchStatus = matchTopicFilter( pTopicName, topicNameLength, pTopicFilter, topicFilterLength ); |
|
3185 |
} |
|
3186 |
} |
|
3187 |
|
|
3188 |
/* Update the output parameter with the match result. */ |
|
3189 |
*pIsMatch = matchStatus; |
|
3190 |
} |
|
3191 |
|
|
3192 |
return status; |
|
3193 |
} |
|
3194 |
|
|
3195 |
/*-----------------------------------------------------------*/ |
|
3196 |
|
|
3197 |
MQTTStatus_t MQTT_GetSubAckStatusCodes( const MQTTPacketInfo_t * pSubackPacket, |
|
3198 |
uint8_t ** pPayloadStart, |
|
3199 |
size_t * pPayloadSize ) |
|
3200 |
{ |
|
3201 |
MQTTStatus_t status = MQTTSuccess; |
|
3202 |
|
|
3203 |
if( pSubackPacket == NULL ) |
|
3204 |
{ |
|
3205 |
LogError( ( "Invalid parameter: pSubackPacket is NULL." ) ); |
|
3206 |
status = MQTTBadParameter; |
|
3207 |
} |
|
3208 |
else if( pPayloadStart == NULL ) |
|
3209 |
{ |
|
3210 |
LogError( ( "Invalid parameter: pPayloadStart is NULL." ) ); |
|
3211 |
status = MQTTBadParameter; |
|
3212 |
} |
|
3213 |
else if( pPayloadSize == NULL ) |
|
3214 |
{ |
|
3215 |
LogError( ( "Invalid parameter: pPayloadSize is NULL." ) ); |
|
3216 |
status = MQTTBadParameter; |
|
3217 |
} |
|
3218 |
else if( pSubackPacket->type != MQTT_PACKET_TYPE_SUBACK ) |
|
3219 |
{ |
|
3220 |
LogError( ( "Invalid parameter: Input packet is not a SUBACK packet: " |
|
3221 |
"ExpectedType=%02x, InputType=%02x", |
|
3222 |
( int ) MQTT_PACKET_TYPE_SUBACK, |
|
3223 |
( int ) pSubackPacket->type ) ); |
|
3224 |
status = MQTTBadParameter; |
|
3225 |
} |
|
3226 |
else if( pSubackPacket->pRemainingData == NULL ) |
|
3227 |
{ |
|
3228 |
LogError( ( "Invalid parameter: pSubackPacket->pRemainingData is NULL" ) ); |
|
3229 |
status = MQTTBadParameter; |
|
3230 |
} |
|
3231 |
|
|
3232 |
/* A SUBACK must have a remaining length of at least 3 to accommodate the |
|
3233 |
* packet identifier and at least 1 return code. */ |
|
3234 |
else if( pSubackPacket->remainingLength < 3U ) |
|
3235 |
{ |
|
3236 |
LogError( ( "Invalid parameter: Packet remaining length is invalid: " |
|
3237 |
"Should be greater than 2 for SUBACK packet: InputRemainingLength=%lu", |
|
3238 |
( unsigned long ) pSubackPacket->remainingLength ) ); |
|
3239 |
status = MQTTBadParameter; |
|
3240 |
} |
|
3241 |
else |
|
3242 |
{ |
|
3243 |
/* According to the MQTT 3.1.1 protocol specification, the "Remaining Length" field is a |
|
3244 |
* length of the variable header (2 bytes) plus the length of the payload. |
|
3245 |
* Therefore, we add 2 positions for the starting address of the payload, and |
|
3246 |
* subtract 2 bytes from the remaining length for the length of the payload.*/ |
|
3247 |
*pPayloadStart = &pSubackPacket->pRemainingData[ sizeof( uint16_t ) ]; |
|
3248 |
*pPayloadSize = pSubackPacket->remainingLength - sizeof( uint16_t ); |
|
3249 |
} |
|
3250 |
|
|
3251 |
return status; |
|
3252 |
} |
|
3253 |
|
|
3254 |
/*-----------------------------------------------------------*/ |
|
3255 |
|
|
3256 |
const char * MQTT_Status_strerror( MQTTStatus_t status ) |
|
3257 |
{ |
|
3258 |
const char * str = NULL; |
|
3259 |
|
|
3260 |
switch( status ) |
|
3261 |
{ |
|
3262 |
case MQTTSuccess: |
|
3263 |
str = "MQTTSuccess"; |
|
3264 |
break; |
|
3265 |
|
|
3266 |
case MQTTBadParameter: |
|
3267 |
str = "MQTTBadParameter"; |
|
3268 |
break; |
|
3269 |
|
|
3270 |
case MQTTNoMemory: |
|
3271 |
str = "MQTTNoMemory"; |
|
3272 |
break; |
|
3273 |
|
|
3274 |
case MQTTSendFailed: |
|
3275 |
str = "MQTTSendFailed"; |
|
3276 |
break; |
|
3277 |
|
|
3278 |
case MQTTRecvFailed: |
|
3279 |
str = "MQTTRecvFailed"; |
|
3280 |
break; |
|
3281 |
|
|
3282 |
case MQTTBadResponse: |
|
3283 |
str = "MQTTBadResponse"; |
|
3284 |
break; |
|
3285 |
|
|
3286 |
case MQTTServerRefused: |
|
3287 |
str = "MQTTServerRefused"; |
|
3288 |
break; |
|
3289 |
|
|
3290 |
case MQTTNoDataAvailable: |
|
3291 |
str = "MQTTNoDataAvailable"; |
|
3292 |
break; |
|
3293 |
|
|
3294 |
case MQTTIllegalState: |
|
3295 |
str = "MQTTIllegalState"; |
|
3296 |
break; |
|
3297 |
|
|
3298 |
case MQTTStateCollision: |
|
3299 |
str = "MQTTStateCollision"; |
|
3300 |
break; |
|
3301 |
|
|
3302 |
case MQTTKeepAliveTimeout: |
|
3303 |
str = "MQTTKeepAliveTimeout"; |
|
3304 |
break; |
|
3305 |
|
|
3306 |
default: |
|
3307 |
str = "Invalid MQTT Status code"; |
|
3308 |
break; |
|
3309 |
} |
|
3310 |
|
|
3311 |
return str; |
|
3312 |
} |
|
3313 |
|
|
3314 |
/*-----------------------------------------------------------*/ |