Skip to content

Commit

Permalink
Add MqttClient_PropsAdd_ex for multithreaded apps
Browse files Browse the repository at this point in the history
  • Loading branch information
embhorn committed Aug 10, 2023
1 parent 65a88f9 commit e837f5e
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 21 deletions.
82 changes: 69 additions & 13 deletions examples/multithread/multithread.c
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ static int mNumMsgsDone;
#define THREAD_CREATE(h, f, c) ((*h = CreateThread(NULL, 0, f, c, 0, NULL)) == NULL)
#define THREAD_JOIN(h, c) WaitForMultipleObjects(c, h, TRUE, INFINITE)
#define THREAD_EXIT(e) ExitThread(e)
#define SLEEP(n) Sleep(n*1000)
#else
/* Posix (Linux/Mac) */
#include <pthread.h>
Expand All @@ -67,6 +68,7 @@ static int mNumMsgsDone;
#define THREAD_CREATE(h, f, c) ({ int ret = pthread_create(h, NULL, f, c); if (ret) { errno = ret; } ret; })
#define THREAD_JOIN(h, c) ({ int ret, x; for(x=0;x<c;x++) { ret = pthread_join(h[x], NULL); if (ret) { errno = ret; break; }} ret; })
#define THREAD_EXIT(e) pthread_exit((void*)e)
#define SLEEP(n) sleep(n)
#endif

static wm_Sem mtLock; /* Protect "packetId" and "stop" */
Expand Down Expand Up @@ -196,6 +198,25 @@ static int mqtt_message_cb(MqttClient *client, MqttMessage *msg,
if (msg_done) {
PRINTF("MQTT Message: Done");
}
#ifdef WOLFMQTT_V5
{
/* Properties can be checked in the message callback */
MqttProp *prop = msg->props;
while (prop != NULL)
{
if (prop->type == MQTT_PROP_CONTENT_TYPE) {
PRINTF("Content type: %.*s", prop->data_str.len,
prop->data_str.str);
}
if (prop->type == MQTT_PROP_USER_PROP) {
PRINTF("User property: key=\"%.*s\", value=\"%.*s\"",
prop->data_str.len, prop->data_str.str,
prop->data_str2.len, prop->data_str2.str);
}
prop = prop->next;
}
}
#endif
wm_SemUnlock(&mtLock);

/* Return negative to terminate publish processing */
Expand Down Expand Up @@ -396,6 +417,9 @@ static void *subscribe_task(void *param)
uint16_t i;
MQTTCtx *mqttCtx = (MQTTCtx*)param;
word32 startSec = 0;
#ifdef WOLFMQTT_V5
MqttProp prop;
#endif

/* Build list of topics */
XMEMSET(&mqttCtx->subscribe, 0, sizeof(MqttSubscribe));
Expand All @@ -406,10 +430,14 @@ static void *subscribe_task(void *param)
#ifdef WOLFMQTT_V5
if (mqttCtx->subId_not_avail != 1) {
/* Subscription Identifier */
MqttProp* prop;
prop = MqttClient_PropsAdd(&mqttCtx->subscribe.props);
prop->type = MQTT_PROP_SUBSCRIPTION_ID;
prop->data_int = DEFAULT_SUB_ID;
XMEMSET(&prop, 0, sizeof(MqttProp));
prop.type = MQTT_PROP_SUBSCRIPTION_ID;
prop.data_int = DEFAULT_SUB_ID;
rc = MqttClient_PropsAdd_ex(&mqttCtx->subscribe.props, &prop);
if (rc != MQTT_CODE_SUCCESS) {
PRINTF("MQTT Subscribe property add failure: %s (%d)",
MqttClient_ReturnCodeToString(rc), rc);
}
}
#endif

Expand Down Expand Up @@ -441,12 +469,6 @@ static void *subscribe_task(void *param)
}
}

#ifdef WOLFMQTT_V5
if (mqttCtx->subscribe.props != NULL) {
MqttClient_PropsFree(mqttCtx->subscribe.props);
}
#endif

THREAD_EXIT(0);
}

Expand Down Expand Up @@ -516,8 +538,6 @@ static void *waitMessage_task(void *param)
if (XFGETS((char*)mqttCtx->rx_buf, MAX_BUFFER_SIZE - 1,
stdin) != NULL)
{
rc = (int)XSTRLEN((char*)mqttCtx->rx_buf);

/* Publish Topic */
mqttCtx->stat = WMQ_PUB;
XMEMSET(&mqttCtx->publish, 0, sizeof(MqttPublish));
Expand All @@ -527,7 +547,8 @@ static void *waitMessage_task(void *param)
mqttCtx->publish.topic_name = mqttCtx->topic_name;
mqttCtx->publish.packet_id = mqtt_get_packetid_threadsafe();
mqttCtx->publish.buffer = mqttCtx->rx_buf;
mqttCtx->publish.total_len = (word16)rc;
mqttCtx->publish.total_len =
(word16)XSTRLEN((char*)mqttCtx->rx_buf);
rc = MqttClient_Publish(&mqttCtx->client,
&mqttCtx->publish);
PRINTF("MQTT Publish: Topic %s, %s (%d)",
Expand All @@ -544,13 +565,19 @@ static void *waitMessage_task(void *param)
}

/* Keep Alive handled in ping thread */
PRINTF("Keep-alive timeout");

/* Signal keep alive thread */
wm_SemUnlock(&pingSignal);

/* Allow ping_task to get scheduled */
sleep(1);
}
else if (rc != MQTT_CODE_SUCCESS) {
/* There was an error */
PRINTF("MQTT Message Wait Error: %s (%d)",
MqttClient_ReturnCodeToString(rc), rc);
mqtt_stop_set();
break;
}
startSec = 0;
Expand All @@ -575,6 +602,10 @@ static void *publish_task(void *param)
MQTTCtx *mqttCtx = (MQTTCtx*)param;
MqttPublish publish;
word32 startSec = 0;
#ifdef WOLFMQTT_V5
MqttProp prop1;
MqttProp prop2;
#endif

/* Publish Topic */
XMEMSET(&publish, 0, sizeof(MqttPublish));
Expand All @@ -588,6 +619,30 @@ static void *publish_task(void *param)
buf[5] = '0' + (publish.packet_id % 10);
publish.buffer = (byte*)buf;
publish.total_len = (word16)XSTRLEN(buf);
#ifdef WOLFMQTT_V5
/* Payload Format Indicator */
XMEMSET(&prop1, 0, sizeof(MqttProp));
prop1.type = MQTT_PROP_PAYLOAD_FORMAT_IND;
prop1.data_byte = 1;
rc = MqttClient_PropsAdd_ex(&publish.props, &prop1);
if (rc != MQTT_CODE_SUCCESS) {
PRINTF("MQTT Publish property add failure: %s (%d)",
MqttClient_ReturnCodeToString(rc), rc);
mqtt_stop_set();
}

/* Content Type */
XMEMSET(&prop2, 0, sizeof(MqttProp));
prop2.type = MQTT_PROP_CONTENT_TYPE;
prop2.data_str.str = (char*)"wolf_type";
prop2.data_str.len = (word16)XSTRLEN(prop2.data_str.str);
rc = MqttClient_PropsAdd_ex(&publish.props, &prop2);
if (rc != MQTT_CODE_SUCCESS) {
PRINTF("MQTT Publish property add failure: %s (%d)",
MqttClient_ReturnCodeToString(rc), rc);
mqtt_stop_set();
}
#endif

do {
rc = MqttClient_Publish_WriteOnly(&mqttCtx->client, &publish, NULL);
Expand Down Expand Up @@ -641,6 +696,7 @@ static void *ping_task(void *param)
if (rc != MQTT_CODE_SUCCESS) {
PRINTF("MQTT Ping Keep Alive Error: %s (%d)",
MqttClient_ReturnCodeToString(rc), rc);
mqtt_stop_set();
break;
}
} while (!mqtt_stop_get());
Expand Down
17 changes: 11 additions & 6 deletions src/mqtt_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -1502,7 +1502,7 @@ int MqttClient_Connect(MqttClient *client, MqttConnect *mc_connect)
mc_connect->stat.write == MQTT_MSG_AUTH)
{
MqttAuth auth, *p_auth = &auth;
MqttProp* prop, *conn_prop;
MqttProp prop, *conn_prop;

/* Find the AUTH property in the connect structure */
for (conn_prop = mc_connect->props;
Expand All @@ -1526,14 +1526,14 @@ int MqttClient_Connect(MqttClient *client, MqttConnect *mc_connect)
p_auth->reason_code = MQTT_REASON_CONT_AUTH;

/* Use the same authentication method property from connect */
prop = MqttProps_Add(&p_auth->props);
prop->type = MQTT_PROP_AUTH_METHOD;
prop->data_str.str = conn_prop->data_str.str;
prop->data_str.len = conn_prop->data_str.len;
XMEMSET(&prop, 0, sizeof(MqttProp));
rc = MqttProps_Add_ex(&p_auth->props, &prop);
prop.type = MQTT_PROP_AUTH_METHOD;
prop.data_str.str = conn_prop->data_str.str;
prop.data_str.len = conn_prop->data_str.len;

/* Send the AUTH packet */
rc = MqttClient_Auth(client, p_auth);
MqttClient_PropsFree(p_auth->props);
#ifdef WOLFMQTT_NONBLOCK
if (rc == MQTT_CODE_CONTINUE)
return rc;
Expand Down Expand Up @@ -2471,6 +2471,11 @@ int MqttClient_PropsFree(MqttProp *head)
return MqttProps_Free(head);
}

int MqttClient_PropsAdd_ex(MqttProp **head, MqttProp *new_prop)
{
return MqttProps_Add_ex(head, new_prop);
}

#endif /* WOLFMQTT_V5 */

int MqttClient_WaitMessage_ex(MqttClient *client, MqttObject* msg,
Expand Down
37 changes: 37 additions & 0 deletions src/mqtt_packet.c
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,12 @@ int MqttDecode_Props(MqttPacketType packet, MqttProp** props, byte* pbuf,
break;
}

#ifdef WOLFMQTT_MULTITHREAD
rc = wm_SemLock(&clientPropStack_lock);
if (rc != 0) {
break;
}
#endif
/* Decode the Identifier */
rc = MqttDecode_Vbi(buf, (word32*)&cur_prop->type,
(word32)(buf_len - (buf - pbuf)));
Expand Down Expand Up @@ -638,6 +644,9 @@ int MqttDecode_Props(MqttPacketType packet, MqttProp** props, byte* pbuf,
break;
}
}
#ifdef WOLFMQTT_MULTITHREAD
(void)wm_SemUnlock(&clientPropStack_lock);
#endif
};

if (rc < 0) {
Expand Down Expand Up @@ -1904,6 +1913,34 @@ int MqttProps_Free(MqttProp *head)
return ret;
}

int MqttProps_Add_ex(MqttProp **head, MqttProp *new_prop)
{
MqttProp *prev = NULL, *cur;

if ((head == NULL) || (new_prop == NULL)) {
return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_BAD_ARG);
}

cur = *head;

/* Find the end of the parameter list */
while (cur != NULL) {
prev = cur;
cur = cur->next;
};

/* set placeholder until caller sets it to a real type */
if (prev == NULL) {
/* Start a new list */
*head = new_prop;
}
else {
/* Add to the existing list */
prev->next = new_prop;
}

return MQTT_CODE_SUCCESS;
}
#endif /* WOLFMQTT_V5 */

static int MqttPacket_HandleNetError(MqttClient *client, int rc)
Expand Down
20 changes: 18 additions & 2 deletions wolfmqtt/mqtt_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -410,8 +410,14 @@ WOLFMQTT_API int MqttClient_Auth(
/*! \brief Add a new property.
* Allocate a property structure and add it to the head of the list
pointed to by head. To be used prior to calling packet command.
Properties added using this method use the internal stack, and must be
freed using MqttClient_PropsFree after the operation is complete.
Note:
This API is not thread-safe, use MqttClient_PropsAdd_ex with multi-threaded
applications.
* \param head Pointer-pointer to a property structure
* \return MQTT_CODE_SUCCESS or MQTT_CODE_ERROR_BAD_ARG
* \return pointer to new MqttProp or NULL
*/
WOLFMQTT_API MqttProp* MqttClient_PropsAdd(
MqttProp **head);
Expand All @@ -424,8 +430,18 @@ WOLFMQTT_API MqttProp* MqttClient_PropsAdd(
*/
WOLFMQTT_API int MqttClient_PropsFree(
MqttProp *head);
#endif

/*! \brief Add a new property.
* Allocate a property structure and add it to the head of the list
pointed to by head. To be used prior to calling packet command.
* \param head Pointer-pointer to a property structure
* \param new_prop Pointer to new property structure to be added.
* \return MQTT_CODE_SUCCESS or MQTT_CODE_ERROR_BAD_ARG
*/
WOLFMQTT_API int MqttClient_PropsAdd_ex(
MqttProp **head, MqttProp *new_prop);

#endif

/*! \brief Encodes and sends the MQTT Disconnect packet (no response)
* \note This is a non-blocking function that will try and send using
Expand Down
1 change: 1 addition & 0 deletions wolfmqtt/mqtt_packet.h
Original file line number Diff line number Diff line change
Expand Up @@ -709,6 +709,7 @@ WOLFMQTT_LOCAL int MqttDecode_Props(MqttPacketType packet, MqttProp** props,
WOLFMQTT_LOCAL int MqttProps_Init(void);
WOLFMQTT_LOCAL int MqttProps_ShutDown(void);
WOLFMQTT_LOCAL MqttProp* MqttProps_Add(MqttProp **head);
WOLFMQTT_LOCAL int MqttProps_Add_ex(MqttProp **head, MqttProp *new_prop);
WOLFMQTT_LOCAL int MqttProps_Free(MqttProp *head);
WOLFMQTT_LOCAL MqttProp* MqttProps_FindType(MqttProp *head,
MqttPropertyType type);
Expand Down

0 comments on commit e837f5e

Please sign in to comment.