Chapter 6.6 — Async / Await Verification
Coroutines as comonad coalgebras + runtime contracts on async functions.
await e compiles to
EffectWitness("await:e@epoch_M"), every resumption
to a TransportProof, and the cubical heap models
mutation between awaits. The runtime layer is
deppy.async_verify's decorators
(@async_requires, @async_ensures,
@async_yields, @async_invariant,
@terminates) which raise
ContractViolation on predicate failure.
Pulse has neither — it doesn't model cooperative concurrency.
ContractViolation on failure; @terminates
wraps the coroutine in a real asyncio.wait_for
deadline and raises TerminationViolation when the
deadline elapses.
@async_requires(pred)— pre-await check@async_ensures(pred)— post-await check (result available asresultarg / kwarg)@async_yields(pred)— invariant on every yielded value of an async generator@async_invariant(pred)— entry + exit invariant on the coroutine's call@terminates(deadline=seconds)— bounds the runtime; raises on timeoutpure("result > 0")— lazy predicate that carries its source for Z3 / Lean encoding
Import from deppy.async_verify or directly from
deppy. Predicates can be callables, strings, or
pure(...) wrappers — the runtime chooses the
right calling convention automatically.
Python's asyncio Model
Python's asyncio is a cooperative concurrency framework. Unlike
threads (which are preempted by the OS), coroutines voluntarily
yield at await points. This gives us a powerful
verification advantage: between two await points, a coroutine
runs atomically — no other coroutine can interfere.
| Property | Threading | asyncio |
|---|---|---|
| Scheduling | Preemptive (OS) | Cooperative (event loop) |
| Yield points | Any bytecode boundary | Only at await |
| Data races | Possible between any bytecodes | Only across await boundaries |
| Lock needed? | Yes, for shared mutable state | Only if state mutated across await |
| Parallelism | Limited by GIL | None (single-threaded) |
await expressions, a coroutine has exclusive
access to all Python state — no other coroutine is running. Deppy exploits this to simplify proofs: within an "atomic block" (code between
awaits), no separation-logic framing is needed.
Coroutines as Suspended Computations
An async def function returns a coroutine object —
a suspended computation that runs only when awaited. Deppy models this as
an effect annotation:
from deppy.async_verify import async_requires, async_ensures, Async
# Regular function: runs immediately, returns a value
@requires(pts_to(x, 'v'))
@ensures(lambda r: pts_to(x, 'v + 1'))
def incr(x: Ref[int]):
x.value += 1
# Async function: returns a coroutine, runs when awaited
@async_requires(pts_to(x, 'v'))
@async_ensures(lambda r: pts_to(x, 'v + 1'))
async def async_incr(x: Ref[int]):
x.value += 1
# Type perspective:
# incr : (Ref[int]) → unit {pts_to pre/post}
# async_incr : (Ref[int]) → Async[unit] {pts_to pre/post}
The Async[T] type tells Deppy that this function may yield
control at await points. Between yields, the function has
exclusive ownership of its heap resources.
Verification of Async Code
Property 1: No Deadlock (Eventual Completion)
Deppy can verify that an async function eventually completes — it doesn't
await forever on something that never resolves:
@async_requires(emp)
@async_ensures(lambda result: pure(result == 42))
@terminates
async def always_completes() -> int:
"""This coroutine always finishes."""
await asyncio.sleep(0.1) # sleep resolves after 0.1s
return 42
Verified — terminates, returns 42
@async_requires(emp)
async def might_deadlock(future: asyncio.Future):
"""Awaits an arbitrary future — may never complete."""
await future # ✗ Cannot prove this resolves!
Verification failed — cannot prove termination
Property 2: Task Ordering Constraints
@async_requires(emp)
@async_ensures(lambda _: pure('step_a_before_step_b'))
async def ordered_pipeline():
"""Prove that A completes before B starts."""
result_a = await step_a() # A completes here
result_b = await step_b(result_a) # B starts after A
# Sequential await ⟹ A happens-before B
Verified — sequential ordering guaranteed
Property 3: Resource Cleanup (async context managers)
@async_requires(emp)
@async_ensures(lambda _: pure('connection_closed'))
async def safe_query(url: str):
"""Async context manager ensures cleanup."""
async with aiohttp.ClientSession() as session:
# session is open here — owned by this coroutine
async with session.get(url) as response:
data = await response.json()
# session.__aexit__ called — connection closed
# Deppy verifies: __aexit__ always runs (even on exception)
return data
Verified — resource cleanup guaranteed
asyncio.Lock vs threading.Lock
Python has two lock types: threading.Lock (for threads) and
asyncio.Lock (for coroutines). They have different semantics
and Deppy verifies each appropriately:
threading.Lock
# Blocks the OS thread
lock = threading.Lock()
with lock:
# exclusive access
# other threads sleep
counter += 1
Verified via: separation logic + preemptive interleaving model
asyncio.Lock
async def _example():
# Yields to event loop
lock = asyncio.Lock()
async with lock:
# exclusive access
# other coroutines run
counter += 1
Verified via: separation logic + cooperative yield model
threading.Lock in async code — it blocks the event loop.
Never use asyncio.Lock from threads — it's not thread-safe.
Deppy flags these misuses as type errors.
# ✗ Type error: threading.Lock used in async function
async def bad():
lock = threading.Lock()
with lock: # blocks event loop!
await something() # can't yield while holding thread lock
Type error — threading.Lock in async context
Example: Async Web Crawler
A verified web crawler that visits each URL at most once and respects a rate limit:
from deppy.async_verify import async_requires, async_ensures
from deppy.ghost import ghost_var, ghost_set
@guarantee("visits each URL at most once")
@guarantee("respects rate limit")
@async_requires(emp)
@async_ensures(lambda results:
pure('len(results) <= len(urls)') and
pure('all_unique(visited_urls(results))'))
async def crawl(urls: list[str], rate_limit: int) -> list[dict]:
"""Crawl URLs with verified uniqueness and rate limiting."""
visited: set[str] = set()
_ghost_visited = ghost_set() # ghost set for proof tracking
results = []
semaphore = asyncio.Semaphore(rate_limit)
async def fetch_one(url: str):
# Check-then-act is safe: cooperative scheduling means
# no other coroutine runs between check and add
if url in visited:
return
visited.add(url)
ghost_set.add(_ghost_visited, url)
# invariant: visited == _ghost_visited
async with semaphore: # at most rate_limit concurrent fetches
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
data = await resp.json()
results.append({'url': url, 'data': data})
tasks = [fetch_one(url) for url in urls]
await asyncio.gather(*tasks)
return results
Verified — unique visits, rate limit respected
Why Uniqueness Holds
if url in visited and visited.add(url) execute
in the same "atomic block" (no await between them).
visited
between the check and the add.
visited, it's never fetched again.
asyncio: between
await points, a coroutine runs atomically. In a threaded
implementation, this same check-then-act pattern would be a TOCTOU race
and would require a lock.
Example: Async Producer-Consumer
from asyncio import Queue
@async_requires(emp)
@async_ensures(lambda _:
pure('all items produced == all items consumed'))
async def producer_consumer():
queue: asyncio.Queue[int] = asyncio.Queue(maxsize=10)
_produced = ghost_var(0)
_consumed = ghost_var(0)
async def producer():
for i in range(100):
await queue.put(i)
ghost_update(_produced, lambda n: n + 1)
await queue.put(None) # sentinel
async def consumer():
while True:
item = await queue.get()
if item is None:
break
ghost_update(_consumed, lambda n: n + 1)
process(item)
await asyncio.gather(producer(), consumer())
# _produced == 100, _consumed == 100
# All items accounted for ✓
Verified — no items lost, no duplicate consumption
Structured Concurrency (TaskGroup)
Python 3.11+ introduced asyncio.TaskGroup for structured
concurrency. Deppy verifies that:
- All tasks complete before the group exits
- Exceptions from any task cancel all siblings
- Resource ownership is properly transferred to/from tasks
@async_requires(
array_pts_to(urls, 'url_list'))
@async_ensures(lambda results:
array_pts_to(urls, 'url_list') and
pure('len(results) == len(url_list)'))
async def fetch_all(urls: list[str]) -> list[str]:
"""Fetch all URLs concurrently with TaskGroup."""
results = [None] * len(urls)
async with asyncio.TaskGroup() as tg:
for i, url in enumerate(urls):
async def fetch(idx=i, u=url):
# Each task owns results[idx] — no overlap
async with aiohttp.ClientSession() as s:
async with s.get(u) as r:
results[idx] = await r.text()
tg.create_task(fetch())
# All tasks complete here — TaskGroup guarantees this
return results
Verified — all tasks complete, no index conflicts
results[idx].
Deppy verifies that different tasks write to different indices — a form of
spatial separation on array elements. This eliminates data races without locks.
Async Generators
Async generators combine async with yield, creating
lazy asynchronous streams. Deppy verifies them as a sequence of
yield-point contracts:
from deppy.async_verify import async_yields
@async_yields(int)
@guarantee("yields values in ascending order")
@guarantee("yields exactly n values")
async def async_range(start: int, stop: int):
"""Async generator with verified ordering."""
current = start
while current < stop:
# invariant: current ≥ start ∧ all previous yields < current
yield current
current += 1
# Consumer verifies it receives sorted values
async def consume():
prev = None
async for val in async_range(0, 100):
if prev is not None:
assert val > prev # guaranteed by generator contract
prev = val
Verified — ascending order, 100 values yielded
Proof Obligations for Async Generators
| Obligation | What Deppy Checks |
|---|---|
| Yield type | Each yield produces a value of the declared type |
| Ordering invariant | Values satisfy any ordering guarantee across yields |
| Termination | Generator eventually stops (if @terminates) |
| Cleanup | async finally blocks run on generator close |
Async Context Manager Verification
class VerifiedConnection:
"""Async context manager with verified resource lifecycle."""
@async_ensures(lambda self:
attr_to(self, '_conn', 'open_connection') **
attr_to(self, '_closed', False))
async def __aenter__(self):
self._conn = await open_connection()
self._closed = False
return self
@async_requires(
attr_to(self, '_conn', 'c') **
attr_to(self, '_closed', False))
@async_ensures(lambda _:
attr_to(self, '_closed', True) and
pure('connection_released'))
async def __aexit__(self, *exc):
await self._conn.close()
self._closed = True
return False
# Usage — Deppy verifies __aexit__ always runs
async def use_db():
async with VerifiedConnection() as conn:
await conn._conn.execute("SELECT 1")
# conn._closed is True here — guaranteed
Verified — connection always closed
Comparison with Pulse's Extraction
F*'s Pulse extracts verified code to sequential C. There is no notion of async/await, event loops, or cooperative scheduling. Deppy fills this gap:
| Feature | Pulse (F*) | Deppy |
|---|---|---|
| Target | Extracted C code | Python (verified in-place) |
| Concurrency model | Parallel composition (Steel) | Threading + asyncio |
| Async support | ❌ None | ✅ Full async/await verification |
| Event loop | ❌ N/A | ✅ Cooperative scheduling model |
| Structured concurrency | ❌ N/A | ✅ TaskGroup verification |
| Lock types | Steel locks (preemptive) | threading.Lock + asyncio.Lock |
Timeout Verification
@async_requires(emp)
@async_ensures(lambda result:
pure('result is not None or timeout_elapsed'))
async def fetch_with_timeout(url: str, timeout: float):
"""Fetch with verified timeout handling."""
try:
async with asyncio.timeout(timeout):
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
return await resp.text()
except asyncio.TimeoutError:
return None
# Deppy verifies: both paths produce valid results
# and all resources are cleaned up in both cases
Verified — handles timeout and success paths
asyncio.gather) and returns whichever
completes first. Prove that the result comes from one of the two URLs.
n operations per second. Use asyncio.Semaphore
and asyncio.sleep. Verify the rate constraint using a ghost variable
tracking operation timestamps.
async def broken():
results = []
for url in urls:
task = asyncio.create_task(fetch(url))
return results # results is empty!
Fix it using TaskGroup and write a specification that catches
the original bug.
Pipeline class with .pipe(stage) that chains async
transformation stages. Verify that: (a) stages execute in order, (b) the
output type of each stage matches the input type of the next, and (c) the
pipeline handles backpressure via bounded queues between stages.
asyncio.Lock. Which proof is simpler?
Why? What does this tell us about cooperative vs. preemptive concurrency?