重构了运行逻辑。上下文使用对象池封装,节点方法调用时间传递CancellationTokenSource用来中止任务

This commit is contained in:
fengjiayi
2025-03-20 22:54:10 +08:00
parent 2168c5ec66
commit 9941f24c5d
27 changed files with 830 additions and 621 deletions

View File

@@ -4,6 +4,7 @@ using Serein.Library.Utils;
using Serein.Library.Utils.SereinExpression;
using Serein.NodeFlow.Model;
using Serein.NodeFlow.Tool;
using System;
using System.Reactive;
using System.Reflection;
using System.Text;
@@ -32,7 +33,7 @@ namespace Serein.NodeFlow.Env
{
this.sereinIOC = new SereinIOC();
this.IsGlobalInterrupt = false;
this.flowStarter = null;
this.flowTaskManagement = null;
this.sereinIOC.OnIOCMembersChanged += e =>
{
if (OperatingSystem.IsWindows())
@@ -357,9 +358,9 @@ namespace Serein.NodeFlow.Env
}
/// <summary>
/// 流程启动器每次运行时都会重新new一个
/// 流程任务管理
/// </summary>
private FlowStarter? flowStarter;
private FlowWorkManagement? flowTaskManagement;
#endregion
@@ -381,29 +382,44 @@ namespace Serein.NodeFlow.Env
}
/// <summary>
/// 异步运行
/// </summary>
/// <returns></returns>
public async Task<bool> StartFlowAsync()
{
flowStarter ??= new FlowStarter();
var nodes = NodeModels.Values.ToList();
List<MethodDetails> initMethods = this.FlowLibraryManagement.GetMdsOnFlowStart(NodeType.Init);
List<MethodDetails> loadMethods = this.FlowLibraryManagement.GetMdsOnFlowStart(NodeType.Loading);
List<MethodDetails> exitMethods = this.FlowLibraryManagement.GetMdsOnFlowStart(NodeType.Exit);
Dictionary<RegisterSequence, List<Type>> autoRegisterTypes = this.FlowLibraryManagement.GetaAutoRegisterType();
IOC.Reset();
await flowStarter.RunAsync(this, nodes, autoRegisterTypes, initMethods, loadMethods, exitMethods);
//_ = Task.Run(async () =>
//{
// //if (FlipFlopState == RunState.Completion)
// //{
// // await ExitFlowAsync(); // 未运行触发器时,才会调用结束方法
// //}
//});
IOC.Register<IScriptFlowApi, ScriptFlowApi>(); // 注册脚本接口
var flowTaskOptions = new FlowTaskLibrary
{
Environment = this,
FlowContextPool = new ObjectPool<IDynamicContext>(() => new DynamicContext(this)),
Nodes = NodeModels.Values.ToList(),
AutoRegisterTypes = this.FlowLibraryManagement.GetaAutoRegisterType(),
InitMds = this.FlowLibraryManagement.GetMdsOnFlowStart(NodeType.Init),
LoadMds = this.FlowLibraryManagement.GetMdsOnFlowStart(NodeType.Loading),
ExitMds = this.FlowLibraryManagement.GetMdsOnFlowStart(NodeType.Exit),
};
flowTaskManagement = new FlowWorkManagement(flowTaskOptions);
var cts = new CancellationTokenSource();
try
{
var t =await flowTaskManagement.RunAsync(cts.Token);
}
catch (Exception ex)
{
SereinEnv.WriteLine(ex);
}
finally
{
SereinEnv.WriteLine(InfoType.INFO, $"流程运行完毕{Environment.NewLine}"); ;
}
flowTaskOptions = null;
return true;
@@ -417,7 +433,7 @@ namespace Serein.NodeFlow.Env
public async Task<bool> StartAsyncInSelectNode(string startNodeGuid)
{
if (flowStarter is null)
if (flowTaskManagement is null)
{
SereinEnv.WriteLine(InfoType.ERROR, "没有启动流程,无法运行单个节点");
return false;
@@ -435,7 +451,7 @@ namespace Serein.NodeFlow.Env
//SerinExpressionEvaluator.Evaluate(setExp, nodeModel,out _);
//var getExpResult2 = SerinExpressionEvaluator.Evaluate(getExp, nodeModel, out _);
await flowStarter.StartFlowInSelectNodeAsync(this, nodeModel);
await flowTaskManagement.StartFlowInSelectNodeAsync(this, nodeModel);
return true;
}
else
@@ -454,7 +470,9 @@ namespace Serein.NodeFlow.Env
object result = new Unit();
if (this.NodeModels.TryGetValue(nodeGuid, out var model))
{
result = await model.ExecutingAsync(context);
CancellationTokenSource cts = new CancellationTokenSource();
result = await model.ExecutingAsync(context, cts.Token);
cts?.Cancel();
}
return result;
}
@@ -464,10 +482,10 @@ namespace Serein.NodeFlow.Env
/// </summary>
public Task<bool> ExitFlowAsync()
{
flowStarter?.Exit();
flowTaskManagement?.Exit();
UIContextOperation?.Invoke(() => OnFlowRunComplete?.Invoke(new FlowEventArgs()));
IOC.Reset();
flowStarter = null;
flowTaskManagement = null;
GC.Collect();
return Task.FromResult(true);
}
@@ -480,12 +498,12 @@ namespace Serein.NodeFlow.Env
{
var nodeModel = GuidToModel(nodeGuid);
if (nodeModel is null) return;
if (flowStarter is not null && nodeModel is SingleFlipflopNode flipflopNode) // 子节点为触发器
if (flowTaskManagement is not null && nodeModel is SingleFlipflopNode flipflopNode) // 子节点为触发器
{
if (FlowState != RunState.Completion
&& flipflopNode.NotExitPreviousNode()) // 正在运行,且该触发器没有上游节点
{
_ = flowStarter.RunGlobalFlipflopAsync(this, flipflopNode);// 被父节点移除连接关系的子节点若为触发器,且无上级节点,则当前流程正在运行,则加载到运行环境中
_ = flowTaskManagement.RunGlobalFlipflopAsync(this, flipflopNode);// 被父节点移除连接关系的子节点若为触发器,且无上级节点,则当前流程正在运行,则加载到运行环境中
}
}
@@ -499,9 +517,9 @@ namespace Serein.NodeFlow.Env
{
var nodeModel = GuidToModel(nodeGuid);
if (nodeModel is null) return;
if (flowStarter is not null && nodeModel is SingleFlipflopNode flipflopNode) // 子节点为触发器
if (flowTaskManagement is not null && nodeModel is SingleFlipflopNode flipflopNode) // 子节点为触发器
{
flowStarter.TerminateGlobalFlipflopRuning(flipflopNode);
flowTaskManagement.TerminateGlobalFlipflopRuning(flipflopNode);
}
}
@@ -793,7 +811,7 @@ namespace Serein.NodeFlow.Env
}
#endregion
var nodeModel = FlowFunc.CreateNode(this, controlType, methodDetails); // 加载项目时创建节点
var nodeModel = FlowNodeExtension.CreateNode(this, controlType, methodDetails); // 加载项目时创建节点
if (nodeModel is null)
{
nodeInfo.Guid = string.Empty;
@@ -926,7 +944,7 @@ namespace Serein.NodeFlow.Env
NodeModelBase? nodeModel;
if (methodDetailsInfo is null)
{
nodeModel = FlowFunc.CreateNode(this, nodeControlType); // 加载基础节点
nodeModel = FlowNodeExtension.CreateNode(this, nodeControlType); // 加载基础节点
}
else
{
@@ -934,7 +952,7 @@ namespace Serein.NodeFlow.Env
methodDetailsInfo.MethodName,
out var methodDetails))
{
nodeModel = FlowFunc.CreateNode(this, nodeControlType, methodDetails); // 一般的加载节点方法
nodeModel = FlowNodeExtension.CreateNode(this, nodeControlType, methodDetails); // 一般的加载节点方法
}
else
{
@@ -1032,7 +1050,7 @@ namespace Serein.NodeFlow.Env
if (remoteNode is SingleFlipflopNode flipflopNode)
{
flowStarter?.TerminateGlobalFlipflopRuning(flipflopNode); // 假设被移除的是全局触发器,尝试从启动器移除
flowTaskManagement?.TerminateGlobalFlipflopRuning(flipflopNode); // 假设被移除的是全局触发器,尝试从启动器移除
}
remoteNode.Remove(); // 调用节点的移除方法
@@ -1689,7 +1707,7 @@ namespace Serein.NodeFlow.Env
if (toNode is SingleFlipflopNode flipflopNode)
{
flowStarter?.TerminateGlobalFlipflopRuning(flipflopNode); // 假设被连接的是全局触发器,尝试移除
flowTaskManagement?.TerminateGlobalFlipflopRuning(flipflopNode); // 假设被连接的是全局触发器,尝试移除
}
var isPass = false;
@@ -1896,10 +1914,10 @@ namespace Serein.NodeFlow.Env
{
return (T)sereinIOC.Get(typeof(T));
}
T ISereinIOC.Get<T>(string key)
{
return sereinIOC.Get<T>(key);
}
//T ISereinIOC.Get<T>(string key)
//{
// return sereinIOC.Get<T>(key);
//}
bool ISereinIOC.RegisterPersistennceInstance(string key, object instance)
@@ -1908,10 +1926,10 @@ namespace Serein.NodeFlow.Env
return sereinIOC.RegisterPersistennceInstance(key, instance);
}
bool ISereinIOC.RegisterInstance(string key, object instance)
{
return sereinIOC.RegisterInstance(key, instance);
}
//bool ISereinIOC.RegisterInstance(string key, object instance)
//{
// return sereinIOC.RegisterInstance(key, instance);
//}
object ISereinIOC.Instantiate(Type type)

View File

@@ -102,7 +102,7 @@ namespace Serein.NodeFlow.Env
/// </summary>
public InfoClass InfoClass { get => currentFlowEnvironment.InfoClass; set => currentFlowEnvironment.InfoClass = value; }
public RunState FlowState { get => currentFlowEnvironment.FlowState; set => currentFlowEnvironment.FlowState = value; }
public RunState FlipFlopState { get => currentFlowEnvironment.FlipFlopState; set => currentFlowEnvironment.FlipFlopState = value; }
//public RunState FlipFlopState { get => currentFlowEnvironment.FlipFlopState; set => currentFlowEnvironment.FlipFlopState = value; }
public event LoadDllHandler OnDllLoad {
add { currentFlowEnvironmentEvent.OnDllLoad += value; }
@@ -607,10 +607,10 @@ namespace Serein.NodeFlow.Env
return IOC.RegisterPersistennceInstance(key, instance);
}
public bool RegisterInstance(string key, object instance)
{
return IOC.RegisterInstance(key, instance);
}
//public bool RegisterInstance(string key, object instance)
//{
// return IOC.RegisterInstance(key, instance);
//}
public object Get(Type type)
{
@@ -622,10 +622,10 @@ namespace Serein.NodeFlow.Env
return IOC.Get<T>();
}
public T Get<T>(string key)
{
return IOC.Get<T>(key);
}
//public T Get<T>(string key)
//{
// return IOC.Get<T>(key);
//}
public object Instantiate(Type type)
{

View File

@@ -784,7 +784,7 @@ namespace Serein.NodeFlow.Env
}
//MethodDetailss.TryGetValue(methodDetailsInfo.MethodName, out var methodDetails);// 加载项目时尝试获取方法信息
var nodeModel = FlowFunc.CreateNode(this, nodeControlType, methodDetails); // 远程环境下加载节点
var nodeModel = FlowNodeExtension.CreateNode(this, nodeControlType, methodDetails); // 远程环境下加载节点
nodeModel.LoadInfo(nodeInfo);
TryAddNode(nodeModel);
IsLoadingNode = false;
@@ -1098,7 +1098,7 @@ namespace Serein.NodeFlow.Env
}
#endregion
var nodeModel = FlowFunc.CreateNode(this, controlType, methodDetails); // 加载项目时创建节点
var nodeModel = FlowNodeExtension.CreateNode(this, controlType, methodDetails); // 加载项目时创建节点
if (nodeModel is null)
{
nodeInfo.Guid = string.Empty;

View File

@@ -12,7 +12,7 @@ namespace Serein.NodeFlow
/// <summary>
/// 流程环境需要的扩展方法
/// </summary>
public static class FlowFunc
public static class FlowNodeExtension
{
/// <summary>
/// 判断是否为基础节点
@@ -71,37 +71,6 @@ namespace Serein.NodeFlow
///// <summary>
///// 从节点信息读取节点类型
///// </summary>
///// <param name="nodeInfo"></param>
///// <returns></returns>
///// <exception cref="NotImplementedException"></exception>
//public static NodeControlType GetNodeControlType(NodeInfo nodeInfo)
//{
// if(!EnumHelper.TryConvertEnum<NodeControlType>(nodeInfo.Type, out var controlType))
// {
// return NodeControlType.None;
// }
// return controlType;
// // 创建控件实例
// //NodeControlType controlType = nodeInfo.Type switch
// //{
// // $"{NodeStaticConfig.NodeSpaceName}.{nameof(SingleActionNode)}" => NodeControlType.Action,// 动作节点控件
// // $"{NodeStaticConfig.NodeSpaceName}.{nameof(SingleFlipflopNode)}" => NodeControlType.Flipflop, // 触发器节点控件
// // $"{NodeStaticConfig.NodeSpaceName}.{nameof(SingleConditionNode)}" => NodeControlType.ExpCondition,// 条件表达式控件
// // $"{NodeStaticConfig.NodeSpaceName}.{nameof(SingleExpOpNode)}" => NodeControlType.ExpOp, // 操作表达式控件
// // $"{NodeStaticConfig.NodeSpaceName}.{nameof(CompositeConditionNode)}" => NodeControlType.ConditionRegion, // 条件区域控件
// // $"{NodeStaticConfig.NodeSpaceName}.{nameof(SingleGlobalDataNode)}" => NodeControlType.GlobalData, // 数据节点
// // $"{NodeStaticConfig.NodeSpaceName}.{nameof(SingleScriptNode)}" => NodeControlType.Script, // 数据节点
// // _ => NodeControlType.None,
// //};
// //return controlType;
//}
/// <summary>
/// 程序集封装依赖
/// </summary>

View File

@@ -1,413 +0,0 @@
using Serein.Library;
using Serein.Library.Api;
using Serein.Library.Utils;
using Serein.NodeFlow.Model;
using Serein.NodeFlow.Tool;
using System.Collections.Concurrent;
namespace Serein.NodeFlow
{
/// <summary>
/// 流程启动器
/// </summary>
public class FlowStarter
{
/// <summary>
/// 控制所有全局触发器的结束
/// </summary>
private CancellationTokenSource? _flipFlopCts;
/// <summary>
/// 是否停止启动
/// </summary>
private bool IsStopStart = false;
/// <summary>
/// 结束运行时需要执行的方法
/// </summary>
private Func<Task>? ExitAction { get; set; }
/// <summary>
/// 从选定的节点开始运行
/// </summary>
/// <param name="env"></param>
/// <param name="startNode"></param>
/// <returns></returns>
public async Task StartFlowInSelectNodeAsync(IFlowEnvironment env, NodeModelBase startNode)
{
IDynamicContext context;
context = new Serein.Library.DynamicContext(env); // 从起始节点启动流程时创建上下文
await startNode.StartFlowAsync(context); // 开始运行时从选定节点开始运行
context.Exit();
}
/// <summary>
/// 开始运行(需要准备好方法信息)
/// </summary>
/// <param name="env">运行环境</param>
/// <param name="nodes">环境中已加载的所有节点</param>
/// <param name="initMethods">初始化方法</param>
/// <param name="loadingMethods">加载时方法</param>
/// <param name="exitMethods">结束时方法</param>
/// <returns></returns>
public async Task RunAsync(IFlowEnvironment env,
List<NodeModelBase> nodes,
Dictionary<RegisterSequence, List<Type>> autoRegisterTypes,
List<MethodDetails> initMethods,
List<MethodDetails> loadingMethods,
List<MethodDetails> exitMethods)
{
#region
env.IOC.Register<IScriptFlowApi, ScriptFlowApi>(); // 注册脚本接口
#endregion
env.FlowState = RunState.Running; // 开始运行
NodeModelBase? startNode = nodes.FirstOrDefault(node => node.IsStart);
if (startNode is null) {
env.FlowState = RunState.Completion; // 不存在起点,退出流程
return;
}
#region
List<MethodDetails> runNodeMd;
List<SingleFlipflopNode> flipflopNodes;
flipflopNodes = nodes.Where(it => it.MethodDetails?.MethodDynamicType == NodeType.Flipflop && it.IsStart == false)
.Select(it => (SingleFlipflopNode)it)
.Where(node => node.DebugSetting.IsEnable && node is SingleFlipflopNode flipflopNode && flipflopNode.NotExitPreviousNode())
.ToList();// 获取需要再运行开始之前启动的触发器节点
runNodeMd = nodes.Select(item => item.MethodDetails).ToList(); // 获取环境中所有节点的方法信息
#endregion
#region
// 判断使用哪一种流程上下文
IDynamicContext Context = new Serein.Library.DynamicContext(env); // 从起始节点启动流程时创建上下文
#endregion
#region Ioc容器
// 清除节点使用的对象,筛选出需要初始化的方法描述
var thisRuningMds = new List<MethodDetails>();
thisRuningMds.AddRange(runNodeMd.Where(md => md?.ActingInstanceType is not null));
thisRuningMds.AddRange(initMethods.Where(md => md?.ActingInstanceType is not null));
thisRuningMds.AddRange(loadingMethods.Where(md => md?.ActingInstanceType is not null));
thisRuningMds.AddRange(exitMethods.Where(md => md?.ActingInstanceType is not null));
foreach (var nodeMd in thisRuningMds)
{
nodeMd.ActingInstance = null;
}
// 初始化ioc容器中的类型对象
foreach (var md in thisRuningMds)
{
if (md.ActingInstanceType != null)
{
env.IOC.Register(md.ActingInstanceType);
}
else
{
await Console.Out.WriteLineAsync($"{md.MethodName} - 没有类型声明");
IsStopStart = true;
}
}
if (IsStopStart) return;// 检查所有dll节点是否存在类型
env.IOC.Build(); // 流程启动前的初始化
foreach (var md in thisRuningMds)
{
md.ActingInstance = env.IOC.Get(md.ActingInstanceType);
if(md.ActingInstance is null)
{
await Console.Out.WriteLineAsync($"{md.MethodName} - 无法获取类型[{md.ActingInstanceType}]的实例");
IsStopStart = true;
}
}
if (IsStopStart)
{
return;// 调用节点初始化后检查状态
}
#endregion
#region IOC容器
if (autoRegisterTypes.TryGetValue(RegisterSequence.FlowInit, out var flowInitTypes))
{
foreach (var type in flowInitTypes)
{
env.IOC.Register(type); // 初始化前注册
}
}
Context.Env.IOC.Build(); // 绑定初始化时注册的类型
//object?[]? args = [Context];
foreach (var md in initMethods) // 初始化
{
if (!env.TryGetDelegateDetails(md.AssemblyName, md.MethodName, out var dd)) // 流程运行初始化
{
throw new Exception("不存在对应委托");
}
await dd.InvokeAsync(md.ActingInstance, [Context]);
//((Func<object, object[], object>)dd.EmitDelegate).Invoke(md.ActingInstance, [Context]);
}
Context.Env.IOC.Build(); // 绑定初始化时注册的类型
if(autoRegisterTypes.TryGetValue(RegisterSequence.FlowLoading,out var flowLoadingTypes))
{
foreach (var type in flowLoadingTypes)
{
env.IOC.Register(type); // 初始化前注册
}
}
Context.Env.IOC.Build(); // 绑定初始化时注册的类型
foreach (var md in loadingMethods) // 加载
{
//object?[]? data = [md.ActingInstance, args];
//md.MethodDelegate.DynamicInvoke(data);
if (!env.TryGetDelegateDetails(md.AssemblyName, md.MethodName, out var dd)) // 流程运行正在加载
{
throw new Exception("不存在对应委托");
}
await dd.InvokeAsync(md.ActingInstance, [Context]);
//((Action<object, object?[]?>)del).Invoke(md.ActingInstance, [Context]);
//((Func<object, object[], object>)dd.EmitDelegate).Invoke(md.ActingInstance, [Context]);
}
Context.Env.IOC.Build(); // 预防有人在加载时才注册类型,再绑定一次
#endregion
#region 退
ExitAction = async () =>
{
//env.IOC.Run<WebApiServer>(web => {
// web?.Stop();
//});
//env.IOC.Run<WebSocketServer>(server => {
// server?.Stop();
//});
foreach (MethodDetails? md in exitMethods)
{
if (!env.TryGetDelegateDetails(md.AssemblyName, md.MethodName, out var dd)) // 流程运行退出执行
{
throw new Exception("不存在对应委托");
}
await dd.InvokeAsync(md.ActingInstance, [Context]);
}
if (_flipFlopCts != null && !_flipFlopCts.IsCancellationRequested)
{
_flipFlopCts?.Cancel();
_flipFlopCts?.Dispose();
} // 通知所有流程上下文停止运行
TerminateAllGlobalFlipflop(); // 确保所有触发器不再运行
SereinEnv.ClearFlowGlobalData(); // 清空全局数据缓存
NativeDllHelper.FreeLibrarys(); // 卸载所有已加载的 Native Dll
env.IOC.Run<FlowInterruptTool>(fit => fit.CancelAllTrigger());// 取消所有中断
env.FlowState = RunState.Completion;
env.FlipFlopState = RunState.Completion;
};
#endregion
#region
try
{
//await TestScript(env);
await startNode.StartFlowAsync(Context); // 开始运行时从起始节点开始运行
if (flipflopNodes.Count > 0)
{
env.FlipFlopState = RunState.Running;
// 如果存在需要启动的触发器,则开始启动
_flipFlopCts = new CancellationTokenSource();
env.IOC.RegisterInstance(NodeStaticConfig.FlipFlopCtsName, _flipFlopCts);
// 使用 TaskCompletionSource 创建未启动的触发器任务
var tasks = flipflopNodes.Select(async node =>
{
await RunGlobalFlipflopAsync(env,node); // 启动流程时启动全局触发器
}).ToArray();
_ = Task.WhenAll(tasks);
}
// 等待结束
if(env.FlipFlopState == RunState.Running && _flipFlopCts is not null)
{
while (!_flipFlopCts.IsCancellationRequested)
{
await Task.Delay(100);
}
}
}
catch (Exception ex)
{
await Console.Out.WriteLineAsync(ex.ToString());
}
finally
{
env.FlowState = RunState.Completion;
SereinEnv.WriteLine(InfoType.INFO, $"流程运行完毕{Environment.NewLine}");;
}
#endregion
}
private ConcurrentDictionary<SingleFlipflopNode, CancellationTokenSource> dictGlobalFlipflop = [];
/// <summary>
/// 尝试添加全局触发器
/// </summary>
/// <param name="singleFlipFlopNode"></param>
/// <param name="env"></param>
public async Task RunGlobalFlipflopAsync(IFlowEnvironment env, SingleFlipflopNode singleFlipFlopNode)
{
if (dictGlobalFlipflop.TryAdd(singleFlipFlopNode, new CancellationTokenSource()))
{
singleFlipFlopNode.MethodDetails.ActingInstance ??= env.IOC.Get(singleFlipFlopNode.MethodDetails.ActingInstanceType);
await FlipflopExecuteAsync(env, singleFlipFlopNode, dictGlobalFlipflop[singleFlipFlopNode]);
}
}
/// <summary>
/// 尝试移除全局触发器
/// </summary>
/// <param name="singleFlipFlopNode"></param>
public void TerminateGlobalFlipflopRuning(SingleFlipflopNode singleFlipFlopNode)
{
if (dictGlobalFlipflop.TryRemove(singleFlipFlopNode, out var cts))
{
if (!cts.IsCancellationRequested)
{
cts.Cancel();
}
cts.Dispose();
}
}
/// <summary>
/// 终结所有全局触发器
/// </summary>
private void TerminateAllGlobalFlipflop()
{
foreach ((var node, var cts) in dictGlobalFlipflop)
{
if (!cts.IsCancellationRequested)
{
cts.Cancel();
}
cts.Dispose();
}
dictGlobalFlipflop.Clear();
}
/// <summary>
/// 启动全局触发器
/// </summary>
/// <param name="env">流程运行全局环境</param>
/// <param name="singleFlipFlopNode">需要全局监听信号的触发器</param>
/// <returns></returns>
private async Task FlipflopExecuteAsync(IFlowEnvironment env,
SingleFlipflopNode singleFlipFlopNode,
CancellationTokenSource cts)
{
if(_flipFlopCts is null)
{
SereinEnv.WriteLine(InfoType.INFO, "流程尚未启动flowStarter尚未创建无法启动该节点");
return;
}
while (!_flipFlopCts.IsCancellationRequested && !cts.IsCancellationRequested)
{
try
{
var context = new Library.DynamicContext(env); // 启动全局触发器时新建上下文
var newFlowData = await singleFlipFlopNode.ExecutingAsync(context); // 获取触发器等待Task
context.AddOrUpdate(singleFlipFlopNode.Guid, newFlowData);
await NodeModelBase.RefreshFlowDataAndExpInterrupt(context, singleFlipFlopNode, newFlowData); // 全局触发器触发后刷新该触发器的节点数据
if (context.NextOrientation == ConnectionInvokeType.None)
{
continue;
}
_ = Task.Run(async () => {
var nextNodes = singleFlipFlopNode.SuccessorNodes[context.NextOrientation];
for (int i = nextNodes.Count - 1; i >= 0 && !_flipFlopCts.IsCancellationRequested; i--)
{
// 筛选出启用的节点
if (!nextNodes[i].DebugSetting.IsEnable)
{
continue ;
}
context.SetPreviousNode(nextNodes[i], singleFlipFlopNode);
if (nextNodes[i].DebugSetting.IsInterrupt) // 执行触发前
{
await nextNodes[i].DebugSetting.GetInterruptTask.Invoke();
await Console.Out.WriteLineAsync($"[{nextNodes[i].MethodDetails.MethodName}]中断已取消,开始执行后继分支");
}
await nextNodes[i].StartFlowAsync(context); // 启动执行触发器后继分支的节点
}
nextNodes = singleFlipFlopNode.SuccessorNodes[ConnectionInvokeType.Upstream];
for (int i = nextNodes.Count - 1; i >= 0 && !_flipFlopCts.IsCancellationRequested; i--)
{
// 筛选出启用的节点
if (!nextNodes[i].DebugSetting.IsEnable)
{
continue;
}
context.SetPreviousNode(nextNodes[i], singleFlipFlopNode);
if (nextNodes[i].DebugSetting.IsInterrupt) // 执行触发前
{
await nextNodes[i].DebugSetting.GetInterruptTask.Invoke();
await Console.Out.WriteLineAsync($"[{nextNodes[i].MethodDetails.MethodName}]中断已取消,开始执行后继分支");
}
await nextNodes[i].StartFlowAsync(context); // 启动执行触发器后继分支的节点
}
context.Exit();
});
}
catch (FlipflopException ex)
{
SereinEnv.WriteLine(InfoType.ERROR, $"触发器[{singleFlipFlopNode.MethodDetails.MethodName}]因非预期异常终止。"+ex.Message);
if (ex.Type == FlipflopException.CancelClass.CancelFlow)
{
break;
}
}
catch (Exception ex)
{
SereinEnv.WriteLine(InfoType.ERROR, $"触发器[{singleFlipFlopNode.Guid}]异常。"+ ex.Message);
await Task.Delay(1000);
}
}
}
/// <summary>
/// 结束流程
/// </summary>
public void Exit()
{
ExitAction?.Invoke();
}
}
}

View File

@@ -0,0 +1,52 @@
using Microsoft.Extensions.ObjectPool;
using Serein.Library;
using Serein.Library.Api;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace Serein.NodeFlow
{
public class FlowTaskLibrary()
{
/// <summary>
/// 流程运行环境
/// </summary>
public IFlowEnvironment Environment { get; set; }// = environment;
/// <summary>
/// 表示运行环境状态
/// </summary>
public CancellationTokenSource CancellationTokenSource { get; } = new CancellationTokenSource();
/// <summary>
/// 上下文线程池
/// </summary>
public Serein.Library.Utils.ObjectPool<IDynamicContext> FlowContextPool { get; set; }
/// <summary>
/// 当前任务加载的所有节点
/// </summary>
public List<NodeModelBase> Nodes { get; set; }// = nodes;
/// <summary>
/// 需要注册的类型
/// </summary>
public Dictionary<RegisterSequence, List<Type>> AutoRegisterTypes { get; set; } //= autoRegisterTypes;
/// <summary>
/// 初始化时需要的方法
/// </summary>
public List<MethodDetails> InitMds { get; set; }// = initMds;
/// <summary>
/// 加载时需要的方法
/// </summary>
public List<MethodDetails> LoadMds { get; set; }// = loadMds;
/// <summary>
/// 退出时需要调用的方法
/// </summary>
public List<MethodDetails> ExitMds { get; set; } //= exitMds;
}
}

View File

@@ -0,0 +1,386 @@
using Microsoft.CodeAnalysis;
using Serein.Library;
using Serein.Library.Api;
using Serein.Library.Utils;
using Serein.NodeFlow.Model;
using Serein.NodeFlow.Tool;
using System;
using System.Collections.Concurrent;
using System.Xml.Linq;
namespace Serein.NodeFlow
{
/// <summary>
/// 流程任务管理
/// </summary>
public class FlowWorkManagement
{
/// <summary>
/// 触发器对应的Cts
/// </summary>
private ConcurrentDictionary<SingleFlipflopNode, CancellationTokenSource> dictGlobalFlipflop = [];
/// <summary>
/// 结束运行时需要执行的方法
/// </summary>
private Func<Task>? ExitAction { get; set; }
/// <summary>
/// 初始化选项
/// </summary>
public FlowTaskLibrary WorkLibrary { get; }
/// <summary>
/// 流程任务管理
/// </summary>
/// <param name="library"></param>
public FlowWorkManagement(FlowTaskLibrary library)
{
WorkLibrary = library;
}
/// <summary>
/// 初始化啊
/// </summary>
/// <returns></returns>
public async Task<bool> RunAsync(CancellationToken token)
{
NodeModelBase? startNode = WorkLibrary.Nodes.FirstOrDefault(node => node.IsStart);
if (startNode is null)
{
return false;
}
if (!RegisterAllType())
{
return false;
};
var initState = await TryInit();
if (!initState)
{
return false;
};
var loadState = await TryLoadAsync();
if (!loadState)
{
return false;
};
var task = CallFlipflopNode();
await CallStartNode(startNode);
await task;
await CallExit();
return true;
}
#region
private bool RegisterAllType()
{
var env = WorkLibrary.Environment;
var nodeMds = WorkLibrary.Nodes.Select(item => item.MethodDetails).ToList(); // 获取环境中所有节点的方法信息
var allMds = new List<MethodDetails>();
allMds.AddRange(nodeMds.Where(md => md?.ActingInstanceType is not null));
allMds.AddRange(WorkLibrary.InitMds.Where(md => md?.ActingInstanceType is not null));
allMds.AddRange(WorkLibrary.LoadMds.Where(md => md?.ActingInstanceType is not null));
allMds.AddRange(WorkLibrary.ExitMds.Where(md => md?.ActingInstanceType is not null));
var isSuccessful = true;
foreach (var md in allMds)
{
if (md.ActingInstanceType != null)
{
env.IOC.Register(md.ActingInstanceType);
}
else
{
SereinEnv.WriteLine(InfoType.ERROR, "{md.MethodName} - 没有类型声明");
isSuccessful = false ;
}
}
env.IOC.Build(); // 绑定初始化时注册的类型
foreach (var md in allMds)
{
var instance = env.IOC.Get(md.ActingInstanceType);
if (instance is null)
{
SereinEnv.WriteLine(InfoType.ERROR, $"{md.MethodName} - 无法获取类型[{md.ActingInstanceType}]的实例");
isSuccessful = false;
}
}
return isSuccessful;
}
private async Task<bool> TryInit()
{
var env = WorkLibrary.Environment;
var initMds = WorkLibrary.InitMds;
var pool = WorkLibrary.FlowContextPool;
var ioc = WorkLibrary.Environment.IOC;
foreach (var md in initMds) // 初始化
{
if (!env.TryGetDelegateDetails(md.AssemblyName, md.MethodName, out var dd)) // 流程运行初始化
{
throw new Exception("不存在对应委托");
}
var context = pool.Allocate();
var instance = ioc.Get(md.ActingInstanceType);
await dd.InvokeAsync(instance, [context]);
context.Reset();
pool.Free(context);
}
env.IOC.Build(); // 绑定初始化时注册的类型
var isSuccessful = true;
return isSuccessful;
}
private async Task<bool> TryLoadAsync()
{
var env = WorkLibrary.Environment;
var loadMds = WorkLibrary.LoadMds;
var pool = WorkLibrary.FlowContextPool;
var ioc = WorkLibrary.Environment.IOC;
foreach (var md in loadMds) // 加载时
{
if (!env.TryGetDelegateDetails(md.AssemblyName, md.MethodName, out var dd)) // 流程运行初始化
{
throw new Exception("不存在对应委托");
}
var context = pool.Allocate();
var instance = ioc.Get(md.ActingInstanceType);
await dd.InvokeAsync(instance, [context]);
context.Reset();
pool.Free(context);
}
env.IOC.Build(); // 绑定初始化时注册的类型
var isSuccessful = true;
return isSuccessful;
}
private async Task<bool> CallExit()
{
var env = WorkLibrary.Environment;
var mds = WorkLibrary.ExitMds;
var pool = WorkLibrary.FlowContextPool;
var ioc = WorkLibrary.Environment.IOC;
ioc.Run<FlowInterruptTool>(fit => fit.CancelAllTrigger());// 取消所有中断
foreach (var md in mds) // 结束时
{
if (!env.TryGetDelegateDetails(md.AssemblyName, md.MethodName, out var dd)) // 流程运行初始化
{
throw new Exception("不存在对应委托");
}
var context = pool.Allocate();
var instance = ioc.Get(md.ActingInstanceType);
await dd.InvokeAsync(instance, [context]);
context.Reset();
pool.Free(context);
}
TerminateAllGlobalFlipflop(); // 确保所有触发器不再运行
SereinEnv.ClearFlowGlobalData(); // 清空全局数据缓存
NativeDllHelper.FreeLibrarys(); // 卸载所有已加载的 Native Dll
var isSuccessful = true;
return isSuccessful;
}
private Task CallFlipflopNode()
{
var env = WorkLibrary.Environment;
var flipflopNodes = WorkLibrary.Nodes.Where(item => item is SingleFlipflopNode node
&& !node.IsStart
&& node.DebugSetting.IsEnable
&& node.NotExitPreviousNode())
.Select(item => (SingleFlipflopNode)item);
//.ToList();// 获取需要再运行开始之前启动的触发器节点
if (flipflopNodes.Count() > 0)
{
var tasks = flipflopNodes.Select(async node =>
{
await RunGlobalFlipflopAsync(env, node); // 启动流程时启动全局触发器
});
Task.WhenAll(tasks);
}
return Task.CompletedTask;
}
private async Task CallStartNode(NodeModelBase startNode)
{
var pool = WorkLibrary.FlowContextPool;
var token = WorkLibrary.CancellationTokenSource.Token;
var context = pool.Allocate();
await startNode.StartFlowAsync(context, token);
context.Exit();
pool.Free(context);
return;
}
#endregion
/// <summary>
/// 从选定的节点开始运行
/// </summary>
/// <param name="env"></param>
/// <param name="startNode"></param>
/// <returns></returns>
public async Task StartFlowInSelectNodeAsync(IFlowEnvironment env, NodeModelBase startNode)
{
var pool = WorkLibrary.FlowContextPool;
var context = pool.Allocate();
var token = WorkLibrary.CancellationTokenSource.Token;
await startNode.StartFlowAsync(context, token); // 开始运行时从选定节点开始运行
context.Reset();
pool.Free(context);
}
/// <summary>
/// 尝试添加全局触发器
/// </summary>
/// <param name="singleFlipFlopNode"></param>
/// <param name="env"></param>
public async Task RunGlobalFlipflopAsync(IFlowEnvironment env, SingleFlipflopNode singleFlipFlopNode)
{
if (dictGlobalFlipflop.TryAdd(singleFlipFlopNode, new CancellationTokenSource()))
{
var cts = dictGlobalFlipflop[singleFlipFlopNode];
await FlipflopExecuteAsync(singleFlipFlopNode, cts.Token);
}
}
/// <summary>
/// 尝试移除全局触发器
/// </summary>
/// <param name="singleFlipFlopNode"></param>
public void TerminateGlobalFlipflopRuning(SingleFlipflopNode singleFlipFlopNode)
{
if (dictGlobalFlipflop.TryRemove(singleFlipFlopNode, out var cts))
{
if (!cts.IsCancellationRequested)
{
cts.Cancel();
}
cts.Dispose();
}
}
/// <summary>
/// 终结所有全局触发器
/// </summary>
private void TerminateAllGlobalFlipflop()
{
foreach ((var node, var cts) in dictGlobalFlipflop)
{
if (!cts.IsCancellationRequested)
{
cts.Cancel();
}
cts.Dispose();
}
dictGlobalFlipflop.Clear();
}
/// <summary>
/// 启动全局触发器
/// </summary>
/// <param name="singleFlipFlopNode">需要全局监听信号的触发器</param>
/// <param name="singleToken">单个触发器持有的</param>
/// <returns></returns>
private async Task FlipflopExecuteAsync(SingleFlipflopNode singleFlipFlopNode,
CancellationToken singleToken)
{
var pool = WorkLibrary.FlowContextPool;
while (!singleToken.IsCancellationRequested && !singleToken.IsCancellationRequested)
{
try
{
var context = pool.Allocate(); // 启动全局触发器时新建上下文
var newFlowData = await singleFlipFlopNode.ExecutingAsync(context, singleToken); // 获取触发器等待Task
context.AddOrUpdate(singleFlipFlopNode.Guid, newFlowData);
if (context.NextOrientation == ConnectionInvokeType.None)
{
continue;
}
_ = Task.Run(() => CallSubsequentNode(singleFlipFlopNode, singleToken, pool, context)); // 重新启动触发器
}
catch (FlipflopException ex)
{
SereinEnv.WriteLine(InfoType.ERROR, $"触发器[{singleFlipFlopNode.MethodDetails.MethodName}]因非预期异常终止。"+ex.Message);
if (ex.Type == FlipflopException.CancelClass.CancelFlow)
{
break;
}
}
catch (Exception ex)
{
SereinEnv.WriteLine(InfoType.ERROR, $"触发器[{singleFlipFlopNode.Guid}]异常。"+ ex.Message);
await Task.Delay(100);
}
}
}
private static async Task? CallSubsequentNode(SingleFlipflopNode singleFlipFlopNode, CancellationToken singleToken, ObjectPool<IDynamicContext> pool, IDynamicContext context)
{
var flowState = context.NextOrientation; // 记录一下流程状态
var nextNodes = singleFlipFlopNode.SuccessorNodes[ConnectionInvokeType.Upstream]; // 优先调用上游分支
for (int i = nextNodes.Count - 1; i >= 0 && !singleToken.IsCancellationRequested; i--)
{
// 筛选出启用的节点
if (!nextNodes[i].DebugSetting.IsEnable)
{
continue;
}
context.SetPreviousNode(nextNodes[i], singleFlipFlopNode); // 设置调用关系
if (nextNodes[i].DebugSetting.IsInterrupt) // 执行触发前检查终端
{
await nextNodes[i].DebugSetting.GetInterruptTask.Invoke();
await Console.Out.WriteLineAsync($"[{nextNodes[i].MethodDetails.MethodName}]中断已取消,开始执行后继分支");
}
await nextNodes[i].StartFlowAsync(context, singleToken); // 启动执行触发器后继分支的节点
}
nextNodes = singleFlipFlopNode.SuccessorNodes[flowState]; // 调用对应分支
for (int i = nextNodes.Count - 1; i >= 0 && !singleToken.IsCancellationRequested; i--)
{
// 筛选出启用的节点
if (!nextNodes[i].DebugSetting.IsEnable)
{
continue;
}
context.SetPreviousNode(nextNodes[i], singleFlipFlopNode);
if (nextNodes[i].DebugSetting.IsInterrupt) // 执行触发前
{
await nextNodes[i].DebugSetting.GetInterruptTask.Invoke();
await Console.Out.WriteLineAsync($"[{nextNodes[i].MethodDetails.MethodName}]中断已取消,开始执行后继分支");
}
await nextNodes[i].StartFlowAsync(context, singleToken); // 启动执行触发器后继分支的节点
}
context.Reset();
pool.Free(context);
}
/// <summary>
/// 结束流程
/// </summary>
public void Exit()
{
ExitAction?.Invoke();
}
}
}

View File

@@ -36,14 +36,18 @@ namespace Serein.NodeFlow.Model
/// </summary>
/// <param name="context"></param>
/// <returns></returns>
public override async Task<object?> ExecutingAsync(IDynamicContext context)
public override async Task<object?> ExecutingAsync(IDynamicContext context, CancellationToken token)
{
try
{
// 条件区域中遍历每个条件节点
foreach (SingleConditionNode? node in ConditionNodes)
{
var state = await node.ExecutingAsync(context);
if (token.IsCancellationRequested)
{
return null;
}
var state = await node.ExecutingAsync(context, token);
if (context.NextOrientation != ConnectionInvokeType.IsSucceed)
{
// 如果条件不通过,立刻推出循环

View File

@@ -110,8 +110,9 @@ namespace Serein.NodeFlow.Model
/// </summary>
/// <param name="context"></param>
/// <returns></returns>
public override async Task<object?> ExecutingAsync(IDynamicContext context)
public override async Task<object?> ExecutingAsync(IDynamicContext context, CancellationToken token)
{
if (token.IsCancellationRequested) return null;
// 接收上一节点参数or自定义参数内容
object? parameter;
object? result = null;

View File

@@ -1,4 +1,5 @@
using Serein.Library;
using Newtonsoft.Json.Linq;
using Serein.Library;
using Serein.Library.Api;
using Serein.Library.Utils;
using Serein.Library.Utils.SereinExpression;
@@ -91,8 +92,10 @@ namespace Serein.NodeFlow.Model
}
public override async Task<object?> ExecutingAsync(IDynamicContext context)
public override async Task<object?> ExecutingAsync(IDynamicContext context, CancellationToken token)
{
if(token.IsCancellationRequested) return null;
object? parameter = null;// context.TransmissionData(this); // 表达式节点使用上一节点数据
var pd = MethodDetails.ParameterDetailss[0];

View File

@@ -1,6 +1,7 @@
using Serein.Library.Api;
using Serein.Library;
using Serein.Library.Utils;
using System;
namespace Serein.NodeFlow.Model
{
@@ -21,7 +22,7 @@ namespace Serein.NodeFlow.Model
/// <param name="context"></param>
/// <returns></returns>
/// <exception cref="Exception"></exception>
public override async Task<object?> ExecutingAsync(IDynamicContext context)
public override async Task<object?> ExecutingAsync(IDynamicContext context, CancellationToken token)
{
#region
if (DebugSetting.IsInterrupt) // 执行触发前
@@ -37,13 +38,18 @@ namespace Serein.NodeFlow.Model
{
throw new Exception("不存在对应委托");
}
object instance = md.ActingInstance;
var args = await GetParametersAsync(context);
var instance = context.Env.IOC.Get(md.ActingInstanceType);
await dd.InvokeAsync(instance, [context]);
var args = await GetParametersAsync(context, token);
// 因为这里会返回不确定的泛型 IFlipflopContext<TRsult>
// 而我们只需要获取到 State 和 Value返回的数据
// 所以使用 dynamic 类型接收
dynamic dynamicFlipflopContext = await dd.InvokeAsync(md.ActingInstance, args);
if (token.IsCancellationRequested)
{
return null;
}
dynamic dynamicFlipflopContext = await dd.InvokeAsync(instance, args);
FlipflopStateType flipflopStateType = dynamicFlipflopContext.State;
context.NextOrientation = flipflopStateType.ToContentType();

View File

@@ -115,8 +115,9 @@ namespace Serein.NodeFlow.Model
/// </summary>
/// <param name="context"></param>
/// <returns></returns>
public override async Task<object> ExecutingAsync(IDynamicContext context)
public override async Task<object?> ExecutingAsync(IDynamicContext context, CancellationToken token)
{
if (token.IsCancellationRequested) return null;
if (string.IsNullOrEmpty(KeyName))
{
context.NextOrientation = ConnectionInvokeType.IsError;

View File

@@ -165,13 +165,15 @@ namespace Serein.NodeFlow.Model
/// </summary>
/// <param name="context"></param>
/// <returns></returns>
public override async Task<object?> ExecutingAsync(IDynamicContext context)
public override async Task<object?> ExecutingAsync(IDynamicContext context, CancellationToken token)
{
var @params = await GetParametersAsync(context);
if(token.IsCancellationRequested) return null;
var @params = await GetParametersAsync(context, token);
if(token.IsCancellationRequested) return null;
//context.AddOrUpdate($"{context.Guid}_{this.Guid}_Params", @params[0]); // 后面再改
ReloadScript();// 每次都重新解析
ReloadScript();// 每次都重新解析
IScriptInvokeContext scriptContext = new ScriptInvokeContext(context);
@@ -193,6 +195,9 @@ namespace Serein.NodeFlow.Model
var envEvent = (IFlowEnvironmentEvent)context.Env;
envEvent.OnFlowRunComplete += onFlowStop; // 防止运行后台流程
if (token.IsCancellationRequested) return null;
var result = await ScriptInterpreter.InterpretAsync(scriptContext, mainNode); // 从入口节点执行
envEvent.OnFlowRunComplete -= onFlowStop;
//SereinEnv.WriteLine(InfoType.INFO, "FlowContext Guid : " + context.Guid);

View File

@@ -15,12 +15,13 @@ namespace Serein.NodeFlow.Model
{
}
public override async Task<object> ExecutingAsync(IDynamicContext context)
public override async Task<object?> ExecutingAsync(IDynamicContext context, CancellationToken token)
{
if (token.IsCancellationRequested) return null;
if(Adapter is null)
{
var result = await base.ExecutingAsync(context);
var result = await base.ExecutingAsync(context, token);
if (result is IEmbeddedContent adapter)
{
this.Adapter = adapter;
@@ -39,7 +40,7 @@ namespace Serein.NodeFlow.Model
iflowContorl.OnExecuting(data);
}
return Task.FromResult<object?>(null);
return null;
}
}
}