Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
544 views
in Technique[技术] by (71.8m points)

swift - Swift Combine:检查Subject是否有观察者?(Swift Combine: Check if Subject has observer?)

在RxSwift中,我们可以使用hasObserver检查*Subject是否有任何观察者,如何在例如PassthroughSubject Combine中做到这一点?

  ask by Sajjon translate from so

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

No one time needed this... Apple does not provide this by API, and, actually, I do not recommend such thing, because it is like manually checking value of retainCount in pre-ARC Objective-C for some decision in code.

(不需要任何时间... Apple并没有通过API提供此功能,实际上,我不推荐这样的事情,因为这就像在ARC Objective-C之前手动检查retainCount值以决定代码。)

Anyway it is possible.

(无论如何都是可能的。)

Let's consider it as a lab exercise.

(让我们将其视为实验室练习。)

Hope someone find this helpful.

(希望有人觉得这有帮助。)

Disclaimer: below code was not tested with all Publisher(s) and not safe as for some real-world project.

(免责声明:以下代码并未在所有发布服务器上进行测试,因此对于某些实际项目并不安全。)

It is just approach demo.

(这只是方法演示。)

So, as there are many kind of publishers and all of them are final and private and, moreover there might be come via type-eraser, we needed generic thing applying to any publisher, thus operator

(因此,由于存在多种类型的发布者,并且所有发布者都是最终的和私有的,而且可能通过类型擦除器来实现,因此我们需要将通用的东西应用于任何发布者,从而使运算符)

extension Publisher {
    public func countingSubscribers(_ callback: ((Int) -> Void)? = nil)
        -> Publishers.SubscribersCounter<Self> {
            return Publishers.SubscribersCounter<Self>(upstream: self, callback: callback)
    }
}

Operator gives us possibility to inject in any place of of publishers chain and provide interesting value via callback.

(运算符使我们有可能注入发布者链的任何位置,并通过回调提供有趣的价值。)

Interesting value in our case will be count of subscribers.

(在我们的案例中,有趣的价值将是订阅者的数量。)

As operator is injected in both Upstream & Downstream we need bidirectional custom pipe implementation, ie.

(由于在上游和下游都注入了操作员,因此我们需要双向自定义管道实施。)

custom publisher, custom subscriber, custom subscription.

(自定义发布者,自定义订阅者,自定义订阅。)

In our case they must be transparent, as we don't need to modify streams... actually it will be Combine-proxy.

(在我们的情况下,它们必须是透明的,因为我们不需要修改流……实际上,它将是Combine-proxy。)

Posible usage:

(可能的用法:)
1) when SubscribersCounter publisher is last in chain, the numberOfSubscribers property can be used directly

(1)当SubscribersCounter发布者位于链中的最后一位时,可以直接使用numberOfSubscribers属性)

let publisher = NotificationCenter.default
    .publisher(for: UIApplication.didBecomeActiveNotification)
    .countingSubscribers()
...
publisher.numberOfSubscribers

2) when it somewhere in the middle of the chain, then receive callback about changed subscribers count

(2)当它位于链的中间时,则接收有关已更改订户数的回调)

let publisher = URLSession.shared
        .dataTaskPublisher(for: URL(string: "https://www.google.com")!)
        .countingSubscribers({ count in print("Observers: (count)") })
        .receive(on: DispatchQueue.main)
        .map { _ in "Data received" }
        .replaceError(with: "An error occurred")

Here is implementation:

(这是实现:)

import Combine

extension Publishers {

    public class SubscribersCounter<Upstream> : Publisher where Upstream : Publisher {

        private(set) var numberOfSubscribers = 0

        public typealias Output = Upstream.Output
        public typealias Failure = Upstream.Failure

        public let upstream: Upstream
        public let callback: ((Int) -> Void)?

        public init(upstream: Upstream, callback: ((Int) -> Void)?) {
            self.upstream = upstream
            self.callback = callback
        }

        public func receive<S>(subscriber: S) where S : Subscriber,
            Upstream.Failure == S.Failure, Upstream.Output == S.Input {
                self.increase()
                upstream.receive(subscriber: SubscribersCounterSubscriber<S>(counter: self, subscriber: subscriber))
        }

        fileprivate func increase() {
            numberOfSubscribers += 1
            self.callback?(numberOfSubscribers)
        }

        fileprivate func decrease() {
            numberOfSubscribers -= 1
            self.callback?(numberOfSubscribers)
        }

        // own subscriber is needed to intercept upstream/downstream events
        private class SubscribersCounterSubscriber<S> : Subscriber where S: Subscriber {
            let counter: SubscribersCounter<Upstream>
            let subscriber: S

            init (counter: SubscribersCounter<Upstream>, subscriber: S) {
                self.counter = counter
                self.subscriber = subscriber
            }

            deinit {
                Swift.print(">> Subscriber deinit")
            }

            func receive(subscription: Subscription) {
                subscriber.receive(subscription: SubscribersCounterSubscription<Upstream>(counter: counter, subscription: subscription))
            }

            func receive(_ input: S.Input) -> Subscribers.Demand {
                return subscriber.receive(input)
            }

            func receive(completion: Subscribers.Completion<S.Failure>) {
                subscriber.receive(completion: completion)
            }

            typealias Input = S.Input
            typealias Failure = S.Failure
        }

        // own subcription is needed to handle cancel and decrease
        private class SubscribersCounterSubscription<Upstream>: Subscription where Upstream: Publisher {
            let counter: SubscribersCounter<Upstream>
            let wrapped: Subscription

            private var cancelled = false
            init(counter: SubscribersCounter<Upstream>, subscription: Subscription) {
                self.counter = counter
                self.wrapped = subscription
            }

            deinit {
                Swift.print(">> Subscription deinit")
                if !cancelled {
                    counter.decrease()
                }
            }

            func request(_ demand: Subscribers.Demand) {
                wrapped.request(demand)
            }

            func cancel() {
                wrapped.cancel()
                if !cancelled {
                    cancelled = true
                    counter.decrease()
                }
            }
        }
    }
}

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...