Lesson 04: C# 8 Async Streams · 异步流

当"拿到所有数据再返回"太慢时,你需要的是一条一条异步喂给调用方的管道

前置 / Prerequisite:已完成 Lesson 03(yield return 同步迭代器),理解 async/await
本课目标:能用 IAsyncEnumerable<T> 写出低延迟、省内存的异步数据流处理代码。

一、痛点:为什么 Task<IEnumerable<T>> 不够用

假设你要从数据库读取 100 万条记录,或者从分页 API 拉取多页数据。在 C# 7.3 里你有两个选择,都不理想:

方案 A:一次性返回所有数据

// 问题:100 万条全进内存,等最慢那页完成才开始处理
async Task<List<Order>> GetAllOrders()
{
    var all = new List<Order>();
    while (true)
    {
        var page = await FetchPageAsync();
        if (!page.Any()) break;
        all.AddRange(page);
    }
    return all; // 等到这里,调用方才拿到第一条数据
}

代价:内存爆炸 + 首字节延迟 = 最慢那页的耗时 × 数据总量。

方案 B:回调

// 问题:回调地狱,无法用 using / try-catch 包裹整个迭代生命周期
async Task ReadOrders(Func<Order, Task> onNext)
{
    foreach (var batch in batches)
    {
        foreach (var order in batch)
            await onNext(order);
    }
}

代价:资源释放、异常处理、取消操作——全得手动管理。回调不是 C# 的惯用模式。

理想方案:异步迭代器

同步世界里我们有 yield return——调用方"拉"一条,生产者"产"一条,用多少取多少。异步流把这个模型加上 await

// C# 8:IAsyncEnumerable<T>——异步的 yield return
async IAsyncEnumerable<Order> GetAllOrders()
{
    while (true)
    {
        var page = await FetchPageAsync();
        if (!page.Any()) yield break;
        foreach (var order in page)
            yield return order;
    }
}

// 调用方:await foreach——来一条处理一条
await foreach (var order in GetAllOrders())
{
    Process(order);  // 第一条数据到达时就开始处理,不等人齐
}
关键优势:低延迟——第一条数据到达即可处理,不用等全部加载完
省内存——一次只在内存中保留当前批次,而不是 100 万条
惯用语法——await foreachyield return 的组合,跟同步迭代器一样自然
资源安全——迭代器里的 using / try-finally 正常工作
⚠️ 常见困惑:await foreach vs foreach + await

await 放在不同位置,语义完全不同——这不是风格差别,而是两种不同的返回类型
// 写法 A:await 在 foreach 里面 → 流式(本课主角)
await foreach (var x in GetStream())  // 方法返回 IAsyncEnumerable<T>
    Process(x);                         // 逐条异步拉取,来一条处理一条

// 写法 B:await 在 foreach 外面 → 等人齐(传统 C# 7.3 做法)
foreach (var x in await GetAllAsync())  // 方法返回 Task<IEnumerable<T>>
    Process(x);                              // await 等全部数据就绪,然后同步遍历
写法 A:await foreach写法 B:foreach await
方法返回IAsyncEnumerable<T>Task<IEnumerable<T>>
首条延迟第一条数据到达即可处理全部数据就绪才开始
内存一次一条/一批整个集合
编译保护IAsyncEnumerable 用普通 foreach → 编译报错
记忆口诀:await 在 foreach 里面 = 边拉边处理。await 在 foreach 外面 = 等人齐再遍历。决定用哪种的不是偏好,而是方法的返回类型——编译器会替你检查。

二、接口与编译产物

两个核心接口

就像同步迭代器依赖 IEnumerable<T>IEnumerator<T>,异步流有对应的异步版本:

// 异步可枚举——只需要实现这一个(通常编译器帮你干)
public interface IAsyncEnumerable<out T>
{
    IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken ct = default);
}

// 异步枚举器——真正做 MoveNext + Current 的
public interface IAsyncEnumerator<out T> : IAsyncDisposable
{
    T Current { get; }
    ValueTask<bool> MoveNextAsync();
}
注意 MoveNextAsync 返回 ValueTask<bool>,不是 Task<bool>异步流的迭代循环非常频繁(每条数据至少一次 MoveNext),ValueTask 在同步完成时不会分配堆内存——这是性能敏感的设计。
同步异步 (C# 8)
可枚举接口IEnumerable<T>IAsyncEnumerable<T>
枚举器接口IEnumerator<T>IAsyncEnumerator<T>
取下一个MoveNext()MoveNextAsync()
当前元素CurrentCurrent
消费语法foreachawait foreach
释放IDisposableIAsyncDisposable

回顾:IEnumerable<T> vs IEnumerator<T> —— 工厂与光标

这两个接口的区别是理解 foreach(无论是同步还是异步版本)的基础。一句话:

IEnumerable<T> = "我是一批可以遍历的数据"(工厂
IEnumerator<T> = "我是那个正在遍历的光标"(工人

类比:IEnumerable 是图书馆,IEnumerator 是你手上的借书卡。同一个图书馆可以发出多张借书卡,每张卡各自记录独立的阅读进度——互不影响。之所以拆成两个接口,就是为了支持多次、同时、独立遍历同一个集合。
// 同一个 List,两个独立的光标——各自有自己的 Current 位置
var library = new List<int> { 10, 20, 30, 40 };

var card1 = library.GetEnumerator();  // IEnumerable.GetEnumerator() → 返回一个新光标
var card2 = library.GetEnumerator();  // 再调一次,拿到另一个独立光标

card1.MoveNext(); card1.MoveNext();  // card1 走到 20
card2.MoveNext();                     // card2 还在 10——互不影响

Console.WriteLine(card1.Current);  // 20
Console.WriteLine(card2.Current);  // 10

异步流的两兄弟结构完全一样——只是把"工厂"和"光标"都变成了异步版本:

角色同步异步 (C# 8)
工厂IEnumerable<T>IAsyncEnumerable<T>
光标IEnumerator<T>IAsyncEnumerator<T>
往前走MoveNext()MoveNextAsync()

编译器生成什么

当你写一个 async IAsyncEnumerable<T> 方法,编译器把它变成状态机——和你熟悉的 async Task 方法类似,但状态机里多了一个迭代器状态。

// 你写的:
async IAsyncEnumerable<int> Generate()
{
    yield return 1;
    await Task.Delay(100);
    yield return 2;
}

// 编译器生成的(伪代码简化):
// 一个实现了 IAsyncEnumerable<int> 的类
// + 一个实现了 IAsyncEnumerator<int> 的状态机
// 状态:-2=初始, -1=运行中, 0=在yield1之后, 1=在await之后, -3=结束
// await 和 yield 混在一起时,状态机会在两者之间切换
限制:async IAsyncEnumerable 方法里不能同时用 out/ref 参数。这是状态机的实现限制,和 async Task 方法一致。

三、CancellationToken —— 如何取消异步流

异步操作必须有取消支持。异步流有三种传递 CancellationToken 的方式:

方式一:参数传入(推荐,C# 8 手动模式)

// 调用方把 token 作为参数传进去
async IAsyncEnumerable<Order> GetOrders(
    CancellationToken ct = default)
{
    while (true)
    {
        ct.ThrowIfCancellationRequested();  // 每轮循环检查一次
        var page = await FetchPageAsync(ct); // 传给下游
        if (!page.Any()) yield break;
        foreach (var o in page) yield return o;
    }
}

// 调用方:
var cts = new CancellationTokenSource();
await foreach (var order in GetOrders(cts.Token))
{
    if (ShouldStop(order)) cts.Cancel();
}
ThrowIfCancellationRequested() 会抛异常吗?会——它抛出 OperationCanceledException。本质上是语法糖:if (token.IsCancellationRequested) throw new OperationCanceledException(token);

为什么用抛异常来取消?这是 .NET 异步体系的设计约定——Task 能识别 OperationCanceledException,把自身状态设为 Canceled(而非 Faulted),await 也会正确处理这个异常。如果只是想检查而不抛,用 IsCancellationRequested 属性即可。

方式二:WithCancellation 扩展方法(调用方传入)

// WithCancellation 把 token 塞进 GetAsyncEnumerator 的 ct 参数
var cts = new CancellationTokenSource();
await foreach (var order in GetOrders().WithCancellation(cts.Token))
{
    Process(order);
}

// 迭代器这边用 [EnumeratorCancellation] 接收:
async IAsyncEnumerable<Order> GetOrders(
    [EnumeratorCancellation] CancellationToken ct = default)
{
    // ct 现在来自调用方的 WithCancellation 传进来的 token
    ...
}
[EnumeratorCancellation] 的作用:告诉编译器"这个参数的值从调用方的 WithCancellation 取,不要让它和其他参数混在一起"。这样调用方写 .WithCancellation(token) 时,token 会自动路由到这个参数。没有这个特性时,你得手动把它当成普通参数传——方式一的做法。

方式三:直接用 CancellationToken.None(不推荐)

如果你的场景确实不需要取消(比如控制台小工具),可以不处理 token。但任何涉及网络/数据库的异步流都应该支持取消——这是 .NET 生态的约定。

四、ConfigureAwait —— "await 之后回不回原来的地盘"

没有 ConfigureAwait:默认回到原始上下文

await 默认会捕获当前的 SynchronizationContext(同步上下文)SynchronizationContext,I/O 完成后尝试调度回那个上下文继续执行。这在 UI 应用中是必须的:

// WinForms / WPF 场景:UI 线程上执行
async Task FetchDataAsync()
{
    var data = await httpClient.GetStringAsync(url);
    // ↑ 默认回到 UI 线程
    label1.Text = data;  // 改控件——必须在 UI 线程
}

ConfigureAwait(false):别回来了,随便哪个线程都行

async Task<string> FetchDataAsync()
{
    var data = await httpClient.GetStringAsync(url)
        .ConfigureAwait(false);
    // ↑ "不用回 UI 线程了,随便哪个线程池线程继续就行"
    return data;  // 在随便哪个线程上执行——但这里不碰 UI,无所谓
}

为什么要用 false?两个原因

1. 防止死锁(.NET Framework 尤其常见)

经典死锁场景——用 .Result / .Wait() 同步阻塞等待一个 async 方法:

// 🚫 经典死锁(UI 线程 / ASP.NET 非 Core)
var result = FetchDataAsync().Result;  // 阻塞了当前线程!
// UI 线程卡在 .Result,等 Task 完成
// → Task 完成后想回到 UI 线程执行 await 后面的代码
// → 但 UI 线程还被 .Result 卡着
// → **死锁**

// ✅ 加了 ConfigureAwait(false) 的版本:
async Task<string> FetchDataAsync()
{
    var data = await httpClient.GetStringAsync(url)
        .ConfigureAwait(false);  // 不用回 UI 线程
    return data;  // 线程池线程执行 → Task 正常完成 → .Result 解除
}

2. 节省上下文切换开销

每个 await 后调度回原线程是有开销的。库代码(数据访问层、业务逻辑层)普遍在每个 await.ConfigureAwait(false)——不碰 UI 就不需要回原线程。

.NET Framework 4.8 时代的典型写法

这是你在 Framework 项目里应该非常熟悉的模式——每个非 UI 层的 async 方法都像这样:

async Task<List<Order>> GetOrdersAsync()
{
    using var conn = new SqlConnection(connectionString);
    await conn.OpenAsync().ConfigureAwait(false);

    using var cmd = conn.CreateCommand();
    cmd.CommandText = "SELECT ...";
    using var reader = await cmd.ExecuteReaderAsync().ConfigureAwait(false);

    var orders = new List<Order>();
    while (await reader.ReadAsync().ConfigureAwait(false))
        orders.Add(...);
    return orders;
}
好消息:ASP.NET Core 没有 SynchronizationContext经典的 .Result 死锁在 Core 环境下不会发生。但库代码里加 .ConfigureAwait(false) 仍然是好习惯——性能上有小收益。

核心理解:ConfigureAwait 决定的是 await 状态机恢复时跳到哪个线程,而不是代码执行顺序。这和 Lesson 03 的状态机模型完全对应——只是额外加了一个"恢复目标线程"的配置。

异步流的 ConfigureAwait(.NET 6+)

// 单个 Task 的 ConfigureAwait:
var result = await SomeTask().ConfigureAwait(false);

// 异步流的 ConfigureAwait(.NET 6 新增扩展方法):
await foreach (var item in GetItems().ConfigureAwait(false))
{
    // 每次 MoveNextAsync 都会用 ConfigureAwait(false)
}
C# 8 时代(.NET Core 3.x)的限制: IAsyncEnumerable<T>ConfigureAwait 扩展方法是 .NET 6 才加入的。在 .NET Core 3.x 中,只能在异步迭代器内部的每个 await 上加 .ConfigureAwait(false)

五、实战模式

场景一:数据库游标读取

async IAsyncEnumerable<Customer> ReadCustomers(
    DbConnection conn,
    [EnumeratorCancellation] CancellationToken ct = default)
{
    await using var cmd = conn.CreateCommand();
    cmd.CommandText = "SELECT Id, Name FROM Customers";

    await using var reader = await cmd.ExecuteReaderAsync(ct);
    while (await reader.ReadAsync(ct))
    {
        yield return new Customer
        {
            Id = reader.GetInt32(0),
            Name = reader.GetString(1),
        };
    }
    // reader.DisposeAsync → cmd.DisposeAsync 自动执行
}

注意 await using var——异步流内部支持 IAsyncDisposable 模式,迭代器结束或中断时自动释放资源。

为什么 await using var reader = await cmd.ExecuteReaderAsync(ct) 里有两个 await

这行包含两个完全不同的 await,各干各的事
后面的 await cmd.ExecuteReaderAsync(ct):异步等待数据库返回 DbDataReader
前面的 await using var:声明这个 reader 用完后要异步释放(调 DisposeAsync() 而非同步的 Dispose())。

为什么 Dispose 也要异步?因为 DbDataReaderDispose 可能涉及网络 I/O——比如通知 SQL Server "释放服务器端游标"。同步 Dispose 会阻塞线程等这个网络 round-trip;异步 Dispose 不阻塞。

展开后:
var reader = await cmd.ExecuteReaderAsync(ct);  // 异步获取 reader
try { ... } finally { await reader.DisposeAsync(); }  // 异步释放

场景二:分页 API 流式消费

async IAsyncEnumerable<Issue> GetGitHubIssues(
    string repo,
    [EnumeratorCancellation] CancellationToken ct = default)
{
    var page = 1;
    while (true)
    {
        var url = $"https://api.github.com/repos/{repo}/issues?page={page}";
        var response = await _http.GetAsync(url, ct);

        if (!response.IsSuccessStatusCode) yield break;

        var issues = await response.Content
            .ReadFromJsonAsync<List<Issue>>(cancellationToken: ct);

        if (issues == null || issues.Count == 0) yield break;

        foreach (var issue in issues)
            yield return issue;

        page++;
    }
}

// 消费——拿到第一条就开始展示,不等全部翻页
await foreach (var issue in GetGitHubIssues("dotnet/runtime"))
{
    Console.WriteLine($"#{issue.Number}: {issue.Title}");
}

场景三:文件逐行异步读取

// .NET 6+ 自带:System.IO.File.ReadLinesAsync
await foreach (var line in File.ReadLinesAsync("huge.log"))
{
    if (line.Contains("ERROR"))
        Console.WriteLine(line);
}

// C# 8 / .NET Core 3.x 中可以自己封装:
async IAsyncEnumerable<string> ReadLinesAsync(string path)
{
    using var reader = new StreamReader(path);
    while (true)
    {
        var line = await reader.ReadLineAsync();
        if (line == null) yield break;
        yield return line;
    }
}

六、Async LINQ —— 不自己写 foreach

引入 System.Linq.Async 包后,可以像操作同步集合一样操作异步流:

using System.Linq.Async;  // NuGet: System.Linq.Async

await foreach (var order in GetOrders()
    .Where(o => o.Total > 1000)
    .OrderByDescending(o => o.Total)
    .Take(10))
{
    Console.WriteLine($"大额订单: {order.Total}");
}

// 也可以聚合到单个值:
var count = await GetOrders().CountAsync(o => o.Total > 1000);
var max   = await GetOrders().MaxAsync(o => o.Total);
var list  = await GetOrders().ToListAsync();  // 全量——注意内存
不要滥用 ToListAsync()异步流的初衷是流式处理。如果最终还是要全量加载到 List<T>,那你用 Task<List<T>> 就够了,不需要异步流。

七、什么时候用异步流(什么时候不用)

场景用异步流?理由
数据库轮询大表 ✅ 用 逐条产出,不占内存
分页 API 翻页 ✅ 用 隐藏翻页细节,给调用方一个统一的流
事件流 / 消息队列消费 ✅ 用 天然就是流(Channel<T> 的 ReadAllAsync)
文件逐行异步读取 ✅ 用 .NET 6+ 原生支持
单次 HTTP 请求返回小 JSON ❌ 不用 数据量小,Task<T> 更简单
CPU 密集型数据计算(无 I/O) ❌ 不用 没有异步操作,普通 IEnumerable<T> 足够
最终需要对全量数据排序 / 分组 ⚠️ 谨慎 排序必须加载全量,此时异步流只是延迟了加载,没省内存
经验法则:如果你在迭代循环体内有 await,而且数据量可能很大——用异步流。如果数据总是小的(几十条),或者循环体内没有 I/O——普通 IEnumerable<T> 就够了。

八、对比:三个相近概念的区别

返回类型语义数据何时到达内存
Task<List<T>> 一次性全拉、一次性返回 全部就绪后才能遍历 全量
Task<IEnumerable<T>> 任务完成后返回一个迭代器 迭代器本身异步获取,但迭代过程同步 取决于迭代器实现
IAsyncEnumerable<T> 每次 MoveNext 都可以异步 边拉边产,流式到达 一次一条(或一批)
// Task<List<T>>:启动 → 全部拉完 → 再遍历
var list = await GetAllAsync();    // 阻塞等全部数据就绪
foreach (var x in list) { ... }  // 然后才遍历

// IAsyncEnumerable<T>:启动 → 拉一条 → 处理 → 拉下一条 → ...
await foreach (var x in GetAllAsync()) // 拿到第一条就开始
{
    Process(x);  // 处理完第一条才去拉第二条
}

📝 小测验

Q1. 以下哪项是 IAsyncEnumerable<T> 相比 Task<List<T>> 的核心优势?

选择正确答案:

Q2. 以下代码有什么问题?

async IAsyncEnumerable<int> GetNumbers()
{
    yield return 1;
    await Task.Delay(100);
    yield return 2;
}

foreach (var n in GetNumbers())  // 注意:不是 await foreach
{
    Console.WriteLine(n);
}
选择正确答案:

Q3. 关于 IAsyncEnumerator<T>.MoveNextAsync(),以下哪句正确?

选择正确答案:

Q4. 在异步流方法中,以下哪个写法是错误的?

选择正确答案:

Q5. 以下哪个场景不适合IAsyncEnumerable<T>

选择正确答案:

Lesson 04 · C# 8 Async Streams · 下一课预告:Lesson 05 — SynchronizationContext 与 await 续延调度