using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Reactive.Subjects; using System.Text; using System.Threading.Tasks; namespace Serein.Library.Utils { /// /// 流程运行中断工具 /// public class FlowInterruptTool { // 使用并发字典管理每个信号对应的广播列表 private readonly ConcurrentDictionary> _subscribers = new ConcurrentDictionary>(); /// /// 获取或创建指定信号的 Subject(消息广播者) /// /// 枚举信号标识符 /// 对应的 Subject private Subject GetOrCreateSubject(string signal) { return _subscribers.GetOrAdd(signal, _ => new Subject()); } /// /// 订阅指定信号的消息 /// /// 枚举信号标识符 /// 订阅者 /// 取消订阅的句柄 private IDisposable Subscribe(string signal, Action action) { IObserver observer = new Observer(action); var subject = GetOrCreateSubject(signal); return subject.Subscribe(observer); // 返回取消订阅的句柄 } /// /// 等待触发 /// /// /// public async Task WaitTriggerAsync(string signal) { var taskCompletionSource = new TaskCompletionSource(); var subscription = Subscribe(signal, taskCompletionSource.SetResult); var result = await taskCompletionSource.Task; subscription.Dispose(); // 取消订阅 return result; } /// /// 手动触发信号,并广播给所有订阅者 /// /// 枚举信号标识符 /// 是否成功触发 public bool InvokeTrigger(string signal) { if (_subscribers.TryGetValue(signal, out var subject)) { subject.OnNext(true); // 广播给所有订阅者 subject.OnCompleted(); // 通知订阅结束 return true; } return false; } /// /// 取消所有任务 /// public void CancelAllTrigger() { foreach (var subject in _subscribers.Values) { subject.OnCompleted(); // 通知所有订阅者结束 } _subscribers.Clear(); } } }