I ran into a concurrency quirk while hacking on this library for BIP-324. The library is performing some network level encoding and decoding of bytes on the wire for version 2 of a protocol. The project publishes a small proxy application along side the library. The proxy app is a middleman for V1 clients and V2 clients, it simply listens on one side for V1 messages and writes them in V2 on the other side, and vice versa.
Not that it matters too much for understanding this issues I ran into, but the library is written in a “sans-io” approach where it makes no assumptions about the concurrency runtime of the caller. This makes it extremely flexible, but requires more boilerplate integration code from the caller. So the library also offers higher level interfaces which can plug into the most common runtimes. For example, the AsyncProtocol type has a very straight forward interface for the caller and does all the boilerplate under the hood. It just requires the caller to be using a compatible asynchronous runtime.
The BIP324 proxy application is using the high level AsyncProtocol interface of the library along with the popular Tokio asynchronous runtime. Now, there is nothing stopping the proxy app from instead using two threads, one for V1 -> V2 and the other for V2 -> V1, in a synchronous concurrency runtime. But the asynchronous runtime promises some ridiculous concise syntax for this use case and may be more performant for the use case (not a big deal here though either way).
loop {
  select {
    message = v1_reader.read_exact() {
      v2_writer.write(message).await;
    },
    message = v2_reader.read_exact() {
      v1_writer.write(message).await;
    }
  }
}
Proxy application pseudocode for an asynchronous concurrency runtime.
This pseudocode looks vaguely Rust-y, but boiled down to what the runtime offers. Instead of having to fire up and manage two threads, the application just selects across two different asynchronous tasks. In this case, reading from the V1 side and reading from the V2 side. When one of either of the reads completes, the work is scheduled to then write to the opposite side. The select operator is showing off the potential efficiency allowed by the granular control flow operations of an async runtime, which is possible here because the tasks will work cooperatively to hand off who is driving. To compose operations like this in a thread-based, preemptive, runtime would require a lot more overhead, like a queue to manage messages. Control flow with threads is coarse-grain, heavyweight, since it is usually just implicitly set by the function order on the call stack.
So I coded this up and tested it locally and was confused when it would appear to work for a bit, but inevitably at some point go off the rails. All of a sudden all clients would no longer understand each other and just disconnect. Something was throwing this off, but there is just so little code, what could it be?
I had to take another look at the select documentation for the Tokio implementation, along with the read calls I was using under the hood to pull bytes off of the sockets. The select docs have a section called Cancellation Safety with the following blurb.
When using select! in a loop to receive messages from multiple sources, you should make sure that the receive call is cancellation safe to avoid losing messages.
That sounded suspicious. I am a little new to some of these async control flow operations and had never ran into cancellation issues before (that I know of). But I wasn’t exactly sure what it was talking about so I took a look at the read operation I was using under the hood for the streams, it too has a section on cancellation safety.
This method is not cancellation safe. If the method is used as the event in a tokio::select! statement and some other branch completes first, then some data may already have been read into buf.
Well, damn. For some reason, I assumed that if one of the read operations returns data, the write work would be queue’d up and that is that. The loop would restart and do it all again. However, that is not the contract of select. Once one of the tasks being selected across returns, the rest are canceled. If the tasks are stateless this is not a big deal, they just drop their work and would re-do stuff on the next go around of the loop. But these tasks are reading off of streams! Those streams are not stateless, so when the task drops its work the read bytes are lost.
I am surprised I have never ran into this before. My theory is that in thread-based runtimes, cancellation is such a nuclear option you rarely deal with it. Threads are heavyweight and complex, you are probably only canceling them if you are shutting down the whole application. In an asynchronous runtime, tasks are generally lightweight and cheap, it’s not a big deal to nuke a few and restart them. So to run into cancellation safety issues, you will probably have to be using an asynchronous runtime plus be making calls which are not cancellation safe, like my stream reads.
Well now I know. But is there a way to make the functions in this loop cancellation safe? I was using the read_exact function under the hood which sounds pretty perfect for my use case. The protocol I am implementing encodes packets in a layout where the first three bytes contains the number of bytes remaining in the packet. So a read operation would be:
- Read first three bytes.
- Decode length.
- Read the decoded length number of bytes.
- Decode packet.
read_exact is perfect! Except that it is not cancellation safe…why is that?
A future is an instance of a stuct which has Future behavior (a.k.a. implements the trait). This instance can own some memory allocation like any other. The read_exact function is tasked with reading an exact amount of bytes asynchronously. So it is going to return a future, but it might have to be poll’d multiple times to reach the exact amount of bytes. A straightforward way for it to accomplish this is to pull as many bytes off the stream as it can, with the exact amount being the limit, and if there is not enough yet, store it in a local buffer. Next time around pull more bytes and check if the exact amount is reached. And so on. So the future is holding a memory allocation of the bytes it has pulled off of the stream. If it gets canceled before reaching the exact amount these bytes are going to get dropped. They will also probably no longer be in the stream since they have been passed up already. So data is just dropped.
The read_exact function is actually part of an extension trait, AsyncReadExt, defined in futures-rs (which is kind of the central hub of future trait development). The AsyncRead trait has a much smaller scope, but any implementation of it can then be extended with AsyncReadExt with no extra work required by the implementer, pretty neat. AsyncReadExt is comparable to the standard library’s std::io::Read. What is a little confusing though is that some runtimes, like Tokio, have implemented their own versions of AsyncReadExt along with AsyncRead to squeeze out performance. A caller can decide which to use through imports in their code.
Both the futures-rs and Tokio AsyncReadExt traits have a version of read_exact. The implementations are similar to each other, but not exactly what I described above. They take a mutable reference to write to instead of managing any memory themselves. This doesn’t fix the cancellation issues though. The definition for a future to be cancellation safe is that if it is not yet completed, it can be dropped at any point and safely re-created. These read_exact implementations are not passing back any sort of “number of bytes read” to the caller, since the whole point is to wait until all the bytes are read. If a caller simply recreated a canceled future they might drop bytes, or in this case, overwrite them in the buffer.
Any bytes pulled off of the underlying stream need to be held onto between re-creates. I think this could still be accomplished using the read_exacts, but it would require some non-obvious bookkeeping on the buffer. At that point, it may be more straight forward to use the simple read function, also part of AsyncReadExt, and track the number of bytes read per re-create. The wrapper stuct around the I/O read operations becomes its own little state machine, kind of like a future implementation, but doesn’t have to deal with all the pinning since it doesn’t have to do any fancy self referencing.
Channels are great concurrency primitive safeguards and naturally fit in the expressions for a select statement, try to use the structure when you can.
awaits in the branch
In the above example I focused on await being used under the hood in the async expression for branches. But what about awaits in the branch handlers?
loop {
    select! {
        result = potentially_cancellable_operation() => {
            // This handler code will NOT be cancelled by select!
            non_cancellation_safe_operation().await;
        },
        other_result = another_potentially_cancellable_operation() => {
            other_non_cancellation_safe_operation().await;
        }
    }
}
Await away in the handler logic.
While the select effectively creates a big future of the branch expressions, the returned handler is its own thing. This means it gets driven to completion, and not exposed to cancellation from the select. This assumes there isn’t a higher level cancellation context though, like if the handler is used as an expression in another select!