In my previous post Swift Actors: A practical example, part 1 I covered how I got a Swift.org toolchain from trunk, enabled support for the currently experimental support for structured concurrency, and created a thread-safe cache in few lines of Swift code.

If you missed part 1, definitely do read it first as this post builds upon the code I already covered there. »

When I wrapped up writing part 1, I was looking at my new cache actor and I started wondering “What about Combine?”.

What about Swift actors and Combine?

With all the concurrency advancements like async/await, the Task APIs, and actors - is Combine going to be still needed?

Some simple use cases would be easier with the new APIs. What’s more the new concurrency APIs are fully integrated with Swift’s error handling which makes the code much simpler to write and read. For example:

  1. I believe I won’t use Future much more since an async function is much simpler to write.
  2. Reimplementing publishers that only emit series of values and then complete as an AsyncSequence will be easier and more importantly much simpler at the point-of-use.

In any case I think the Combine APIs to manage multiple streams of events over time offer a unique angle to asycnhronous value processing. In fact, I think it’s great we’re getting a simpler way of async/concurrent for simple tasks and still have the option to solve more complex problems with Combine.

So let’s add some Combine to the ActorTest app from part 1!

Converting an async function to synchronous

Huge disclaimer: This post is purely exploratory and is using the experimental Swift concurrency feature which is a work in progress.

The compute() function from part 1 which precomputes a bunch of hashes and adds them to the cache currently looks like so:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
func compute() async {
  addHash(for: 42)
    
  await withTaskGroup(of: Bool.self) { group in
    for number in 0 ... 15_000 {
      group.spawn {
        await self.addHash(for: number)
        return true
      }
    }
  }
}

I’d very much like to mix-in some Combine in here to provide a way for an observer to track the progress of precomputing the hashes.

Usually I’d return a publisher from the function and let observers subscribe it. However, this function is async so it doesn’t return a result until it’s finished all its work.

On the other hand, in the structured concurrency proposal I noticed a new function called async(...) that lets you run async code in synchronous contexts.

Let’s try that and let compute() return a publisher emitting its progress:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
func compute() -> AnyPublisher<Int, Never> {
  let progress = CurrentValueSubject<Int, Never>(0)
  
  async {
    await withTaskGroup(of: Bool.self) { group in

    }
    progress.send(completion: .finished)
  }
  
  return progress.eraseToAnyPublisher()
}

It took some trial and error to come up with this code but here’s the play-by-play:

  1. compute() is not async anymore and returns an AnyPublisher<Int, Never> so consumers can subscribe and observe the progress,
  2. async(...) creates an asynchronous task to run withTaskGroup(...) so you can actually use await inside the closure body,
  3. Since execution is waiting for withTaskGroup(...) to complete all the group tasks, I can simply emit the .finished publisher event on the next line after withTaskGroup(...).
  4. Finally (and that’s a bit of an assumption) Swift’s structured concurrency allows me to return from compute() before the code in async completes. async() spawns a child task which runs asynchronously and upon completion also releases the execution of compute(). So in fact my progress subject is alive and well until group completes all its tasks 🤯🤯🤯.

Great! ⌘+B compiles the code just fine and I’m ready to look into fleshing out the code that emits the progress.

Task groups are lazy sequences

As I mentioned in part 1, the task group API, in its current state, allows you to easily add tasks to the group for concurrent execution by calling spawn(...). Before completing, withTaskGroup(...) waits until all spawned tasks complete.

There is however a group API that also allows you to control and track the completion of tasks. In my code I’d like to send a value via my progress subject each time a task completes.

This is trivial - peaking in the headers shows me that TaskGroup conforms to AsyncSequence and I can get the result of each task when it completes by calling the sequence’s next() method:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
await withTaskGroup(of: Bool.self) { group in
  // Schedule all the tasks.
  for number in 0 ... 15_000 {
    group.spawn {
      await self.addHash(for: number)
      return true
    }
  }
  
  // Consume the tasks as they complete.
  var counter = 0
  while let _ = await group.next() {
    counter += 1
    progress.send(Int(Double(counter) / 15_000.0 * 100))
  }
}

The first half of the body now spawns all the tasks in the group and the while loop consumes the results of the concurrent tasks as soon as they complete (but still synchronously).

I love this design because with just few lines of code you create both:

  • the concurrent exection that does the heavy work, and
  • the synchronized handler where you can safely process the results 🥰.

The code, emitting the progress, counts the completed tasks and calls into progress.send(...) to emit the current progress value.

Subscribing the progress publisher

The rewrite with Combine is essentially completed. I still need to subscribe compute() and print the progress to the console.

That’s the easiest task so far:

1
2
3
4
5
6
7
8
9
static var sub: AnyCancellable?

static func main() async {
  sub = await cache.compute()
    .removeDuplicates()
    .sink(receiveValue: { progress in
      print("Computing: \(progress)%")
    })
}

compute() is not async anymore but since it’s an actor method I still had to use await. The rest was easy as pie - removeDuplicates() emits only the unique progress values, and the code in sink(...) prints out the progress.

When I build and run - the progress shows up in Xcode’s console:

...
Computing: 96%
Computing: 97%
Computing: 98%
Computing: 99%
Computing: 100%
Program ended with exit code: 0

The complete ActorTest app with Combine

Here’s how the complete App.swift developed in this post look like:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import Foundation
import CryptoKit
import Combine

@available(macOS 9999, *)
@main
struct App {
  static let cache = HashCache()
  static var sub: AnyCancellable?
  
  static func main() async {
    sub = await cache.compute()
      .removeDuplicates()
      .sink(receiveValue: { progress in
        print("Computing: \(progress)%")
      })
  }
}

@available(macOS 9999, *)
actor HashCache {
  private(set) var hashes = [Int: String]()
  
  func addHash(for number: Int) {
    let string = SHA512.hash(data: 
      Data(String(number).utf8)
    ).description
        
    hashes[number] = string
  }
  
  func compute() -> AnyPublisher<Int, Never> {
    let progress = CurrentValueSubject<Int, Never>(0)
    
    async {
      await withTaskGroup(of: Bool.self) { group in
        // Schedule all the tasks.
        for number in 0 ... 15_000 {
          group.spawn {
            await self.addHash(for: number)
            return true
          }
        }
        
        // Consume the tasks as they complete.
        var counter = 0.0
        while let _ = await group.next() {
          counter += 1
          progress.send(Int(counter / 15_000.0 * 100))
        }
      }
      
      progress.send(100)
      progress.send(completion: .finished)
    }
    
    return progress.eraseToAnyPublisher()
  }
}

Final disclaimer: Developed using a Swift toolchain from the Swift.org trunk branch. The concurrency feature is a work-in-progress. This code might not work at a later moment.


Where to go from here?

All of the concurrency features in Swift are still work in progress. You don’t need to be learning them as of right now as syntax or behavior might change, so I hope this post does not put any unnecessary pressure on you.

That’s also the reason why I’ve also put so many disclaimers in the post - this write-up is purely exploratory.

Hit me up with your ideas, replies, and feedback on Twitter at https://twitter.com/icanzilb.