|
| 1 | +// |
| 2 | +// DelaySubscription.swift |
| 3 | +// CombineExt |
| 4 | +// |
| 5 | +// Created by Jack Stone on 06/03/2021. |
| 6 | +// Copyright © 2021 Combine Community. All rights reserved. |
| 7 | +// |
| 8 | + |
| 9 | +#if canImport(Combine) |
| 10 | +import Combine |
| 11 | + |
| 12 | +@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) |
| 13 | +public extension Publisher { |
| 14 | + |
| 15 | + /// Time shifts the delivery of all output to the downstream receiver by delaying |
| 16 | + /// the time a subscriber starts receiving elements from its subscription. |
| 17 | + /// |
| 18 | + /// Note that delaying a subscription may result in skipped elements for "hot" publishers. |
| 19 | + /// However, this won't make a difference for "cold" publishers. |
| 20 | + /// |
| 21 | + /// - Parameter interval: The amount of delay time. |
| 22 | + /// - Parameter tolerance: The allowed tolerance in the firing of the delayed subscription. |
| 23 | + /// - Parameter scheduler: The scheduler to schedule the subscription delay on. |
| 24 | + /// - Parameter options: Any additional scheduler options. |
| 25 | + /// |
| 26 | + /// - Returns: A publisher with its subscription delayed. |
| 27 | + /// |
| 28 | + func delaySubscription<S: Scheduler>(for interval: S.SchedulerTimeType.Stride, |
| 29 | + tolerance: S.SchedulerTimeType.Stride? = nil, |
| 30 | + scheduler: S, |
| 31 | + options: S.SchedulerOptions? = nil) -> Publishers.DelaySubscription<Self, S> { |
| 32 | + return Publishers.DelaySubscription(upstream: self, |
| 33 | + interval: interval, |
| 34 | + tolerance: tolerance ?? scheduler.minimumTolerance, |
| 35 | + scheduler: scheduler, |
| 36 | + options: options) |
| 37 | + } |
| 38 | +} |
| 39 | + |
| 40 | +// MARK: - Publisher |
| 41 | +@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) |
| 42 | +public extension Publishers { |
| 43 | + |
| 44 | + /// A publisher that delays the upstream subscription. |
| 45 | + struct DelaySubscription<U: Publisher, S: Scheduler>: Publisher { |
| 46 | + |
| 47 | + public typealias Output = U.Output // Upstream output |
| 48 | + public typealias Failure = U.Failure // Upstream failure |
| 49 | + |
| 50 | + /// The publisher that this publisher receives signals from. |
| 51 | + public let upstream: U |
| 52 | + |
| 53 | + /// The amount of delay time. |
| 54 | + public let interval: S.SchedulerTimeType.Stride |
| 55 | + |
| 56 | + /// The allowed tolerance in the firing of the delayed subscription. |
| 57 | + public let tolerance: S.SchedulerTimeType.Stride |
| 58 | + |
| 59 | + /// The scheduler to run the subscription delay timer on. |
| 60 | + public let scheduler: S |
| 61 | + |
| 62 | + /// Any additional scheduler options. |
| 63 | + public let options: S.SchedulerOptions? |
| 64 | + |
| 65 | + init(upstream: U, |
| 66 | + interval: S.SchedulerTimeType.Stride, |
| 67 | + tolerance: S.SchedulerTimeType.Stride, |
| 68 | + scheduler: S, |
| 69 | + options: S.SchedulerOptions?) { |
| 70 | + self.upstream = upstream |
| 71 | + self.interval = interval |
| 72 | + self.tolerance = tolerance |
| 73 | + self.scheduler = scheduler |
| 74 | + self.options = options |
| 75 | + } |
| 76 | + |
| 77 | + public func receive<S>(subscriber: S) where S : Subscriber, Self.Failure == S.Failure, Self.Output == S.Input { |
| 78 | + self.upstream.subscribe(DelayedSubscription(publisher: self, downstream: subscriber)) |
| 79 | + } |
| 80 | + } |
| 81 | +} |
| 82 | + |
| 83 | +// MARK: - Subscription |
| 84 | +@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) |
| 85 | +private extension Publishers.DelaySubscription { |
| 86 | + |
| 87 | + /// The delayed subscription where the scheduler advancing takes place. |
| 88 | + final class DelayedSubscription<D: Subscriber>: Subscriber where D.Input == Output, D.Failure == U.Failure { |
| 89 | + |
| 90 | + typealias Input = U.Output // Upstream output |
| 91 | + typealias Failure = U.Failure // Upstream failure |
| 92 | + |
| 93 | + private let interval: S.SchedulerTimeType.Stride |
| 94 | + private let tolerance: S.SchedulerTimeType.Stride |
| 95 | + private let scheduler: S |
| 96 | + private let options: S.SchedulerOptions? |
| 97 | + |
| 98 | + private let downstream: D |
| 99 | + |
| 100 | + init(publisher: Publishers.DelaySubscription<U, S>, |
| 101 | + downstream: D) { |
| 102 | + self.interval = publisher.interval |
| 103 | + self.tolerance = publisher.tolerance |
| 104 | + self.scheduler = publisher.scheduler |
| 105 | + self.options = publisher.options |
| 106 | + self.downstream = downstream |
| 107 | + } |
| 108 | + |
| 109 | + func receive(subscription: Subscription) { |
| 110 | + scheduler.schedule(after: scheduler.now.advanced(by: interval), |
| 111 | + tolerance: tolerance, |
| 112 | + options: options) { [weak self] in |
| 113 | + self?.downstream.receive(subscription: subscription) |
| 114 | + } |
| 115 | + } |
| 116 | + |
| 117 | + func receive(_ input: U.Output) -> Subscribers.Demand { |
| 118 | + return downstream.receive(input) |
| 119 | + } |
| 120 | + |
| 121 | + func receive(completion: Subscribers.Completion<U.Failure>) { |
| 122 | + downstream.receive(completion: completion) |
| 123 | + } |
| 124 | + } |
| 125 | +} |
| 126 | +#endif |
0 commit comments