From 4011c971e43b1da909c95e338798d9af20aae363 Mon Sep 17 00:00:00 2001 From: Giovanni Giacobbi Date: Wed, 12 Aug 2020 12:16:39 +0200 Subject: [PATCH] WIP: fix subscriptions --- src/MQTTClient.php | 25 ++++------------- src/Repositories/MemoryRepository.php | 40 ++++++++++++++++++++------- 2 files changed, 36 insertions(+), 29 deletions(-) diff --git a/src/MQTTClient.php b/src/MQTTClient.php index d617f0a..c922707 100644 --- a/src/MQTTClient.php +++ b/src/MQTTClient.php @@ -879,6 +879,8 @@ protected function handlePublishedMessage(string $buffer, int $qualityOfServiceL $topic = substr($buffer, 2, $topicLength); $message = substr($buffer, ($topicLength + 2)); + $this->logger->debug("GOT PUBLISH MESSAGE"); + if ($qualityOfServiceLevel > 0) { if (strlen($message) < 2) { $this->logger->error(sprintf( @@ -1224,17 +1226,10 @@ protected function deliverPublishedMessage(string $topic, string $message, int $ ]); foreach ($subscribers as $subscriber) { - if ($subscriber->getQualityOfServiceLevel() > $qualityOfServiceLevel) { - // At this point we need to assume that this subscriber does not want to receive - // the message, but maybe there are other subscribers waiting for the message. - continue; - } - - try { - call_user_func($subscriber->getCallback(), $topic, $message, $retained); - } catch (\Throwable $e) { - // We ignore errors produced by custom callbacks. - } + // We deliver the message to the first subscriber which matches this topic, + // if there are overlapping topics the message will be delivered multiple times by the broker! + call_user_func($subscriber->getCallback(), $topic, $message, $retained); + break; } } @@ -1297,10 +1292,6 @@ protected function sendPublishComplete(int $messageId): void */ protected function republishPendingMessages(): void { - $this->logger->debug('Re-publishing pending messages to MQTT broker.', [ - 'broker' => sprintf('%s:%s', $this->host, $this->port), - ]); - /** @noinspection PhpUnhandledExceptionInspection */ $dateTime = (new DateTime())->sub(new DateInterval('PT' . $this->settings->getResendTimeout() . 'S')); $messages = $this->repository->getPendingPublishedMessagesLastSentBefore($dateTime); @@ -1333,10 +1324,6 @@ protected function republishPendingMessages(): void */ protected function republishPendingUnsubscribeRequests(): void { - $this->logger->debug('Re-sending pending unsubscribe requests to MQTT broker.', [ - 'broker' => sprintf('%s:%s', $this->host, $this->port), - ]); - /** @noinspection PhpUnhandledExceptionInspection */ $dateTime = (new DateTime())->sub(new DateInterval('PT' . $this->settings->getResendTimeout() . 'S')); $requests = $this->repository->getPendingUnsubscribeRequestsLastSentBefore($dateTime); diff --git a/src/Repositories/MemoryRepository.php b/src/Repositories/MemoryRepository.php index 8ce86c1..f5e0c88 100644 --- a/src/Repositories/MemoryRepository.php +++ b/src/Repositories/MemoryRepository.php @@ -21,8 +21,14 @@ */ class MemoryRepository implements Repository { - /** @var SplObjectStorage|TopicSubscription[] */ - private $topicSubscriptions; + /** @var array */ + private $topicSubscriptions = array(); + + /** @var array */ + private $topicSubscriptionsQueue = array(); + + /** @var int */ + private $topicSubscriptionsPtr = 0; /** @var SplObjectStorage|PublishedMessage[] */ private $pendingPublishedMessages; @@ -38,7 +44,6 @@ class MemoryRepository implements Repository */ public function __construct() { - $this->topicSubscriptions = new SplObjectStorage(); $this->pendingPublishedMessages = new SplObjectStorage(); $this->pendingUnsubscribeRequests = new SplObjectStorage(); $this->pendingPublishConfirmations = new SplObjectStorage(); @@ -52,7 +57,7 @@ public function __construct() */ public function countTopicSubscriptions(): int { - return $this->topicSubscriptions->count(); + return count($this->topicSubscriptions); } /** @@ -63,7 +68,13 @@ public function countTopicSubscriptions(): int */ public function addTopicSubscription(TopicSubscription $subscription): void { - $this->topicSubscriptions->attach($subscription); + // Finds the next available subscription id (FIXME) + while (isset($this->topicSubscriptions[$this->topicSubscriptionsPtr])) { + $this->topicSubscriptionsPtr++; + } + + $this->topicSubscriptions[$this->topicSubscriptionsPtr] = $subscription; + $this->topicSubscriptionsQueue[] = $this->topicSubscriptionsPtr; } /** @@ -113,13 +124,21 @@ public function getTopicSubscriptionsMatchingTopic(string $topic): array { $result = []; - foreach ($this->topicSubscriptions as $subscription) { + for ($i = 0; $i < count($this->topicSubscriptionsQueue); $i++) { + $subscriptionId = $this->topicSubscriptionsQueue[$i]; + $subscription = $this->topicSubscriptions[$subscriptionId]; + if (preg_match($subscription->getRegexifiedTopic(), $topic)) { - $result[] = $subscription; + // We have a match! Move this to the bottom + $this->topicSubscriptionsQueue[] = $subscriptionId; + unset($this->topicSubscriptionsQueue[$i]); + $this->topicSubscriptionsQueue = array_values($this->topicSubscriptionsQueue); + + return array($subscription); } } - return $result; + return array(); } /** @@ -134,10 +153,11 @@ public function removeTopicSubscription(string $topic): bool { $result = false; - foreach ($this->topicSubscriptions as $subscription) { + foreach ($this->topicSubscriptions as $id => $subscription) { if ($subscription->getTopic() === $topic) { - $this->topicSubscriptions->detach($subscription); + unset($this->topicSubscriptions[$topic]); $result = true; + // FIXME: broken break; } }