在使用 C# 编写异步迭代器时,您可能会遇到如下警告:
warning CS8425: 异步迭代器“TestConversationService.ChatStreamed(IReadOnlyList<ChatMessage>, ChatCompletionOptions, CancellationToken)”具有一个或多个类型为 "CancellationToken" 的参数,但它们都未用 "EnumeratorCancellation" 属性修饰,因此将不使用所生成的 "IAsyncEnumerable<>.GetAsyncEnumerator" 中的取消令牌参数。
看到这样的警告,您可能会困惑:究竟需要在异步迭代器的方法参数上添加 [EnumeratorCancellation]
属性吗?如果不添加,会有什么区别? 让我们深入探讨一下这个问题,揭示其背后的真相。
正常调用时,[EnumeratorCancellation] 的影响
如果您只是简单地在异步迭代器方法中传递一个普通的 CancellationToken
,无论是否使用 [EnumeratorCancellation]
,方法的行为似乎并没有显著区别。例如:
public async IAsyncEnumerable<int> GenerateNumbersAsync(CancellationToken cancellationToken = default)
{
for (int i = 0; i < 10; i++)
{
cancellationToken.ThrowIfCancellationRequested();
yield return i;
await Task.Delay(1000, cancellationToken);
}
}
public async Task ConsumeNumbersAsync()
{
CancellationTokenSource cts = new CancellationTokenSource();
Task cancelTask = Task.Run(async () =>
{
await Task.Delay(3000);
cts.Cancel();
});
try
{
await foreach (var number in GenerateNumbersAsync(cts.Token))
{
Console.WriteLine(number);
}
}
catch (OperationCanceledException)
{
Console.WriteLine("枚举已被取消");
}
await cancelTask;
}
输出如下:
0
1
2
枚举已被取消
在上述代码中,即使没有使用 [EnumeratorCancellation]
,取消令牌 cts.Token
依然会生效,导致迭代过程被取消。这可能会让开发者误以为 [EnumeratorCancellation]
没有实际作用,进而引发更多的困惑。
揭开真相:生产者与消费者的职责分离
实际上,[EnumeratorCancellation]
的核心作用在于 实现生产者与消费者的职责分离。具体来说:
通过这种设计,生产者不需要知道取消请求是由谁或何时发起的,简化了生产者的设计,同时赋予消费者更大的控制权。这不仅提高了代码的可维护性和可复用性,还避免了取消逻辑的混乱。
示例说明
下面通过一个示例,直观地展示 [EnumeratorCancellation]
如何实现职责分离。
1. 定义异步迭代器方法
using System;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
public class DataProducer
{
public async IAsyncEnumerable<int> ProduceData(
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
int i = 0;
while (true)
{
cancellationToken.ThrowIfCancellationRequested();
Console.WriteLine($"[Iterator] 生成数字: {i}");
yield return i++;
await Task.Delay(1000, cancellationToken);
}
}
}
在这个 DataProducer
类中,ProduceData
方法使用 [EnumeratorCancellation]
标注了 cancellationToken
参数。这意味着,当消费者通过 WithCancellation
传递取消令牌时,编译器会自动将该取消令牌传递给 ProduceData
方法的 cancellationToken
参数。
2. 定义消费者方法
using System;
using System.Threading;
using System.Threading.Tasks;
public class DataConsumer
{
public async Task ConsumeDataAsync(IAsyncEnumerable<int> producer)
{
using CancellationTokenSource cts = new CancellationTokenSource();
_ = Task.Run(async () =>
{
await Task.Delay(5000);
cts.Cancel();
Console.WriteLine("[Trigger] 已发出取消请求");
});
try
{
await foreach (var data in producer.WithCancellation(cts.Token))
{
Console.WriteLine($"[Consumer] 接收到数据: {data}");
}
}
catch (OperationCanceledException)
{
Console.WriteLine("[Consumer] 数据接收已被取消");
}
}
}
在 DataConsumer
类中,ConsumeDataAsync
方法创建了一个 CancellationTokenSource
,并在5秒后取消它。通过 WithCancellation
方法,将取消令牌传递给 ProduceData
方法。这样,消费者完全控制了取消逻辑,而生产者只需响应取消请求。
3. 执行示例
public class Program
{
public static async Task Main(string[] args)
{
var producer = new DataProducer();
var consumer = new DataConsumer();
await consumer.ConsumeDataAsync(producer.ProduceData());
}
}
预期输出:
[Iterator] 生成数字: 0
[Consumer] 接收到数据: 0
[Iterator] 生成数字: 1
[Consumer] 接收到数据: 1
[Iterator] 生成数字: 2
[Consumer] 接收到数据: 2
[Iterator] 生成数字: 3
[Consumer] 接收到数据: 3
[Iterator] 生成数字: 4
[Consumer] 接收到数据: 4
[Trigger] 已发出取消请求
[Consumer] 数据接收已被取消
在5秒后,取消请求被触发,迭代器检测到取消并抛出 OperationCanceledException
,导致迭代过程被中断。请注意DataConsumer在接收生产出来的数据 IAsyncEnumerable<int>
时,已经错过了在生产函数中传入 cancellationToken
的机会,但作为消费者,仍然可以通过 .WithCancellation
方法进行优雅取消。
这展示了生产者与消费者如何通过 WithCancellation
和 [EnumeratorCancellation]
实现职责分离,消费者能够独立地控制取消逻辑,而生产者只需响应取消请求。
CancellationToken 与 WithCancellation 同时作用时的行为
那么,如果在异步迭代器方法中同时传递了 CancellationToken
参数,并通过 WithCancellation
指定了不同的取消令牌,取消操作会听哪个的?还是都会监听?
结论是:两者都会生效,只要其中任意一个取消令牌被触发,迭代器都会检测到取消请求并中断迭代过程。这取决于方法内部如何处理多个取消令牌。
示例演示
以下是一个详细的示例,展示当同时传递 CancellationToken
参数和使用不同的 WithCancellation
时的行为。
1. 定义异步迭代器方法
using System;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
public class EnumeratorCancellationDemo
{
public async IAsyncEnumerable<int> GenerateNumbersAsync(
[EnumeratorCancellation] CancellationToken cancellationToken,
CancellationToken externalCancellationToken = default)
{
int i = 0;
try
{
while (true)
{
cancellationToken.ThrowIfCancellationRequested();
externalCancellationToken.ThrowIfCancellationRequested();
Console.WriteLine($"[Iterator] 生成数字: {i}");
yield return i++;
await Task.Delay(1000, cancellationToken);
}
}
finally
{
Console.WriteLine("[Iterator] 迭代器已退出。");
}
}
}
2. 定义消费者方法
public class Program
{
static async Task Main(string[] args)
{
Console.WriteLine("启动枚举取消示例...\n");
var demo = new EnumeratorCancellationDemo();
Console.WriteLine("=== 测试1: 先取消方法参数 ===\n");
await TestCancellation(demo, cancelParamFirst: true);
Console.WriteLine("\n=== 测试2: 先取消 WithCancellation ===\n");
await TestCancellation(demo, cancelParamFirst: false);
Console.WriteLine("\n演示结束。");
Console.ReadLine();
}
static async Task TestCancellation(EnumeratorCancellationDemo demo, bool cancelParamFirst)
{
using CancellationTokenSource ctsParam = new CancellationTokenSource();
using CancellationTokenSource ctsWith = new CancellationTokenSource();
if (cancelParamFirst)
{
_ = Task.Run(async () =>
{
await Task.Delay(3000);
ctsParam.Cancel();
Console.WriteLine("[Trigger] 已取消 ctsParam (方法参数)");
});
_ = Task.Run(async () =>
{
await Task.Delay(5000);
ctsWith.Cancel();
Console.WriteLine("[Trigger] 已取消 ctsWith (WithCancellation)");
});
}
else
{
_ = Task.Run(async () =>
{
await Task.Delay(3000);
ctsWith.Cancel();
Console.WriteLine("[Trigger] 已取消 ctsWith (WithCancellation)");
});
_ = Task.Run(async () =>
{
await Task.Delay(5000);
ctsParam.Cancel();
Console.WriteLine("[Trigger] 已取消 ctsParam (方法参数)");
});
}
try
{
await foreach (var number in demo.GenerateNumbersAsync(ctsWith.Token, ctsParam.Token).WithCancellation(ctsWith.Token))
{
Console.WriteLine($"[Consumer] 接收到数字: {number}");
}
}
catch (OperationCanceledException ex)
{
string reason = ex.CancellationToken == ctsWith.Token ? "WithCancellation" : "方法参数";
Console.WriteLine($"[Iterator] 迭代器检测到取消。原因: {reason}");
Console.WriteLine("[Consumer] 枚举已被取消。");
}
}
}
3. 运行示例并观察结果
启动程序后,控制台输出可能如下所示:
启动枚举取消示例...
=== 测试1: 先取消方法参数 ===
[Iterator] 生成数字: 0
[Consumer] 接收到数字: 0
[Iterator] 生成数字: 1
[Consumer] 接收到数字: 1
[Iterator] 生成数字: 2
[Consumer] 接收到数字: 2
[Trigger] 已取消 ctsParam (方法参数)
[Iterator] 迭代器已退出。
[Iterator] 迭代器检测到取消。原因: 方法参数
[Consumer] 枚举已被取消。
=== 测试2: 先取消 WithCancellation ===
[Iterator] 生成数字: 0
[Consumer] 接收到数字: 0
[Iterator] 生成数字: 1
[Consumer] 接收到数字: 1
[Trigger] 已取消 ctsWith (WithCancellation)
[Iterator] 生成数字: 2
[Consumer] 接收到数字: 2
[Trigger] 已取消 ctsWith (WithCancellation)
[Iterator] 迭代器已退出。
[Iterator] 迭代器检测到取消。原因: WithCancellation
[Consumer] 枚举已被取消。
演示结束。
解释:
测试1:先取消方法参数 (ctsParam
)
- 在第3秒时,
ctsParam
被取消。 - 迭代器检测到
externalCancellationToken
被取消,抛出 OperationCanceledException
。 - 终止迭代过程,即使
ctsWith
还未被取消。
测试2:先取消 WithCancellation
(ctsWith
)
- 在第3秒时,
ctsWith
被取消。 - 迭代器检测到
cancellationToken
被取消,抛出 OperationCanceledException
。 - 终止迭代过程,即使
ctsParam
还未被取消。
关键点:
总结
通过上述示例,我们深入了解了 [EnumeratorCancellation]
的必要性及其在异步迭代器中的核心作用。简要回顾:
消除警告:使用 [EnumeratorCancellation]
可以消除 Visual Studio 提示的警告,确保取消请求能够正确传递给异步迭代器方法。
职责分离:它实现了生产者与消费者的职责分离,使生产者专注于数据生成,消费者控制取消逻辑,从而提升代码的可维护性和可复用性。
灵活的取消机制:即使同时传递多个取消令牌,只要任意一个被取消,迭代器就会终止,提供了灵活而强大的取消控制能力。
转自https://www.cnblogs.com/sdcb/p/18551982/why-we-need-enumerator-cancellation
该文章在 2024/11/19 8:44:55 编辑过