I told OpenAI to generate an increasingly complex stream… here is the result.

Working with Asynchronous Streams in Python

VidaVolta
4 min readDec 26, 2023

--

async def number_generator(n):
for i in range(n):
yield i
await asyncio.sleep(1)


async def main():
async for number in number_generator(5):
print(number)

asyncio.run(main())

# Prints 0 1 2 3 4 with a one second delay between numbers
async def third_party_generator(n):
for i in range(n):
# sometimes, this generator throws exceptions...
if random.choices(['True', 'False']):
raise Exception("My Service is down!")

yield i
await asyncio.sleep(1)
async def main():
async for number in third_party_generator(5):
try:
print(number)
except Exception as e:
print("I handled the exception.. but now what?")
    generated = []
exception = None
async for number in third_party_generator(5):
try:
generated.append(number)
print(number)
except Exception as e:
exception = e

if exception is not None:
async for number in some_other_generator(generated):
print(number)
class StreamWrapper:
def __init__(self, stream: AsyncGenerator[int, Any]):
self._stream = stream

def __aiter__(self):
return self

async def __anext__(self):
# add some special logic here
async for data in self._stream:
return data

raise StopAsyncIteration
async for number in StreamWrapper(third_party_generator(5)):
print(number)
class StreamRecoveringWrapper:
def __init__(self, stream, fallback_streamer):
self._stream = stream
self._fallback_streamer = fallback_streamer
self._streamed = []

def __aiter__(self):
return self

async def __anext__(self):
try:
async for data in self._stream:
self._streamed.append(data)
return data
except Exception as e:
self._stream = self._fallback_streamer.get_new_stream(self._streamed)
return None # uh oh, returning None! Doesn't matter, we will add another layer that can handle this!

raise StopAsyncIteration
class TransformingWrapper:
def __init__(self, stream):
self._stream = stream

def __aiter__(self):
return self

async def __anext__(self):
async for data in self._stream:
return data or 123 # some default value

raise StopAsyncIteration
async for number in TransformingWrapper(StreamRecoveringWrapper(third_party_generator(5), fallback_streamer)):
print(number)

--

--

VidaVolta
VidaVolta

Written by VidaVolta

Software engineering articles.

No responses yet