Subscribe to Jacob’s Tech Tavern for free to get ludicrously in-depth articles on iOS, Swift, tech, & indie projects in your inbox every week.
Paid subscribers unlock Quick Hacks, my advanced tips series, and enjoy exclusive early access to my long-form articles.
It’s no secret.
I’m a Combine fanboy.
I had a chip on my shoulder after missing the RxSwift bandwagon. Therefore, when Combine came out in 2019, I got serious shiny-object syndrome and used it wherever I could.
Then, in 2021, Swift Concurrency (finally) released, with ergonomics which drastically outclassed Combine for many simple use cases.
Combine still held a cherished place in my heart. I’d built my whole network stack with the framework, and came up with all manner of excuses to resist transitioning.
Once the AsyncAlgorithms package was published in 2022, there was really no excuse left: much of the functionality in Combine was now available in Swift Concurrency.
Today, I’ll demonstrate how to migrate your Combine code over to AsyncAlgorithms, with a fully open-source tutorial project you can code along with.
If you, like me, need to wean yourself off Combine, then code along from Before to After.
The Combine Project structure
The Combine project (Before) is a familiar layered architecture you might find in many indie projects, although you probably aren’t lazy enough to leave ContentView
as the default name.
UI layer —
ContentView
&ContentViewModel
Data access layer —
ContentRepo
Networking layer — all the APIs
While this project is pretty basic; it demonstrates a nice mix of various Combine use cases you might find in your projects.
These use cases include:
Displaying a notification badge from a couple of data sources.
Displaying the progress of a long-running download.
Fetching and displaying a username from an API call.
Async Algorithms
There’s one protocol you need to understand to really get to grips with AsyncAlgorithms: AsyncSequence.
AsyncSequence works a lot like the regular Sequence protocol in the Swift Standard Library: it provides an asynchronous version of the Iterator
protocol that offers async iterated access to the next()
element.
This iteration can be invoked using the for-await-in
syntax sugar.
AsyncSequence should be very familiar if you’re coming from Combine publishers: both deal with streams of values over time. Many of the algorithms in the AsyncAlgorithms package therefore serve as drop-in replacements for corresponding Combine operations.
If you want a more detailed primer on AsyncSequence, and concurrency tools in general, check out this piece I wrote for my friends at Emerge Tools: Async await in Swift: The Full Toolkit.
Let’s jump into the tutorial now.
And code along, dammit!
Starting from Before, the original Combine project, import the AsyncAlgorithms package from Github.
Let’s go through each use case step-by-step and migrate Combine over to their modern AsyncAlgorithm equivalents.
Combining data sources
Let’s say you’re a social app, an email app, or a personal finance app — the key engagement hook is notifications, often with a single menu which contains them.
The most popular UI pattern is decorating the notification menu button with a badge and number, which counts all the notifications.
Migrating the Repository
This number pulls together various data sources in order to get the correct number, which we can set up in our trusty Repository.
var chatNotificationsSubject = CurrentValueSubject<[Notification], Never>([])
var friendNotificationsSubject = CurrentValueSubject<[Notification], Never>([])
func streamChatNotifications() {
Task {
let chatNotifications = try? await chatAPI.getUnreadChatMessages()
chatNotificationsSubject.send(chatNotifications ?? [])
}
}
func streamFriendNotifications() {
Task {
let friendNotifications = try? await friendsAPI.getFriendNotifications()
friendNotificationsSubject.send(friendNotifications ?? [])
}
}
Just because we’re migrating, doesn’t mean we need to rip out all the Combine code at once. You might even be working with a third party SDK. What’s important here is taking Combine out of our public interfaces — and leaving it as an implementation detail.
Fortunately, it’s easy to seamlessly convert Combine publishers into Swift-Concurrency-friendly AsyncSequences using the values
modifier.
This property was actually the last update Apple ever gave to Combine, in 2021.
🥲
To keep Combine out of the interface, we can make the Combine subjects private:
private var chatNotificationsSubject = CurrentValueSubject<[Notification], Never>([])
private var friendNotificationsSubject = CurrentValueSubject<[Notification], Never>([])
And use computed properties which convert them into AsyncSequences which can be consumed downstream:
var chatNotificationsSequence: AsyncPublisher<AnyPublisher<[Notification], Never>> {
chatNotificationsSubject.eraseToAnyPublisher().values
}
var friendNotificationsSequence: AsyncPublisher<AnyPublisher<[Notification], Never>> {
friendNotificationsSubject.eraseToAnyPublisher().values
}
Since we’re leaving the internal implementation unchanged, we don’t even need to modify our API code in this instance. The conversion is relatively seamless once you get your head around type erasure.
Migrating the View Model
In the view model, we originally used the eponymous combineLatest
to merge the streams together. This is Combine’s bread-and-butter: publishing values from multiple data sources and combining them together.
This modifier publishes pairs of values each time either publisher sends anything. For our notification badge, this means we can always be sure it’s combining the total number of new notifications from each source, such as friend requests, chat messages, and likes.
This ensures our users see the correct value for the badge.
private func subscribeToNotifications() {
repo.chatNotificationsSubject
.combineLatest(repo.friendNotificationsSubject)
.map { $0.0.count + $0.1.count }
.receive(on: RunLoop.main)
.assign(to: \.notificationCount, on: self)
.store(in: &cancellables)
}
The most critical thing to understand when moving from Combine to AsyncAlgorithms is the way we iterate through values.
Instead of handling streams of values with .sink
, we’re now iterating through an AsyncSequence using the for-await-in
syntax.
Wait, that wasn’t the most important thing!
The most important thing is to import AsyncAlgorithms
. But the iteration syntax is definitely #2.
Now that our imports are in order, we can merge the AsyncSequence streams together in our view model.
We combine the streams from the repo using the combineLatest()
algorithm, which has virtually identical syntax to the Combine form.
private func streamNotificationCount() async {
for await notificationsCount in combineLatest(
repo.chatNotificationsSequence,
repo.friendNotificationsSequence
) { ... }
}
If you aren’t familiar with the behaviour of combineLatest
, printing the output above looks a little like this (mapping just the id
property):
// no notifications to start with
([], [])
// friend notifications published
([], ["4", "5", "6", "6"])
// chat notifications arrive, both sets published
(["1", "2", "3"], ["4", "5", "6", "6"])
Migrating the map is actually much easier — it isn’t even an AsyncAlgorithm; it’s just the existing standard library implementation of map
on a Sequence.
@MainActor
private func streamNotificationCount() async {
for await notificationsCount in combineLatest(
repo.chatNotificationsSequence,
repo.friendNotificationsSequence
).map({
$0.0.count + $0.1.count
}) {
self.notificationCount = notificationsCount
}
}
Since we’re assigning a property on our observable view model, we put this method on the @MainActor
to ensure thread safety.
This single @MainActor
attribute neatly replaces the .receive(on: RunLoop.main)
operation. The for-await-in
syntax serves as a suspension point between each iteration of the stream — the waiting itself is happening off the main thread.
Streams and throttling
Many apps include some kind of loading bar, or progress indicator, for any non-trivial download or processing task. This simple UI element can yield a big UX improvement by making the app appear more responsive.
What we don’t want to do, however, is spam the UI with more updates than we have frames per second. So, even if we are inundated with 1000 progress updates per second, we should only update the UI when the user will actually notice.
Migrating the Repository
Our original download listener is doing what Combine does best: handling a rapid stream of delegate callbacks from a closure-based API. This updates the download percentage on our UI.
var downloadSubject = PassthroughSubject<Double, Never>()
private let downloadAPI = DownloadAPI()
func performDownload() {
Task {
await downloadAPI.startDownload { [weak self] percentage in
self?.downloadSubject.send(percentage)
}
}
}
To migrate this to pure Swift Concurrency, first we want to convert the Combine stream into an AsyncStream with a Continuation.
AsyncStream is a special concrete generic type which conforms to the AsyncSequence protocol, but gives us full control over creating and sending values. This allows it to handily take up the mantle of “wrapping callback-based APIs”.
Continuations are a runtime entity in Swift Concurrency: lightweight objects that store state at async suspension points and allow resumption in the future.
Continuations manage a suspension point, and we manually send values to the stream using yield()
. This triggers the next()
value in AsyncStream’s iterator to be sent.
func performDownload() async -> AsyncStream<Double> {
AsyncStream { continuation in
Task {
await downloadAPI.startDownload { percentage in
continuation.yield(percentage)
}
continuation.finish()
}
}
}
This, admittedly, isn’t the most intuitive code if you aren’t used to AsyncStream.
In short, we are replacing Combine’s subject.send(value)
with AsyncStream’s continuation.yield(value)
.
Migrating the View Model
Now we’ve set up the repository, we can do the fun part: converting the pipelines in our view model. Here’s the original Combine-based form:
private func subscribeToDownloadTask() {
repo.downloadSubject
.throttle(for: .seconds(0.05), scheduler: RunLoop.main, latest: true)
.receive(on: RunLoop.main)
.assign(to: \.downloadPercentage, on: self)
.store(in: &cancellables)
}
The API between Combine and AsyncAlgorithms are really similar enough that we don’t need to change that much at all here.
The Combine .throttle()
operator needs to be replaced with the AsyncAlgorithms equivalent. Aptly named; _throttle()
. If it ain’t broke, don’t fix it.
Why does _throttle() have an underscore?
The Swift team added this because they weren’t sure about the semantics they wanted for throttling before AsyncAlgorithms
v1.0
.When you’re not sure, underscore!
We can replace downloadSubject
with the new performDownload()
function call, reach for the trusty @MainActor
attribute, and et-viola! Our function virtually migrates itself.
@MainActor
private func performDownload() async {
for await percentage in repo
.performDownload()
._throttle(for: .seconds(0.05), latest: true) {
self.downloadPercentage = percentage
}
}
Now, the Swift Concurrency version of the view model matches the original Combine behaviour: updating the loading bar on the UI (at most) 20 times per second in response to a download progress callback.
CurrentValueSubject to AsyncChannel
As iOS engineers, our bread-and-butter is fetching data from an API and displaying it on a view.
In this final section, we’re going to migrate CurrentValueSubject
into AsyncChannel
.
Migrating the Repository
Our original Combine code in the repository is identical to the rest of the old Combine repo code, so it won’t be tough to see where this is going.
var userSubject = CurrentValueSubject<User?, Error>(nil)
private let userAPI = UserAPI()
func loadUser() {
Task {
do {
let user = try await userAPI.getUser()
userSubject.send(user)
} catch {
userSubject.send(completion: .failure(error))
}
}
}
We’re fetching data from UserAPI and sending the user data — or an error — to a CurrentValueSubject. This caches the latest value, making it easy to share the data across your app without performing lots of additional networking.
Author’s note:
Frankly, using Combine to fetch and returning a single object from the network is over-engineering a complex solution to a simple problem.
Basic async/await is almost certainly enough here. There are certainly use cases for AsyncChannel beyond this very contrived example.
AsyncChannel is type heavily inspired by Combine subjects. It’s a special type of AsyncSequence which applies back pressure — this means it’ll buffer values. It wait for a value to be consumed downstream before calling the next()
function on its iterator.
We begin by modifying the userSubject
on our repository from a CurrentValueSubject to an AsyncThrowingChannel.
var userChannel = AsyncThrowingChannel<User, Error>()
Since these concepts behave the same, we hardly need to modify loadUser()
at all to get the repo to its final AsyncAlgorithms form.
func loadUser() {
Task {
do {
let user = try await userAPI.getUser()
await userChannel.send(user)
} catch {
userChannel.fail(error)
}
}
}
The only difference is swapping out the unhappy path code branch — from send(completion: .failure)
to the much more intuitive fail(error)
.
Migrating the View Model
Let’s come one last time to our view model to consume these User values.
private func subscribeToUser() {
repo.userSubject
.compactMap { $0 }
.receive(on: RunLoop.main)
.sink(receiveCompletion: { completion in
if case .failure(let error) = completion {
print(error)
}
}, receiveValue: { [weak self] user in
self?.user = user
})
.store(in: &cancellables)
}
We can tweak our subscribeToUser
function name into the more descriptive handleUserValues
.
We’re listening to an AsyncChannel, which is a kind of AsyncSequence with an asynchronous iterator inside. Therefore, we can use the now-familiar for-await-in
syntax to handle values as they pass through the channel.
private func handleUserValues() async {
do {
for try await user in repo.userChannel {
// ...
}
} catch {
print(error)
}
}
Now we can apply the same transformations from our original Combine implementation.
First, the compactMap
operation which filters out nil
user values. From the Combine form…
.compactMap { $0 }
…to the AsyncAlgorithm version:
.compacted()
Again, we can replace .receive(on: RunLoop.main)
, with the @MainActor
attribute, and our final code looks very similar to the original Combine implementation.
@MainActor
private func handleUserValues() async {
do {
for try await user in repo.userChannel.compacted() {
self.user = user
}
} catch {
print(error)
}
}
But there’s one problem.
In our original Combine repository, we could cache the latest User value in the CurrentValueSubject and pass it around without re-fetching the user. Then, any other view model requesting the user data could immediately return the cached value when they first set up the pipeline.
However, AsyncChannel only behaves like a PassthroughSubject, not a CurrentValueSubject. Due to the implementation of its underlying async iterator, it will always await next()
in the for-await-in
loop when you set it up rather than consuming an old value.
If you need this caching behaviour, and are for some reason allergic to first-party semi-deprecated libraries like Combine, then the open-source AsyncExtensions package might have you covered.
Most importantly, as a Combine fanboy, I am finally vindicated. AsyncAlgorithms isn’t all that after all.
My fellow writer, Captain SwiftUI, helpfully pointed out an even more critical weakness in AsyncAlgorithms, that has a huge impact when using it for a shared repository: AsyncSequence does not support broadcasting and multiplexing.
This means that if you tried to create multiple downstream
for-await-in
listeners to an AsyncSequence or AsyncStream, you’d get a crash—the AsyncIterator inside each sequence isn’t designed to have multiple entities callingawait next()
.Future iterations of AsyncAlgorithms are intended to include some kind of Broadcaster entity.
Migration Retrospective
We’ve come a long way in a short time.
Before, our repository interface looked like this:
import Combine
protocol ContentRepo {
var userSubject: CurrentValueSubject<User?, Error> { get }
var chatNotificationsSubject: CurrentValueSubject<[Notification], Never> { get }
var friendNotificationsSubject: CurrentValueSubject<[Notification], Never> { get }
var downloadSubject: PassthroughSubject<Double, Never> { get }
func loadUser()
func streamChatNotifications()
func streamFriendNotifications()
func performDownload()
}
Now, it looks more or less the same, and yet oh-so-different:
import AsyncAlgorithms
protocol ContentRepo {
var userChannel: AsyncThrowingChannel<User?, Error> { get }
var chatNotificationsSequence: AsyncPublisher<AnyPublisher<[Notification], Never>> { get }
var friendNotificationsSequence: AsyncPublisher<AnyPublisher<[Notification], Never>> { get }
func loadUser()
func streamChatNotifications()
func streamFriendNotifications()
func performDownload() -> AsyncStream<Double>
}
Once our repository was wired up correctly to produce AsyncAlgorithms-compatible AsyncSequences, the view model migration was pretty interesting.
Combing values from multiple streams worked pretty well already in the Combine world — which we should expect of a framework named Combine.
private func subscribeToNotifications() {
repo.chatNotificationsSubject
.combineLatest(repo.friendNotificationsSubject)
.map { $0.0.count + $0.1.count }
.receive(on: RunLoop.main)
.assign(to: \.notificationCount, on: self)
.store(in: &cancellables)
}
When we use an AsyncAlgorithm, it still looks just as neat:
@MainActor
private func streamNotificationCount() async {
for await notificationsCount in combineLatest(
repo.chatNotificationsSequence,
repo.friendNotificationsSequence
).map({
$0.0.count + $0.1.count
}) {
self.notificationCount = notificationsCount
}
}
Our downloader originally looked like this when updating the UI with the long-running task progress.
private func subscribeToDownloadTask() {
repo.downloadSubject
.throttle(for: .seconds(0.05), scheduler: RunLoop.main, latest: true)
.receive(on: RunLoop.main)
.assign(to: \.downloadPercentage, on: self)
.store(in: &cancellables)
}
The AsyncAlgorithm version looks very similar, albeit using a hilarious underscored API.
@MainActor
private func performDownload() async {
for await percentage in repo
.performDownload()
._throttle(for: .seconds(0.05), latest: true) {
self.downloadPercentage = percentage
}
}
When listening to individual user objects, we went from this:
private func subscribeToUser() {
repo.userSubject
.compactMap { $0 }
.receive(on: RunLoop.main)
.sink(receiveCompletion: { completion in
if case .failure(let error) = completion {
print(error)
}
}, receiveValue: { [weak self] user in
self?.user = user
})
.store(in: &cancellables)
}
To a much cleaner-looking AsyncAlgorithms, with far simpler error handling:
@MainActor
private func handleUserValues() async {
do {
for try await user in repo.userChannel.compacted() {
self.user = user
}
} catch {
print(error)
}
}
I haven’t mentioned it until now, because it wasn’t very interesting, but we’ve also converted the setup code in the view model; from this classic Combine setup:
private func configureSubscriptions() {
subscribeToUser()
subscribeToNotifications()
subscribeToDownloadTask()
}
private func loadData() {
repo.loadUser()
repo.streamChatNotifications()
repo.streamFriendNotifications()
repo.performDownload()
}
To this approach more in line with Swift Concurrency:
private func configureSubscriptions() {
Task { await handleUserValues() }
Task { await performDownload() }
Task { await streamNotificationCount() }
}
private func loadData() {
repo.loadUser()
repo.streamChatNotifications()
repo.streamFriendNotifications()
}
You’ll notice performDownload()
is gone, since we now kick off the download at the same time as setting up the stream handler.
One last time, you’re welcome to check out the before and after in my open-source codebase from Combine to AsyncAlgorithms.
Conclusion
If you missed the boat back in 2016, this is what it felt like when we refactored all the Objective-C code to Swift. Old code bad, new shiny code good. Happy developers.
We can see our migration was a success because the AsyncAlgorithms project looks and behaves identically to the original version.
While it’s tremendously satisfying to replace a framework that has no future for a framework with a future, it’s important to remember that your users probably don’t care*.
*Disregard this statement if you’re an indie hacker building a dev-tools startup in public, LFG 🚀
However, code that uses AsyncAlgorithms is simpler to read than Combine code, and works seamlessly with the rest of Swift Concurrency. This reduces the code maintenance burden in the future.
If you want my measured advice, you should gradually migrate your public interfaces to future-proof your work, and let your Combine code retire with dignity — as private properties on your repos… at least until they replace CurrentValueSubject
.