特定用例下的Combine全面如何使用详解

引言 在之前的文章中,我们了解了 Combine 的基础知识:了解了 Publisher、Subscriber 和 Subscription 如何工作以及这些部

引言

在之前的文章中,我们了解了 Combine 的基础知识:了解了 Publisher、Subscriber 和 Subscription 如何工作以及这些部分之间的相互关系,以及如何使用 Operator 来操作 Publisher 及处理其事件。

本文将 Combine 用于用于特定用例,更贴近实际的应用开发。我们将了解如何利用 Combine 进行网络任务、如何调试 Combine Publisher、如何使用 Timer、观察对象,以及了解 Combine 中的资源管理。

网络

Combine 提供了 API 来帮助开发者以声明方式执行常见任务。这些 API 围绕两个关键功能:

使用 URLSession 执行网络请求。

使用 Codable 协议对 JSON 数据进行编码和解码。

URLSession Extension

URLSession 是 Apple 平台下执行网络相关任务的标准方式,可以帮助我们完成多种操作。例如:

  • 用于检索 URL 的内容的数据传输任务;
  • 用于获取 URL 的内容的数据下载任务;
  • 用于向 URL 上传数据或文件的上传任务;
  • 在两方之间传输数据的流式传输任务;
  • 连接到 Websocket 的 Websocket 任务。

其中,只有数据传输任务公开了一个 Combine Publisher。 Combine 使用具有两个变体的单个 API 处理这些任务。入参数为 URLRequestURL

func dataTaskPublisher(for url: URL) -> URLSession.DataTaskPublisher
func dataTaskPublisher(for request: URLRequest) -> URLSession.DataTaskPublisher

下面看看如何使用这个 API:

import Combine
import Foundation
import PlaygroundSupport
PlaygroundPage.current.needsIndefiniteExecution = true
func example(_ desc: String, _ action:() -> Void) {
    print("--- \(desc) ---")
    action()
}
var subscriptions = Set<AnyCancellable>()
example("URLSession") { 
    guard let url = URL(string: "https://random-data-api.com/api/v2/appliances") else { 
        return 
    }
    URLSession.shared
        .dataTaskPublisher(for: url)
        .sink(receiveCompletion: { completion in
            if case .failure(let err) = completion {
                print("Retrieving data failed with error \(err)")
            }
        }, receiveValue: { data, response in
            print("Retrieved data of size \(data.count), response = \(response)")
        })
        .store(in: &subscriptions)
}

在一些基础代码后,进入我们的 example 函数。我们使用 URL 作为参数的 dataTaskPublisher(for:) 。确保我们处理了错误。请求结果是包含 DataURLResponse 的元组。Combine 在 URLSession.dataTask 上提供了 Publisher 而不是闭包。最后保留 Subscription,否请求它会立即被取消,并且请求永远不会执行。

Codable

Codable 协议是我们绝对应该了解的 Swift 的编码和解码机制。Foundation 通过 JSONEncoderJSONDecoder 对 JSON 进行编码和解码。 我们也可以使用 PropertyListEncoderPropertyListDecoder,但这些在网络请求的上下文中用处不大。

在前面的示例中,我们获取了一些 JSON。 我们可以使用 JSONDecoder 对其进行解码:

example("URLSession") { 
    guard let url = URL(string: "https://random-data-api.com/api/v2/appliances") else { 
        return 
    }
    URLSession.shared
        .dataTaskPublisher(for: url)
        .tryMap({ data, response in
            try JSONDecoder().decode([String:String].self, from: data)
        })
        .sink(receiveCompletion: { completion in
            if case .failure(let err) = completion {
                print("Retrieving data failed with error \(err)")
            }
        }, receiveValue: { data in
            print("Retrieved data: \(data)")
        })
        .store(in: &subscriptions)
}

我们在 tryMap Operator 中解码 JSON,该方法有效,但 Combine 提供了一个在该场景下更合适的 Operator 来帮助减少代码:decode(type:decoder:)

    URLSession.shared
        .dataTaskPublisher(for: url)
        .map(\.data)
        .decode(type: [String:String].self, decoder: JSONDecoder())
        .sink(receiveCompletion: { completion in
            if case .failure(let err) = completion {
                print("Retrieving data failed with error \(err)")
            }
        }, receiveValue: { data in
            print("Retrieved data: \(data)")
        })
        .store(in: &amp;subscriptions)

但由于 dataTaskPublisher(for:) 发出一个元组,我们不能直接使用 decode(type:decoder:), 需要使用 map(_:) 只处理部分数据。其他的优点包括我们只在设置 Publisher 时实例化 JSONDecoder 一次,而不是每次在 tryMap(_:) 闭包中创建它。

向多个 Subscriber 发布网络数据

每次订阅 Publisher 时,它都会开始工作。在网络请求的情况下,如果多个 Subscriber 需要结果,则多次发送相同的请求。

Combine 没有像其他框架那样容易实现这一点的 Operator。 我们可以使用 share() Operator,但这需要在结果返回之前设置所有的 Subscription。

还有另一种解决方案:使用 multicast() Operator,它返回一个 ConnectablePublisher,该 Publisher 为每个 Subscriber 创建一个单独的 Subject。 它允许我们多次订阅 Subject,然后在我们准备好时,调用 Publisher 的 connect() 方法:

example("connect") { 
    guard let url = URL(string: "https://random-data-api.com/api/v2/appliances") else { 
        return 
    }
    let publisher = URLSession.shared
        .dataTaskPublisher(for: url)
        .map(\.data)
        .multicast { PassthroughSubject&lt;Data, URLError&gt;() }
    let subscription1 = publisher
        .sink(receiveCompletion: { completion in
            if case .failure(let err) = completion {
                print("Sink1 Retrieving data failed with error \(err)")
            }
        }, receiveValue: { object in
            print("Sink1 Retrieved object \(object)")
        })
        .store(in: &amp;subscriptions)
    let subscription2 = publisher
        .sink(receiveCompletion: { completion in
            if case .failure(let err) = completion {
                print("Sink2 Retrieving data failed with error \(err)")
            }
        }, receiveValue: { object in
            print("Sink2 Retrieved object \(object)")
        })
        .store(in: &amp;subscriptions)
    let subscription = publisher.connect()
}

在上述代码中,创建 DataTaskPublisher 后,map data,然后使用 multicast。传递给 multicast 的闭包必须返回适当类型的 Subject。 我们会在后文中了解有关 multicast 的更多信息。首次订阅 Publisher,由于它是一个 ConnectablePublisher,它不会立即开始工作。准备好后使用 publisher.connect() 它将开始工作并向所有 Subscriber 推送值。

通过上述代码,我们可以一次请求并与两个 Subscriber 共享结果。这个过程仍然有点复杂,因为 Combine 不像其他响应式框架那样为这种场景提供 Operator。后续文章我们将探索如何设计一个更好的解决方案。

调试

理解异步代码中的事件流一直是一个挑战。在 Combine 的上下文中尤其如此,因为 Publisher 中的 Operator 链可能不会立即发出事件。 例如,像 throttle(for:scheduler:latest:) 这样的 Operator 不会发出它们接收到的所有事件,所以我们需要了解发生了什么。 Combine 提供了一些 Operator 来帮助我们进行调试。

打印事件

print(_:to:) Operator 是我们在不确定是否有任何内容通过时,应该使用的第一个 Operator。它返回一个PassthroughPublisher ,可以打印很多关于正在发生的事情的信息:

即使是这样的简单案例:

let subscription = (1...3).publisher
  .print("publisher")
  .sink { _ in }

输出非常详细:

publisher: receive subscription: (1...3)
publisher: request unlimited
publisher: receive value: (1)
publisher: receive value: (2)
publisher: receive value: (3)
publisher: receive finished

我们会看到 print(_:to:) Operator 显示了很多信息:

  • 在收到订阅时打印并显示其上游 Publisher 的描述;
  • 打印 Subscriber 的 demand request,以便我们查看请求的值的数量。
  • 打印上游 Publisher 发出的每个值。
  • 最后,打印完成事件。

print 有一个额外的参数接受一个 TextOutputStream 对象。 我们可以使用它来重定向字符串以打印到自定义的记录器中。我们还可以在日志中添加额外信息,例如当前日期和时间等。

我们可以创建一个简单的记录器来显示每个字符串之间的时间间隔,以便了解发布者发出值的速度:

example("print") { 
    class TimeLogger: TextOutputStream {
      private var previous = Date()
      private let formatter = NumberFormatter()
      init() {
        formatter.maximumFractionDigits = 5
        formatter.minimumFractionDigits = 5
      }
      func write(_ string: String) {
        let trimmed = string.trimmingCharacters(in: .whitespacesAndNewlines)
        guard !trimmed.isEmpty else { return }
        let now = Date()
        print("+\(formatter.string(for: now.timeIntervalSince(previous))!)s: \(string)")
        previous = now
      }
    }
    let subscription = (1...3).publisher
      .print("publisher", to: TimeLogger())
      .sink { _ in }
}

结果显示每条打印行之间的时间:

--- print ---
+0.00064s: publisher: receive subscription: (1...3)
+0.00145s: publisher: request unlimited
+0.00035s: publisher: receive value: (1)
+0.00026s: publisher: receive value: (2)
+0.00028s: publisher: receive value: (3)
+0.00026s: publisher: receive finished

执行副作用

除了打印信息外,对特定事件执行操作通常很有用,我们将此称为执行副作用:额外的操作不会直接影响下游的其他发 Publisher,但会产生类似于修改外部变量的效果。

handleEvents(receiveSubscription:receiveOutput:receiveCompletion:receiveCancel:receiveRequest:) 让我们可以拦截 Publisher 生命周期中的所有事件,然后在每个步骤中进行额外的操作。

考虑这段代码:

example("handleEvents", {
    guard let url = URL(string: "https://random-data-api.com/api/v2/appliances") else { 
        return 
    }
    URLSession.shared
        .dataTaskPublisher(for: url)
        .map(\.data)
        .decode(type: [String:String].self, decoder: JSONDecoder())
        .sink(receiveCompletion: { completion in
            print("\(completion)")
        }, receiveValue: { data in
            print("\(data)")
        })
})

我们运行它,从来没有看到任何打印。我们使用 handleEvents 来跟踪正在发生的事情。 你可以在 publishersink 之间插入此 Operator:

.handleEvents(receiveSubscription: { _ in
  print("Network request will start")
}, receiveOutput: { _ in
  print("Network request data received")
}, receiveCancel: {
  print("Network request cancelled")
})

再次运行代码,这次我们会看到一些调试输出:

--- handleEvents ---
Network request will start
Network request cancelled

我们忘记保留 AnyCancellable。 因此 Subscription 开始但立即被取消。

使用 Debugger Operator

Debugger 操作符是我们在万不得已的时候确实需要使用的 Operator。

第一个简单的 Operator 是 breakpointOnError()。 顾名思义,当我们使用此 Operator 时,如果任何上游 Publisher 发出错误,Xcode 将在调试器中中断。

一个更完整的变体是 breakpoint(receiveSubscription:receiveOutput:receiveCompletion:)。 它允许你拦截所有事件并根据具体情况决定是否要暂停。

例如,只有当某些值通过时才中断:

.breakpoint(receiveOutput: { value in
  return value > 0 && value < 5
})

假设上游 Publisher 发出整数值,但值 1 到 5 永远不会被发出,我们可以将断点配置为仅在这种情况下中断。

Timer

Timer 在编码时经常用到,除了异步执行代码之外,可能还需要控制任务应该重复的时间和频率。

在 Dispatch 框架可用之前,开发人员依靠 RunLoop 来异步执行任务并实现并发。以上所有方法都能够创建 Timer,但在 Combine 中并非所有 Timer 都相同。

使用 RunLoop

线程可以拥有自己的 RunLoop,只需从当前线程调用 RunLoop.current。请注意,除非我们了解 RunLoop 是如何运行的——特别是真的需要一个 RunLoop —— 否则最好只使用主线程的 RunLoop。

注意:Apple 文档中的一个重要说明和警告是 RunLoop 类不是线程安全的。 我们应该只为当前线程的 RunLoop 用 RunLoop 方法。

RunLoop 实现了我们将后续文章中了解的 Scheduler 协议。它定义了几种相对低级别的方法,并且是唯一一种可以让你创建可取消 Timer 的方法:

example("Timer RunLoop") { 
    let runLoop = RunLoop.main
    let subscription = runLoop.schedule(
      after: runLoop.now,
      interval: .seconds(1),
      tolerance: .milliseconds(100)
    ) {
      print("Timer fired")
    }
    .store(in: &amp;subscriptions)
}

此 Timer 不传递任何值,也不创建 Publisher。 它从 after: 参数中指定的 date 开始,具有指定的间隔 interval 和容差 tolerance。 它与 Combine 相关的唯一用处是它返回的 Cancelable 可让我们在一段时间后将其停止:

example("Timer RunLoop") { 
    let runLoop = RunLoop.main
    let subscription = runLoop.schedule(
      after: runLoop.now,
      interval: .seconds(1),
      tolerance: .milliseconds(100)
    ) {
      print("Timer fired")
    }
    runLoop.schedule(after: .init(Date(timeIntervalSinceNow: 3.0))) {
      subscription.cancel()
    }
}

考虑到所有因素,RunLoop 并不是创建 Timer 的最佳方式,使用 Timer 类会更好。

使用 Timer 类

Timer 是 Mac OS X 中可用的最古老的计时器。 由于它的委托模式和与 RunLoop 的紧密关系,它一直很难使用。 Combine 带来了一个现代变体,我们可以直接用作 Publisher:

let publisher = Timer.publish(every: 1.0, on: .main, in: .common)

onin 两个参数确定:

  • Timer 附加到哪个 RunLoop,这里是主线程的 RunLoop。
  • Timer 在哪个 RunLoop 模式下运行,这里是默认的 RunLoop 模式。

RunLoop 是 macOS 中异步事件处理的基本机制,但它们的 API 有点繁琐。我们可以通过调用 RunLoop.current 为我们自己创建或从 Foundation 获取的任何线程获取 RunLoop,因此我们也可以编写以下代码:

let publisher = Timer.publish(every: 1.0, on: .current, in: .common)

注意:在 DispatchQueue.main 以外的 Dispatch 队列上运行此代码可能会导致不可预知的结果。 Dispatch 框架不使用 RunLoop 来管理其线程。 由于 RunLoop 需要调用其方法来处理事件,因此我们永远不会看到 Timer 在除主队列之外的任何队列上触发。 为 Timer 设置为 RunLoop.main 是最简单安全的选择。

计时器返回的发布者是 ConnectablePublisher,在我们显式调用它的 connect() 方法之前,它不会在 Subscription 时开始触发。我们还可以使用 autoconnect() Operator,它会在第一个 Subscriber 订阅时自动连接。

因此,创建将在订阅时启动 Timer 的 Publisher 的最佳方法是编写:

let publisher = Timer
  .publish(every: 1.0, on: .main, in: .common)
  .autoconnect()

Timer Publisher 发出当前日期,其 Publisher.Output 类型为 Date。 我们可以使用 scan 制作一个发出递增值的计时器:

example("Timer Timer") { 
    let subscription = Timer
        .publish(every: 1.0, on: .main, in: .common)
        .autoconnect()
        .scan(0) { counter, _ in counter + 1 }
        .sink { counter in
            print("Counter is \(counter)")
        }
        .store(in: &amp;subscriptions)
}

还有一个我们在这里没有使用的 Timer.publish() 参数:容差(Tolerance)。 它以 TimeInterval 形式指定可接受的偏差。但请注意,使用低于 RunLoop 的 minimumTolerance 值的值可能会产生不符合预期的结果。

使用 DispatchQueue

我们可以使用 DispatchQueue 来生成 Timer。虽然 Dispatch 框架有一个 DispatchTimerSource,但 Combine 没有为其提供 Timer 接口。 相反,我们将使用另一种方法生成 Timer 事件:

example("Timer DispatchQueue") { 
    let queue = DispatchQueue.main
    let source = PassthroughSubject&lt;Int, Never&gt;()
    var counter = 0
    let cancellable = queue.schedule(
      after: queue.now,
      interval: .seconds(1)
    ) {
      source.send(counter)
      counter += 1
    }
    .store(in: &amp;subscriptions)
    let subscription = source.sink {
      print("Timer emitted \($0)")
    }
    .store(in: &amp;subscriptions)
}

这代码并不漂亮。我们创建一个 subject source,我们将向其发送 counter 值。每次计时触发时,counter 都会增加它。每秒在所选队列上安排一个重复操作,这将立即开始。订阅 source 获取 counter 值。

KVO

处理变化是 Combine 的核心。Publisher 让我们订阅它们以处理异步事件。我们了解了 assign(to:on:),它使我们能够在每次 Publisher 发出新值时更新对象属性的值。

此外,Combine 还提供了观察单个变量变化的机制:

  • 它为符合 KVO(Key-Value Observing)的对象的任何属性提供 Publisher。
  • ObservableObject 协议处理多个变量可能发生变化的情况。

publisher(for:options:)

KVO 一直是 Objective-C 的重要组成部分。 Foundation、UIKit 和 AppKit 类的大量属性都符合 KVO 的要求。我们可以使用 KVO 来观察它们的变化。

下面是一个对 OperationQueue 的 operationCount 属性 KVO 的示例:

let queue = OperationQueue()
let subscription = queue.publisher(for: \.operationCount)
  .sink {
    print("Outstanding operations in queue: \($0)")
  }

每次向队列添加新 Operation 时,它的 operationCount 都会增加,并且我们的 sink 会收到新的计数值。当队列消耗了一个 Operation 时,计数也相应会减少,并且我们的 sink 会再次收到更新的计数值。

还有许多其他框架类公开了符合 KVO 的属性。只需将 publisher(for:) 与 KVO 兼容的属性一起使用,我们将获得一个能够发出值变化的 Publisher。

自定义的 KVO 兼容属性

我们还可以在自己的代码中使用 Key-Value Observing,前提是:

  • 对象是 NSObject 子类;
  • 使用 @objc dynamic 标记属性。

完成此操作后,我们标记的对象和属性将与 KVO 兼容,并且可以使用 Combine。

注意:虽然 Swift 语言不直接支持 KVO,但将属性标记为 @objc dynamic 会强制编译器生成触发 KVO 机制的方法,该机制依赖 NSObject 协议中的特定方法。

在 Playground 上尝试一个例子:

example("KVO") { 
    class TestObject: NSObject {
      @objc dynamic var value: Int = 0
    }
    let obj = TestObject()
    let subscription = obj.publisher(for: \.value)
      .sink {
        print("value changes to \($0)")
      }
    obj.value = 100
    obj.value = 200
}

在上面的代码中,我们创建了一个 TestObject 类,继承自 NSObject 这是 KVO 所必需的。将我们要使其可观察的属性标记为 @objc dynamic。创建并订阅 objvalue 属性的 Publisher。更新属性几次:

--- KVO ---
value changes to 0
value changes to 100
value changes to 200

我们注意到在 TestObject 中我们使用的是 Swift 类型 Int,而作为 Objective-C 特性的 KVO 仍然有效? KVO 可以与任何 Objective-C 类型以及任何桥接到 Objective-C 的 Swift 类型一起正常工作。这包括所有原生 Swift 类型以及数组和字典,只要它们的值都可以桥接到 Objective-C。

Observation options

publisher(for:options:)options 参数是一个具有四个值的选项集:.initial.prior.old.new。 默认值为 [.initial],这就是为什么我们会看到 Publisher 在发出任何更改之前发出初始值。以下是选项的细分:

.initial 发出初始值。

.prior 在发生更改时发出先前的值和新的值。

.old.new 在此 Publisher 中未使用,它们都什么都不做(只是让新值通过)。

如果我们不想要初始值,你可以简单地写:

obj.publisher(for: \.value, options: [])

如果我们指定 .prior,则每次发生更改时都会获得两个单独的值。 修改 integerProperty 示例:

let subscription = obj.publisher(for: \.value, options: [.prior])

你现在将在 integerProperty 订阅的调试控制台中看到以下内容:

--- KVO ---
value changes to 0
value changes to 100
value changes to 100
value changes to 200

该属性首先从 0 更改为 100,因此我们获得两个值:0 和 100。然后,它从 100 更改为 200,因此我们再次获得两个值:100 和 200。

ObservableObject

Combine 的 ObservableObject 协议不仅仅适用于派生自 NSObject 的对象,而且适用于 Swift 的对象。 它与 @Published 属性包装器合作,帮助我们使用编译器生成的 objectWillChange Publisher 创建类。

它使我们免于编写大量重复代码,并允许创建可以自我监控的属性,并在它们中的任何一个发生更改时通知的对象。

这是一个例子:

example("ObservableObject") { 
    class MonitorObject: ObservableObject {
      @Published var someProperty = false
      @Published var someOtherProperty = ""
    }
    let object = MonitorObject()
    let subscription = object.objectWillChange.sink {
      print("object will change")
    }
    object.someProperty = true
}
--- ObservableObject ---
object will change

ObservableObject 协议使编译器自动生成 objectWillChange 属性。 它是一个 ObservableObjectPublisher,它发出 Void 值并且永不失败。

每次对象的 @Published 变量之一发生更改时,都会触发 objectWillChange。不幸的是,我们无法知道实际更改了哪个属性。 这旨在与 SwiftUI 很好地配合使用,它可以合并事件以简化屏幕更新。

资源管理

在前面的内容中,我们发现有时我们希望共享网络请求、图像处理和文件解码等资源,而不是进行重复的工作。换句话说,我们希望在多个订阅者之间共享单个资源的结果—— Publisher 发出的值,而不是复制该结果。

Combine 提供了两个操作符来管理资源:share() Operator 和 multicast(_:) Operator。

share()

该 Operator 的目的是让我们通过引用而不是通过值来获取 Publisher。 Publisher 通常是结构体:当我们将 Publisher 传递给函数或将其存储在多个属性中时,Swift 会多次复制它。当我们订阅每个副本时,Publisher 只能做一件事:开始其工作并交付值。

share() Operator 返回 Publishers.Share 类的实例。通常,Publisher 被实现为结构,但在 share() 的情况下, Operator 获取对 Publisher 的引用而不是使用值语义,这允许它共享底层 Publisher。

这个新 Publisher “共享”上游 Publisher。它将与第一个传入的 Subscriber 一起订阅一次上游 Publisher。然后它将从上游 Publisher 接收到的值转发给这个 Subscriber 以及所有在它之后订阅的 Subscriber。

注意:新 Subscriber 只会收到上游 Publisher 在订阅后发出的值。不涉及缓冲或重放。如果 Subscriber 在上游Publisher 完成后订阅 share Publisher,则该新 Subscriber 只会收到完成事件。

假设我们正在执行一个网络请求,你希望多个 Subscriber 无需多次请求即可接收结果:

example("share") { 
    let shared = URLSession.shared
      .dataTaskPublisher(for: URL(string: "https://random-data-api.com/api/v2/appliances")!)
      .map(\.data)
      .print("shared")
      .share()
    print("subscribing first")
    let subscription1 = shared.sink(
      receiveCompletion: { _ in },
      receiveValue: { print("subscription1 received: '\($0)'") }
    )
    .store(in: &subscriptions)
    print("subscribing second")
    let subscription2 = shared.sink(
      receiveCompletion: { _ in },
      receiveValue: { print("subscription2 received: '\($0)'") }
    )
    .store(in: &subscriptions)
}

第一个 Subscriber 触发 share() 的上游 Publisher 工作(执行网络请求)。 第二个 Subscriber 将简单地“连接”到它并与第一个 Subscriber 同时接收值。

在 Playground 中运行此代码:

--- share ---
subscribing first
shared: receive subscription: (DataTaskPublisher)
shared: request unlimited
subscribing second
shared: receive value: (91 bytes)
subscription2 received: '91 bytes'
subscription1 received: '91 bytes'
shared: receive finished

我们可以看到,第一个 Subscription 触发对 DataTaskPublisher 的订阅。第二个 Subscription 没有任何改变:Publisher 继续运行,没有第二个请求发出。当请求完成时,Publisher 将结果数据发送给两个 Subscriber,然后完成。

要验证请求只发送一次,我们可以注释掉 share(),输出将类似于以下内容:

--- share ---
subscribing first
shared: receive subscription: (DataTaskPublisher)
shared: request unlimited
subscribing second
shared: receive subscription: (DataTaskPublisher)
shared: request unlimited
shared: receive value: (109 bytes)
subscription1 received: '109 bytes'
shared: receive finished
shared: receive value: (94 bytes)
subscription2 received: '94 bytes'
shared: receive finished

可以清楚的看到,当 DataTaskPublisher 不共享时,它收到了两个 Subscription! 在这种情况下,请求会运行两次。

但是有一个问题:如果第二个订阅者是在共享请求完成之后来的呢? 我们可以通过延迟第二次订阅来模拟这种情况:

example("share") { 
    let shared = URLSession.shared
        .dataTaskPublisher(for: URL(string: "https://random-data-api.com/api/v2/appliances")!)
        .map(\.data)
        .print("shared")
        .share()
    print("subscribing first")
    let subscription1 = shared.sink(
        receiveCompletion: { _ in },
        receiveValue: { print("subscription1 received: '\($0)'") }
    )
    .store(in: &amp;subscriptions)
    DispatchQueue.main.asyncAfter(deadline: .now() + 5) {
        print("subscribing second")
        let subscription2 = shared.sink(
            receiveCompletion: { print("subscription2 completion \($0)") },
            receiveValue: { print("subscription2 received: '\($0)'") }
        )
        .store(in: &amp;subscriptions)
    }
}

运行 Playground,我们会看到 subscription2 什么值也没有收到:

--- share ---
subscribing first
shared: receive subscription: (DataTaskPublisher)
shared: request unlimited
shared: receive value: (102 bytes)
subscription1 received: '102 bytes'
shared: receive finished
subscribing second
subscription2 completion finished

在创建 subscription2 时,请求已经完成并且结果数据已经发出。如何确保两个 Subscription 都收到请求结果?

multicast(_:)

在上游 Publisher 完成后,要与 Publisher 共享单个 Subscription 并将值重播给新 Subscriber,我们需要类似 shareReplay() Operator。不幸的是,这个 Operator 不是 Combine 的一部分。我们将在后续文章中创建一个。

在“网络”中,我们使用了 multicast(_:)。此 Operator 基于 share() 构建,并使用我们选择的 Subject 将值发布给Subscriber。 multicast(_:) 的独特之处在于它返回的 Publisher 是一个 ConnectablePublisher。这意味着它不会订阅上游 Publisher,直到我们调用它的 connect() 方法。这让你有足够的时间来设置我们需要的所有 Subscriber,然后再让它连接到上游 Publisher 并开始工作。

要调整前面的示例以使用 multicast(_:),我们可以编写:

example("multicast") { 
    let subject = PassthroughSubject<Data, URLError>()
    let multicasted = URLSession.shared
        .dataTaskPublisher(for: URL(string: "https://random-data-api.com/api/v2/appliances")!)
        .map(\.data)
        .print("multicast")
        .multicast(subject: subject)
    let subscription1 = multicasted
        .sink(
            receiveCompletion: { _ in },
            receiveValue: { print("subscription1 received: '\($0)'") }
        )
        .store(in: &subscriptions)
    let subscription2 = multicasted
        .sink(
            receiveCompletion: { _ in },
            receiveValue: { print("subscription2 received: '\($0)'") }
        )
        .store(in: &subscriptions)
    let cancellable = multicasted.connect()
        .store(in: &subscriptions)
}

我们准备一个 subject,它传递上游 Publisher 发出的值和完成事件。使用上述 subject 准备多播 Publisher。

运行 Playground,结果输出:

--- multicast ---
multicast: receive subscription: (DataTaskPublisher)
multicast: request unlimited
multicast: receive value: (116 bytes)
subscription1 received: '116 bytes'
subscription2 received: '116 bytes'
multicast: receive finished

一个多播 Publisher,和所有的 ConnectablePublisher 一样,也提供了一个 autoconnect() 方法,这使它像 share() 一样工作:第一次订阅它时,它会连接到上游 Publisher 并立即开始工作。

Future

虽然 share()multicast(_:) 为你提供了成熟的 Publisher,Combine 还提供了另一种让我们共享计算结果的方法:Future

example("future") { 
    func performSomeWork() throws -&gt; Int {
        print("Performing some work and returning a result")
        return 5
    }
    let future = Future&lt;Int, Error&gt; { fulfill in
        do {
            let result = try performSomeWork()
            fulfill(.success(result))
        } catch {
            fulfill(.failure(error))
        }
    }
    print("Subscribing to future...")
    let subscription1 = future
        .sink(
            receiveCompletion: { _ in print("subscription1 completed") },
            receiveValue: { print("subscription1 received: '\($0)'") }
        )
        .store(in: &amp;subscriptions)
    DispatchQueue.main.asyncAfter(deadline: .now() + 3) { 
        let subscription2 = future
            .sink(
                receiveCompletion: { _ in print("subscription2 completed") },
                receiveValue: { print("subscription2 received: '\($0)'") }
            )
            .store(in: &amp;subscriptions)
    }
}

运行将输出:

--- future ---
Performing some work and returning a result
Subscribing to future...
subscription1 received: '5'
subscription1 completed
subscription2 received: '5'
subscription2 completed

在代码中,我们提供一个模拟 Future 执行的工作。创造新的 Future, 工作立即开始,无需等待 Subscriber。

如果成功,则给 Promise 提供值。如果失败,将错误传递给 Promise。Subscription 一次表明我们收到了结果。第二次 Subscription 表明我们也收到了结果,没有执行两次工作。

从资源的角度来看:

  • Future 是一个类,而不是一个结构。
  • 创建后,它立即调用闭包开始计算结果。
  • 它存储 Promise 的结果并将其交付给当前和未来的 Subscriber。

在实践中,这意味着 Future 是一种便捷的方式,可以立即开始执行某些工作,同时只执行一次工作并将结果交付给任意数量的 Subscriber。但它执行工作并返回单个结果,而不是结果流,因此使用场景比成熟的 Subscriber 要更少。当我们需要共享网络请求产生的单个结果时,它是一个很好的选择!

内容参考

以上就是特定用例下的Combine全面使用详解的详细内容,更多关于Combine 特定用例的资料请关注好代码网其它相关文章!

标签: Combine 用例