using Newtonsoft.Json.Linq; using Serein.Library.Api; using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Reactive.Subjects; using System.Text; using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; namespace Serein.Library.Utils { /// /// 同步的单体消息触发器 /// /// public class SingleSyncFlowTrigger : IFlowTrigger { private readonly ConcurrentDictionary>>> _syncChannel = new ConcurrentDictionary>>>(); public void CancelAllTrigger() { foreach (var triggers in _syncChannel.Values) { foreach (var trigger in triggers) { trigger.SetCanceled(); } } } public Task InvokeTriggerAsync(TSingle signal, TResult value) { if(_syncChannel.TryGetValue(signal, out var tcss)) { var tcs = tcss.Dequeue(); var result = new TriggerResult { Type = TriggerDescription.External, Value = value, }; tcs.SetResult(result); return Task.FromResult(true); } return Task.FromResult(false); } public async Task> WaitTriggerAsync(TSingle signal) { if (!_syncChannel.TryGetValue(signal,out var tcss)) { tcss = new Queue>>(); _syncChannel.TryAdd(signal, tcss); } var taskCompletionSource = new TaskCompletionSource>(); tcss.Enqueue(taskCompletionSource); var result = await taskCompletionSource.Task; if (result.Value is TResult result2) { return new TriggerResult { Type = TriggerDescription.External, Value = result2, }; } else { return new TriggerResult { Type = TriggerDescription.TypeInconsistency, }; } } public async Task> WaitTriggerWithTimeoutAsync(TSingle signal, TimeSpan outTime) { if (!_syncChannel.TryGetValue(signal, out var tcss)) { tcss = new Queue>>(); _syncChannel.TryAdd(signal, tcss); } var taskCompletionSource = new TaskCompletionSource>(); tcss.Enqueue(taskCompletionSource); var cts = new CancellationTokenSource(); // 异步任务:超时后自动触发信号 _ = Task.Run(async () => { try { await Task.Delay(outTime, cts.Token); if (!cts.IsCancellationRequested) // 如果还没有被取消 { var outResult = new TriggerResult() { Type = TriggerDescription.Overtime }; taskCompletionSource.SetResult(outResult); // 超时触发 } } catch (OperationCanceledException) { // 超时任务被取消 } finally { cts?.Dispose(); // 确保 cts 被释放 } }, cts.Token); var result = await taskCompletionSource.Task; cts?.Cancel(); if (result.Value is TResult result2) { return new TriggerResult { Type = result.Type, Value = result2, }; } else { return new TriggerResult { Type = result.Type, }; } } } }