From c47e8e08cb5386babe616f300d8c3b0f2f97b632 Mon Sep 17 00:00:00 2001 From: Giovanni Giacobbi Date: Wed, 19 Aug 2020 14:16:27 +0200 Subject: [PATCH 1/7] Refactored Repository interface and pending message handling. Details: * Refactored the Repository interface. The new interface stores a generic serializable PendingMessage without knowing which type it is, using two different queues one for incoming and one for outgoing messages. There are currently three types of outgoing messages (PublishedMessage, SubscribeRequest, UnsubscribeRequest) and one incoming message (PublishedMessaged) handled. * The `TopicSubscription` class has been renamed to `Subscription` and now represents only the subscription itself, separating it from the concept of SubscribeRequest which might include several topic filters in one message. * Removed exception `UnexpectedAcknowledgementException`. It is expected to have duplicate packets over the wire, so there is no need for an expection in this case, a notice to the logger is enough. * Added `ProtocolViolationException`. This explicitly marks situations where the broker is misbehaving and the connection should be terminated. * Fixed some problems with the MessageProcessor. --- src/Contracts/MessageProcessor.php | 21 +- src/Contracts/MqttClient.php | 4 +- src/Contracts/Repository.php | 178 +++----- .../PendingMessageAlreadyExistsException.php | 23 + .../PendingMessageNotFoundException.php | 23 + ...lishConfirmationAlreadyExistsException.php | 24 -- src/Exceptions/ProtocolViolationException.php | 23 + .../UnexpectedAcknowledgementException.php | 34 -- src/Logger.php | 6 +- .../Mqtt31MessageProcessor.php | 36 +- src/MqttClient.php | 333 +++++++-------- src/PendingMessage.php | 94 ++++ src/PublishedMessage.php | 111 +---- src/Repositories/MemoryRepository.php | 402 +++++------------- src/SubscribeRequest.php | 38 ++ src/Subscription.php | 122 ++++++ src/TopicSubscription.php | 124 ------ src/UnsubscribeRequest.php | 96 +---- .../Mqtt31MessageProcessorTest.php | 37 +- 19 files changed, 729 insertions(+), 1000 deletions(-) create mode 100644 src/Exceptions/PendingMessageAlreadyExistsException.php create mode 100644 src/Exceptions/PendingMessageNotFoundException.php delete mode 100644 src/Exceptions/PendingPublishConfirmationAlreadyExistsException.php create mode 100644 src/Exceptions/ProtocolViolationException.php delete mode 100644 src/Exceptions/UnexpectedAcknowledgementException.php create mode 100644 src/PendingMessage.php create mode 100644 src/SubscribeRequest.php create mode 100644 src/Subscription.php delete mode 100644 src/TopicSubscription.php diff --git a/src/Contracts/MessageProcessor.php b/src/Contracts/MessageProcessor.php index 37c31a4..a608832 100644 --- a/src/Contracts/MessageProcessor.php +++ b/src/Contracts/MessageProcessor.php @@ -8,8 +8,9 @@ use PhpMqtt\Client\Exceptions\ConnectingToBrokerFailedException; use PhpMqtt\Client\Exceptions\InvalidMessageException; use PhpMqtt\Client\Exceptions\MqttClientException; -use PhpMqtt\Client\Exceptions\UnexpectedAcknowledgementException; +use PhpMqtt\Client\Exceptions\ProtocolViolationException; use PhpMqtt\Client\Message; +use PhpMqtt\Client\Subscription; /** * Implementations of this interface provide message parsing capabilities. @@ -42,7 +43,7 @@ public function tryFindMessageInBuffer(string $buffer, int $bufferLength, string * @param string $message * @return Message|null * @throws InvalidMessageException - * @throws UnexpectedAcknowledgementException + * @throws ProtocolViolationException * @throws MqttClientException */ public function parseAndValidateMessage(string $message): ?Message; @@ -74,22 +75,22 @@ public function buildDisconnectMessage(): string; /** * Builds a subscribe message from the given parameters. * - * @param int $messageId - * @param string $topic - * @param int $qualityOfService + * @param int $messageId + * @param Subscription[] $subscriptions + * @param bool $isDuplicate * @return string */ - public function buildSubscribeMessage(int $messageId, string $topic, int $qualityOfService): string; + public function buildSubscribeMessage(int $messageId, array $subscriptions, bool $isDuplicate = false): string; /** * Builds an unsubscribe message from the given parameters. * - * @param int $messageId - * @param string $topic - * @param bool $isDuplicate + * @param int $messageId + * @param string[] $topics + * @param bool $isDuplicate * @return string */ - public function buildUnsubscribeMessage(int $messageId, string $topic, bool $isDuplicate = false): string; + public function buildUnsubscribeMessage(int $messageId, array $topics, bool $isDuplicate = false): string; /** * Builds a publish message based on the given parameters. diff --git a/src/Contracts/MqttClient.php b/src/Contracts/MqttClient.php index e5e0372..2b88c92 100644 --- a/src/Contracts/MqttClient.php +++ b/src/Contracts/MqttClient.php @@ -9,7 +9,7 @@ use PhpMqtt\Client\Exceptions\ConnectingToBrokerFailedException; use PhpMqtt\Client\Exceptions\DataTransferException; use PhpMqtt\Client\Exceptions\TopicNotSubscribedException; -use PhpMqtt\Client\Exceptions\UnexpectedAcknowledgementException; +use PhpMqtt\Client\Exceptions\ProtocolViolationException; /** * An interface for the MQTT client. @@ -137,7 +137,7 @@ public function interrupt(): void; * @param int|null $queueWaitLimit * @return void * @throws DataTransferException - * @throws UnexpectedAcknowledgementException + * @throws ProtocolViolationException */ public function loop(bool $allowSleep = true, bool $exitWhenQueuesEmpty = false, int $queueWaitLimit = null): void; diff --git a/src/Contracts/Repository.php b/src/Contracts/Repository.php index 1b543a2..2ffbc2f 100644 --- a/src/Contracts/Repository.php +++ b/src/Contracts/Repository.php @@ -5,10 +5,10 @@ namespace PhpMqtt\Client\Contracts; use DateTime; -use PhpMqtt\Client\Exceptions\PendingPublishConfirmationAlreadyExistsException; -use PhpMqtt\Client\PublishedMessage; -use PhpMqtt\Client\TopicSubscription; -use PhpMqtt\Client\UnsubscribeRequest; +use PhpMqtt\Client\Exceptions\PendingMessageAlreadyExistsException; +use PhpMqtt\Client\Exceptions\PendingMessageNotFoundException; +use PhpMqtt\Client\PendingMessage; +use PhpMqtt\Client\Subscription; /** * Implementations of this interface provide storage capabilities to an MQTT client. @@ -35,186 +35,134 @@ interface Repository public function newMessageId(): int; /** - * Releases the given message id, allowing it to be reused in the future. - * - * @param int $messageId - * @return void - */ - public function releaseMessageId(int $messageId): void; - - /** - * Returns the number of registered topic subscriptions. The method does - * not differentiate between pending and acknowledged subscriptions. + * Returns the number of pending outgoing messages. * * @return int */ - public function countTopicSubscriptions(): int; - - /** - * Adds a topic subscription to the repository. - * - * @param TopicSubscription $subscription - * @return void - */ - public function addTopicSubscription(TopicSubscription $subscription): void; + public function countPendingOutgoingMessages(): int; /** - * Get all topic subscriptions with the given message identifier. + * Gets a pending outgoing message with the given message identifier, if found. * * @param int $messageId - * @return TopicSubscription[] + * @return PendingMessage|null */ - public function getTopicSubscriptionsWithMessageId(int $messageId): array; + public function getPendingOutgoingMessage(int $messageId): ?PendingMessage; /** - * Find a topic subscription with the given topic. + * Gets a list of pending outgoing messages last sent before the given date time. * - * @param string $topic - * @return TopicSubscription|null - */ - public function getTopicSubscriptionByTopic(string $topic): ?TopicSubscription; - - /** - * Get all topic subscriptions matching the given topic. - * - * @param string $topic - * @return TopicSubscription[] - */ - public function getTopicSubscriptionsMatchingTopic(string $topic): array; - - /** - * Removes the topic subscription with the given topic from the repository. - * Returns true if a topic subscription existed and has been removed. - * Otherwise, false is returned. + * If date time is `null`, all pending messages are returned. * - * @param string $topic - * @return bool - */ - public function removeTopicSubscription(string $topic): bool; - - /** - * Returns the number of pending publish messages. + * The messages are returned in the same order they were added to the repository. * - * @return int + * @param DateTime|null $dateTime + * @return PendingMessage[] */ - public function countPendingPublishMessages(): int; + public function getPendingOutgoingMessagesLastSentBefore(DateTime $dateTime = null): array; /** - * Adds a pending published message to the repository. + * Adds a pending outgoing message to the repository. * - * @param PublishedMessage $message + * @param PendingMessage $message * @return void + * @throws PendingMessageAlreadyExistsException */ - public function addPendingPublishedMessage(PublishedMessage $message): void; + public function addPendingOutgoingMessage(PendingMessage $message): void; /** - * Gets a pending published message with the given message identifier, if found. + * Marks an existing pending outgoing published message as received in the repository. * - * @param int $messageId - * @return PublishedMessage|null - */ - public function getPendingPublishedMessageWithMessageId(int $messageId): ?PublishedMessage; - - /** - * Gets a list of pending published messages last sent before the given date time. - * - * @param DateTime $dateTime - * @return PublishedMessage[] - */ - public function getPendingPublishedMessagesLastSentBefore(DateTime $dateTime): array; - - /** - * Marks the pending published message with the given message identifier as received. - * If the message has no QoS level of 2, is not found or has already been received, - * false is returned. Otherwise the result will be true. + * If the message does not exists, an exception is thrown, + * otherwise `true` is returned if the message was marked as received, and `false` + * in case it was already marked as received. * * @param int $messageId * @return bool + * @throws PendingMessageNotFoundException */ - public function markPendingPublishedMessageAsReceived(int $messageId): bool; + public function markPendingOutgoingPublishedMessageAsReceived(int $messageId): bool; /** - * Removes a pending published message from the repository. If a pending message - * with the given identifier is found and successfully removed from the repository, - * `true` is returned. Otherwise `false` will be returned. + * Removes a pending outgoing message from the repository. + * + * If a pending message with the given identifier is found and + * successfully removed from the repository, `true` is returned. + * Otherwise `false` will be returned. * * @param int $messageId * @return bool */ - public function removePendingPublishedMessage(int $messageId): bool; + public function removePendingOutgoingMessage(int $messageId): bool; /** - * Returns the number of pending unsubscribe requests. + * Returns the number of pending incoming messages. * * @return int */ - public function countPendingUnsubscribeRequests(): int; + public function countPendingIncomingMessages(): int; /** - * Adds a pending unsubscribe request to the repository. - * - * @param UnsubscribeRequest $request - * @return void - */ - public function addPendingUnsubscribeRequest(UnsubscribeRequest $request): void; - - /** - * Gets a pending unsubscribe request with the given message identifier, if found. + * Gets a pending incoming message with the given message identifier, if found. * * @param int $messageId - * @return UnsubscribeRequest|null + * @return PendingMessage|null */ - public function getPendingUnsubscribeRequestWithMessageId(int $messageId): ?UnsubscribeRequest; + public function getPendingIncomingMessage(int $messageId): ?PendingMessage; /** - * Gets a list of pending unsubscribe requests last sent before the given date time. + * Adds a pending outgoing message to the repository. * - * @param DateTime $dateTime - * @return UnsubscribeRequest[] + * @param PendingMessage $message + * @return void + * @throws PendingMessageAlreadyExistsException */ - public function getPendingUnsubscribeRequestsLastSentBefore(DateTime $dateTime): array; + public function addPendingIncomingMessage(PendingMessage $message): void; /** - * Removes a pending unsubscribe requests from the repository. If a pending request - * with the given identifier is found and successfully removed from the repository, - * `true` is returned. Otherwise `false` will be returned. + * Removes a pending incoming message from the repository. + * + * If a pending message with the given identifier is found and + * successfully removed from the repository, `true` is returned. + * Otherwise `false` will be returned. * * @param int $messageId * @return bool */ - public function removePendingUnsubscribeRequest(int $messageId): bool; + public function removePendingIncomingMessage(int $messageId): bool; /** - * Returns the number of pending publish confirmations. + * Returns the number of registered subscriptions. * * @return int */ - public function countPendingPublishConfirmations(): int; + public function countSubscriptions(): int; /** - * Adds a pending publish confirmation to the repository. + * Adds a subscription to the repository. * - * @param PublishedMessage $message + * @param Subscription $subscription * @return void - * @throws PendingPublishConfirmationAlreadyExistsException */ - public function addPendingPublishConfirmation(PublishedMessage $message): void; + public function addSubscription(Subscription $subscription): void; /** - * Gets a pending publish confirmation with the given message identifier, if found. + * Gets all subscriptions matching the given criteria. * - * @param int $messageId - * @return PublishedMessage|null + * @param string $topicName + * @param int $subscriptionId + * @return Subscription[] */ - public function getPendingPublishConfirmationWithMessageId(int $messageId): ?PublishedMessage; + public function getMatchingSubscriptions(string $topicName = null, int $subscriptionId = null): array; /** - * Removes the pending publish confirmation with the given message identifier - * from the repository. This is normally done as soon as a transaction has been - * successfully finished by the publisher. + * Removes the subscription with the given topic filter from the repository. * - * @param int $messageId + * Returns `true` if a topic subscription existed and has been removed. + * Otherwise, `false` is returned. + * + * @param string $topicFilter * @return bool */ - public function removePendingPublishConfirmation(int $messageId): bool; + public function removeSubscription(string $topicFilter): bool; } diff --git a/src/Exceptions/PendingMessageAlreadyExistsException.php b/src/Exceptions/PendingMessageAlreadyExistsException.php new file mode 100644 index 0000000..08260b0 --- /dev/null +++ b/src/Exceptions/PendingMessageAlreadyExistsException.php @@ -0,0 +1,23 @@ +encodeMessageId($messageId); - // Encode the topic as length prefixed string. - $buffer .= $this->buildLengthPrefixedString($topic); + foreach ($subscriptions as $subscription) { + // Encode the topic as length prefixed string. + $buffer .= $this->buildLengthPrefixedString($subscription->getTopicFilter()); - // Encode the quality of service level. - $buffer .= chr($qualityOfService); + // Encode the quality of service level. + $buffer .= chr($subscription->getQualityOfServiceLevel()); + } // The header consists of the message type 0x82 and the length. $header = chr(0x82) . $this->encodeMessageLength(strlen($buffer)); @@ -300,13 +302,15 @@ public function buildSubscribeMessage(int $messageId, string $topic, int $qualit /** * {@inheritDoc} */ - public function buildUnsubscribeMessage(int $messageId, string $topic, bool $isDuplicate = false): string + public function buildUnsubscribeMessage(int $messageId, array $topics, bool $isDuplicate = false): string { // Encode the message id, it always consists of two bytes. $buffer = $this->encodeMessageId($messageId); - // Encode the topic as length prefixed string. - $buffer .= $this->buildLengthPrefixedString($topic); + foreach ($topics as $topic) { + // Encode the topic as length prefixed string. + $buffer .= $this->buildLengthPrefixedString($topic); + } // The header consists of the message type 0xa2 and the length. // Additionally, the first byte may contain the duplicate flag. @@ -412,12 +416,9 @@ public function parseAndValidateMessage(string $message): ?Message } // Then handle the command accordingly. - switch ($command){ + switch ($command) { case 0x02: - throw new UnexpectedAcknowledgementException( - UnexpectedAcknowledgementException::EXCEPTION_ACK_CONNECT, - 'We unexpectedly received a connection acknowledgement.' - ); + throw new ProtocolViolationException('Unexpected connection acknowledgement.'); case 0x03: return $this->parseAndValidatePublishMessage($data, $qualityOfService); @@ -672,11 +673,14 @@ protected function parseAndValidateSubscribeAcknowledgementMessage(string $data) $messageId = $this->decodeMessageId($this->pop($data, 2)); // Parse and validate the QoS acknowledgements. - $acknowledgements = str_split($data); - foreach ($acknowledgements as $acknowledgement) { + $datalen = strlen($data); + $acknowledgements = []; + for ($i = 0; $i < $datalen; $i++) { + $acknowledgement = ord(substr($data, $i, 1)); if (!in_array($acknowledgement, [0, 1, 2])) { throw new InvalidMessageException('Received subscribe acknowledgement with invalid QoS values from the broker.'); } + $acknowledgements[] = $acknowledgement; } return (new Message(MessageType::SUBSCRIBE_ACKNOWLEDGEMENT())) diff --git a/src/MqttClient.php b/src/MqttClient.php index 5d6834d..3de8d9f 100644 --- a/src/MqttClient.php +++ b/src/MqttClient.php @@ -4,8 +4,6 @@ namespace PhpMqtt\Client; -use DateInterval; -use DateTime; use PhpMqtt\Client\Concerns\GeneratesRandomClientIds; use PhpMqtt\Client\Concerns\OffersHooks; use PhpMqtt\Client\Concerns\ValidatesConfiguration; @@ -17,10 +15,11 @@ use PhpMqtt\Client\Exceptions\ConnectingToBrokerFailedException; use PhpMqtt\Client\Exceptions\DataTransferException; use PhpMqtt\Client\Exceptions\MqttClientException; -use PhpMqtt\Client\Exceptions\PendingPublishConfirmationAlreadyExistsException; +use PhpMqtt\Client\Exceptions\PendingMessageAlreadyExistsException; +use PhpMqtt\Client\Exceptions\PendingMessageNotFoundException; use PhpMqtt\Client\Exceptions\ProtocolNotSupportedException; use PhpMqtt\Client\Exceptions\TopicNotSubscribedException; -use PhpMqtt\Client\Exceptions\UnexpectedAcknowledgementException; +use PhpMqtt\Client\Exceptions\ProtocolViolationException; use PhpMqtt\Client\MessageProcessors\Mqtt31MessageProcessor; use PhpMqtt\Client\Repositories\MemoryRepository; use Psr\Log\LoggerInterface; @@ -537,7 +536,7 @@ public function publish(string $topic, string $message, int $qualityOfService = $messageId = $this->repository->newMessageId(); $pendingMessage = new PublishedMessage($messageId, $topic, $message, $qualityOfService, $retain); - $this->repository->addPendingPublishedMessage($pendingMessage); + $this->repository->addPendingOutgoingMessage($pendingMessage); } $this->publishMessage($topic, $message, $qualityOfService, $retain, $messageId); @@ -611,58 +610,62 @@ protected function publishMessage( * ); * ``` * - * @param string $topic + * @param string $topicFilter * @param callable $callback * @param int $qualityOfService * @return void * @throws DataTransferException */ - public function subscribe(string $topic, callable $callback, int $qualityOfService = self::QOS_AT_MOST_ONCE): void + public function subscribe(string $topicFilter, callable $callback, int $qualityOfService = self::QOS_AT_MOST_ONCE): void { $this->ensureConnected(); - $messageId = $this->repository->newMessageId(); - $data = $this->messageProcessor->buildSubscribeMessage($messageId, $topic, $qualityOfService); - - $this->logger->debug('Subscribing to topic [{topic}] with QoS [{qos}].', [ - 'topic' => $topic, + $this->logger->debug('Subscribing to topic [{topic}] with maximum QoS [{qos}].', [ + 'topic' => $topicFilter, 'qos' => $qualityOfService, ]); - $pendingMessage = new TopicSubscription($topic, $callback, $messageId, $qualityOfService); - $this->repository->addTopicSubscription($pendingMessage); + // Create the subscription representation now, but it will become an + // actual subscription only upon acknowledgement from the broker. + $subscriptions = [ + new Subscription($topicFilter, null, $callback, $qualityOfService) + ]; + $messageId = $this->repository->newMessageId(); + $pendingMessage = new SubscribeRequest($messageId, $subscriptions); + $this->repository->addPendingOutgoingMessage($pendingMessage); + + $data = $this->messageProcessor->buildSubscribeMessage($messageId, $subscriptions); $this->writeToSocket($data); } /** * Unsubscribe from the given topic. * - * @param string $topic + * @param string $topicFilter * @return void * @throws DataTransferException * @throws TopicNotSubscribedException */ - public function unsubscribe(string $topic): void + public function unsubscribe(string $topicFilter): void { $this->ensureConnected(); - $subscription = $this->repository->getTopicSubscriptionByTopic($topic); - if ($subscription === null) { - throw new TopicNotSubscribedException(sprintf('No subscription found for topic [%s].', $topic)); - } - - $messageId = $this->repository->newMessageId(); - $data = $this->messageProcessor->buildUnsubscribeMessage($messageId, $topic); + // $subscription = $this->repository->getTopicSubscriptionByTopic($topic); + // if ($subscription === null) { + // throw new TopicNotSubscribedException(sprintf('No subscription found for topic [%s].', $topic)); + // } $this->logger->debug('Unsubscribing from topic [{topic}].', [ - 'messageId' => $messageId, - 'topic' => $topic, + 'topic' => $topicFilter, ]); - $pendingMessage = new UnsubscribeRequest($messageId, $topic); - $this->repository->addPendingUnsubscribeRequest($pendingMessage); + $messageId = $this->repository->newMessageId(); + $topicFilters = [ $topicFilter ]; + $pendingMessage = new UnsubscribeRequest($messageId, $topicFilters); + $this->repository->addPendingOutgoingMessage($pendingMessage); + $data = $this->messageProcessor->buildUnsubscribeMessage($messageId, $topicFilters); $this->writeToSocket($data); } @@ -745,11 +748,7 @@ public function loop(bool $allowSleep = true, bool $exitWhenQueuesEmpty = false, // The result is used by us to perform required actions according to the protocol. if ($message !== null) { - try { - $this->handleMessage($message); - } catch (UnexpectedAcknowledgementException $e) { - $this->logger->warning($e); - } + $this->handleMessage($message); } } } else { @@ -758,23 +757,9 @@ public function loop(bool $allowSleep = true, bool $exitWhenQueuesEmpty = false, } } - // Once a second we try to republish messages without confirmation. - // This will only trigger the republishing though. If a message really - // gets republished depends on the resend timeout and the last time - // we sent the message. - if (1 < (microtime(true) - $lastRepublishedAt)) { - $this->republishPendingMessages(); - $lastRepublishedAt = microtime(true); - } - - // Once a second we try to resend unconfirmed unsubscribe requests. - // This will also only trigger the resending process. If an unsubscribe - // request really gets resend depends on the resend timeout and the last - // time we sent the unsubscribe request. - if (1 < (microtime(true) - $lastResendUnsubscribedAt)) { - $this->republishPendingUnsubscribeRequests(); - $lastResendUnsubscribedAt = microtime(true); - } + // Republish messages expired without confirmation. + // This includes published messages, subscribe and unsubscribe requests. + $this->resendPendingMessages(); // If the last message of the broker has been received more seconds ago // than specified by the keep alive time, we will send a ping to ensure @@ -786,7 +771,7 @@ public function loop(bool $allowSleep = true, bool $exitWhenQueuesEmpty = false, // This check will ensure, that, if we want to exit as soon as all queues // are empty and they really are empty, we quit. if ($exitWhenQueuesEmpty) { - if ($this->allQueuesAreEmpty() && $this->repository->countTopicSubscriptions() === 0) { + if ($this->allQueuesAreEmpty() && $this->repository->countSubscriptions() === 0) { break; } @@ -794,7 +779,7 @@ public function loop(bool $allowSleep = true, bool $exitWhenQueuesEmpty = false, // and we reached the time limit. if ($queueWaitLimit !== null && (microtime(true) - $loopStartedAt) > $queueWaitLimit && - $this->repository->countTopicSubscriptions() === 0) { + $this->repository->countSubscriptions() === 0) { break; } } @@ -806,19 +791,19 @@ public function loop(bool $allowSleep = true, bool $exitWhenQueuesEmpty = false, * * @param Message $message * @throws DataTransferException - * @throws UnexpectedAcknowledgementException */ protected function handleMessage(Message $message): void { - // PUBLISH + // PUBLISH (incoming) if ($message->getType()->equals(MessageType::PUBLISH())) { if ($message->getQualityOfService() === self::QOS_AT_LEAST_ONCE) { + // QoS 1. $this->sendPublishAcknowledgement($message->getMessageId()); } if ($message->getQualityOfService() === self::QOS_EXACTLY_ONCE) { + // QoS 2, part 1. try { - $this->sendPublishReceived($message->getMessageId()); $pendingMessage = new PublishedMessage( $message->getMessageId(), $message->getTopic(), @@ -826,131 +811,143 @@ protected function handleMessage(Message $message): void 2, false ); - $this->repository->addPendingPublishConfirmation($pendingMessage); - } catch (PendingPublishConfirmationAlreadyExistsException $e) { - // We already received and processed this message, therefore we do not respond - // with a receipt a second time and wait for the release instead. + $this->repository->addPendingIncomingMessage($pendingMessage); + } catch (PendingMessageAlreadyExistsException $e) { + // We already received and processed this message. } + + // Always acknowledge, even if we received it multiple times. + $this->sendPublishReceived($message->getMessageId()); + // We only deliver this published message as soon as we receive a publish complete. return; } + // For QoS 0 and QoS 1 we can deliver right away. $this->deliverPublishedMessage($message->getTopic(), $message->getContent(), $message->getQualityOfService()); + return; } - // PUBACK + // PUBACK (outgoing, QoS 1) if ($message->getType()->equals(MessageType::PUBLISH_ACKNOWLEDGEMENT())) { - $result = $this->repository->removePendingPublishedMessage($message->getMessageId()); + $result = $this->repository->removePendingOutgoingMessage($message->getMessageId()); if ($result === false) { - $this->logger->notice('Received publish acknowledgement from the broker for already acknowledged message.'); - throw new UnexpectedAcknowledgementException( - UnexpectedAcknowledgementException::EXCEPTION_ACK_PUBLISH, - 'The MQTT broker acknowledged a publish that has not been pending anymore.' - ); + $this->logger->notice('Received publish acknowledgement from the broker for already acknowledged message.', [ + 'messageId' => $message->getMessageId() + ]); } - - $this->repository->releaseMessageId($message->getMessageId()); + return; } - // PUBREC + // PUBREC (outgoing, QoS 2, part 1) if ($message->getType()->equals(MessageType::PUBLISH_RECEIPT())) { - $result = $this->repository->markPendingPublishedMessageAsReceived($message->getMessageId()); + try { + $result = $this->repository->markPendingOutgoingPublishedMessageAsReceived($message->getMessageId()); + } catch (PendingMessageNotFoundException $e) { + // This should never happen as we should have received all + // PUBRECs before we we see the first PUBCOMP which actually + // remove the message, but better staying on the safe side. + $result = false; + } if ($result === false) { - $this->logger->notice('Received publish receipt from the broker for already acknowledged message.'); - throw new UnexpectedAcknowledgementException( - UnexpectedAcknowledgementException::EXCEPTION_ACK_RECEIVE, - 'The MQTT broker sent a receipt for a publish that has not been pending anymore.' - ); + $this->logger->notice('Received publish receipt from the broker for already acknowledged message.', [ + 'messageId' => $message->getMessageId() + ]); } + // We always reply blindly to keep the flow moving. $this->sendPublishRelease($message->getMessageId()); + return; } - // PUBREL + // PUBREL (incoming, QoS 2, part 2) if ($message->getType()->equals(MessageType::PUBLISH_RELEASE())) { - $publishedMessage = $this->repository->getPendingPublishConfirmationWithMessageId($message->getMessageId()); - $result = $this->repository->removePendingPublishConfirmation($message->getMessageId()); - if ($publishedMessage === null || $result === false) { - $this->logger->notice('Received publish release from the broker for already released message.'); - throw new UnexpectedAcknowledgementException( - UnexpectedAcknowledgementException::EXCEPTION_ACK_RELEASE, - 'The MQTT broker released a publish that has not been pending anymore.' + $pendingMessage = $this->repository->getPendingIncomingMessage($message->getMessageId()); + if (!$pendingMessage || !$pendingMessage instanceof PublishedMessage) { + $this->logger->notice('Received publish release from the broker for already released message.', [ + 'messageId' => $message->getMessageId(), + ]); + } else { + $this->deliverPublishedMessage( + $pendingMessage->getTopicName(), + $pendingMessage->getMessage(), + $pendingMessage->getQualityOfServiceLevel() ); - } - $this->deliverPublishedMessage( - $publishedMessage->getTopic(), - $publishedMessage->getMessage(), - $publishedMessage->getQualityOfServiceLevel() - ); + $this->repository->removePendingIncomingMessage($message->getMessageId()); + } + // Always reply with the PUBCOMP packet so it stops resending it. $this->sendPublishComplete($message->getMessageId()); + return; } - // PUBCOMP + // PUBCOMP (outgoing, QoS 2 part 3) if ($message->getType()->equals(MessageType::PUBLISH_COMPLETE())) { - $result = $this->repository->removePendingPublishedMessage($message->getMessageId()); + $result = $this->repository->removePendingOutgoingMessage($message->getMessageId()); if ($result === false) { - $this->logger->notice('Received publish completion from the broker for already acknowledged message.'); - throw new UnexpectedAcknowledgementException( - UnexpectedAcknowledgementException::EXCEPTION_ACK_COMPLETE, - 'The MQTT broker sent a completion for a publish that has not been pending anymore.' - ); + $this->logger->notice('Received publish completion from the broker for already acknowledged message.', [ + 'messageId' => $message->getMessageId(), + ]); } - - $this->repository->releaseMessageId($message->getMessageId()); + return; } // SUBACK if ($message->getType()->equals(MessageType::SUBSCRIBE_ACKNOWLEDGEMENT())) { - $subscriptions = $this->repository->getTopicSubscriptionsWithMessageId($message->getMessageId()); - - if (count($message->getAcknowledgedQualityOfServices()) !== count($subscriptions)) { - $this->logger->notice('Received subscribe acknowledgement from the broker with wrong number of QoS acknowledgements.', [ - 'required' => count($subscriptions), - 'received' => count($message->getAcknowledgedQualityOfServices()), + $pendingMessage = $this->repository->getPendingOutgoingMessage($message->getMessageId()); + if (!$pendingMessage || !$pendingMessage instanceof SubscribeRequest) { + $this->logger->notice('Received subscribe acknowledgement from the broker for already acknowledged request.', [ + 'messageId' => $message->getMessageId(), ]); - throw new UnexpectedAcknowledgementException( - UnexpectedAcknowledgementException::EXCEPTION_ACK_SUBSCRIBE, - sprintf( - 'The MQTT broker responded with a different amount of QoS acknowledgements as we have subscriptions.' - . ' Subscriptions: %s, QoS Acknowledgements: %s', - count($subscriptions), - count($message->getAcknowledgedQualityOfServices()) - ) - ); + return; } - foreach ($message->getAcknowledgedQualityOfServices() as $index => $qualityOfServiceLevel) { - $subscriptions[$index]->setAcknowledgedQualityOfServiceLevel(intval($qualityOfServiceLevel)); + $acknowledgedSubscriptions = $pendingMessage->getSubscriptions(); + if (count($acknowledgedSubscriptions) != count($message->getAcknowledgedQualityOfServices())) { + throw new ProtocolViolationException(sprintf( + 'The MQTT broker responded with a different amount of QoS acknowledgements (%d) than we expected (%d).', + count($message->getAcknowledgedQualityOfServices()), + count($acknowledgedSubscriptions) + )); } - $this->repository->releaseMessageId($message->getMessageId()); + foreach ($message->getAcknowledgedQualityOfServices() as $index => $qualityOfService) { + // It may happen that the server registers our subscription + // with a lower quality of service than requested, in this + // case this is the one that we will record. + $acknowledgedSubscriptions[$index]->setQualityOfServiceLevel($qualityOfService); + + $this->repository->addSubscription($acknowledgedSubscriptions[$index]); + } + + $this->repository->removePendingOutgoingMessage($message->getMessageId()); + return; } // UNSUBACK if ($message->getType()->equals(MessageType::UNSUBSCRIBE_ACKNOWLEDGEMENT())) { - $unsubscribeRequest = $this->repository->getPendingUnsubscribeRequestWithMessageId($message->getMessageId()); - $result = $this->repository->removePendingUnsubscribeRequest($message->getMessageId()); - if ($result === false) { - $this->logger->notice('Received unsubscribe acknowledgement from the broker for already acknowledged request.'); - throw new UnexpectedAcknowledgementException( - UnexpectedAcknowledgementException::EXCEPTION_ACK_PUBLISH, - 'The MQTT broker acknowledged an unsubscribe request that has not been pending anymore.' - ); + $pendingMessage = $this->repository->getPendingOutgoingMessage($message->getMessageId()); + if (!$pendingMessage || !$pendingMessage instanceof UnsubscribeRequest) { + $this->logger->notice('Received unsubscribe acknowledgement from the broker for already acknowledged request.', [ + 'messageId' => $message->getMessageId(), + ]); + return; } - if ($unsubscribeRequest !== null) { - $this->repository->removeTopicSubscription($unsubscribeRequest->getTopic()); + foreach ($pendingMessage->getTopicFilters() as $topicFilter) { + $this->repository->removeSubscription($topicFilter); } - $this->repository->releaseMessageId($message->getMessageId()); + $this->repository->removePendingOutgoingMessage($message->getMessageId()); + return; } // PINGREQ if ($message->getType()->equals(MessageType::PING_REQUEST())) { // Respond with PINGRESP. $this->writeToSocket(chr(0xd0) . chr(0x00)); + return; } } @@ -961,9 +958,8 @@ protected function handleMessage(Message $message): void */ protected function allQueuesAreEmpty(): bool { - return $this->repository->countPendingPublishMessages() === 0 && - $this->repository->countPendingUnsubscribeRequests() === 0 && - $this->repository->countPendingPublishConfirmations() === 0; + return $this->repository->countPendingOutgoingMessages() === 0 && + $this->repository->countPendingIncomingMessages() === 0; } /** @@ -977,7 +973,7 @@ protected function allQueuesAreEmpty(): bool */ protected function deliverPublishedMessage(string $topic, string $message, int $qualityOfServiceLevel, bool $retained = false): void { - $subscribers = $this->repository->getTopicSubscriptionsMatchingTopic($topic); + $subscribers = $this->repository->getMatchingSubscriptions($topic); $this->logger->debug('Delivering message received on topic [{topic}] from the broker to [{subscribers}] subscribers.', [ 'topic' => $topic, @@ -1004,57 +1000,46 @@ protected function deliverPublishedMessage(string $topic, string $message, int $ * @return void * @throws DataTransferException */ - protected function republishPendingMessages(): void + protected function resendPendingMessages(): void { - $this->logger->debug('Re-publishing pending messages to the broker.'); - /** @noinspection PhpUnhandledExceptionInspection */ - $dateTime = (new DateTime())->sub(new DateInterval('PT' . $this->settings->getResendTimeout() . 'S')); - $messages = $this->repository->getPendingPublishedMessagesLastSentBefore($dateTime); - - foreach ($messages as $message) { - $this->logger->debug('Re-publishing pending message to the broker.', ['messageId' => $message->getMessageId()]); - - $this->publishMessage( - $message->getTopic(), - $message->getMessage(), - $message->getQualityOfServiceLevel(), - $message->wantsToBeRetained(), - $message->getMessageId(), - true - ); - - $message->setLastSentAt(new DateTime()); - $message->incrementSendingAttempts(); - } - } + $dateTime = (new \DateTime())->sub(new \DateInterval('PT' . $this->settings->getResendTimeout() . 'S')); + $messages = $this->repository->getPendingOutgoingMessagesLastSentBefore($dateTime); - /** - * Re-sends pending unsubscribe requests. - * - * @return void - * @throws DataTransferException - */ - protected function republishPendingUnsubscribeRequests(): void - { - $this->logger->debug('Re-sending pending unsubscribe requests to the broker.'); - - /** @noinspection PhpUnhandledExceptionInspection */ - $dateTime = (new DateTime())->sub(new DateInterval('PT' . $this->settings->getResendTimeout() . 'S')); - $requests = $this->repository->getPendingUnsubscribeRequestsLastSentBefore($dateTime); + foreach ($messages as $pendingMessage) { + if ($pendingMessage instanceof PublishedMessage) { + $this->logger->debug('Re-publishing pending message to the broker.', [ + 'messageId' => $pendingMessage->getMessageId(), + ]); - foreach ($requests as $request) { - $data = $this->messageProcessor->buildUnsubscribeMessage($request->getMessageId(), $request->getTopic(), true); + $this->publishMessage( + $pendingMessage->getTopicName(), + $pendingMessage->getMessage(), + $pendingMessage->getQualityOfServiceLevel(), + $pendingMessage->wantsToBeRetained(), + $pendingMessage->getMessageId(), + true + ); + } elseif ($pendingMessage instanceof SubscribeRequest) { + $this->logger->debug('Re-sending pending subscribe request to the broker.', [ + 'messageId' => $pendingMessage->getMessageId(), + ]); - $this->logger->debug('Re-sending pending unsubscribe request to the broker.', [ - 'messageId' => $request->getMessageId(), - 'topic' => $request->getTopic(), - ]); + $data = $this->messageProcessor->buildSubscribeMessage($pendingMessage->getMessageId(), $pendingMessage->getSubscriptions(), true); + $this->writeToSocket($data); + } elseif ($pendingMessage instanceof UnsubscribeRequest) { + $this->logger->debug('Re-sending pending unsubscribe request to the broker.', [ + 'messageId' => $pendingMessage->getMessageId(), + ]); - $this->writeToSocket($data); + $data = $this->messageProcessor->buildUnsubscribeMessage($pendingMessage->getMessageId(), $pendingMessage->getTopicFilters(), true); + $this->writeToSocket($data); + } else { + throw new \RuntimeException('Unexpected pending message type'); + } - $request->setLastSentAt(new DateTime()); - $request->incrementSendingAttempts(); + $pendingMessage->setLastSentAt(new \DateTime()); + $pendingMessage->incrementSendingAttempts(); } } diff --git a/src/PendingMessage.php b/src/PendingMessage.php new file mode 100644 index 0000000..0d989da --- /dev/null +++ b/src/PendingMessage.php @@ -0,0 +1,94 @@ +messageId = $messageId; + $this->lastSentAt = $sentAt ?? new DateTime(); + } + + /** + * Returns the message identifier. + * + * @return int + */ + public function getMessageId(): int + { + return $this->messageId; + } + + /** + * Returns the date time when the message was last attempted to be sent. + * + * @return DateTime + */ + public function getLastSentAt(): DateTime + { + return $this->lastSentAt; + } + + /** + * Returns the number of times the message has been attempted to be sent. + * + * @return int + */ + public function getSendingAttempts(): int + { + return $this->sendingAttempts; + } + + /** + * Sets the date time when the message was last attempted to be sent. + * + * @param DateTime|null $value + * @return static + */ + public function setLastSentAt(DateTime $value = null): self + { + $this->lastSentAt = $value ?? new DateTime(); + + return $this; + } + + /** + * Increments the sending attempts by one. + * + * @return static + */ + public function incrementSendingAttempts(): self + { + $this->sendingAttempts++; + + return $this; + } +} diff --git a/src/PublishedMessage.php b/src/PublishedMessage.php index fb0a751..2896490 100644 --- a/src/PublishedMessage.php +++ b/src/PublishedMessage.php @@ -12,81 +12,55 @@ * * @package PhpMqtt\Client */ -class PublishedMessage +class PublishedMessage extends PendingMessage { - /** @var int */ - private $messageId; - /** @var string */ - private $topic; + protected $topicName; /** @var string */ - private $message; + protected $message; /** @var int */ - private $qualityOfService; + protected $qualityOfService; /** @var bool */ - private $retain; - - /** @var DateTime */ - private $lastSentAt; - - /** @var int */ - private $sendingAttempts = 1; + protected $retain; /** @var bool */ - private $received = false; + protected $received = false; /** * Creates a new published message object. * * @param int $messageId - * @param string $topic + * @param string $topicName * @param string $message * @param int $qualityOfService * @param bool $retain - * @param DateTime|null $sentAt */ public function __construct( int $messageId, - string $topic, + string $topicName, string $message, int $qualityOfService, - bool $retain, - DateTime $sentAt = null + bool $retain ) { - if ($sentAt === null) { - $sentAt = new DateTime(); - } - - $this->messageId = $messageId; - $this->topic = $topic; + parent::__construct($messageId); + $this->topicName = $topicName; $this->message = $message; $this->qualityOfService = $qualityOfService; $this->retain = $retain; - $this->lastSentAt = $sentAt; - } - - /** - * Returns the message identifier. - * - * @return int - */ - public function getMessageId(): int - { - return $this->messageId; } /** - * Returns the topic of the published message. + * Returns the topic name of the published message. * * @return string */ - public function getTopic(): string + public function getTopicName(): string { - return $this->topic; + return $this->topicName; } /** @@ -119,26 +93,6 @@ public function wantsToBeRetained(): bool return $this->retain; } - /** - * Returns the date time when the message was last attempted to be sent. - * - * @return DateTime - */ - public function getLastSentAt(): DateTime - { - return $this->lastSentAt; - } - - /** - * Returns the number of times the message has been attempted to be sent. - * - * @return int - */ - public function getSendingAttempts(): int - { - return $this->sendingAttempts; - } - /** * Determines whether the message has been confirmed as received. * @@ -150,40 +104,19 @@ public function hasBeenReceived(): bool } /** - * Sets the date time when the message was last attempted to be sent. + * Marks the published message as received (QoS level 2). * - * @param DateTime $value - * @return static - */ - public function setLastSentAt(DateTime $value): self - { - $this->lastSentAt = $value; - - return $this; - } - - /** - * Increments the sending attempts by one. + * Returns `true` if the message was not previously received. Otherwise + * `false` will be returned. * - * @return static + * @return bool */ - public function incrementSendingAttempts(): self + public function markAsReceived(): bool { - $this->sendingAttempts++; - - return $this; - } + $result = !$this->received; - /** - * Sets the received state. - * - * @param bool $value - * @return static - */ - public function setReceived(bool $value): self - { - $this->received = $value; + $this->received = true; - return $this; + return $result; } } diff --git a/src/Repositories/MemoryRepository.php b/src/Repositories/MemoryRepository.php index 6538f25..b70f112 100644 --- a/src/Repositories/MemoryRepository.php +++ b/src/Repositories/MemoryRepository.php @@ -4,49 +4,38 @@ namespace PhpMqtt\Client\Repositories; -use Datetime; use PhpMqtt\Client\Contracts\Repository; -use PhpMqtt\Client\Exceptions\PendingPublishConfirmationAlreadyExistsException; +use PhpMqtt\Client\Exceptions\PendingMessageAlreadyExistsException; +use PhpMqtt\Client\Exceptions\PendingMessageNotFoundException; +use PhpMqtt\Client\PendingMessage; use PhpMqtt\Client\PublishedMessage; -use PhpMqtt\Client\TopicSubscription; -use PhpMqtt\Client\UnsubscribeRequest; -use SplObjectStorage; +use PhpMqtt\Client\Subscription; /** * Provides an in-memory implementation which manages message ids, subscriptions and pending messages. - * Instances of this type do not persist any data and are only meant for simple uses cases and testing. + * Instances of this type do not persist any data and are only meant for simple uses cases. * * @package PhpMqtt\Client\Repositories */ class MemoryRepository implements Repository { /** @var int */ - private $lastMessageId = 0; + private $nextMessageId = 1; - /** @var int[] */ - private $reservedMessageIds = []; + /** @var array */ + private $pendingOutgoingMessages = []; - /** @var SplObjectStorage|TopicSubscription[] */ - private $topicSubscriptions; + /** @var array */ + private $pendingIncomingMessages = []; - /** @var SplObjectStorage|PublishedMessage[] */ - private $pendingPublishedMessages; - - /** @var SplObjectStorage|UnsubscribeRequest[] */ - private $pendingUnsubscribeRequests; - - /** @var SplObjectStorage|PublishedMessage[] */ - private $pendingPublishConfirmations; + /** @var array */ + private $subscriptions = []; /** * MemoryRepository constructor. */ public function __construct() { - $this->topicSubscriptions = new SplObjectStorage(); - $this->pendingPublishedMessages = new SplObjectStorage(); - $this->pendingUnsubscribeRequests = new SplObjectStorage(); - $this->pendingPublishConfirmations = new SplObjectStorage(); } /** @@ -57,89 +46,52 @@ public function __construct() */ public function newMessageId(): int { - do { - $this->rotateMessageId(); - - $messageId = $this->lastMessageId; - } while ($this->isReservedMessageId($messageId)); - - $this->reservedMessageIds[] = $messageId; - - return $messageId; - } - - /** - * Releases the given message id, allowing it to be reused in the future. - * - * @param int $messageId - * @return void - */ - public function releaseMessageId(int $messageId): void - { - $this->reservedMessageIds = array_diff($this->reservedMessageIds, [$messageId]); - } - - /** - * This method rotates the message id. This normally means incrementing it, - * but when we reach the limit (65535), the message id is reset to zero. - * - * @return void - */ - protected function rotateMessageId(): void - { - if ($this->lastMessageId === 65535) { - $this->lastMessageId = 0; + if (count($this->pendingOutgoingMessages) >= 65535) { + // This should never happen, as the server receive queue is + // normally smaller than the actual total number of message ids. + // Also, when using MQTT 5.0 the server can specify a smaller + // receive queue size (mosquitto for example has 20 by default), + // so the client has to implement the logic to honor this + // restriction and fallback to the protocol limit. + throw new \RuntimeException("No more message IDs available"); } - $this->lastMessageId++; - } + while (isset($this->pendingOutgoingMessages[$this->nextMessageId])) { + $this->nextMessageId++; + if ($this->nextMessageId > 65535) { + $this->nextMessageId = 1; + } + } - /** - * Determines if the given message id is currently reserved. - * - * @param int $messageId - * @return bool - */ - protected function isReservedMessageId(int $messageId): bool - { - return in_array($messageId, $this->reservedMessageIds); + return $this->nextMessageId; } /** - * Returns the number of registered topic subscriptions. The method does - * not differentiate between pending and acknowledged subscriptions. - * - * @return int + * {@inheritDoc} */ - public function countTopicSubscriptions(): int + public function countPendingOutgoingMessages(): int { - return $this->topicSubscriptions->count(); + return count($this->pendingOutgoingMessages); } /** - * Adds a topic subscription to the repository. - * - * @param TopicSubscription $subscription - * @return void + * {@inheritDoc} */ - public function addTopicSubscription(TopicSubscription $subscription): void + public function getPendingOutgoingMessage(int $messageId): ?PendingMessage { - $this->topicSubscriptions->attach($subscription); + return $this->pendingOutgoingMessages[$messageId] ?? null; } /** - * Get all topic subscriptions with the given message identifier. - * - * @param int $messageId - * @return TopicSubscription[] + * {@inheritDoc} */ - public function getTopicSubscriptionsWithMessageId(int $messageId): array + public function getPendingOutgoingMessagesLastSentBefore(\DateTime $dateTime = null): array { $result = []; - foreach ($this->topicSubscriptions as $subscription) { - if ($subscription->getMessageId() === $messageId) { - $result[] = $subscription; + foreach ($this->pendingOutgoingMessages as $pendingMessage) { + if ($pendingMessage->getLastSentAt() < $dateTime) { + $result[] = $pendingMessage; } } @@ -147,301 +99,137 @@ public function getTopicSubscriptionsWithMessageId(int $messageId): array } /** - * Find a topic subscription with the given topic. - * - * @param string $topic - * @return TopicSubscription|null + * {@inheritDoc} */ - public function getTopicSubscriptionByTopic(string $topic): ?TopicSubscription + public function addPendingOutgoingMessage(PendingMessage $message): void { - foreach ($this->topicSubscriptions as $subscription) { - if ($subscription->getTopic() === $topic) { - return $subscription; - } + if (isset($this->pendingOutgoingMessages[$message->getMessageId()])) { + throw new PendingMessageAlreadyExistsException($message->getMessageId()); } - return null; + $this->pendingOutgoingMessages[$message->getMessageId()] = $message; } /** - * Get all topic subscriptions matching the given topic. - * - * @param string $topic - * @return TopicSubscription[] + * {@inheritDoc} */ - public function getTopicSubscriptionsMatchingTopic(string $topic): array + public function markPendingOutgoingPublishedMessageAsReceived(int $messageId): bool { - $result = []; - - foreach ($this->topicSubscriptions as $subscription) { - if (preg_match($subscription->getRegexifiedTopic(), $topic)) { - $result[] = $subscription; - } + if (!isset($this->pendingOutgoingMessages[$messageId]) || + !$this->pendingOutgoingMessages[$messageId] instanceof PublishedMessage) { + throw new PendingMessageNotFoundException($messageId); } - return $result; + return $this->pendingOutgoingMessages[$messageId]->markAsReceived(); } /** - * Removes the topic subscription with the given topic from the repository. - * Returns true if a topic subscription existed and has been removed. - * Otherwise, false is returned. - * - * @param string $topic - * @return bool + * {@inheritDoc} */ - public function removeTopicSubscription(string $topic): bool + public function removePendingOutgoingMessage(int $messageId): bool { - $result = false; - - foreach ($this->topicSubscriptions as $subscription) { - if ($subscription->getTopic() === $topic) { - $this->topicSubscriptions->detach($subscription); - $result = true; - break; - } + if (!isset($this->pendingOutgoingMessages[$messageId])) { + return false; } - return $result; + unset($this->pendingOutgoingMessages[$messageId]); + return true; } /** - * Returns the number of pending publish messages. - * - * @return int + * {@inheritDoc} */ - public function countPendingPublishMessages(): int + public function countPendingIncomingMessages(): int { - return $this->pendingPublishedMessages->count(); + return count($this->pendingIncomingMessages); } /** - * Adds a pending published message to the repository. - * - * @param PublishedMessage $message - * @return void + * {@inheritDoc} */ - public function addPendingPublishedMessage(PublishedMessage $message): void + public function getPendingIncomingMessage(int $messageId): ?PendingMessage { - $this->pendingPublishedMessages->attach($message); + return $this->pendingIncomingMessages[$messageId] ?? null; } /** - * Gets a pending published message with the given message identifier, if found. - * - * @param int $messageId - * @return PublishedMessage|null + * {@inheritDoc} */ - public function getPendingPublishedMessageWithMessageId(int $messageId): ?PublishedMessage + public function addPendingIncomingMessage(PendingMessage $message): void { - foreach ($this->pendingPublishedMessages as $message) { - if ($message->getMessageId() === $messageId) { - return $message; - } + if (isset($this->pendingIncomingMessages[$message->getMessageId()])) { + throw new PendingMessageAlreadyExistsException($message->getMessageId()); } - return null; + $this->pendingIncomingMessages[$message->getMessageId()] = $message; } /** - * Gets a list of pending published messages last sent before the given date time. - * - * @param DateTime $dateTime - * @return PublishedMessage[] + * {@inheritDoc} */ - public function getPendingPublishedMessagesLastSentBefore(DateTime $dateTime): array + public function removePendingIncomingMessage(int $messageId): bool { - $result = []; - - foreach ($this->pendingPublishedMessages as $message) { - if ($message->hasBeenReceived() === false && $message->getLastSentAt() < $dateTime) { - $result[] = $message; - } - } - - return $result; - } - - /** - * Marks the pending published message with the given message identifier as received. - * If the message has no QoS level of 2, is not found or has already been received, - * false is returned. Otherwise the result will be true. - * - * @param int $messageId - * @return bool - */ - public function markPendingPublishedMessageAsReceived(int $messageId): bool - { - $message = $this->getPendingPublishedMessageWithMessageId($messageId); - - if ($message === null || $message->getQualityOfServiceLevel() < 2 || $message->hasBeenReceived()) { + if (!isset($this->pendingIncomingMessages[$messageId])) { return false; } - $message->setReceived(true); - + unset($this->pendingIncomingMessages[$messageId]); return true; } /** - * Removes a pending published message from the repository. If a pending message - * with the given identifier is found and successfully removed from the repository, - * `true` is returned. Otherwise `false` will be returned. - * - * @param int $messageId - * @return bool + * {@inheritDoc} */ - public function removePendingPublishedMessage(int $messageId): bool + public function countSubscriptions(): int { - $message = $this->getPendingPublishedMessageWithMessageId($messageId); - - if ($message === null) { - return false; - } - - $this->pendingPublishedMessages->detach($message); - - return true; + return count($this->subscriptions); } /** - * Returns the number of pending unsubscribe requests. - * - * @return int + * {@inheritDoc} */ - public function countPendingUnsubscribeRequests(): int + public function addSubscription(Subscription $subscription): void { - return $this->pendingUnsubscribeRequests->count(); + $this->subscriptions[] = $subscription; } /** - * Adds a pending unsubscribe request to the repository. - * - * @param UnsubscribeRequest $request - * @return void + * {@inheritDoc} */ - public function addPendingUnsubscribeRequest(UnsubscribeRequest $request): void - { - $this->pendingUnsubscribeRequests->attach($request); - } - - /** - * Gets a pending unsubscribe request with the given message identifier, if found. - * - * @param int $messageId - * @return UnsubscribeRequest|null - */ - public function getPendingUnsubscribeRequestWithMessageId(int $messageId): ?UnsubscribeRequest - { - foreach ($this->pendingUnsubscribeRequests as $request) { - if ($request->getMessageId() === $messageId) { - return $request; - } - } - - return null; - } - - /** - * Gets a list of pending unsubscribe requests last sent before the given date time. - * - * @param DateTime $dateTime - * @return UnsubscribeRequest[] - */ - public function getPendingUnsubscribeRequestsLastSentBefore(DateTime $dateTime): array + public function getMatchingSubscriptions(string $topicName = null, int $subscriptionId = null): array { $result = []; - foreach ($this->pendingUnsubscribeRequests as $request) { - if ($request->getLastSentAt() < $dateTime) { - $result[] = $request; + foreach ($this->subscriptions as $subscription) { + if (($topicName !== null) && !$subscription->matchTopicFilter($topicName)) { + continue; } - } - - return $result; - } - /** - * Removes a pending unsubscribe requests from the repository. If a pending request - * with the given identifier is found and successfully removed from the repository, - * `true` is returned. Otherwise `false` will be returned. - * - * @param int $messageId - * @return bool - */ - public function removePendingUnsubscribeRequest(int $messageId): bool - { - $request = $this->getPendingUnsubscribeRequestWithMessageId($messageId); + if (($subscriptionId !== null) && ($subscription->getSubscriptionId() !== $subscriptionId)) { + continue; + } - if ($request === null) { - return false; + $result[] = $subscription; } - $this->pendingUnsubscribeRequests->detach($request); - - return true; + return $result; } /** - * Returns the number of pending publish confirmations. - * - * @return int + * {@inheritDoc} */ - public function countPendingPublishConfirmations(): int + public function removeSubscription(string $topicFilter): bool { - return $this->pendingPublishConfirmations->count(); - } - - /** - * Adds a pending publish confirmation to the repository. - * - * @param PublishedMessage $message - * @return void - * @throws PendingPublishConfirmationAlreadyExistsException - */ - public function addPendingPublishConfirmation(PublishedMessage $message): void - { - if ($this->getPendingPublishConfirmationWithMessageId($message->getMessageId()) !== null) { - throw new PendingPublishConfirmationAlreadyExistsException($message->getMessageId()); - } - - $this->pendingPublishConfirmations->attach($message); - } + $result = false; - /** - * Gets a pending publish confirmation with the given message identifier, if found. - * - * @param int $messageId - * @return PublishedMessage|null - */ - public function getPendingPublishConfirmationWithMessageId(int $messageId): ?PublishedMessage - { - foreach ($this->pendingPublishConfirmations as $confirmation) { - if ($confirmation->getMessageId() === $messageId) { - return $confirmation; + foreach ($this->subscriptions as $index => $subscription) { + if ($subscription->getTopicFilter() === $topicFilter) { + unset($this->subscriptions[$index]); + $result = true; + break; } } - return null; - } - - /** - * Removes the given message identifier from the list of pending publish confirmations. - * This is normally done as soon as a transaction has been successfully finished. - * - * @param int $messageId - * @return bool - */ - public function removePendingPublishConfirmation(int $messageId): bool - { - $confirmation = $this->getPendingPublishConfirmationWithMessageId($messageId); - - if ($confirmation === null) { - return false; - } - - $this->pendingPublishConfirmations->detach($confirmation); - - return true; + return $result; } } diff --git a/src/SubscribeRequest.php b/src/SubscribeRequest.php new file mode 100644 index 0000000..28a4d2d --- /dev/null +++ b/src/SubscribeRequest.php @@ -0,0 +1,38 @@ +subscriptions = array_values($subscriptions); + } + + /** + * Returns the subscriptions in this request. + * + * @return Subscription[] + */ + public function getSubscriptions(): array + { + return $this->subscriptions; + } +} diff --git a/src/Subscription.php b/src/Subscription.php new file mode 100644 index 0000000..08842bb --- /dev/null +++ b/src/Subscription.php @@ -0,0 +1,122 @@ +topicFilter = $topicFilter; + $this->subscriptionId = $subscriptionId; + $this->qualityOfService = $qualityOfService; + + if ($callback !== null) { + $this->callback = SerializableClosure::from($callback); + } + + $this->regexifyTopicFilter(); + } + + /** + * Converts the topic filter into a regular expression. + * + * @return void + */ + private function regexifyTopicFilter(): void + { + $this->regexifiedTopicFilter = '/^' . str_replace(['$', '/', '+', '#'], ['\$', '\/', '[^\/]*', '.*'], $this->topicFilter) . '$/'; + } + + /** + * Returns the topic of the subscription. + * + * @return string + */ + public function getTopicFilter(): string + { + return $this->topicFilter; + } + + /** + * Matches the given topic name matches to the subscription's topic filter. + * + * @param string $topicName + * @return bool + */ + public function matchTopicFilter(string $topicName): bool + { + return (bool) preg_match($this->regexifiedTopicFilter, $topicName); + } + + /** + * Returns the subscription identifier. + * + * @return int|null + */ + public function getSubscriptionId(): ?int + { + return $this->subscriptionId; + } + + /** + * Returns the callback for this subscription. + * + * @return \Closure|null + */ + public function getCallback(): ?\Closure + { + return ($this->callback ? $this->callback->getClosure() : null); + } + + /** + * Returns the requested quality of service level. + * + * @return int + */ + public function getQualityOfServiceLevel(): int + { + return $this->qualityOfService; + } + + /** + * Sets the actual quality of service level. + * + * @param int $qualityOfService + * @return void + */ + public function setQualityOfServiceLevel(int $qualityOfService): void + { + $this->qualityOfService = $qualityOfService; + } +} diff --git a/src/TopicSubscription.php b/src/TopicSubscription.php deleted file mode 100644 index 8384f52..0000000 --- a/src/TopicSubscription.php +++ /dev/null @@ -1,124 +0,0 @@ -topic = $topic; - $this->regexifiedTopic = '/^' . str_replace(['$', '/', '+', '#'], ['\$', '\/', '[^\/]*', '.*'], $topic) . '$/'; - $this->callback = SerializableClosure::from($callback); - $this->messageId = $messageId; - $this->qualityOfService = $qualityOfService; - } - - /** - * Returns the topic of the subscription. - * - * @return string - */ - public function getTopic(): string - { - return $this->topic; - } - - /** - * Returns the regexified topic. This regex can be used to match - * incoming messages to subscriptions. - * - * @return string - */ - public function getRegexifiedTopic(): string - { - return $this->regexifiedTopic; - } - - /** - * Returns the callback for this subscription. - * - * @return \Closure - */ - public function getCallback(): \Closure - { - return $this->callback->getClosure(); - } - - /** - * Returns the message identifier. - * - * @return int - */ - public function getMessageId(): int - { - return $this->messageId; - } - - /** - * Returns the requested quality of service level. - * - * @return int - */ - public function getQualityOfServiceLevel(): int - { - return $this->qualityOfService; - } - - /** - * Returns the acknowledged quality of service level. - * - * @return int|null - */ - public function getAcknowledgedQualityOfServiceLevel(): ?int - { - return $this->acknowledgedQualityOfService; - } - - /** - * Sets the acknowledged quality of service level. - * - * @param int $value - * @return static - */ - public function setAcknowledgedQualityOfServiceLevel(int $value): self - { - $this->acknowledgedQualityOfService = $value; - - return $this; - } -} diff --git a/src/UnsubscribeRequest.php b/src/UnsubscribeRequest.php index a5c83da..7edd443 100644 --- a/src/UnsubscribeRequest.php +++ b/src/UnsubscribeRequest.php @@ -4,105 +4,35 @@ namespace PhpMqtt\Client; -use DateTime; - /** - * Represents an unsubscribe request. Is used to store pending unsubscribe requests. - * If an unsubscribe request is not acknowledged by the broker, having one of these - * objects allows the client to resend the request. + * Represents an unsubscribe request. * * @package PhpMqtt\Client */ -class UnsubscribeRequest +class UnsubscribeRequest extends PendingMessage { - /** @var int */ - private $messageId; - - /** @var string */ - private $topic; - - /** @var DateTime */ - private $lastSentAt; - - /** @var int */ - private $sendingAttempts = 1; + /** @var string[] */ + protected $topicFilters; /** * Creates a new unsubscribe request object. * - * @param int $messageId - * @param string $topic - * @param DateTime|null $sentAt - */ - public function __construct(int $messageId, string $topic, DateTime $sentAt = null) - { - $this->messageId = $messageId; - $this->topic = $topic; - $this->lastSentAt = $sentAt ?? new DateTime(); - } - - /** - * Returns the message identifier. - * - * @return int - */ - public function getMessageId(): int - { - return $this->messageId; - } - - /** - * Returns the topic of the subscription. - * - * @return string - */ - public function getTopic(): string - { - return $this->topic; - } - - /** - * Returns the date time when the message was last attempted to be sent. - * - * @return DateTime - */ - public function getLastSentAt(): DateTime - { - return $this->lastSentAt; - } - - /** - * Returns the number of times the message has been attempted to be sent. - * - * @return int + * @param int $messageId + * @param string[] $topicFilters */ - public function getSendingAttempts(): int + public function __construct(int $messageId, array $topicFilters) { - return $this->sendingAttempts; + parent::__construct($messageId); + $this->topicFilters = array_values($topicFilters); } /** - * Sets the date time when the message was last attempted to be sent. + * Returns the topic filters in this request. * - * @param DateTime $value - * @return static + * @return string[] */ - public function setLastSentAt(DateTime $value): self + public function getTopicFilters(): array { - $this->lastSentAt = $value; - - return $this; - } - - /** - * Increments the sending attempts by one. - * - * @return static - */ - public function incrementSendingAttempts(): self - { - $this->sendingAttempts++; - - return $this; + return $this->topicFilters; } } diff --git a/tests/Unit/MessageProcessors/Mqtt31MessageProcessorTest.php b/tests/Unit/MessageProcessors/Mqtt31MessageProcessorTest.php index be2eef9..9ac228c 100644 --- a/tests/Unit/MessageProcessors/Mqtt31MessageProcessorTest.php +++ b/tests/Unit/MessageProcessors/Mqtt31MessageProcessorTest.php @@ -6,6 +6,7 @@ use PhpMqtt\Client\ConnectionSettings; use PhpMqtt\Client\Logger; +use PhpMqtt\Client\Subscription; use PhpMqtt\Client\MessageProcessors\Mqtt31MessageProcessor; use PHPUnit\Framework\TestCase; @@ -158,32 +159,30 @@ public function buildSubscribeMessage_testDataProvider(): array return [ // Simple QoS 0 subscription - [42, 'test/foo', 0, hex2bin('82'.'0d00'.'2a00'.'08') . 'test/foo' . hex2bin('00')], + [42, [new Subscription('test/foo', null, null, 0)], hex2bin('82'.'0d00'.'2a00'.'08') . 'test/foo' . hex2bin('00')], // Wildcard QoS 2 subscription with high message id - [43764, 'test/foo/bar/baz/#', 2, hex2bin('82'.'17aa'.'f400'.'12') . 'test/foo/bar/baz/#' . hex2bin('02')], + [43764, [new Subscription('test/foo/bar/baz/#', null, null, 2)], hex2bin('82'.'17aa'.'f400'.'12') . 'test/foo/bar/baz/#' . hex2bin('02')], // Long QoS 1 subscription with high message id - [62304, $longTopic, 1, hex2bin('82'.'8701'.'f360'.'0082') . $longTopic . hex2bin('01')], + [62304, [new Subscription($longTopic, null, null, 1)], hex2bin('82'.'8701'.'f360'.'0082') . $longTopic . hex2bin('01')], ]; } /** * @dataProvider buildSubscribeMessage_testDataProvider * - * @param int $messageId - * @param string $topic - * @param int $qualityOfService - * @param string $expectedResult + * @param int $messageId + * @param Subscription[] $subscriptions + * @param string $expectedResult */ public function test_buildSubscribeMessage_builds_correct_message( int $messageId, - string $topic, - int $qualityOfService, + array $subscriptions, string $expectedResult ): void { - $result = $this->messageProcessor->buildSubscribeMessage($messageId, $topic, $qualityOfService); + $result = $this->messageProcessor->buildSubscribeMessage($messageId, $subscriptions); $this->assertEquals($expectedResult, $result); } @@ -202,32 +201,32 @@ public function buildUnsubscribeMessage_testDataProvider(): array return [ // Simple unsubscribe without duplicate - [42, 'test/foo', false, hex2bin('a2'.'0c00'.'2a00'.'08') . 'test/foo'], + [42, ['test/foo'], false, hex2bin('a2'.'0c00'.'2a00'.'08') . 'test/foo'], // Wildcard unsubscribe with high message id as duplicate - [43764, 'test/foo/bar/baz/#', true, hex2bin('aa'.'16aa'.'f400'.'12') . 'test/foo/bar/baz/#'], + [43764, ['test/foo/bar/baz/#'], true, hex2bin('aa'.'16aa'.'f400'.'12') . 'test/foo/bar/baz/#'], // Long unsubscribe with high message id as duplicate - [62304, $longTopic, true, hex2bin('aa'.'8601'.'f360'.'0082') . $longTopic], + [62304, [$longTopic], true, hex2bin('aa'.'8601'.'f360'.'0082') . $longTopic], ]; } /** * @dataProvider buildUnsubscribeMessage_testDataProvider * - * @param int $messageId - * @param string $topic - * @param bool $isDuplicate - * @param string $expectedResult + * @param int $messageId + * @param string[] $topics + * @param bool $isDuplicate + * @param string $expectedResult */ public function test_buildUnsubscribeMessage_builds_correct_message( int $messageId, - string $topic, + array $topics, bool $isDuplicate, string $expectedResult ): void { - $result = $this->messageProcessor->buildUnsubscribeMessage($messageId, $topic, $isDuplicate); + $result = $this->messageProcessor->buildUnsubscribeMessage($messageId, $topics, $isDuplicate); $this->assertEquals($expectedResult, $result); } From 4b8e236ec4b98e8be484c8badc518fd55b705a6a Mon Sep 17 00:00:00 2001 From: Giovanni Giacobbi Date: Fri, 21 Aug 2020 18:00:10 +0200 Subject: [PATCH 2/7] Removed Redis repository implementation until updated to the new Repository interface --- src/Repositories/RedisConnectionSettings.php | 109 ---- src/Repositories/RedisRepository.php | 554 ------------------- 2 files changed, 663 deletions(-) delete mode 100644 src/Repositories/RedisConnectionSettings.php delete mode 100644 src/Repositories/RedisRepository.php diff --git a/src/Repositories/RedisConnectionSettings.php b/src/Repositories/RedisConnectionSettings.php deleted file mode 100644 index 4943ae9..0000000 --- a/src/Repositories/RedisConnectionSettings.php +++ /dev/null @@ -1,109 +0,0 @@ -host; - } - - /** - * @param string $host - * @return RedisConnectionSettings - */ - public function setHost(string $host): RedisConnectionSettings - { - $clone = clone $this; - - $clone->host = $host; - - return $clone; - } - - /** - * @return int - */ - public function getPort(): int - { - return $this->port; - } - - /** - * @param int $port - * @return RedisConnectionSettings - */ - public function setPort(int $port): RedisConnectionSettings - { - $clone = clone $this; - - $clone->port = $port; - - return $clone; - } - - /** - * @return float - */ - public function getConnectTimeout(): float - { - return $this->connectTimeout; - } - - /** - * @param float $connectTimeout - * @return RedisConnectionSettings - */ - public function setConnectTimeout(float $connectTimeout): RedisConnectionSettings - { - $clone = clone $this; - - $clone->connectTimeout = $connectTimeout; - - return $clone; - } - - /** - * @return int - */ - public function getDatabase(): int - { - return $this->database; - } - - /** - * @param int $database - * @return RedisConnectionSettings - */ - public function setDatabase(int $database): RedisConnectionSettings - { - $clone = clone $this; - - $clone->database = $database; - - return $clone; - } -} diff --git a/src/Repositories/RedisRepository.php b/src/Repositories/RedisRepository.php deleted file mode 100644 index 13a2bc2..0000000 --- a/src/Repositories/RedisRepository.php +++ /dev/null @@ -1,554 +0,0 @@ -isConnected())) { - throw new ConfigurationInvalidException('Redis repository requires connection settings or connected Redis instance.'); - } - - if ($redis !== null && $redis->isConnected()) { - $this->redis = $redis; - } else { - $this->ensureConnectionSettingsAreValid($connectionSettings); - - $redis = new \Redis(); - $result = $redis->connect($connectionSettings->getHost(), $connectionSettings->getPort(), $connectionSettings->getConnectTimeout()); - - if ($result === false) { - throw new ConfigurationInvalidException('Connecting to the Redis server failed. Is the configuration correct?'); - } - - $redis->select($connectionSettings->getDatabase()); - - $this->redis = $redis; - } - - $this->identifier = $identifier; - - $this->redis->setOption(\Redis::OPT_PREFIX, $this->identifier . ':'); - $this->redis->setOption(\Redis::OPT_SCAN, \Redis::SCAN_RETRY); - $this->redis->setOption(\Redis::OPT_SERIALIZER, \Redis::SERIALIZER_PHP); - - $this->ensureRepositoryIsInitialized(); - } - - /** - * Ensures the given connection settings are valid (i.e. usable to connect to a Redis instance). - * This method does not validate whether connecting is actually possible. - * - * @param RedisConnectionSettings $connectionSettings - * @return void - * @throws ConfigurationInvalidException - */ - protected function ensureConnectionSettingsAreValid(RedisConnectionSettings $connectionSettings): void - { - if ($connectionSettings->getHost() === null) { - throw new ConfigurationInvalidException('No host has been configured for the Redis repository.'); - } - - if ($connectionSettings->getDatabase() < 0 || $connectionSettings->getDatabase() > 15) { - throw new ConfigurationInvalidException('The configured Redis database is invalid. Only databases 0 to 15 are supported.'); - } - } - - /** - * This method initializes the required keys for this repository using the established Redis connection. - * - * @return void - */ - protected function ensureRepositoryIsInitialized(): void - { - // Set the last used message id to zero, making the first message id to be used a 1. - $this->redis->setnx(self::KEY_LAST_MESSAGE_ID, 0); - } - - /** - * Returns a new message id. The message id might have been used before, - * but it is currently not being used (i.e. in a resend queue). - * - * @return int - */ - public function newMessageId(): int - { - do { - $this->rotateMessageId(); - - $messageId = $this->redis->get(self::KEY_LAST_MESSAGE_ID); - } while ($this->isReservedMessageId($messageId)); - - $this->redis->sAdd(self::KEY_RESERVED_MESSAGE_IDS, $messageId); - - return $messageId; - } - - /** - * Releases the given message id, allowing it to be reused in the future. - * - * @param int $messageId - * @return void - */ - public function releaseMessageId(int $messageId): void - { - $this->redis->sRem(self::KEY_RESERVED_MESSAGE_IDS, $messageId); - } - - /** - * This method rotates the message id. This normally means incrementing it, - * but when we reach the limit (65535), the message id is reset to zero. - * - * @return void - */ - protected function rotateMessageId(): void - { - $lastMessageId = $this->redis->get(self::KEY_LAST_MESSAGE_ID); - - if ($lastMessageId === 65535) { - $this->redis->set(self::KEY_LAST_MESSAGE_ID, 1); - } else { - $this->redis->set(self::KEY_LAST_MESSAGE_ID, $lastMessageId + 1); - } - } - - /** - * Determines if the given message id is currently reserved. - * - * @param int $messageId - * @return bool - */ - protected function isReservedMessageId(int $messageId): bool - { - return $this->redis->sIsMember(self::KEY_RESERVED_MESSAGE_IDS, $messageId); - } - - /** - * Returns the number of registered topic subscriptions. The method does - * not differentiate between pending and acknowledged subscriptions. - * - * @return int - */ - public function countTopicSubscriptions(): int - { - return $this->redis->sCard(self::KEY_TOPIC_SUBSCRIPTIONS); - } - - /** - * Adds a topic subscription to the repository. - * - * @param TopicSubscription $subscription - * @return void - */ - public function addTopicSubscription(TopicSubscription $subscription): void - { - $this->redis->sAdd(self::KEY_TOPIC_SUBSCRIPTIONS, $subscription); - } - - /** - * Get all topic subscriptions with the given message identifier. - * - * @param int $messageId - * @return TopicSubscription[] - */ - public function getTopicSubscriptionsWithMessageId(int $messageId): array - { - $result = []; - - $iterator = null; - while ($subscriptions = $this->redis->sScan(self::KEY_TOPIC_SUBSCRIPTIONS, $iterator)) { - /** @var TopicSubscription[] $subscriptions */ - foreach ($subscriptions as $subscription) { - if ($subscription->getMessageId() === $messageId) { - $result[] = $subscription; - } - } - } - - return $result; - } - - /** - * Find a topic subscription with the given topic. - * - * @param string $topic - * @return TopicSubscription|null - */ - public function getTopicSubscriptionByTopic(string $topic): ?TopicSubscription - { - $iterator = null; - while ($subscriptions = $this->redis->sScan(self::KEY_TOPIC_SUBSCRIPTIONS, $iterator)) { - /** @var TopicSubscription[] $subscriptions */ - foreach ($subscriptions as $subscription) { - if ($subscription->getTopic() === $topic) { - return $subscription; - } - } - } - - return null; - } - - /** - * Get all topic subscriptions matching the given topic. - * - * @param string $topic - * @return TopicSubscription[] - */ - public function getTopicSubscriptionsMatchingTopic(string $topic): array - { - $result = []; - - $iterator = null; - while ($subscriptions = $this->redis->sScan(self::KEY_TOPIC_SUBSCRIPTIONS, $iterator)) { - /** @var TopicSubscription[] $subscriptions */ - foreach ($subscriptions as $subscription) { - if (preg_match($subscription->getRegexifiedTopic(), $topic)) { - $result[] = $subscription; - } - } - } - - return $result; - } - - /** - * Removes the topic subscription with the given topic from the repository. - * Returns true if a topic subscription existed and has been removed. - * Otherwise, false is returned. - * - * @param string $topic - * @return bool - */ - public function removeTopicSubscription(string $topic): bool - { - $subscription = $this->getTopicSubscriptionByTopic($topic); - - if ($subscription === null) { - return false; - } - - if ($this->redis->sRem(self::KEY_TOPIC_SUBSCRIPTIONS, $subscription) === false) { - return false; - } - - return true; - } - - /** - * Returns the number of pending publish messages. - * - * @return int - */ - public function countPendingPublishMessages(): int - { - return $this->redis->sCard(self::KEY_PENDING_PUBLISH_MESSAGES); - } - - /** - * Adds a pending published message to the repository. - * - * @param PublishedMessage $message - * @return void - */ - public function addPendingPublishedMessage(PublishedMessage $message): void - { - $this->redis->sAdd(self::KEY_TOPIC_SUBSCRIPTIONS, $message); - } - - /** - * Gets a pending published message with the given message identifier, if found. - * - * @param int $messageId - * @return PublishedMessage|null - */ - public function getPendingPublishedMessageWithMessageId(int $messageId): ?PublishedMessage - { - $iterator = null; - while ($messages = $this->redis->sScan(self::KEY_PENDING_PUBLISH_MESSAGES, $iterator)) { - /** @var PublishedMessage[] $messages */ - foreach ($messages as $message) { - if ($message->getMessageId() === $messageId) { - return $message; - } - } - } - - return null; - } - - /** - * Gets a list of pending published messages last sent before the given date time. - * - * @param DateTime $dateTime - * @return PublishedMessage[] - */ - public function getPendingPublishedMessagesLastSentBefore(DateTime $dateTime): array - { - $result = []; - - $iterator = null; - while ($messages = $this->redis->sScan(self::KEY_PENDING_PUBLISH_MESSAGES, $iterator)) { - /** @var PublishedMessage[] $messages */ - foreach ($messages as $message) { - if ($message->hasBeenReceived() === false && $message->getLastSentAt() < $dateTime) { - $result[] = $message; - } - } - } - - return $result; - } - - /** - * Marks the pending published message with the given message identifier as received. - * If the message has no QoS level of 2, is not found or has already been received, - * false is returned. Otherwise the result will be true. - * - * @param int $messageId - * @return bool - */ - public function markPendingPublishedMessageAsReceived(int $messageId): bool - { - $message = $this->getPendingPublishedMessageWithMessageId($messageId); - - if ($message === null || $message->getQualityOfServiceLevel() < 2 || $message->hasBeenReceived()) { - return false; - } - - if ($this->redis->sRem(self::KEY_PENDING_PUBLISH_MESSAGES, $message) === false) { - return false; - } - - $message->setReceived(true); - - $this->addPendingPublishedMessage($message); - - return true; - } - - /** - * Removes a pending published message from the repository. If a pending message - * with the given identifier is found and successfully removed from the repository, - * `true` is returned. Otherwise `false` will be returned. - * - * @param int $messageId - * @return bool - */ - public function removePendingPublishedMessage(int $messageId): bool - { - $message = $this->getPendingPublishedMessageWithMessageId($messageId); - - if ($message === null) { - return false; - } - - if ($this->redis->sRem(self::KEY_PENDING_PUBLISH_MESSAGES, $message) === false) { - return false; - } - - return true; - } - - /** - * Returns the number of pending unsubscribe requests. - * - * @return int - */ - public function countPendingUnsubscribeRequests(): int - { - return $this->redis->sCard(self::KEY_PENDING_UNSUBSCRIBE_REQUESTS); - } - - /** - * Adds a pending unsubscribe request to the repository. - * - * @param UnsubscribeRequest $request - * @return void - */ - public function addPendingUnsubscribeRequest(UnsubscribeRequest $request): void - { - $this->redis->sAdd(self::KEY_PENDING_UNSUBSCRIBE_REQUESTS, $request); - } - - /** - * Gets a pending unsubscribe request with the given message identifier, if found. - * - * @param int $messageId - * @return UnsubscribeRequest|null - */ - public function getPendingUnsubscribeRequestWithMessageId(int $messageId): ?UnsubscribeRequest - { - $iterator = null; - while ($requests = $this->redis->sScan(self::KEY_PENDING_UNSUBSCRIBE_REQUESTS, $iterator)) { - /** @var UnsubscribeRequest[] $requests */ - foreach ($requests as $request) { - if ($request->getMessageId() === $messageId) { - return $request; - } - } - } - - return null; - } - - /** - * Gets a list of pending unsubscribe requests last sent before the given date time. - * - * @param DateTime $dateTime - * @return UnsubscribeRequest[] - */ - public function getPendingUnsubscribeRequestsLastSentBefore(DateTime $dateTime): array - { - $result = []; - - $iterator = null; - while ($requests = $this->redis->sScan(self::KEY_PENDING_UNSUBSCRIBE_REQUESTS, $iterator)) { - /** @var UnsubscribeRequest[] $requests */ - foreach ($requests as $request) { - if ($request->getLastSentAt() < $dateTime) { - $result[] = $request; - } - } - } - - return $result; - } - - /** - * Removes a pending unsubscribe requests from the repository. If a pending request - * with the given identifier is found and successfully removed from the repository, - * `true` is returned. Otherwise `false` will be returned. - * - * @param int $messageId - * @return bool - */ - public function removePendingUnsubscribeRequest(int $messageId): bool - { - $request = $this->getPendingUnsubscribeRequestWithMessageId($messageId); - - if ($request === null) { - return false; - } - - if ($this->redis->sRem(self::KEY_PENDING_UNSUBSCRIBE_REQUESTS, $request) === false) { - return false; - } - - return true; - } - - /** - * Returns the number of pending publish confirmations. - * - * @return int - */ - public function countPendingPublishConfirmations(): int - { - return $this->redis->sCard(self::KEY_PENDING_PUBLISH_CONFIRMATIONS); - } - - /** - * Adds a pending publish confirmation to the repository. - * - * @param PublishedMessage $message - * @return void - * @throws PendingPublishConfirmationAlreadyExistsException - */ - public function addPendingPublishConfirmation(PublishedMessage $message): void - { - if ($this->getPendingPublishConfirmationWithMessageId($message->getMessageId()) !== null) { - throw new PendingPublishConfirmationAlreadyExistsException($message->getMessageId()); - } - - $this->redis->sAdd(self::KEY_PENDING_PUBLISH_CONFIRMATIONS, $message); - } - - /** - * Gets a pending publish confirmation with the given message identifier, if found. - * - * @param int $messageId - * @return PublishedMessage|null - */ - public function getPendingPublishConfirmationWithMessageId(int $messageId): ?PublishedMessage - { - $iterator = null; - while ($confirmations = $this->redis->sScan(self::KEY_PENDING_PUBLISH_CONFIRMATIONS, $iterator)) { - /** @var PublishedMessage[] $confirmations */ - foreach ($confirmations as $confirmation) { - if ($confirmation->getMessageId() === $messageId) { - return $confirmation; - } - } - } - - return null; - } - - /** - * Removes the given message identifier from the list of pending publish confirmations. - * This is normally done as soon as a transaction has been successfully finished. - * - * @param int $messageId - * @return bool - */ - public function removePendingPublishConfirmation(int $messageId): bool - { - $confirmation = $this->getPendingPublishConfirmationWithMessageId($messageId); - - if ($confirmation === null) { - return false; - } - - if ($this->redis->sRem(self::KEY_PENDING_PUBLISH_CONFIRMATIONS, $confirmation) === false) { - return false; - } - - return true; - } -} From 5fb0516f11ebed2079e9870f69e30250785447f4 Mon Sep 17 00:00:00 2001 From: Marvin Mall Date: Sun, 23 Aug 2020 09:58:52 +0200 Subject: [PATCH 3/7] Fix MessageProcessor, exceptions and documentation --- src/Contracts/MessageProcessor.php | 11 ++++- src/Contracts/MqttClient.php | 5 ++- src/Contracts/Repository.php | 6 ++- .../PendingMessageAlreadyExistsException.php | 2 +- .../PendingMessageNotFoundException.php | 2 +- src/Exceptions/RepositoryException.php | 14 +++++++ .../TopicNotSubscribedException.php | 15 ------- .../Mqtt31MessageProcessor.php | 15 ++++++- src/MqttClient.php | 42 ++++++++++--------- src/Repositories/MemoryRepository.php | 3 +- .../Mqtt31MessageProcessorTest.php | 2 +- 11 files changed, 71 insertions(+), 46 deletions(-) create mode 100644 src/Exceptions/RepositoryException.php delete mode 100644 src/Exceptions/TopicNotSubscribedException.php diff --git a/src/Contracts/MessageProcessor.php b/src/Contracts/MessageProcessor.php index a608832..2464dda 100644 --- a/src/Contracts/MessageProcessor.php +++ b/src/Contracts/MessageProcessor.php @@ -59,11 +59,18 @@ public function parseAndValidateMessage(string $message): ?Message; public function buildConnectMessage(ConnectionSettings $connectionSettings, bool $useCleanSession = false): string; /** - * Builds a ping message. + * Builds a ping request message. * * @return string */ - public function buildPingMessage(): string; + public function buildPingRequestMessage(): string; + + /** + * Builds a ping response message. + * + * @return string + */ + public function buildPingResponseMessage(): string; /** * Builds a disconnect message. diff --git a/src/Contracts/MqttClient.php b/src/Contracts/MqttClient.php index 2b88c92..7e09c9c 100644 --- a/src/Contracts/MqttClient.php +++ b/src/Contracts/MqttClient.php @@ -8,8 +8,8 @@ use PhpMqtt\Client\Exceptions\ConfigurationInvalidException; use PhpMqtt\Client\Exceptions\ConnectingToBrokerFailedException; use PhpMqtt\Client\Exceptions\DataTransferException; -use PhpMqtt\Client\Exceptions\TopicNotSubscribedException; use PhpMqtt\Client\Exceptions\ProtocolViolationException; +use PhpMqtt\Client\Exceptions\RepositoryException; /** * An interface for the MQTT client. @@ -57,6 +57,7 @@ public function isConnected(): bool; * @param bool $retain * @return void * @throws DataTransferException + * @throws RepositoryException */ public function publish(string $topic, string $message, int $qualityOfService = 0, bool $retain = false): void; @@ -86,6 +87,7 @@ public function publish(string $topic, string $message, int $qualityOfService = * @param int $qualityOfService * @return void * @throws DataTransferException + * @throws RepositoryException */ public function subscribe(string $topic, callable $callback, int $qualityOfService = 0): void; @@ -95,7 +97,6 @@ public function subscribe(string $topic, callable $callback, int $qualityOfServi * @param string $topic * @return void * @throws DataTransferException - * @throws TopicNotSubscribedException */ public function unsubscribe(string $topic): void; diff --git a/src/Contracts/Repository.php b/src/Contracts/Repository.php index 2ffbc2f..81f0ef7 100644 --- a/src/Contracts/Repository.php +++ b/src/Contracts/Repository.php @@ -7,6 +7,7 @@ use DateTime; use PhpMqtt\Client\Exceptions\PendingMessageAlreadyExistsException; use PhpMqtt\Client\Exceptions\PendingMessageNotFoundException; +use PhpMqtt\Client\Exceptions\RepositoryException; use PhpMqtt\Client\PendingMessage; use PhpMqtt\Client\Subscription; @@ -31,6 +32,7 @@ interface Repository * but it is currently not being used (i.e. in a resend queue). * * @return int + * @throws RepositoryException */ public function newMessageId(): int; @@ -149,8 +151,8 @@ public function addSubscription(Subscription $subscription): void; /** * Gets all subscriptions matching the given criteria. * - * @param string $topicName - * @param int $subscriptionId + * @param string|null $topicName + * @param int|null $subscriptionId * @return Subscription[] */ public function getMatchingSubscriptions(string $topicName = null, int $subscriptionId = null): array; diff --git a/src/Exceptions/PendingMessageAlreadyExistsException.php b/src/Exceptions/PendingMessageAlreadyExistsException.php index 08260b0..bb20329 100644 --- a/src/Exceptions/PendingMessageAlreadyExistsException.php +++ b/src/Exceptions/PendingMessageAlreadyExistsException.php @@ -9,7 +9,7 @@ * * @package PhpMqtt\Client\Exceptions */ -class PendingMessageAlreadyExistsException extends MqttClientException +class PendingMessageAlreadyExistsException extends RepositoryException { /** * PendingMessageAlreadyExistsException constructor. diff --git a/src/Exceptions/PendingMessageNotFoundException.php b/src/Exceptions/PendingMessageNotFoundException.php index 72c7bb3..ef3fe5a 100644 --- a/src/Exceptions/PendingMessageNotFoundException.php +++ b/src/Exceptions/PendingMessageNotFoundException.php @@ -9,7 +9,7 @@ * * @package PhpMqtt\Client\Exceptions */ -class PendingMessageNotFoundException extends MqttClientException +class PendingMessageNotFoundException extends RepositoryException { /** * PendingMessageNotFoundException constructor. diff --git a/src/Exceptions/RepositoryException.php b/src/Exceptions/RepositoryException.php new file mode 100644 index 0000000..95a22e4 --- /dev/null +++ b/src/Exceptions/RepositoryException.php @@ -0,0 +1,14 @@ +ensureConnected(); - // $subscription = $this->repository->getTopicSubscriptionByTopic($topic); - // if ($subscription === null) { - // throw new TopicNotSubscribedException(sprintf('No subscription found for topic [%s].', $topic)); - // } - $this->logger->debug('Unsubscribing from topic [{topic}].', [ 'topic' => $topicFilter, ]); @@ -704,9 +703,7 @@ public function loop(bool $allowSleep = true, bool $exitWhenQueuesEmpty = false, { $this->logger->debug('Starting client loop to process incoming messages and the resend queue.'); - $loopStartedAt = microtime(true); - $lastRepublishedAt = microtime(true); - $lastResendUnsubscribedAt = microtime(true); + $loopStartedAt = microtime(true); while (true) { if ($this->interrupted) { @@ -791,6 +788,7 @@ public function loop(bool $allowSleep = true, bool $exitWhenQueuesEmpty = false, * * @param Message $message * @throws DataTransferException + * @throws ProtocolViolationException */ protected function handleMessage(Message $message): void { @@ -829,6 +827,7 @@ protected function handleMessage(Message $message): void } // PUBACK (outgoing, QoS 1) + // Receiving an acknowledgement allows us to remove the published message from the retry queue. if ($message->getType()->equals(MessageType::PUBLISH_ACKNOWLEDGEMENT())) { $result = $this->repository->removePendingOutgoingMessage($message->getMessageId()); if ($result === false) { @@ -840,13 +839,13 @@ protected function handleMessage(Message $message): void } // PUBREC (outgoing, QoS 2, part 1) + // Receiving a receipt allows us to mark the published message as received. if ($message->getType()->equals(MessageType::PUBLISH_RECEIPT())) { try { $result = $this->repository->markPendingOutgoingPublishedMessageAsReceived($message->getMessageId()); } catch (PendingMessageNotFoundException $e) { - // This should never happen as we should have received all - // PUBRECs before we we see the first PUBCOMP which actually - // remove the message, but better staying on the safe side. + // This should never happen as we should have received all PUBREC messages before we see the first + // PUBCOMP which actually remove the message. So we do this for safety only. $result = false; } if ($result === false) { @@ -861,6 +860,7 @@ protected function handleMessage(Message $message): void } // PUBREL (incoming, QoS 2, part 2) + // When the broker tells us we can release the received published message, we deliver it to subscribed callbacks. if ($message->getType()->equals(MessageType::PUBLISH_RELEASE())) { $pendingMessage = $this->repository->getPendingIncomingMessage($message->getMessageId()); if (!$pendingMessage || !$pendingMessage instanceof PublishedMessage) { @@ -883,6 +883,8 @@ protected function handleMessage(Message $message): void } // PUBCOMP (outgoing, QoS 2 part 3) + // Receiving a completion allows us to remove a published message from the retry queue. + // At this point, the publish process is complete. if ($message->getType()->equals(MessageType::PUBLISH_COMPLETE())) { $result = $this->repository->removePendingOutgoingMessage($message->getMessageId()); if ($result === false) { @@ -946,7 +948,7 @@ protected function handleMessage(Message $message): void // PINGREQ if ($message->getType()->equals(MessageType::PING_REQUEST())) { // Respond with PINGRESP. - $this->writeToSocket(chr(0xd0) . chr(0x00)); + $this->writeToSocket($this->messageProcessor->buildPingResponseMessage()); return; } } @@ -975,9 +977,10 @@ protected function deliverPublishedMessage(string $topic, string $message, int $ { $subscribers = $this->repository->getMatchingSubscriptions($topic); - $this->logger->debug('Delivering message received on topic [{topic}] from the broker to [{subscribers}] subscribers.', [ + $this->logger->debug('Delivering message received on topic [{topic}] with QoS [{qos}] from the broker to [{subscribers}] subscribers.', [ 'topic' => $topic, 'message' => $message, + 'qos' => $qualityOfServiceLevel, 'subscribers' => count($subscribers), ]); @@ -999,6 +1002,7 @@ protected function deliverPublishedMessage(string $topic, string $message, int $ * * @return void * @throws DataTransferException + * @throws InvalidMessageException */ protected function resendPendingMessages(): void { @@ -1035,7 +1039,7 @@ protected function resendPendingMessages(): void $data = $this->messageProcessor->buildUnsubscribeMessage($pendingMessage->getMessageId(), $pendingMessage->getTopicFilters(), true); $this->writeToSocket($data); } else { - throw new \RuntimeException('Unexpected pending message type'); + throw new InvalidMessageException('Unexpected message type encountered while resending pending messages.'); } $pendingMessage->setLastSentAt(new \DateTime()); @@ -1108,7 +1112,7 @@ protected function ping(): void { $this->logger->debug('Sending ping to the broker to keep the connection alive.'); - $this->writeToSocket($this->messageProcessor->buildPingMessage()); + $this->writeToSocket($this->messageProcessor->buildPingRequestMessage()); } /** diff --git a/src/Repositories/MemoryRepository.php b/src/Repositories/MemoryRepository.php index b70f112..4806060 100644 --- a/src/Repositories/MemoryRepository.php +++ b/src/Repositories/MemoryRepository.php @@ -7,6 +7,7 @@ use PhpMqtt\Client\Contracts\Repository; use PhpMqtt\Client\Exceptions\PendingMessageAlreadyExistsException; use PhpMqtt\Client\Exceptions\PendingMessageNotFoundException; +use PhpMqtt\Client\Exceptions\RepositoryException; use PhpMqtt\Client\PendingMessage; use PhpMqtt\Client\PublishedMessage; use PhpMqtt\Client\Subscription; @@ -53,7 +54,7 @@ public function newMessageId(): int // receive queue size (mosquitto for example has 20 by default), // so the client has to implement the logic to honor this // restriction and fallback to the protocol limit. - throw new \RuntimeException("No more message IDs available"); + throw new RepositoryException('No more message identifiers available. The queue is full.'); } while (isset($this->pendingOutgoingMessages[$this->nextMessageId])) { diff --git a/tests/Unit/MessageProcessors/Mqtt31MessageProcessorTest.php b/tests/Unit/MessageProcessors/Mqtt31MessageProcessorTest.php index 9ac228c..2c527b1 100644 --- a/tests/Unit/MessageProcessors/Mqtt31MessageProcessorTest.php +++ b/tests/Unit/MessageProcessors/Mqtt31MessageProcessorTest.php @@ -383,7 +383,7 @@ public function test_buildPublishCompleteMessage_builds_correct_message(int $mes public function test_buildPingMessage_builds_correct_message(): void { - $this->assertEquals(hex2bin('c000'), $this->messageProcessor->buildPingMessage()); + $this->assertEquals(hex2bin('c000'), $this->messageProcessor->buildPingRequestMessage()); } public function test_buildDisconnectMessage_builds_correct_message(): void From c75ff9df64c341d374a4b99594b411c8eabdd6ef Mon Sep 17 00:00:00 2001 From: Marvin Mall Date: Sun, 23 Aug 2020 16:38:25 +0200 Subject: [PATCH 4/7] Change visibility and description of DTOs --- src/PendingMessage.php | 19 +++++++++++-------- src/PublishedMessage.php | 22 ++++++++++------------ src/SubscribeRequest.php | 2 +- 3 files changed, 22 insertions(+), 21 deletions(-) diff --git a/src/PendingMessage.php b/src/PendingMessage.php index 0d989da..904c2ba 100644 --- a/src/PendingMessage.php +++ b/src/PendingMessage.php @@ -9,21 +9,24 @@ /** * Represents a pending message. * - * If the message is not acknowledged by the broker, having one of these - * objects allows the client to resend the request. + * For messages with QoS 1 and 2 the client is responsible to resend the message if no + * acknowledgement is received from the broker within a given time period. + * + * This class serves as common base for message objects which need to be resent if no + * acknowledgement is received. * * @package PhpMqtt\Client */ abstract class PendingMessage { /** @var int */ - protected $messageId; + private $messageId; /** @var int */ - protected $sendingAttempts = 1; + private $sendingAttempts = 1; /** @var DateTime */ - protected $lastSentAt; + private $lastSentAt; /** * Creates a new pending message object. @@ -48,7 +51,7 @@ public function getMessageId(): int } /** - * Returns the date time when the message was last attempted to be sent. + * Returns the date time when the message was last sent. * * @return DateTime */ @@ -58,7 +61,7 @@ public function getLastSentAt(): DateTime } /** - * Returns the number of times the message has been attempted to be sent. + * Returns the number of times the message has been sent. * * @return int */ @@ -68,7 +71,7 @@ public function getSendingAttempts(): int } /** - * Sets the date time when the message was last attempted to be sent. + * Sets the date time when the message was last sent. * * @param DateTime|null $value * @return static diff --git a/src/PublishedMessage.php b/src/PublishedMessage.php index 2896490..5e851e6 100644 --- a/src/PublishedMessage.php +++ b/src/PublishedMessage.php @@ -4,8 +4,6 @@ namespace PhpMqtt\Client; -use DateTime; - /** * A simple DTO for published messages which need to be stored in a repository * while waiting for the confirmation to be deliverable. @@ -15,28 +13,28 @@ class PublishedMessage extends PendingMessage { /** @var string */ - protected $topicName; + private $topicName; /** @var string */ - protected $message; + private $message; /** @var int */ - protected $qualityOfService; + private $qualityOfService; /** @var bool */ - protected $retain; + private $retain; /** @var bool */ - protected $received = false; + private $received = false; /** * Creates a new published message object. * - * @param int $messageId - * @param string $topicName - * @param string $message - * @param int $qualityOfService - * @param bool $retain + * @param int $messageId + * @param string $topicName + * @param string $message + * @param int $qualityOfService + * @param bool $retain */ public function __construct( int $messageId, diff --git a/src/SubscribeRequest.php b/src/SubscribeRequest.php index 28a4d2d..2055463 100644 --- a/src/SubscribeRequest.php +++ b/src/SubscribeRequest.php @@ -12,7 +12,7 @@ class SubscribeRequest extends PendingMessage { /** @var Subscription[] */ - protected $subscriptions; + private $subscriptions; /** * Creates a new subscribe request message. From b22fb79e2bc1855a55764a97299bc927d25cd90e Mon Sep 17 00:00:00 2001 From: Marvin Mall Date: Sun, 23 Aug 2020 16:48:58 +0200 Subject: [PATCH 5/7] Simplify acknowledgement parsing logic --- src/MessageProcessors/Mqtt31MessageProcessor.php | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/src/MessageProcessors/Mqtt31MessageProcessor.php b/src/MessageProcessors/Mqtt31MessageProcessor.php index 5a29b7d..b191c97 100644 --- a/src/MessageProcessors/Mqtt31MessageProcessor.php +++ b/src/MessageProcessors/Mqtt31MessageProcessor.php @@ -684,14 +684,11 @@ protected function parseAndValidateSubscribeAcknowledgementMessage(string $data) $messageId = $this->decodeMessageId($this->pop($data, 2)); // Parse and validate the QoS acknowledgements. - $datalen = strlen($data); - $acknowledgements = []; - for ($i = 0; $i < $datalen; $i++) { - $acknowledgement = ord(substr($data, $i, 1)); + $acknowledgements = array_map('ord', str_split($data)); + foreach ($acknowledgements as $acknowledgement) { if (!in_array($acknowledgement, [0, 1, 2])) { throw new InvalidMessageException('Received subscribe acknowledgement with invalid QoS values from the broker.'); } - $acknowledgements[] = $acknowledgement; } return (new Message(MessageType::SUBSCRIBE_ACKNOWLEDGEMENT())) From 6017c93e24015a2de71f8275713e6a8a56ef8d54 Mon Sep 17 00:00:00 2001 From: Marvin Mall Date: Sun, 23 Aug 2020 16:51:02 +0200 Subject: [PATCH 6/7] Change code formatting --- src/MqttClient.php | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/src/MqttClient.php b/src/MqttClient.php index d4a6e0a..0a26287 100644 --- a/src/MqttClient.php +++ b/src/MqttClient.php @@ -629,13 +629,12 @@ public function subscribe(string $topicFilter, callable $callback, int $qualityO 'qos' => $qualityOfService, ]); + $messageId = $this->repository->newMessageId(); + // Create the subscription representation now, but it will become an // actual subscription only upon acknowledgement from the broker. - $subscriptions = [ - new Subscription($topicFilter, null, $callback, $qualityOfService) - ]; + $subscriptions = [new Subscription($topicFilter, null, $callback, $qualityOfService)]; - $messageId = $this->repository->newMessageId(); $pendingMessage = new SubscribeRequest($messageId, $subscriptions); $this->repository->addPendingOutgoingMessage($pendingMessage); @@ -655,12 +654,11 @@ public function unsubscribe(string $topicFilter): void { $this->ensureConnected(); - $this->logger->debug('Unsubscribing from topic [{topic}].', [ - 'topic' => $topicFilter, - ]); + $this->logger->debug('Unsubscribing from topic [{topic}].', ['topic' => $topicFilter]); + + $messageId = $this->repository->newMessageId(); + $topicFilters = [$topicFilter]; - $messageId = $this->repository->newMessageId(); - $topicFilters = [ $topicFilter ]; $pendingMessage = new UnsubscribeRequest($messageId, $topicFilters); $this->repository->addPendingOutgoingMessage($pendingMessage); From ac224d0e4c20639ed71dd5d76c4febf4657483ea Mon Sep 17 00:00:00 2001 From: Marvin Mall Date: Sun, 23 Aug 2020 16:54:26 +0200 Subject: [PATCH 7/7] Change field visibility and formatting --- src/SubscribeRequest.php | 1 + src/UnsubscribeRequest.php | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/src/SubscribeRequest.php b/src/SubscribeRequest.php index 2055463..177578f 100644 --- a/src/SubscribeRequest.php +++ b/src/SubscribeRequest.php @@ -23,6 +23,7 @@ class SubscribeRequest extends PendingMessage public function __construct(int $messageId, array $subscriptions) { parent::__construct($messageId); + $this->subscriptions = array_values($subscriptions); } diff --git a/src/UnsubscribeRequest.php b/src/UnsubscribeRequest.php index 7edd443..1ac4aee 100644 --- a/src/UnsubscribeRequest.php +++ b/src/UnsubscribeRequest.php @@ -12,7 +12,7 @@ class UnsubscribeRequest extends PendingMessage { /** @var string[] */ - protected $topicFilters; + private $topicFilters; /** * Creates a new unsubscribe request object. @@ -23,6 +23,7 @@ class UnsubscribeRequest extends PendingMessage public function __construct(int $messageId, array $topicFilters) { parent::__construct($messageId); + $this->topicFilters = array_values($topicFilters); }