async def
is a magic footgun¶Recently I took a deep dive into asynchronous python libraries and found a lot of them that used async def
methods in places that makes them loose a lot of their benefits.
See this example where we do I/O (e.g. reading or writing a file, using a socket, waiting for a server response, ...) asynchronously in the background while doing some other stuff with the thread. The time required for IO should be completely shadowed by our computation, without even using additional resources, because it is handled asynchronously by the OS, drivers and hardware. Brilliant!
import time
import asyncio
import gc
WORK_TIME = 2
WRITE_TIME = 1
logstart = None
def log(*args, start: bool = False, color="0", **kwargs):
global logstart
current = time.time()
if start:
logstart = current
print(f"\x1b[{color}m{current - logstart:2.2f}", *args, "\x1b[0m", **kwargs)
write_lock = asyncio.Lock()
# Don't look at these function too hard before reading the rest of the text. Contains spoilers.
def os_async_write(i, lock=False):
log(f"[W{i}] >>> OS started writing {i}", color="94")
if not lock:
async def _wait_for_write(sleep_awaitable):
await sleep_awaitable
log(f"[W{i}] <<< OS done writing {i}", color="94")
return _wait_for_write(async_sleep(WRITE_TIME))
lock_awaitable = write_lock.acquire()
async def _wait_for_write(lock_awaitable):
await lock_awaitable
try:
await async_sleep(WRITE_TIME)
finally:
if lock:
write_lock.release()
return _wait_for_write(lock_awaitable)
def async_sleep(t):
# asyncio.sleep() is a coroutine, so it also messes with the time, because the delay does not start when the coroutine is created
future = asyncio.Future()
asyncio.get_running_loop().call_later(t, future.set_result, None)
return future
async def write_output_async(i, lock=False):
log(f"[O{i}] >> Tell OS to write output {i}")
await os_async_write(i, lock=lock)
log(f"[O{i}] << OS writing completed {i}")
def do_cpu_work(i):
log(f"[X{i}] > Starting 10s CPU work {i}", color="93")
time.sleep(WORK_TIME)
gc.collect()
log(f"[X{i}] < Done with 10s CPU work {i}", color="93")
async def example1_await_immediately():
log("Starting", start=True)
for i in range(3):
do_cpu_work(i)
await write_output_async(i)
log("Done")
await example1_await_immediately()
0.00 Starting 0.00 [X0] > Starting 10s CPU work 0 2.02 [X0] < Done with 10s CPU work 0 2.02 [O0] >> Tell OS to write output 0 2.02 [W0] >>> OS started writing 0 3.03 [W0] <<< OS done writing 0 3.03 [O0] << OS writing completed 0 3.03 [X1] > Starting 10s CPU work 1 5.05 [X1] < Done with 10s CPU work 1 5.05 [O1] >> Tell OS to write output 1 5.05 [W1] >>> OS started writing 1 6.07 [W1] <<< OS done writing 1 6.07 [O1] << OS writing completed 1 6.07 [X2] > Starting 10s CPU work 2 8.09 [X2] < Done with 10s CPU work 2 8.09 [O2] >> Tell OS to write output 2 8.09 [W2] >>> OS started writing 2 9.10 [W2] <<< OS done writing 2 9.10 [O2] << OS writing completed 2 9.10 Done
Oh, never mind. There is an obvious error in this example. Because the method is awaited immediately, we cannot do any work during the IO.
Easy fix: Just call the function, but await
it before the next output is to be written. The IO is clearly finished then, so await will return immediately, saving us time.
async def example2_await_before_next_write():
log("Starting", start=True)
lastWrite = None
for i in range(3):
do_cpu_work(i)
if lastWrite:
await lastWrite
lastWrite = write_output_async(i)
await lastWrite
log("Done")
await example2_await_before_next_write()
0.00 Starting 0.00 [X0] > Starting 10s CPU work 0 2.02 [X0] < Done with 10s CPU work 0 2.02 [X1] > Starting 10s CPU work 1 4.04 [X1] < Done with 10s CPU work 1 4.04 [O0] >> Tell OS to write output 0 4.04 [W0] >>> OS started writing 0 5.04 [W0] <<< OS done writing 0 5.04 [O0] << OS writing completed 0 5.04 [X2] > Starting 10s CPU work 2 7.06 [X2] < Done with 10s CPU work 2 7.06 [O1] >> Tell OS to write output 1 7.06 [W1] >>> OS started writing 1 8.08 [W1] <<< OS done writing 1 8.08 [O1] << OS writing completed 1 8.08 [O2] >> Tell OS to write output 2 8.08 [W2] >>> OS started writing 2 9.08 [W2] <<< OS done writing 2 9.08 [O2] << OS writing completed 2 9.08 Done
Oh... Now the IO is running after our second CPU work instead of before, but we still haven't got any speed improvements.
asyncio.gather
and TaskGroup
¶Instead of waiting inside the loop for a white, that could have happened during our other computations, let's move the waiting outside the loop with asyncio.gather
async def example3_gather():
log("Starting", start=True)
writes = []
for i in range(3):
do_cpu_work(i)
writes.append(write_output_async(i))
await asyncio.gather(*writes)
log("Done")
await example3_gather()
0.00 Starting 0.00 [X0] > Starting 10s CPU work 0 2.02 [X0] < Done with 10s CPU work 0 2.02 [X1] > Starting 10s CPU work 1 4.04 [X1] < Done with 10s CPU work 1 4.04 [X2] > Starting 10s CPU work 2 6.07 [X2] < Done with 10s CPU work 2 6.07 [O0] >> Tell OS to write output 0 6.07 [W0] >>> OS started writing 0 6.07 [O1] >> Tell OS to write output 1 6.07 [W1] >>> OS started writing 1 6.07 [O2] >> Tell OS to write output 2 6.07 [W2] >>> OS started writing 2 7.07 [W0] <<< OS done writing 0 7.07 [O0] << OS writing completed 0 7.07 [W1] <<< OS done writing 1 7.07 [O1] << OS writing completed 1 7.07 [W2] <<< OS done writing 2 7.07 [O2] << OS writing completed 2 7.07 Done
This way we saved some time, because all our writes were executed in parallel.
But they weren't started until all processes were completed, so a lot of time was wasted.
The same can be achieved by using a TaskGroup
async def example4_taskgroup_create_task_outside_loop():
log("Starting", start=True)
async with asyncio.TaskGroup() as tg:
for i in range(3):
do_cpu_work(i)
tg.create_task(write_output_async(i))
log("Done")
await example4_taskgroup_create_task_outside_loop()
0.00 Starting 0.00 [X0] > Starting 10s CPU work 0 2.02 [X0] < Done with 10s CPU work 0 2.02 [X1] > Starting 10s CPU work 1 4.04 [X1] < Done with 10s CPU work 1 4.04 [X2] > Starting 10s CPU work 2 6.06 [X2] < Done with 10s CPU work 2 6.06 [O0] >> Tell OS to write output 0 6.06 [W0] >>> OS started writing 0 6.06 [O1] >> Tell OS to write output 1 6.06 [W1] >>> OS started writing 1 6.06 [O2] >> Tell OS to write output 2 6.06 [W2] >>> OS started writing 2 7.07 [W0] <<< OS done writing 0 7.07 [O0] << OS writing completed 0 7.07 [W1] <<< OS done writing 1 7.07 [O1] << OS writing completed 1 7.07 [W2] <<< OS done writing 2 7.07 [O2] << OS writing completed 2 7.07 Done
This left all writing to the very end and then wrote all output in parallel again, which might not be realistic, though.
If we write to a socket or append a linear file, only a single write can happen at a time. Lets simulate that here by requiring a lock around the simulated writing.
async def example5_taskgroup_create_task_outside_loop_with_lock():
log("Starting", start=True)
async with asyncio.TaskGroup() as tg:
for i in range(3):
do_cpu_work(i)
tg.create_task(write_output_async(i, lock=True))
log("Done")
await example5_taskgroup_create_task_outside_loop_with_lock()
0.00 Starting 0.00 [X0] > Starting 10s CPU work 0 2.02 [X0] < Done with 10s CPU work 0 2.02 [X1] > Starting 10s CPU work 1 4.04 [X1] < Done with 10s CPU work 1 4.04 [X2] > Starting 10s CPU work 2 6.06 [X2] < Done with 10s CPU work 2 6.06 [O0] >> Tell OS to write output 0 6.06 [W0] >>> OS started writing 0 6.06 [O1] >> Tell OS to write output 1 6.06 [W1] >>> OS started writing 1 6.06 [O2] >> Tell OS to write output 2 6.06 [W2] >>> OS started writing 2 7.07 [O0] << OS writing completed 0 8.08 [O1] << OS writing completed 1 9.09 [O2] << OS writing completed 2 9.09 Done
And all our speedup is gone again.
Why is this happening? The asyncio
documentation has the explanation:
Note that simply calling a coroutine will not schedule it to be executed:
>>> main() <coroutine object main at 0x1053bb7c8>
Even though Python provides the exact same async
/await
syntax as JavaScript it works a lot different in JavaScript:
The body of an async function can be thought of as being split by zero or more await expressions. Top-level code, up to and including the first await expression (if there is one), is run synchronously. In this way, an async function without an await expression will run synchronously. If there is an await expression inside the function body, however, the async function will always complete asynchronously.
This even applies to the JavaScript Promise
implementation:
This promise is already resolved at the time when it's created (because the
resolveOuter
is called synchronously)
So, simply calling an async
method does not start its execution. For it to be executed, it has to be awaited or turned into a task. Directly awaiting didn't help in the first attempts, so let's turn the coroutine into a task.
async def example6_await_before_next_write_with_task():
log("Starting", start=True)
lastWrite = None
for i in range(3):
do_cpu_work(i)
if lastWrite:
await lastWrite
lastWrite = asyncio.create_task(write_output_async(i))
await lastWrite
log("Done")
await example6_await_before_next_write_with_task()
0.00 Starting 0.00 [X0] > Starting 10s CPU work 0 2.02 [X0] < Done with 10s CPU work 0 2.02 [X1] > Starting 10s CPU work 1 4.04 [X1] < Done with 10s CPU work 1 4.04 [O0] >> Tell OS to write output 0 4.04 [W0] >>> OS started writing 0 5.05 [W0] <<< OS done writing 0 5.05 [O0] << OS writing completed 0 5.05 [X2] > Starting 10s CPU work 2 7.07 [X2] < Done with 10s CPU work 2 7.07 [O1] >> Tell OS to write output 1 7.07 [W1] >>> OS started writing 1 8.09 [W1] <<< OS done writing 1 8.09 [O1] << OS writing completed 1 8.09 [O2] >> Tell OS to write output 2 8.09 [W2] >>> OS started writing 2 9.10 [W2] <<< OS done writing 2 9.10 [O2] << OS writing completed 2 9.10 Done
Still no improvements. The same result as example2_await_before_next_write
: No asynchronous writing.
The task was created, but because nothing else was ever awaited, the event loop never started executing our task, so it couldn't start our IO asynchronously.
Let's force our event loop to switch to a different task after creating it. This can be achieved with await asyncio.sleep(0)
(or any other await
call).
The Tasks still have to be created, though.
async def example7_sleep_with_task():
log("Starting", start=True)
lastWrite = None
for i in range(3):
do_cpu_work(i)
if lastWrite:
await lastWrite
lastWrite = asyncio.create_task(write_output_async(i))
await asyncio.sleep(0)
log("Awaiting last write")
await lastWrite
log("Done")
await example7_sleep_with_task()
0.00 Starting 0.00 [X0] > Starting 10s CPU work 0 2.02 [X0] < Done with 10s CPU work 0 2.02 [O0] >> Tell OS to write output 0 2.02 [W0] >>> OS started writing 0 2.02 [X1] > Starting 10s CPU work 1 4.04 [X1] < Done with 10s CPU work 1 4.04 [W0] <<< OS done writing 0 4.04 [O0] << OS writing completed 0 4.04 [O1] >> Tell OS to write output 1 4.04 [W1] >>> OS started writing 1 4.04 [X2] > Starting 10s CPU work 2 6.06 [X2] < Done with 10s CPU work 2 6.06 [W1] <<< OS done writing 1 6.06 [O1] << OS writing completed 1 6.06 [O2] >> Tell OS to write output 2 6.06 [W2] >>> OS started writing 2 6.06 Awaiting last write 7.07 [W2] <<< OS done writing 2 7.07 [O2] << OS writing completed 2 7.07 Done
Finally! It took a lot of work to make this async
function actually do work asynchronously, and the resulting code is everything but pretty.
We have to create tasks now and call asyncio.sleep
and then hope that it switches to the coroutine we want.
Creating a lot of task can become a significant overhead in comparison to just awaiting a coroutine as well. [citation needed]
Using an async for
loop with an async
iterator is also not enough to yield control flow to the task.
It only works if the iterator also uses await
.
We can also use a TaskGroup
to make sure all our tasks are awaited.
async def to_aiter(gen):
for i in gen:
yield i
await asyncio.sleep(0) # Without this we don't get asynchronous writes
async def example9_task_with_async_for():
log("Starting", start=True)
async with asyncio.TaskGroup() as tg:
async for i in to_aiter(range(3)):
do_cpu_work(i)
tg.create_task(write_output_async(i, lock=True))
log("Done")
await example9_task_with_async_for()
0.00 Starting 0.00 [X0] > Starting 10s CPU work 0 2.02 [X0] < Done with 10s CPU work 0 2.02 [O0] >> Tell OS to write output 0 2.02 [W0] >>> OS started writing 0 2.02 [X1] > Starting 10s CPU work 1 4.04 [X1] < Done with 10s CPU work 1 4.05 [O1] >> Tell OS to write output 1 4.05 [W1] >>> OS started writing 1 4.05 [X2] > Starting 10s CPU work 2 6.07 [X2] < Done with 10s CPU work 2 6.07 [O2] >> Tell OS to write output 2 6.07 [W2] >>> OS started writing 2 6.07 [O0] << OS writing completed 0 7.07 [O1] << OS writing completed 1 8.09 [O2] << OS writing completed 2 8.09 Done
This moves all the output completions to the end of the taskgroup, though, which might be undesirable and also increases the total runtime.
The problem is, that our async
function only creates a coroutine
object, but does not immediately execute the first synchronous part of it, which includes initiating the write.
Instead of an async
function, a normal function is created, that directly initiates the asynchronous write calls and then calls an internal coroutine passing the Awaitable
from the OS call.
Because the fixed function is not async
it cannot await
the internal coroutine. But it can return it as a result, so the caller can await it. Since the first statement is an await
it does not matter, that the coroutine isn't called immediately.
def write_output_awaitable(i, lock=False):
log(f"[O{i}] << Tell OS to write output {i}")
awaitable = os_async_write(i, lock=lock)
async def _wait_for_inner(awaitable):
await awaitable
log(f"[O{i}] << OS writing completed {i}")
return _wait_for_inner(awaitable)
Note that this only fixes it, if the called function also is not an async
function, but just returns an Awaitable
instead. The called function also has to initiate the write call immediately.
async def example2_await_before_next_write_fixed_function():
log("Starting", start=True)
lastWrite = None
for i in range(3):
do_cpu_work(i)
if lastWrite:
await lastWrite
lastWrite = write_output_awaitable(i)
log("Awaiting last write")
await lastWrite
log("Done")
await example2_await_before_next_write_fixed_function()
0.00 Starting 0.00 [X0] > Starting 10s CPU work 0 2.02 [X0] < Done with 10s CPU work 0 2.02 [O0] << Tell OS to write output 0 2.02 [W0] >>> OS started writing 0 2.02 [X1] > Starting 10s CPU work 1 4.04 [X1] < Done with 10s CPU work 1 4.04 [W0] <<< OS done writing 0 4.04 [O0] << OS writing completed 0 4.04 [O1] << Tell OS to write output 1 4.04 [W1] >>> OS started writing 1 4.04 [X2] > Starting 10s CPU work 2 6.06 [X2] < Done with 10s CPU work 2 6.06 [W1] <<< OS done writing 1 6.06 [O1] << OS writing completed 1 6.06 [O2] << Tell OS to write output 2 6.06 [W2] >>> OS started writing 2 6.06 Awaiting last write 7.08 [W2] <<< OS done writing 2 7.08 [O2] << OS writing completed 2 7.08 Done
Here is an attempt at writing a decorator that immediately invokes the coroutine to execute until its first await
:
(I'm sure someone more knowledgeable than me can come up with a much prettier/better implementation)
def js_like_await(coro):
class JSLikeCoroutine:
def __init__(self, awaitable, coro_obj):
self.awaitable = awaitable
self.coro_obj = coro_obj
def __await__(self):
try:
yield self.awaitable
while True:
yield self.coro_obj.send(None)
except StopIteration as e:
return e.value
def coroutine_with_js_like_await(*args, **kwargs):
coro_obj = coro(*args, **kwargs)
future = None
try:
future = coro_obj.send(None)
return JSLikeCoroutine(future, coro_obj)
except StopIteration as e:
result = asyncio.Future()
result.set_result(e.value)
return result
return coroutine_with_js_like_await
With this decorator, we can fix our write_output_async
to be useful without requiring a task or event loop.
fixed_write_output_async = js_like_await(write_output_async)
async def example10_fixed_write_with_gather():
log("Starting", start=True)
lastWrite = None
for i in range(3):
do_cpu_work(i)
if lastWrite:
await lastWrite
lastWrite = fixed_write_output_async(i)
log("Awaiting last write")
await lastWrite
log("Done")
await example10_fixed_write_with_gather()
0.00 Starting 0.00 [X0] > Starting 10s CPU work 0 2.02 [X0] < Done with 10s CPU work 0 2.02 [O0] >> Tell OS to write output 0 2.02 [W0] >>> OS started writing 0 2.02 [X1] > Starting 10s CPU work 1 4.05 [X1] < Done with 10s CPU work 1 4.05 [W0] <<< OS done writing 0 4.05 [O0] << OS writing completed 0 4.05 [O1] >> Tell OS to write output 1 4.05 [W1] >>> OS started writing 1 4.05 [X2] > Starting 10s CPU work 2 6.07 [X2] < Done with 10s CPU work 2 6.07 [W1] <<< OS done writing 1 6.07 [O1] << OS writing completed 1 6.07 [O2] >> Tell OS to write output 2 6.07 [W2] >>> OS started writing 2 6.07 Awaiting last write 7.08 [W2] <<< OS done writing 2 7.08 [O2] << OS writing completed 2 7.08 Done