🌊 异步流速查 Async Streams
C# 8 引入 · IAsyncEnumerable<T> · await foreach · 📖 完整课程
接口对照
| 同步 | 异步 (C# 8) |
| 可枚举接口 | IEnumerable<T> | IAsyncEnumerable<T> |
| 枚举器接口 | IEnumerator<T> | IAsyncEnumerator<T> |
| 取下一个 | MoveNext() → bool | MoveNextAsync() → ValueTask<bool> |
| 当前元素 | Current | Current |
| 消费语法 | foreach (var x in src) | await foreach (var x in src) |
| 释放 | IDisposable | IAsyncDisposable |
MoveNextAsync 返回 ValueTask<bool>(非 Task<bool>)。迭代循环调用极其频繁,ValueTask 在同步完成时避免堆分配。
基本用法
生产者:yield return
async IAsyncEnumerable<Order> GetOrders()
{
while (true)
{
var page = await FetchPageAsync();
if (!page.Any()) yield break;
foreach (var o in page)
yield return o;
}
}
消费者:await foreach
await foreach (var order in GetOrders())
{
Process(order);
// 第一条到达即处理,不等全部加载
}
IAsyncEnumerable<T> 没有实现 IEnumerable<T>——不能用于普通 foreach。反过来也不行,IEnumerable<T> 不能用于 await foreach。
CancellationToken 传递模式
方式一:参数传入(显式)
async IAsyncEnumerable<T> Source(CancellationToken ct = default)
{
while (...) { ct.ThrowIfCancellationRequested(); ... }
}
// 调用方
await foreach (var x in Source(cts.Token)) { }
方式二:WithCancellation + [EnumeratorCancellation](惯用)
async IAsyncEnumerable<T> Source(
[EnumeratorCancellation] CancellationToken ct = default)
{
// ct 自动来自调用方的 WithCancellation
}
// 调用方——token 自动路由到标记了 [EnumeratorCancellation] 的参数
await foreach (var x in Source().WithCancellation(cts.Token)) { }
[EnumeratorCancellation]:告诉编译器"这个参数的值从 WithCancellation() 取"。必须配合 .WithCancellation(token) 使用。
实战模式
数据库游标读取
async IAsyncEnumerable<Customer> ReadCustomers(
DbConnection conn,
[EnumeratorCancellation] CancellationToken ct = default)
{
await using var cmd = conn.CreateCommand();
cmd.CommandText = "SELECT Id, Name FROM Customers";
await using var reader = await cmd.ExecuteReaderAsync(ct);
while (await reader.ReadAsync(ct))
yield return new Customer { Id = reader.GetInt32(0), Name = reader.GetString(1) };
// reader → cmd 依次 DisposeAsync
}
分页 API
async IAsyncEnumerable<Issue> GetIssues(string repo,
[EnumeratorCancellation] CancellationToken ct = default)
{
var page = 1;
while (true)
{
var resp = await _http.GetAsync($"api/...?page={page++}", ct);
if (!resp.IsSuccessStatusCode) yield break;
var items = await resp.Content.ReadFromJsonAsync<List<Issue>>(cancellationToken: ct);
if (items == null || !items.Any()) yield break;
foreach (var item in items) yield return item;
}
}
文件逐行读取(.NET 6+)
await foreach (var line in File.ReadLinesAsync("huge.log"))
{
if (line.Contains("ERROR")) Console.WriteLine(line);
}
Async LINQ System.Linq.Async
// NuGet: System.Linq.Async
using System.Linq.Async;
// 流式:Where → OrderBy → Take
await foreach (var x in GetOrders()
.Where(o => o.Total > 1000)
.OrderByDescending(o => o.Total)
.Take(10))
{ }
// 聚合:CountAsync, MaxAsync, SumAsync, ToListAsync...
var count = await GetOrders().CountAsync(o => o.Total > 1000);
var max = await GetOrders().MaxAsync(o => o.Total);
不要滥用 ToListAsync()——异步流的初衷是流式处理。全量加载到 List 不如直接用 Task<List<T>>。
何时使用 / 何时不用
| 场景 | 用异步流? | 理由 |
| 数据库轮询大表 | ✅ | 逐条产出,不占内存 |
| 分页 API 翻页 | ✅ | 隐藏翻页细节,统一为流 |
| 事件流 / 消息队列 | ✅ | 天然就是流 |
| 文件逐行异步读取 | ✅ | .NET 6+ 原生支持 |
| 单次 HTTP 返回小 JSON | ❌ | 数据量小,Task<T> 更简单 |
| CPU 密集计算(无 I/O) | ❌ | 没有异步操作,IEnumerable<T> 足够 |
| 需要全量排序/分组 | ⚠️ 谨慎 | 排序必须加载全量,省不了内存 |
经验法则:迭代体内有 await + 数据量可能很大 → 用异步流。数据总是小的,或体内无 I/O → 普通 IEnumerable<T>。
三个相近类型的区别
| 返回类型 | 语义 | 数据何时到达 | 内存 |
Task<List<T>> | 一次性全拉、一次性返回 | 全部就绪后才能遍历 | 全量 |
Task<IEnumerable<T>> | 任务完成后返回迭代器 | 迭代器异步获取,迭代过程同步 | 取决于实现 |
IAsyncEnumerable<T> | 每次 MoveNext 可异步 | 边拉边产,流式到达 | 一次一条(或一批) |
限制
async IAsyncEnumerable<T> 方法不能有 out/ref 参数(状态机实现限制,和 async Task 一致)。
相关速查