Skip to content

Commit 319cbd8

Browse files
thg2kNamoshek
andauthored
Refactory repository interface with single pending message and subscriptions management (#33)
* Refactored Repository interface and pending message handling. Details: * Refactored the Repository interface. The new interface stores a generic serializable PendingMessage without knowing which type it is, using two different queues one for incoming and one for outgoing messages. There are currently three types of outgoing messages (PublishedMessage, SubscribeRequest, UnsubscribeRequest) and one incoming message (PublishedMessaged) handled. * The `TopicSubscription` class has been renamed to `Subscription` and now represents only the subscription itself, separating it from the concept of SubscribeRequest which might include several topic filters in one message. * Removed exception `UnexpectedAcknowledgementException`. It is expected to have duplicate packets over the wire, so there is no need for an expection in this case, a notice to the logger is enough. * Added `ProtocolViolationException`. This explicitly marks situations where the broker is misbehaving and the connection should be terminated. * Fixed some problems with the MessageProcessor. * Removed Redis repository implementation until updated to the new Repository interface * Fix MessageProcessor, exceptions and documentation * Change visibility and description of DTOs * Simplify acknowledgement parsing logic * Change code formatting * Change field visibility and formatting Co-authored-by: Marvin Mall <marvin-mall@msn.com>
1 parent 19dcbfd commit 319cbd8

23 files changed

+786
-1697
lines changed

src/Contracts/MessageProcessor.php

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,9 @@
88
use PhpMqtt\Client\Exceptions\ConnectingToBrokerFailedException;
99
use PhpMqtt\Client\Exceptions\InvalidMessageException;
1010
use PhpMqtt\Client\Exceptions\MqttClientException;
11-
use PhpMqtt\Client\Exceptions\UnexpectedAcknowledgementException;
11+
use PhpMqtt\Client\Exceptions\ProtocolViolationException;
1212
use PhpMqtt\Client\Message;
13+
use PhpMqtt\Client\Subscription;
1314

1415
/**
1516
* Implementations of this interface provide message parsing capabilities.
@@ -42,7 +43,7 @@ public function tryFindMessageInBuffer(string $buffer, int $bufferLength, string
4243
* @param string $message
4344
* @return Message|null
4445
* @throws InvalidMessageException
45-
* @throws UnexpectedAcknowledgementException
46+
* @throws ProtocolViolationException
4647
* @throws MqttClientException
4748
*/
4849
public function parseAndValidateMessage(string $message): ?Message;
@@ -58,11 +59,18 @@ public function parseAndValidateMessage(string $message): ?Message;
5859
public function buildConnectMessage(ConnectionSettings $connectionSettings, bool $useCleanSession = false): string;
5960

6061
/**
61-
* Builds a ping message.
62+
* Builds a ping request message.
6263
*
6364
* @return string
6465
*/
65-
public function buildPingMessage(): string;
66+
public function buildPingRequestMessage(): string;
67+
68+
/**
69+
* Builds a ping response message.
70+
*
71+
* @return string
72+
*/
73+
public function buildPingResponseMessage(): string;
6674

6775
/**
6876
* Builds a disconnect message.
@@ -74,22 +82,22 @@ public function buildDisconnectMessage(): string;
7482
/**
7583
* Builds a subscribe message from the given parameters.
7684
*
77-
* @param int $messageId
78-
* @param string $topic
79-
* @param int $qualityOfService
85+
* @param int $messageId
86+
* @param Subscription[] $subscriptions
87+
* @param bool $isDuplicate
8088
* @return string
8189
*/
82-
public function buildSubscribeMessage(int $messageId, string $topic, int $qualityOfService): string;
90+
public function buildSubscribeMessage(int $messageId, array $subscriptions, bool $isDuplicate = false): string;
8391

8492
/**
8593
* Builds an unsubscribe message from the given parameters.
8694
*
87-
* @param int $messageId
88-
* @param string $topic
89-
* @param bool $isDuplicate
95+
* @param int $messageId
96+
* @param string[] $topics
97+
* @param bool $isDuplicate
9098
* @return string
9199
*/
92-
public function buildUnsubscribeMessage(int $messageId, string $topic, bool $isDuplicate = false): string;
100+
public function buildUnsubscribeMessage(int $messageId, array $topics, bool $isDuplicate = false): string;
93101

94102
/**
95103
* Builds a publish message based on the given parameters.

src/Contracts/MqttClient.php

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@
88
use PhpMqtt\Client\Exceptions\ConfigurationInvalidException;
99
use PhpMqtt\Client\Exceptions\ConnectingToBrokerFailedException;
1010
use PhpMqtt\Client\Exceptions\DataTransferException;
11-
use PhpMqtt\Client\Exceptions\TopicNotSubscribedException;
12-
use PhpMqtt\Client\Exceptions\UnexpectedAcknowledgementException;
11+
use PhpMqtt\Client\Exceptions\ProtocolViolationException;
12+
use PhpMqtt\Client\Exceptions\RepositoryException;
1313

1414
/**
1515
* An interface for the MQTT client.
@@ -57,6 +57,7 @@ public function isConnected(): bool;
5757
* @param bool $retain
5858
* @return void
5959
* @throws DataTransferException
60+
* @throws RepositoryException
6061
*/
6162
public function publish(string $topic, string $message, int $qualityOfService = 0, bool $retain = false): void;
6263

@@ -86,6 +87,7 @@ public function publish(string $topic, string $message, int $qualityOfService =
8687
* @param int $qualityOfService
8788
* @return void
8889
* @throws DataTransferException
90+
* @throws RepositoryException
8991
*/
9092
public function subscribe(string $topic, callable $callback, int $qualityOfService = 0): void;
9193

@@ -95,7 +97,6 @@ public function subscribe(string $topic, callable $callback, int $qualityOfServi
9597
* @param string $topic
9698
* @return void
9799
* @throws DataTransferException
98-
* @throws TopicNotSubscribedException
99100
*/
100101
public function unsubscribe(string $topic): void;
101102

@@ -137,7 +138,7 @@ public function interrupt(): void;
137138
* @param int|null $queueWaitLimit
138139
* @return void
139140
* @throws DataTransferException
140-
* @throws UnexpectedAcknowledgementException
141+
* @throws ProtocolViolationException
141142
*/
142143
public function loop(bool $allowSleep = true, bool $exitWhenQueuesEmpty = false, int $queueWaitLimit = null): void;
143144

src/Contracts/Repository.php

Lines changed: 65 additions & 115 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,11 @@
55
namespace PhpMqtt\Client\Contracts;
66

77
use DateTime;
8-
use PhpMqtt\Client\Exceptions\PendingPublishConfirmationAlreadyExistsException;
9-
use PhpMqtt\Client\PublishedMessage;
10-
use PhpMqtt\Client\TopicSubscription;
11-
use PhpMqtt\Client\UnsubscribeRequest;
8+
use PhpMqtt\Client\Exceptions\PendingMessageAlreadyExistsException;
9+
use PhpMqtt\Client\Exceptions\PendingMessageNotFoundException;
10+
use PhpMqtt\Client\Exceptions\RepositoryException;
11+
use PhpMqtt\Client\PendingMessage;
12+
use PhpMqtt\Client\Subscription;
1213

1314
/**
1415
* Implementations of this interface provide storage capabilities to an MQTT client.
@@ -31,190 +32,139 @@ interface Repository
3132
* but it is currently not being used (i.e. in a resend queue).
3233
*
3334
* @return int
35+
* @throws RepositoryException
3436
*/
3537
public function newMessageId(): int;
3638

3739
/**
38-
* Releases the given message id, allowing it to be reused in the future.
39-
*
40-
* @param int $messageId
41-
* @return void
42-
*/
43-
public function releaseMessageId(int $messageId): void;
44-
45-
/**
46-
* Returns the number of registered topic subscriptions. The method does
47-
* not differentiate between pending and acknowledged subscriptions.
40+
* Returns the number of pending outgoing messages.
4841
*
4942
* @return int
5043
*/
51-
public function countTopicSubscriptions(): int;
52-
53-
/**
54-
* Adds a topic subscription to the repository.
55-
*
56-
* @param TopicSubscription $subscription
57-
* @return void
58-
*/
59-
public function addTopicSubscription(TopicSubscription $subscription): void;
44+
public function countPendingOutgoingMessages(): int;
6045

6146
/**
62-
* Get all topic subscriptions with the given message identifier.
47+
* Gets a pending outgoing message with the given message identifier, if found.
6348
*
6449
* @param int $messageId
65-
* @return TopicSubscription[]
50+
* @return PendingMessage|null
6651
*/
67-
public function getTopicSubscriptionsWithMessageId(int $messageId): array;
52+
public function getPendingOutgoingMessage(int $messageId): ?PendingMessage;
6853

6954
/**
70-
* Find a topic subscription with the given topic.
55+
* Gets a list of pending outgoing messages last sent before the given date time.
7156
*
72-
* @param string $topic
73-
* @return TopicSubscription|null
74-
*/
75-
public function getTopicSubscriptionByTopic(string $topic): ?TopicSubscription;
76-
77-
/**
78-
* Get all topic subscriptions matching the given topic.
79-
*
80-
* @param string $topic
81-
* @return TopicSubscription[]
82-
*/
83-
public function getTopicSubscriptionsMatchingTopic(string $topic): array;
84-
85-
/**
86-
* Removes the topic subscription with the given topic from the repository.
87-
* Returns true if a topic subscription existed and has been removed.
88-
* Otherwise, false is returned.
57+
* If date time is `null`, all pending messages are returned.
8958
*
90-
* @param string $topic
91-
* @return bool
92-
*/
93-
public function removeTopicSubscription(string $topic): bool;
94-
95-
/**
96-
* Returns the number of pending publish messages.
59+
* The messages are returned in the same order they were added to the repository.
9760
*
98-
* @return int
61+
* @param DateTime|null $dateTime
62+
* @return PendingMessage[]
9963
*/
100-
public function countPendingPublishMessages(): int;
64+
public function getPendingOutgoingMessagesLastSentBefore(DateTime $dateTime = null): array;
10165

10266
/**
103-
* Adds a pending published message to the repository.
67+
* Adds a pending outgoing message to the repository.
10468
*
105-
* @param PublishedMessage $message
69+
* @param PendingMessage $message
10670
* @return void
71+
* @throws PendingMessageAlreadyExistsException
10772
*/
108-
public function addPendingPublishedMessage(PublishedMessage $message): void;
73+
public function addPendingOutgoingMessage(PendingMessage $message): void;
10974

11075
/**
111-
* Gets a pending published message with the given message identifier, if found.
76+
* Marks an existing pending outgoing published message as received in the repository.
11277
*
113-
* @param int $messageId
114-
* @return PublishedMessage|null
115-
*/
116-
public function getPendingPublishedMessageWithMessageId(int $messageId): ?PublishedMessage;
117-
118-
/**
119-
* Gets a list of pending published messages last sent before the given date time.
120-
*
121-
* @param DateTime $dateTime
122-
* @return PublishedMessage[]
123-
*/
124-
public function getPendingPublishedMessagesLastSentBefore(DateTime $dateTime): array;
125-
126-
/**
127-
* Marks the pending published message with the given message identifier as received.
128-
* If the message has no QoS level of 2, is not found or has already been received,
129-
* false is returned. Otherwise the result will be true.
78+
* If the message does not exists, an exception is thrown,
79+
* otherwise `true` is returned if the message was marked as received, and `false`
80+
* in case it was already marked as received.
13081
*
13182
* @param int $messageId
13283
* @return bool
84+
* @throws PendingMessageNotFoundException
13385
*/
134-
public function markPendingPublishedMessageAsReceived(int $messageId): bool;
86+
public function markPendingOutgoingPublishedMessageAsReceived(int $messageId): bool;
13587

13688
/**
137-
* Removes a pending published message from the repository. If a pending message
138-
* with the given identifier is found and successfully removed from the repository,
139-
* `true` is returned. Otherwise `false` will be returned.
89+
* Removes a pending outgoing message from the repository.
90+
*
91+
* If a pending message with the given identifier is found and
92+
* successfully removed from the repository, `true` is returned.
93+
* Otherwise `false` will be returned.
14094
*
14195
* @param int $messageId
14296
* @return bool
14397
*/
144-
public function removePendingPublishedMessage(int $messageId): bool;
98+
public function removePendingOutgoingMessage(int $messageId): bool;
14599

146100
/**
147-
* Returns the number of pending unsubscribe requests.
101+
* Returns the number of pending incoming messages.
148102
*
149103
* @return int
150104
*/
151-
public function countPendingUnsubscribeRequests(): int;
105+
public function countPendingIncomingMessages(): int;
152106

153107
/**
154-
* Adds a pending unsubscribe request to the repository.
155-
*
156-
* @param UnsubscribeRequest $request
157-
* @return void
158-
*/
159-
public function addPendingUnsubscribeRequest(UnsubscribeRequest $request): void;
160-
161-
/**
162-
* Gets a pending unsubscribe request with the given message identifier, if found.
108+
* Gets a pending incoming message with the given message identifier, if found.
163109
*
164110
* @param int $messageId
165-
* @return UnsubscribeRequest|null
111+
* @return PendingMessage|null
166112
*/
167-
public function getPendingUnsubscribeRequestWithMessageId(int $messageId): ?UnsubscribeRequest;
113+
public function getPendingIncomingMessage(int $messageId): ?PendingMessage;
168114

169115
/**
170-
* Gets a list of pending unsubscribe requests last sent before the given date time.
116+
* Adds a pending outgoing message to the repository.
171117
*
172-
* @param DateTime $dateTime
173-
* @return UnsubscribeRequest[]
118+
* @param PendingMessage $message
119+
* @return void
120+
* @throws PendingMessageAlreadyExistsException
174121
*/
175-
public function getPendingUnsubscribeRequestsLastSentBefore(DateTime $dateTime): array;
122+
public function addPendingIncomingMessage(PendingMessage $message): void;
176123

177124
/**
178-
* Removes a pending unsubscribe requests from the repository. If a pending request
179-
* with the given identifier is found and successfully removed from the repository,
180-
* `true` is returned. Otherwise `false` will be returned.
125+
* Removes a pending incoming message from the repository.
126+
*
127+
* If a pending message with the given identifier is found and
128+
* successfully removed from the repository, `true` is returned.
129+
* Otherwise `false` will be returned.
181130
*
182131
* @param int $messageId
183132
* @return bool
184133
*/
185-
public function removePendingUnsubscribeRequest(int $messageId): bool;
134+
public function removePendingIncomingMessage(int $messageId): bool;
186135

187136
/**
188-
* Returns the number of pending publish confirmations.
137+
* Returns the number of registered subscriptions.
189138
*
190139
* @return int
191140
*/
192-
public function countPendingPublishConfirmations(): int;
141+
public function countSubscriptions(): int;
193142

194143
/**
195-
* Adds a pending publish confirmation to the repository.
144+
* Adds a subscription to the repository.
196145
*
197-
* @param PublishedMessage $message
146+
* @param Subscription $subscription
198147
* @return void
199-
* @throws PendingPublishConfirmationAlreadyExistsException
200148
*/
201-
public function addPendingPublishConfirmation(PublishedMessage $message): void;
149+
public function addSubscription(Subscription $subscription): void;
202150

203151
/**
204-
* Gets a pending publish confirmation with the given message identifier, if found.
152+
* Gets all subscriptions matching the given criteria.
205153
*
206-
* @param int $messageId
207-
* @return PublishedMessage|null
154+
* @param string|null $topicName
155+
* @param int|null $subscriptionId
156+
* @return Subscription[]
208157
*/
209-
public function getPendingPublishConfirmationWithMessageId(int $messageId): ?PublishedMessage;
158+
public function getMatchingSubscriptions(string $topicName = null, int $subscriptionId = null): array;
210159

211160
/**
212-
* Removes the pending publish confirmation with the given message identifier
213-
* from the repository. This is normally done as soon as a transaction has been
214-
* successfully finished by the publisher.
161+
* Removes the subscription with the given topic filter from the repository.
215162
*
216-
* @param int $messageId
163+
* Returns `true` if a topic subscription existed and has been removed.
164+
* Otherwise, `false` is returned.
165+
*
166+
* @param string $topicFilter
217167
* @return bool
218168
*/
219-
public function removePendingPublishConfirmation(int $messageId): bool;
169+
public function removeSubscription(string $topicFilter): bool;
220170
}

0 commit comments

Comments
 (0)