I told OpenAI to generate an increasingly complex stream… here is the result.

Working with Asynchronous Streams in Python

How to add complexity, layer by layer, without creating a messy codebase.

VidaVolta
4 min readDec 26, 2023

--

Recently, I’ve explored the world of asynchronous Python while developing some FastAPI applications for a project. While unintuitive at first, asynchronous programming can be very powerful mainly due to its implementation speed.

In general, you don’t really need to restructure your application logic, or even architecture, to reap the benefits of async. As long as your program is not CPU bound, you just need to locate the expensive non-CPU bound bottlenecks, change some function signatures fromdef to async def and add await here and there and your program “magically” will speed up (subject to Amdahl’s law, of course).

Once you are working in the async paradigm, new components that resemble their synchronous counterparts appear. Python generators are a good example — having both synchronous and asynchronous versions.

Asynchronous generators are a frequently used component in FastAPI applications (or any async webserver) — because they can be used to stream data in a familiar way (i.e. to implement server sent events), like so:

async def number_generator(n):
for i in range(n):
yield i
await asyncio.sleep(1)


async def main():
async for number in number_generator(5):
print(number)

asyncio.run(main())

# Prints 0 1 2 3 4 with a one second delay between numbers

If you want to know precisely how to implement streaming in FastAPI, I wrote a more in-depth guide on this topic.

While intuitive, the above example is a bit constraining. Allow me to demonstrate a common use-case where we want to wrap a generator with our own functionality. Here is an example third party generator:

async def third_party_generator(n):
for i in range(n):
# sometimes, this generator throws exceptions...
if random.choices(['True', 'False']):
raise Exception("My Service is down!")

yield i
await asyncio.sleep(1)

We can’t modify this, we can only work with its results. This might be enough, you can throw a try-catch and handle exceptions:

async def main():
async for number in third_party_generator(5):
try:
print(number)
except Exception as e:
print("I handled the exception.. but now what?")

You can get more complicated with your error handling, how about we fallback to a secondary async generator if the primary one fails.

    generated = []
exception = None
async for number in third_party_generator(5):
try:
generated.append(number)
print(number)
except Exception as e:
exception = e

if exception is not None:
async for number in some_other_generator(generated):
print(number)

So now, if we fail mid-stream, we fallback to another generator, which takes the context of the already generated data as a constructor, so it knows how to resume the stream.

This works, but I found it quickly became unwieldy when I wanted to add layers and layers of functionality to an asynchronous generator.

I wanted retries, I wanted to fallback to other streams if one failed, I wanted to modify contents before passing them to the next layer, and the list went on.

Luckily, I found a much better way to wrap an async generator in our business logic — I call it the “wrapper pattern” and it looks like this:

class StreamWrapper:
def __init__(self, stream: AsyncGenerator[int, Any]):
self._stream = stream

def __aiter__(self):
return self

async def __anext__(self):
# add some special logic here
async for data in self._stream:
return data

raise StopAsyncIteration

When the caller iterates the StreamWrapper, it calls _anext__ until it receives a `StopAsyncIteration` exception.

Now, we can hide all of the retry logic in StreamWrapper. Our code that needs this stream is properly shielded from the logic we’ve added — and can continue using the simple interface:

async for number in StreamWrapper(third_party_generator(5)):
print(number)

What’s great about this is how easy it is to compose a complicated stream by adding layers of wrappers. As a quick example to finish up, I will add a data validation layer, and a “stream recovery” layer that falls back to a different async generator.

class StreamRecoveringWrapper:
def __init__(self, stream, fallback_streamer):
self._stream = stream
self._fallback_streamer = fallback_streamer
self._streamed = []

def __aiter__(self):
return self

async def __anext__(self):
try:
async for data in self._stream:
self._streamed.append(data)
return data
except Exception as e:
self._stream = self._fallback_streamer.get_new_stream(self._streamed)
return None # uh oh, returning None! Doesn't matter, we will add another layer that can handle this!

raise StopAsyncIteration

If the primary stream fails, we will fallback to another stream, supplied by the fallback_streamer.Let’s add another wrapper that transforms our data — for now we will just handle the “issue” from the previous layer, where we return None in the first data chunk when we switch streams.

class TransformingWrapper:
def __init__(self, stream):
self._stream = stream

def __aiter__(self):
return self

async def __anext__(self):
async for data in self._stream:
return data or 123 # some default value

raise StopAsyncIteration

That was easy. The best part is how easy it is to chain our wrappers together:

async for number in TransformingWrapper(StreamRecoveringWrapper(third_party_generator(5), fallback_streamer)):
print(number)

For trivial applications, the strategies shown in this article may be overkill. After all, you can probably think of ways to implement the code in this article in a simpler way. However, it may serve you well when you need to significantly change the properties of a stream, maybe even needing to keep track of state in the stream wrapper.

If you enjoyed this article, please subscribe to my newsletter for more like it.

--

--