using System; using System.Collections.Concurrent; using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; namespace Serein.Library.Utils { public class ChannelFlowInterrupt { /// /// 中断取消类型 /// public enum CancelType { Manual, Overtime } // 使用并发字典管理每个信号对应的 Channel private readonly ConcurrentDictionary> _channels = new ConcurrentDictionary>(); /// /// 创建信号并指定超时时间,到期后自动触发(异步方法) /// /// 信号标识符 /// 超时时间 /// 等待任务 public async Task CreateChannelWithTimeoutAsync(string signal, TimeSpan outTime) { var channel = GetOrCreateChannel(signal); var cts = new CancellationTokenSource(); // 异步任务:超时后自动触发信号 _ = Task.Run(async () => { try { await Task.Delay(outTime, cts.Token); await channel.Writer.WriteAsync(CancelType.Overtime); } catch (OperationCanceledException) { // 超时任务被取消 } }, cts.Token); // 等待信号传入(超时或手动触发) var result = await channel.Reader.ReadAsync(); return result; } /// /// 创建信号并指定超时时间,到期后自动触发(同步阻塞方法) /// /// 信号标识符 /// 超时时间 public CancelType CreateChannelWithTimeoutSync(string signal, TimeSpan timeout) { var channel = GetOrCreateChannel(signal); var cts = new CancellationTokenSource(); CancellationToken token = cts.Token; // 异步任务:超时后自动触发信号 _ = Task.Run(async () => { try { await Task.Delay(timeout, token); await channel.Writer.WriteAsync(CancelType.Overtime); } catch (OperationCanceledException) { // 任务被取消 } }); // 同步阻塞直到信号触发或超时 var result = channel.Reader.ReadAsync().AsTask().GetAwaiter().GetResult(); return result; } /// /// 触发信号 /// /// 信号字符串 /// 是否成功触发 public bool TriggerSignal(string signal) { if (_channels.TryGetValue(signal, out var channel)) { // 手动触发信号 channel.Writer.TryWrite(CancelType.Manual); return true; } return false; } /// /// 取消所有任务 /// public void CancelAllTasks() { foreach (var channel in _channels.Values) { channel.Writer.Complete(); } _channels.Clear(); } /// /// 获取或创建指定信号的 Channel /// /// 信号字符串 /// 对应的 Channel private Channel GetOrCreateChannel(string signal) { return _channels.GetOrAdd(signal, _ => Channel.CreateUnbounded()); } } }