🌊 异步流速查 Async Streams

C# 8 引入 · IAsyncEnumerable<T> · await foreach · 📖 完整课程

接口对照

同步异步 (C# 8)
可枚举接口IEnumerable<T>IAsyncEnumerable<T>
枚举器接口IEnumerator<T>IAsyncEnumerator<T>
取下一个MoveNext() → boolMoveNextAsync() → ValueTask<bool>
当前元素CurrentCurrent
消费语法foreach (var x in src)await foreach (var x in src)
释放IDisposableIAsyncDisposable
MoveNextAsync 返回 ValueTask<bool>(非 Task<bool>)。迭代循环调用极其频繁,ValueTask 在同步完成时避免堆分配。

基本用法

生产者:yield return

async IAsyncEnumerable<Order> GetOrders()
{
    while (true)
    {
        var page = await FetchPageAsync();
        if (!page.Any()) yield break;
        foreach (var o in page)
            yield return o;
    }
}

消费者:await foreach

await foreach (var order in GetOrders())
{
    Process(order);
    // 第一条到达即处理,不等全部加载
}
IAsyncEnumerable<T> 没有实现 IEnumerable<T>——不能用于普通 foreach。反过来也不行,IEnumerable<T> 不能用于 await foreach

CancellationToken 传递模式

方式一:参数传入(显式)

async IAsyncEnumerable<T> Source(CancellationToken ct = default)
{
    while (...) { ct.ThrowIfCancellationRequested(); ... }
}

// 调用方
await foreach (var x in Source(cts.Token)) { }

方式二:WithCancellation + [EnumeratorCancellation](惯用)

async IAsyncEnumerable<T> Source(
    [EnumeratorCancellation] CancellationToken ct = default)
{
    // ct 自动来自调用方的 WithCancellation
}

// 调用方——token 自动路由到标记了 [EnumeratorCancellation] 的参数
await foreach (var x in Source().WithCancellation(cts.Token)) { }
[EnumeratorCancellation]告诉编译器"这个参数的值从 WithCancellation() 取"。必须配合 .WithCancellation(token) 使用。

实战模式

数据库游标读取

async IAsyncEnumerable<Customer> ReadCustomers(
    DbConnection conn,
    [EnumeratorCancellation] CancellationToken ct = default)
{
    await using var cmd = conn.CreateCommand();
    cmd.CommandText = "SELECT Id, Name FROM Customers";
    await using var reader = await cmd.ExecuteReaderAsync(ct);
    while (await reader.ReadAsync(ct))
        yield return new Customer { Id = reader.GetInt32(0), Name = reader.GetString(1) };
    // reader → cmd 依次 DisposeAsync
}

分页 API

async IAsyncEnumerable<Issue> GetIssues(string repo,
    [EnumeratorCancellation] CancellationToken ct = default)
{
    var page = 1;
    while (true)
    {
        var resp = await _http.GetAsync($"api/...?page={page++}", ct);
        if (!resp.IsSuccessStatusCode) yield break;
        var items = await resp.Content.ReadFromJsonAsync<List<Issue>>(cancellationToken: ct);
        if (items == null || !items.Any()) yield break;
        foreach (var item in items) yield return item;
    }
}

文件逐行读取(.NET 6+)

await foreach (var line in File.ReadLinesAsync("huge.log"))
{
    if (line.Contains("ERROR")) Console.WriteLine(line);
}

Async LINQ System.Linq.Async

// NuGet: System.Linq.Async
using System.Linq.Async;

// 流式:Where → OrderBy → Take
await foreach (var x in GetOrders()
    .Where(o => o.Total > 1000)
    .OrderByDescending(o => o.Total)
    .Take(10))
{ }

// 聚合:CountAsync, MaxAsync, SumAsync, ToListAsync...
var count = await GetOrders().CountAsync(o => o.Total > 1000);
var max   = await GetOrders().MaxAsync(o => o.Total);
不要滥用 ToListAsync()——异步流的初衷是流式处理。全量加载到 List 不如直接用 Task<List<T>>

何时使用 / 何时不用

场景用异步流?理由
数据库轮询大表逐条产出,不占内存
分页 API 翻页隐藏翻页细节,统一为流
事件流 / 消息队列天然就是流
文件逐行异步读取.NET 6+ 原生支持
单次 HTTP 返回小 JSON数据量小,Task<T> 更简单
CPU 密集计算(无 I/O)没有异步操作,IEnumerable<T> 足够
需要全量排序/分组⚠️ 谨慎排序必须加载全量,省不了内存
经验法则:迭代体内有 await + 数据量可能很大 → 用异步流。数据总是小的,或体内无 I/O → 普通 IEnumerable<T>

三个相近类型的区别

返回类型语义数据何时到达内存
Task<List<T>>一次性全拉、一次性返回全部就绪后才能遍历全量
Task<IEnumerable<T>>任务完成后返回迭代器迭代器异步获取,迭代过程同步取决于实现
IAsyncEnumerable<T>每次 MoveNext 可异步边拉边产,流式到达一次一条(或一批)

限制

async IAsyncEnumerable<T> 方法不能有 out/ref 参数(状态机实现限制,和 async Task 一致)。

相关速查