当"拿到所有数据再返回"太慢时,你需要的是一条一条异步喂给调用方的管道
async/await。IAsyncEnumerable<T> 写出低延迟、省内存的异步数据流处理代码。
Task<IEnumerable<T>> 不够用假设你要从数据库读取 100 万条记录,或者从分页 API 拉取多页数据。在 C# 7.3 里你有两个选择,都不理想:
// 问题:100 万条全进内存,等最慢那页完成才开始处理
async Task<List<Order>> GetAllOrders()
{
var all = new List<Order>();
while (true)
{
var page = await FetchPageAsync();
if (!page.Any()) break;
all.AddRange(page);
}
return all; // 等到这里,调用方才拿到第一条数据
}
代价:内存爆炸 + 首字节延迟 = 最慢那页的耗时 × 数据总量。
// 问题:回调地狱,无法用 using / try-catch 包裹整个迭代生命周期
async Task ReadOrders(Func<Order, Task> onNext)
{
foreach (var batch in batches)
{
foreach (var order in batch)
await onNext(order);
}
}
代价:资源释放、异常处理、取消操作——全得手动管理。回调不是 C# 的惯用模式。
同步世界里我们有 yield return——调用方"拉"一条,生产者"产"一条,用多少取多少。异步流把这个模型加上 await:
// C# 8:IAsyncEnumerable<T>——异步的 yield return
async IAsyncEnumerable<Order> GetAllOrders()
{
while (true)
{
var page = await FetchPageAsync();
if (!page.Any()) yield break;
foreach (var order in page)
yield return order;
}
}
// 调用方:await foreach——来一条处理一条
await foreach (var order in GetAllOrders())
{
Process(order); // 第一条数据到达时就开始处理,不等人齐
}
await foreach 和 yield return 的组合,跟同步迭代器一样自然using / try-finally 正常工作
await foreach vs foreach + awaitawait 放在不同位置,语义完全不同——这不是风格差别,而是两种不同的返回类型:
// 写法 A:await 在 foreach 里面 → 流式(本课主角)
await foreach (var x in GetStream()) // 方法返回 IAsyncEnumerable<T>
Process(x); // 逐条异步拉取,来一条处理一条
// 写法 B:await 在 foreach 外面 → 等人齐(传统 C# 7.3 做法)
foreach (var x in await GetAllAsync()) // 方法返回 Task<IEnumerable<T>>
Process(x); // await 等全部数据就绪,然后同步遍历
写法 A:await foreach | 写法 B:foreach await | |
|---|---|---|
| 方法返回 | IAsyncEnumerable<T> | Task<IEnumerable<T>> |
| 首条延迟 | 第一条数据到达即可处理 | 全部数据就绪才开始 |
| 内存 | 一次一条/一批 | 整个集合 |
| 编译保护 | 对 IAsyncEnumerable 用普通 foreach → 编译报错 | — |
就像同步迭代器依赖 IEnumerable<T> 和 IEnumerator<T>,异步流有对应的异步版本:
// 异步可枚举——只需要实现这一个(通常编译器帮你干)
public interface IAsyncEnumerable<out T>
{
IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken ct = default);
}
// 异步枚举器——真正做 MoveNext + Current 的
public interface IAsyncEnumerator<out T> : IAsyncDisposable
{
T Current { get; }
ValueTask<bool> MoveNextAsync();
}
MoveNextAsync 返回 ValueTask<bool>,不是 Task<bool>。异步流的迭代循环非常频繁(每条数据至少一次 MoveNext),ValueTask 在同步完成时不会分配堆内存——这是性能敏感的设计。
| 同步 | 异步 (C# 8) | |
|---|---|---|
| 可枚举接口 | IEnumerable<T> | IAsyncEnumerable<T> |
| 枚举器接口 | IEnumerator<T> | IAsyncEnumerator<T> |
| 取下一个 | MoveNext() | MoveNextAsync() |
| 当前元素 | Current | Current |
| 消费语法 | foreach | await foreach |
| 释放 | IDisposable | IAsyncDisposable |
这两个接口的区别是理解 foreach(无论是同步还是异步版本)的基础。一句话:
IEnumerable<T> = "我是一批可以遍历的数据"(工厂)IEnumerator<T> = "我是那个正在遍历的光标"(工人)IEnumerable 是图书馆,IEnumerator 是你手上的借书卡。同一个图书馆可以发出多张借书卡,每张卡各自记录独立的阅读进度——互不影响。之所以拆成两个接口,就是为了支持多次、同时、独立遍历同一个集合。
// 同一个 List,两个独立的光标——各自有自己的 Current 位置
var library = new List<int> { 10, 20, 30, 40 };
var card1 = library.GetEnumerator(); // IEnumerable.GetEnumerator() → 返回一个新光标
var card2 = library.GetEnumerator(); // 再调一次,拿到另一个独立光标
card1.MoveNext(); card1.MoveNext(); // card1 走到 20
card2.MoveNext(); // card2 还在 10——互不影响
Console.WriteLine(card1.Current); // 20
Console.WriteLine(card2.Current); // 10
异步流的两兄弟结构完全一样——只是把"工厂"和"光标"都变成了异步版本:
| 角色 | 同步 | 异步 (C# 8) |
|---|---|---|
| 工厂 | IEnumerable<T> | IAsyncEnumerable<T> |
| 光标 | IEnumerator<T> | IAsyncEnumerator<T> |
| 往前走 | MoveNext() | MoveNextAsync() |
当你写一个 async IAsyncEnumerable<T> 方法,编译器把它变成状态机——和你熟悉的 async Task 方法类似,但状态机里多了一个迭代器状态。
// 你写的:
async IAsyncEnumerable<int> Generate()
{
yield return 1;
await Task.Delay(100);
yield return 2;
}
// 编译器生成的(伪代码简化):
// 一个实现了 IAsyncEnumerable<int> 的类
// + 一个实现了 IAsyncEnumerator<int> 的状态机
// 状态:-2=初始, -1=运行中, 0=在yield1之后, 1=在await之后, -3=结束
// await 和 yield 混在一起时,状态机会在两者之间切换
async IAsyncEnumerable 方法里不能同时用 out/ref 参数。这是状态机的实现限制,和 async Task 方法一致。
异步操作必须有取消支持。异步流有三种传递 CancellationToken 的方式:
// 调用方把 token 作为参数传进去
async IAsyncEnumerable<Order> GetOrders(
CancellationToken ct = default)
{
while (true)
{
ct.ThrowIfCancellationRequested(); // 每轮循环检查一次
var page = await FetchPageAsync(ct); // 传给下游
if (!page.Any()) yield break;
foreach (var o in page) yield return o;
}
}
// 调用方:
var cts = new CancellationTokenSource();
await foreach (var order in GetOrders(cts.Token))
{
if (ShouldStop(order)) cts.Cancel();
}
ThrowIfCancellationRequested() 会抛异常吗?会——它抛出 OperationCanceledException。本质上是语法糖:if (token.IsCancellationRequested) throw new OperationCanceledException(token);Task 能识别 OperationCanceledException,把自身状态设为 Canceled(而非 Faulted),await 也会正确处理这个异常。如果只是想检查而不抛,用 IsCancellationRequested 属性即可。
// WithCancellation 把 token 塞进 GetAsyncEnumerator 的 ct 参数
var cts = new CancellationTokenSource();
await foreach (var order in GetOrders().WithCancellation(cts.Token))
{
Process(order);
}
// 迭代器这边用 [EnumeratorCancellation] 接收:
async IAsyncEnumerable<Order> GetOrders(
[EnumeratorCancellation] CancellationToken ct = default)
{
// ct 现在来自调用方的 WithCancellation 传进来的 token
...
}
[EnumeratorCancellation] 的作用:告诉编译器"这个参数的值从调用方的 WithCancellation 取,不要让它和其他参数混在一起"。这样调用方写 .WithCancellation(token) 时,token 会自动路由到这个参数。没有这个特性时,你得手动把它当成普通参数传——方式一的做法。
如果你的场景确实不需要取消(比如控制台小工具),可以不处理 token。但任何涉及网络/数据库的异步流都应该支持取消——这是 .NET 生态的约定。
await 默认会捕获当前的 SynchronizationContext(同步上下文)SynchronizationContext,I/O 完成后尝试调度回那个上下文继续执行。这在 UI 应用中是必须的:
// WinForms / WPF 场景:UI 线程上执行
async Task FetchDataAsync()
{
var data = await httpClient.GetStringAsync(url);
// ↑ 默认回到 UI 线程
label1.Text = data; // 改控件——必须在 UI 线程
}
async Task<string> FetchDataAsync()
{
var data = await httpClient.GetStringAsync(url)
.ConfigureAwait(false);
// ↑ "不用回 UI 线程了,随便哪个线程池线程继续就行"
return data; // 在随便哪个线程上执行——但这里不碰 UI,无所谓
}
1. 防止死锁(.NET Framework 尤其常见)
经典死锁场景——用 .Result / .Wait() 同步阻塞等待一个 async 方法:
// 🚫 经典死锁(UI 线程 / ASP.NET 非 Core)
var result = FetchDataAsync().Result; // 阻塞了当前线程!
// UI 线程卡在 .Result,等 Task 完成
// → Task 完成后想回到 UI 线程执行 await 后面的代码
// → 但 UI 线程还被 .Result 卡着
// → **死锁**
// ✅ 加了 ConfigureAwait(false) 的版本:
async Task<string> FetchDataAsync()
{
var data = await httpClient.GetStringAsync(url)
.ConfigureAwait(false); // 不用回 UI 线程
return data; // 线程池线程执行 → Task 正常完成 → .Result 解除
}
2. 节省上下文切换开销
每个 await 后调度回原线程是有开销的。库代码(数据访问层、业务逻辑层)普遍在每个 await 加 .ConfigureAwait(false)——不碰 UI 就不需要回原线程。
这是你在 Framework 项目里应该非常熟悉的模式——每个非 UI 层的 async 方法都像这样:
async Task<List<Order>> GetOrdersAsync()
{
using var conn = new SqlConnection(connectionString);
await conn.OpenAsync().ConfigureAwait(false);
using var cmd = conn.CreateCommand();
cmd.CommandText = "SELECT ...";
using var reader = await cmd.ExecuteReaderAsync().ConfigureAwait(false);
var orders = new List<Order>();
while (await reader.ReadAsync().ConfigureAwait(false))
orders.Add(...);
return orders;
}
SynchronizationContext。经典的 .Result 死锁在 Core 环境下不会发生。但库代码里加 .ConfigureAwait(false) 仍然是好习惯——性能上有小收益。ConfigureAwait 决定的是 await 状态机恢复时跳到哪个线程,而不是代码执行顺序。这和 Lesson 03 的状态机模型完全对应——只是额外加了一个"恢复目标线程"的配置。
// 单个 Task 的 ConfigureAwait:
var result = await SomeTask().ConfigureAwait(false);
// 异步流的 ConfigureAwait(.NET 6 新增扩展方法):
await foreach (var item in GetItems().ConfigureAwait(false))
{
// 每次 MoveNextAsync 都会用 ConfigureAwait(false)
}
IAsyncEnumerable<T> 的 ConfigureAwait 扩展方法是 .NET 6 才加入的。在 .NET Core 3.x 中,只能在异步迭代器内部的每个 await 上加 .ConfigureAwait(false)。
async IAsyncEnumerable<Customer> ReadCustomers(
DbConnection conn,
[EnumeratorCancellation] CancellationToken ct = default)
{
await using var cmd = conn.CreateCommand();
cmd.CommandText = "SELECT Id, Name FROM Customers";
await using var reader = await cmd.ExecuteReaderAsync(ct);
while (await reader.ReadAsync(ct))
{
yield return new Customer
{
Id = reader.GetInt32(0),
Name = reader.GetString(1),
};
}
// reader.DisposeAsync → cmd.DisposeAsync 自动执行
}
注意 await using var——异步流内部支持 IAsyncDisposable 模式,迭代器结束或中断时自动释放资源。
await using var reader = await cmd.ExecuteReaderAsync(ct) 里有两个 await?await cmd.ExecuteReaderAsync(ct):异步等待数据库返回 DbDataReader。await using var:声明这个 reader 用完后要异步释放(调 DisposeAsync() 而非同步的 Dispose())。DbDataReader 的 Dispose 可能涉及网络 I/O——比如通知 SQL Server "释放服务器端游标"。同步 Dispose 会阻塞线程等这个网络 round-trip;异步 Dispose 不阻塞。var reader = await cmd.ExecuteReaderAsync(ct); // 异步获取 reader
try { ... } finally { await reader.DisposeAsync(); } // 异步释放
async IAsyncEnumerable<Issue> GetGitHubIssues(
string repo,
[EnumeratorCancellation] CancellationToken ct = default)
{
var page = 1;
while (true)
{
var url = $"https://api.github.com/repos/{repo}/issues?page={page}";
var response = await _http.GetAsync(url, ct);
if (!response.IsSuccessStatusCode) yield break;
var issues = await response.Content
.ReadFromJsonAsync<List<Issue>>(cancellationToken: ct);
if (issues == null || issues.Count == 0) yield break;
foreach (var issue in issues)
yield return issue;
page++;
}
}
// 消费——拿到第一条就开始展示,不等全部翻页
await foreach (var issue in GetGitHubIssues("dotnet/runtime"))
{
Console.WriteLine($"#{issue.Number}: {issue.Title}");
}
// .NET 6+ 自带:System.IO.File.ReadLinesAsync
await foreach (var line in File.ReadLinesAsync("huge.log"))
{
if (line.Contains("ERROR"))
Console.WriteLine(line);
}
// C# 8 / .NET Core 3.x 中可以自己封装:
async IAsyncEnumerable<string> ReadLinesAsync(string path)
{
using var reader = new StreamReader(path);
while (true)
{
var line = await reader.ReadLineAsync();
if (line == null) yield break;
yield return line;
}
}
引入 System.Linq.Async 包后,可以像操作同步集合一样操作异步流:
using System.Linq.Async; // NuGet: System.Linq.Async
await foreach (var order in GetOrders()
.Where(o => o.Total > 1000)
.OrderByDescending(o => o.Total)
.Take(10))
{
Console.WriteLine($"大额订单: {order.Total}");
}
// 也可以聚合到单个值:
var count = await GetOrders().CountAsync(o => o.Total > 1000);
var max = await GetOrders().MaxAsync(o => o.Total);
var list = await GetOrders().ToListAsync(); // 全量——注意内存
ToListAsync()。异步流的初衷是流式处理。如果最终还是要全量加载到 List<T>,那你用 Task<List<T>> 就够了,不需要异步流。
| 场景 | 用异步流? | 理由 |
|---|---|---|
| 数据库轮询大表 | ✅ 用 | 逐条产出,不占内存 |
| 分页 API 翻页 | ✅ 用 | 隐藏翻页细节,给调用方一个统一的流 |
| 事件流 / 消息队列消费 | ✅ 用 | 天然就是流(Channel<T> 的 ReadAllAsync) |
| 文件逐行异步读取 | ✅ 用 | .NET 6+ 原生支持 |
| 单次 HTTP 请求返回小 JSON | ❌ 不用 | 数据量小,Task<T> 更简单 |
| CPU 密集型数据计算(无 I/O) | ❌ 不用 | 没有异步操作,普通 IEnumerable<T> 足够 |
| 最终需要对全量数据排序 / 分组 | ⚠️ 谨慎 | 排序必须加载全量,此时异步流只是延迟了加载,没省内存 |
await,而且数据量可能很大——用异步流。如果数据总是小的(几十条),或者循环体内没有 I/O——普通 IEnumerable<T> 就够了。
| 返回类型 | 语义 | 数据何时到达 | 内存 |
|---|---|---|---|
Task<List<T>> |
一次性全拉、一次性返回 | 全部就绪后才能遍历 | 全量 |
Task<IEnumerable<T>> |
任务完成后返回一个迭代器 | 迭代器本身异步获取,但迭代过程同步 | 取决于迭代器实现 |
IAsyncEnumerable<T> |
每次 MoveNext 都可以异步 | 边拉边产,流式到达 | 一次一条(或一批) |
// Task<List<T>>:启动 → 全部拉完 → 再遍历
var list = await GetAllAsync(); // 阻塞等全部数据就绪
foreach (var x in list) { ... } // 然后才遍历
// IAsyncEnumerable<T>:启动 → 拉一条 → 处理 → 拉下一条 → ...
await foreach (var x in GetAllAsync()) // 拿到第一条就开始
{
Process(x); // 处理完第一条才去拉第二条
}
IAsyncEnumerable<T> 相比 Task<List<T>> 的核心优势?async IAsyncEnumerable<int> GetNumbers()
{
yield return 1;
await Task.Delay(100);
yield return 2;
}
foreach (var n in GetNumbers()) // 注意:不是 await foreach
{
Console.WriteLine(n);
}
IAsyncEnumerator<T>.MoveNextAsync(),以下哪句正确?IAsyncEnumerable<T>?Lesson 04 · C# 8 Async Streams · 下一课预告:Lesson 05 — SynchronizationContext 与 await 续延调度