Chapter 6.6 — Async / Await Verification

Coroutines as comonad coalgebras + runtime contracts on async functions.

Two layers of async verification. The static layer is the coalgebraic one from Part 3.4: every await e compiles to EffectWitness("await:e@epoch_M"), every resumption to a TransportProof, and the cubical heap models mutation between awaits. The runtime layer is deppy.async_verify's decorators (@async_requires, @async_ensures, @async_yields, @async_invariant, @terminates) which raise ContractViolation on predicate failure. Pulse has neither — it doesn't model cooperative concurrency.
Real APIs (no stubs). Every decorator and helper below is implemented and runtime-enforced. Pre/post/ yield/invariant checks all call the supplied predicate at the relevant moment of the coroutine's lifecycle and raise ContractViolation on failure; @terminates wraps the coroutine in a real asyncio.wait_for deadline and raises TerminationViolation when the deadline elapses.
  • @async_requires(pred) — pre-await check
  • @async_ensures(pred) — post-await check (result available as result arg / kwarg)
  • @async_yields(pred) — invariant on every yielded value of an async generator
  • @async_invariant(pred) — entry + exit invariant on the coroutine's call
  • @terminates(deadline=seconds) — bounds the runtime; raises on timeout
  • pure("result > 0") — lazy predicate that carries its source for Z3 / Lean encoding

Import from deppy.async_verify or directly from deppy. Predicates can be callables, strings, or pure(...) wrappers — the runtime chooses the right calling convention automatically.

This chapter is unique to Deppy. F*'s Pulse does not model async/await, event loops, or cooperative multitasking. The concepts here are original to the Deppy framework.

Python's asyncio Model

Python's asyncio is a cooperative concurrency framework. Unlike threads (which are preempted by the OS), coroutines voluntarily yield at await points. This gives us a powerful verification advantage: between two await points, a coroutine runs atomically — no other coroutine can interfere.

PropertyThreadingasyncio
Scheduling Preemptive (OS) Cooperative (event loop)
Yield points Any bytecode boundary Only at await
Data races Possible between any bytecodes Only across await boundaries
Lock needed? Yes, for shared mutable state Only if state mutated across await
Parallelism Limited by GIL None (single-threaded)
Between two await expressions, a coroutine has exclusive access to all Python state — no other coroutine is running. Deppy exploits this to simplify proofs: within an "atomic block" (code between awaits), no separation-logic framing is needed.

Coroutines as Suspended Computations

An async def function returns a coroutine object — a suspended computation that runs only when awaited. Deppy models this as an effect annotation:

from deppy.async_verify import async_requires, async_ensures, Async

# Regular function: runs immediately, returns a value
@requires(pts_to(x, 'v'))
@ensures(lambda r: pts_to(x, 'v + 1'))
def incr(x: Ref[int]):
    x.value += 1

# Async function: returns a coroutine, runs when awaited
@async_requires(pts_to(x, 'v'))
@async_ensures(lambda r: pts_to(x, 'v + 1'))
async def async_incr(x: Ref[int]):
    x.value += 1

# Type perspective:
# incr  : (Ref[int]) → unit   {pts_to pre/post}
# async_incr : (Ref[int]) → Async[unit]  {pts_to pre/post}

The Async[T] type tells Deppy that this function may yield control at await points. Between yields, the function has exclusive ownership of its heap resources.

Verification of Async Code

Property 1: No Deadlock (Eventual Completion)

Deppy can verify that an async function eventually completes — it doesn't await forever on something that never resolves:

@async_requires(emp)
@async_ensures(lambda result: pure(result == 42))
@terminates
async def always_completes() -> int:
    """This coroutine always finishes."""
    await asyncio.sleep(0.1)  # sleep resolves after 0.1s
    return 42
Verified — terminates, returns 42
@async_requires(emp)
async def might_deadlock(future: asyncio.Future):
    """Awaits an arbitrary future — may never complete."""
    await future  # ✗ Cannot prove this resolves!
Verification failed — cannot prove termination

Property 2: Task Ordering Constraints

@async_requires(emp)
@async_ensures(lambda _: pure('step_a_before_step_b'))
async def ordered_pipeline():
    """Prove that A completes before B starts."""
    result_a = await step_a()   # A completes here
    result_b = await step_b(result_a)  # B starts after A
    # Sequential await ⟹ A happens-before B
Verified — sequential ordering guaranteed

Property 3: Resource Cleanup (async context managers)

@async_requires(emp)
@async_ensures(lambda _: pure('connection_closed'))
async def safe_query(url: str):
    """Async context manager ensures cleanup."""
    async with aiohttp.ClientSession() as session:
        # session is open here — owned by this coroutine
        async with session.get(url) as response:
            data = await response.json()
    # session.__aexit__ called — connection closed
    # Deppy verifies: __aexit__ always runs (even on exception)
    return data
Verified — resource cleanup guaranteed

asyncio.Lock vs threading.Lock

Python has two lock types: threading.Lock (for threads) and asyncio.Lock (for coroutines). They have different semantics and Deppy verifies each appropriately:

threading.Lock

# Blocks the OS thread
lock = threading.Lock()
with lock:
    # exclusive access
    # other threads sleep
    counter += 1

Verified via: separation logic + preemptive interleaving model

asyncio.Lock

async def _example():
    # Yields to event loop
    lock = asyncio.Lock()
    async with lock:
        # exclusive access
        # other coroutines run
        counter += 1

Verified via: separation logic + cooperative yield model

Never use threading.Lock in async code — it blocks the event loop. Never use asyncio.Lock from threads — it's not thread-safe. Deppy flags these misuses as type errors.
# ✗ Type error: threading.Lock used in async function
async def bad():
    lock = threading.Lock()
    with lock:      # blocks event loop!
        await something()  # can't yield while holding thread lock
Type error — threading.Lock in async context

Example: Async Web Crawler

A verified web crawler that visits each URL at most once and respects a rate limit:

from deppy.async_verify import async_requires, async_ensures
from deppy.ghost import ghost_var, ghost_set

@guarantee("visits each URL at most once")
@guarantee("respects rate limit")
@async_requires(emp)
@async_ensures(lambda results:
    pure('len(results) <= len(urls)') and
    pure('all_unique(visited_urls(results))'))
async def crawl(urls: list[str], rate_limit: int) -> list[dict]:
    """Crawl URLs with verified uniqueness and rate limiting."""
    visited: set[str] = set()
    _ghost_visited = ghost_set()  # ghost set for proof tracking
    results = []
    semaphore = asyncio.Semaphore(rate_limit)

    async def fetch_one(url: str):
        # Check-then-act is safe: cooperative scheduling means
        # no other coroutine runs between check and add
        if url in visited:
            return
        visited.add(url)
        ghost_set.add(_ghost_visited, url)
        # invariant: visited == _ghost_visited

        async with semaphore:  # at most rate_limit concurrent fetches
            async with aiohttp.ClientSession() as session:
                async with session.get(url) as resp:
                    data = await resp.json()
                    results.append({'url': url, 'data': data})

    tasks = [fetch_one(url) for url in urls]
    await asyncio.gather(*tasks)
    return results
Verified — unique visits, rate limit respected

Why Uniqueness Holds

1
if url in visited and visited.add(url) execute in the same "atomic block" (no await between them).
2
In cooperative scheduling, no other coroutine can modify visited between the check and the add.
3
Therefore, once a URL is in visited, it's never fetched again.
This proof exploits a key property of asyncio: between await points, a coroutine runs atomically. In a threaded implementation, this same check-then-act pattern would be a TOCTOU race and would require a lock.

Example: Async Producer-Consumer

from asyncio import Queue

@async_requires(emp)
@async_ensures(lambda _:
    pure('all items produced == all items consumed'))
async def producer_consumer():
    queue: asyncio.Queue[int] = asyncio.Queue(maxsize=10)
    _produced = ghost_var(0)
    _consumed = ghost_var(0)

    async def producer():
        for i in range(100):
            await queue.put(i)
            ghost_update(_produced, lambda n: n + 1)
        await queue.put(None)  # sentinel

    async def consumer():
        while True:
            item = await queue.get()
            if item is None:
                break
            ghost_update(_consumed, lambda n: n + 1)
            process(item)

    await asyncio.gather(producer(), consumer())
    # _produced == 100, _consumed == 100
    # All items accounted for ✓
Verified — no items lost, no duplicate consumption

Structured Concurrency (TaskGroup)

Python 3.11+ introduced asyncio.TaskGroup for structured concurrency. Deppy verifies that:

  1. All tasks complete before the group exits
  2. Exceptions from any task cancel all siblings
  3. Resource ownership is properly transferred to/from tasks
@async_requires(
    array_pts_to(urls, 'url_list'))
@async_ensures(lambda results:
    array_pts_to(urls, 'url_list') and
    pure('len(results) == len(url_list)'))
async def fetch_all(urls: list[str]) -> list[str]:
    """Fetch all URLs concurrently with TaskGroup."""
    results = [None] * len(urls)

    async with asyncio.TaskGroup() as tg:
        for i, url in enumerate(urls):
            async def fetch(idx=i, u=url):
                # Each task owns results[idx] — no overlap
                async with aiohttp.ClientSession() as s:
                    async with s.get(u) as r:
                        results[idx] = await r.text()
            tg.create_task(fetch())
    # All tasks complete here — TaskGroup guarantees this
    return results
Verified — all tasks complete, no index conflicts
Each task writes to a unique index results[idx]. Deppy verifies that different tasks write to different indices — a form of spatial separation on array elements. This eliminates data races without locks.

Async Generators

Async generators combine async with yield, creating lazy asynchronous streams. Deppy verifies them as a sequence of yield-point contracts:

from deppy.async_verify import async_yields

@async_yields(int)
@guarantee("yields values in ascending order")
@guarantee("yields exactly n values")
async def async_range(start: int, stop: int):
    """Async generator with verified ordering."""
    current = start
    while current < stop:
        # invariant: current ≥ start ∧ all previous yields < current
        yield current
        current += 1

# Consumer verifies it receives sorted values
async def consume():
    prev = None
    async for val in async_range(0, 100):
        if prev is not None:
            assert val > prev  # guaranteed by generator contract
        prev = val
Verified — ascending order, 100 values yielded

Proof Obligations for Async Generators

ObligationWhat Deppy Checks
Yield type Each yield produces a value of the declared type
Ordering invariant Values satisfy any ordering guarantee across yields
Termination Generator eventually stops (if @terminates)
Cleanup async finally blocks run on generator close

Async Context Manager Verification

class VerifiedConnection:
    """Async context manager with verified resource lifecycle."""

    @async_ensures(lambda self:
        attr_to(self, '_conn', 'open_connection') **
        attr_to(self, '_closed', False))
    async def __aenter__(self):
        self._conn = await open_connection()
        self._closed = False
        return self

    @async_requires(
        attr_to(self, '_conn', 'c') **
        attr_to(self, '_closed', False))
    @async_ensures(lambda _:
        attr_to(self, '_closed', True) and
        pure('connection_released'))
    async def __aexit__(self, *exc):
        await self._conn.close()
        self._closed = True
        return False

# Usage — Deppy verifies __aexit__ always runs
async def use_db():
    async with VerifiedConnection() as conn:
        await conn._conn.execute("SELECT 1")
    # conn._closed is True here — guaranteed
Verified — connection always closed

Comparison with Pulse's Extraction

F*'s Pulse extracts verified code to sequential C. There is no notion of async/await, event loops, or cooperative scheduling. Deppy fills this gap:

FeaturePulse (F*)Deppy
Target Extracted C code Python (verified in-place)
Concurrency model Parallel composition (Steel) Threading + asyncio
Async support ❌ None ✅ Full async/await verification
Event loop ❌ N/A ✅ Cooperative scheduling model
Structured concurrency ❌ N/A ✅ TaskGroup verification
Lock types Steel locks (preemptive) threading.Lock + asyncio.Lock
The absence of async/await in Pulse is not a limitation of F* — it's because Pulse targets C, which has no coroutines. Deppy's ability to verify async Python is a direct consequence of targeting a language with native coroutine support.

Timeout Verification

@async_requires(emp)
@async_ensures(lambda result:
    pure('result is not None or timeout_elapsed'))
async def fetch_with_timeout(url: str, timeout: float):
    """Fetch with verified timeout handling."""
    try:
        async with asyncio.timeout(timeout):
            async with aiohttp.ClientSession() as session:
                async with session.get(url) as resp:
                    return await resp.text()
    except asyncio.TimeoutError:
        return None
    # Deppy verifies: both paths produce valid results
    # and all resources are cleaned up in both cases
Verified — handles timeout and success paths
Exercise 6.5.1: Write a verified async function that fetches two URLs in parallel (using asyncio.gather) and returns whichever completes first. Prove that the result comes from one of the two URLs.
Exercise 6.5.2: Implement an async rate limiter that allows at most n operations per second. Use asyncio.Semaphore and asyncio.sleep. Verify the rate constraint using a ghost variable tracking operation timestamps.
Exercise 6.5.3: Write an async producer that generates Fibonacci numbers as an async generator. Verify: (a) each yielded value is the sum of the previous two, (b) values are strictly increasing after the first two.
Exercise 6.5.4: Explain why this code has a subtle bug:
async def broken():
    results = []
    for url in urls:
        task = asyncio.create_task(fetch(url))
    return results  # results is empty!
Fix it using TaskGroup and write a specification that catches the original bug.
Exercise 6.5.5 (Challenge): Implement a verified async Pipeline class with .pipe(stage) that chains async transformation stages. Verify that: (a) stages execute in order, (b) the output type of each stage matches the input type of the next, and (c) the pipeline handles backpressure via bounded queues between stages.
Exercise 6.5.6 (Challenge): Compare the proof effort for verifying the parallel increment from Chapter 6.4 (threading + Lock) with an async version using asyncio.Lock. Which proof is simpler? Why? What does this tell us about cooperative vs. preemptive concurrency?