using Serein.Library.Api; using System; using System.Collections.Concurrent; using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; namespace Serein.Library.Utils { public class ChannelFlowTrigger : IFlowTrigger { // 使用并发字典管理每个枚举信号对应的 Channel private readonly ConcurrentDictionary>> _channels = new ConcurrentDictionary>>(); /// /// 获取或创建指定信号的 Channel /// /// 枚举信号标识符 /// 对应的 Channel private Channel> GetOrCreateChannel(TSignal signal) { return _channels.GetOrAdd(signal, _ => Channel.CreateUnbounded>()); } public async Task> WaitTriggerWithTimeoutAsync(TSignal signal, TimeSpan outTime) { var channel = GetOrCreateChannel(signal); var cts = new CancellationTokenSource(); // 异步任务:超时后自动触发信号 _ = Task.Run(async () => { try { await Task.Delay(outTime, cts.Token); var outResult = new TriggerResult() { Type = TriggerDescription.Overtime }; await channel.Writer.WriteAsync(outResult); } catch (OperationCanceledException) { // 超时任务被取消 } }, cts.Token); // 等待信号传入(超时或手动触发) var result = await WaitTriggerAsync(signal); // 返回一个可以超时触发的等待任务 return result; } public async Task> WaitTriggerAsync(TSignal signal) { var channel = GetOrCreateChannel(signal); // 等待信号传入(超时或手动触发) var result = await channel.Reader.ReadAsync(); if (result.Value is TResult data) { return new TriggerResult() { Value = data, Type = TriggerDescription.External, }; } else { return new TriggerResult() { Type = TriggerDescription.TypeInconsistency, }; } } public async Task InvokeTriggerAsync(TSignal signal, TResult value) { if (_channels.TryGetValue(signal, out var channel)) { // 手动触发信号 var result = new TriggerResult() { Type = TriggerDescription.External, Value = value }; await channel.Writer.WriteAsync(result); return true; } return false; } public void CancelAllTrigger() { foreach (var channel in _channels.Values) { channel.Writer.Complete(); } _channels.Clear(); } } }