Skip to content

Commit 2ad9d0f

Browse files
committed
bind and setup manual puback
1 parent 25a2e61 commit 2ad9d0f

File tree

5 files changed

+104
-1
lines changed

5 files changed

+104
-1
lines changed

include/aws/crt/mqtt/Mqtt5Client.h

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,13 +250,50 @@ namespace Aws
250250
OnStoppedEventData() {}
251251
};
252252

253+
/**
254+
* An opaque handle representing manual control over a QoS 1 PUBACK for a received PUBLISH packet.
255+
*
256+
* Obtained by calling acquirePubackControl() within the OnPublishReceivedHandler callback.
257+
* Pass this handle to Mqtt5Client::InvokePuback() at any later time to send the PUBACK.
258+
*
259+
* @note acquirePubackControl() must be called within the OnPublishReceivedHandler callback.
260+
* Calling it after the callback returns will return nullptr.
261+
*/
262+
class AWS_CRT_CPP_API PubackControlHandle
263+
{
264+
friend class Mqtt5Client;
265+
friend class Mqtt5ClientCore;
266+
267+
public:
268+
PubackControlHandle() noexcept : m_controlId(0) {}
269+
270+
private:
271+
explicit PubackControlHandle(uint64_t controlId) noexcept : m_controlId(controlId) {}
272+
273+
uint64_t m_controlId;
274+
};
275+
253276
/**
254277
* The data returned when a publish is made to a topic the MQTT5 client is subscribed to.
255278
*/
256279
struct AWS_CRT_CPP_API PublishReceivedEventData
257280
{
258281
PublishReceivedEventData() : publishPacket(nullptr) {}
282+
259283
std::shared_ptr<PublishPacket> publishPacket;
284+
285+
/**
286+
* Call this function within the OnPublishReceivedHandler callback to take manual control of the
287+
* PUBACK for this QoS 1 message, preventing the client from automatically sending a PUBACK.
288+
*
289+
* Returns a shared_ptr to a PubackControlHandle that can be passed to Mqtt5Client::InvokePuback()
290+
* to send the PUBACK to the broker.
291+
*
292+
* @note This function must be called within the OnPublishReceivedHandler callback.
293+
* Calling it after the callback returns will return nullptr.
294+
* @note Only relevant for QoS 1 messages. Returns nullptr for QoS 0 messages.
295+
*/
296+
std::function<std::shared_ptr<PubackControlHandle>()> acquirePubackControl;
260297
};
261298

262299
/**
@@ -433,6 +470,19 @@ namespace Aws
433470
*/
434471
const Mqtt5ClientOperationStatistics &GetOperationStatistics() noexcept;
435472

473+
/**
474+
* Sends a PUBACK packet for a QoS 1 PUBLISH that was previously acquired for manual control.
475+
*
476+
* To use manual PUBACK control, call eventData.acquirePubackControl() within the
477+
* OnPublishReceivedHandler callback to obtain a PubackControlHandle. Then call this method
478+
* to send the PUBACK.
479+
*
480+
* @param pubackControlHandle handle obtained from acquirePubackControl()
481+
*
482+
* @return true if the operation succeeded, otherwise false
483+
*/
484+
bool InvokePuback(const PubackControlHandle &pubackControlHandle) noexcept;
485+
436486
~Mqtt5Client();
437487

438488
struct aws_mqtt5_client *GetUnderlyingHandle() const noexcept;

include/aws/crt/mqtt/private/Mqtt5ClientCore.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,15 @@ namespace Aws
9696
std::shared_ptr<UnsubscribePacket> unsubscribeOptions,
9797
OnUnsubscribeCompletionHandler onUnsubscribeCompletionCallback = NULL) noexcept;
9898

99+
/**
100+
* Sends a PUBACK packet for a QoS 1 PUBLISH that was previously acquired for manual control.
101+
*
102+
* @param pubackControlHandle handle obtained from acquirePubackControl()
103+
*
104+
* @return true if the operation succeeded, otherwise false
105+
*/
106+
bool InvokePuback(const PubackControlHandle &pubackControlHandle) noexcept;
107+
99108
/**
100109
* Tells the Mqtt5ClientCore to release the native client and clean up unhandled the resources
101110
* and operations before destroying it. You MUST only call this function when you

source/mqtt/Mqtt5Client.cpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,16 @@ namespace Aws
158158
return m_client_core->Unsubscribe(unsubscribeOptions, onUnsubscribeCompletionCallback);
159159
}
160160

161+
bool Mqtt5Client::InvokePuback(const PubackControlHandle &pubackControlHandle) noexcept
162+
{
163+
if (m_client_core == nullptr)
164+
{
165+
AWS_LOGF_DEBUG(AWS_LS_MQTT5_CLIENT, "Failed to invoke puback: Mqtt5 Client is invalid.");
166+
return false;
167+
}
168+
return m_client_core->InvokePuback(pubackControlHandle);
169+
}
170+
161171
const Mqtt5ClientOperationStatistics &Mqtt5Client::GetOperationStatistics() noexcept
162172
{
163173
aws_mqtt5_client_operation_statistics m_operationStatisticsNative = {0, 0, 0, 0};

source/mqtt/Mqtt5ClientCore.cpp

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,29 @@ namespace Aws
189189
client_core->m_allocator, *publish, client_core->m_allocator);
190190
PublishReceivedEventData eventData;
191191
eventData.publishPacket = packet;
192+
193+
/*
194+
* Set up the acquirePubackControl function for QoS 1 messages.
195+
* This lambda captures the raw client pointer and the publish view pointer,
196+
* both of which are valid only during this callback invocation.
197+
* Calling acquirePubackControl after the callback returns will return nullptr
198+
* because the publish pointer will no longer be valid. This is handled.
199+
*/
200+
if (publish->qos == AWS_MQTT5_QOS_AT_LEAST_ONCE)
201+
{
202+
aws_mqtt5_client *rawClient = client_core->m_client;
203+
eventData.acquirePubackControl = [rawClient,
204+
publish]() -> std::shared_ptr<PubackControlHandle>
205+
{
206+
if (rawClient == nullptr)
207+
{
208+
return nullptr;
209+
}
210+
uint64_t controlId = aws_mqtt5_client_acquire_puback(rawClient, publish);
211+
return std::make_shared<PubackControlHandle>(PubackControlHandle(controlId));
212+
};
213+
}
214+
192215
client_core->onPublishReceived(eventData);
193216
}
194217
else
@@ -612,6 +635,17 @@ namespace Aws
612635
return result == AWS_OP_SUCCESS;
613636
}
614637

638+
bool Mqtt5ClientCore::InvokePuback(const PubackControlHandle &pubackControlHandle) noexcept
639+
{
640+
if (m_client == nullptr)
641+
{
642+
AWS_LOGF_DEBUG(AWS_LS_MQTT5_CLIENT, "Failed to invoke puback: Mqtt5ClientCore is invalid.");
643+
return false;
644+
}
645+
return aws_mqtt5_client_invoke_puback(m_client, pubackControlHandle.m_controlId, nullptr) ==
646+
AWS_OP_SUCCESS;
647+
}
648+
615649
void Mqtt5ClientCore::Close() noexcept
616650
{
617651
std::lock_guard<std::recursive_mutex> lock(m_callback_lock);

0 commit comments

Comments
 (0)