重写了节点主动中断功能,修改了运行环境持久化注册已有实例的逻辑。

This commit is contained in:
fengjiayi
2024-12-26 22:24:44 +08:00
parent 7a6f8c407b
commit 3a7a8483e8
24 changed files with 428 additions and 563 deletions

View File

@@ -69,6 +69,16 @@ namespace Serein.NodeFlow.Env
NodeMVVMManagement.RegisterModel(NodeControlType.GlobalData, typeof(SingleGlobalDataNode)); // 全局数据节点
NodeMVVMManagement.RegisterModel(NodeControlType.Script, typeof(SingleScriptNode)); // 脚本节点
#endregion
#region
PersistennceInstance.Add(typeof(FlowInterruptTool).FullName, new FlowInterruptTool()); // 缓存流程实例
PersistennceInstance.Add(typeof(IFlowEnvironment).FullName, (FlowEnvironment)this); // 缓存流程实例
PersistennceInstance.Add(typeof(ISereinIOC).FullName, this); // 缓存容器服务
PersistennceInstance.Add(typeof(UIContextOperation).FullName, uiContextOperation); // 缓存封装好的UI线程上下文
ReRegisterPersistennceInstance();
#endregion
}
#region
@@ -303,6 +313,11 @@ namespace Serein.NodeFlow.Env
/// </summary>
private readonly SereinIOC sereinIOC;
/// <summary>
/// 本地运行环境缓存的持久化实例
/// </summary>
private Dictionary<string, object> PersistennceInstance { get; } = new Dictionary<string, object>();
/// <summary>
/// 环境加载的节点集合
/// Node Guid - Node Model
@@ -376,34 +391,23 @@ namespace Serein.NodeFlow.Env
/// <returns></returns>
public async Task<bool> StartFlowAsync()
{
flowStarter = new FlowStarter();
flowStarter ??= new FlowStarter();
var nodes = NodeModels.Values.ToList();
List<MethodDetails> initMethods = this.FlowLibraryManagement.GetMdsOnFlowStart(NodeType.Init);
List<MethodDetails> loadMethods = this.FlowLibraryManagement.GetMdsOnFlowStart(NodeType.Loading);
List<MethodDetails> exitMethods = this.FlowLibraryManagement.GetMdsOnFlowStart(NodeType.Exit);
Dictionary<RegisterSequence, List<Type>> autoRegisterTypes = this.FlowLibraryManagement.GetaAutoRegisterType();
IOC.Reset(); // 开始运行时清空ioc中注册的实例
IOC.Register<IScriptFlowApi, ScriptFlowApi>(); // 注册脚本接口
IOC.CustomRegisterInstance(typeof(IFlowEnvironment).FullName, this); // 注册流程实例
if (this.UIContextOperation is not null)
{
// 注册封装好的UI线程上下文
IOC.CustomRegisterInstance(typeof(UIContextOperation).FullName, this.UIContextOperation, false);
}
_ = Task.Run(async () =>
{
await flowStarter.RunAsync(this, nodes, autoRegisterTypes, initMethods, loadMethods, exitMethods);
if (FlipFlopState == RunState.Completion)
{
await ExitFlowAsync(); // 未运行触发器时,才会调用结束方法
}
});
IOC.Reset();
await flowStarter.RunAsync(this, nodes, autoRegisterTypes, initMethods, loadMethods, exitMethods);
//_ = Task.Run(async () =>
//{
// //if (FlipFlopState == RunState.Completion)
// //{
// // await ExitFlowAsync(); // 未运行触发器时,才会调用结束方法
// //}
//});
return true;
@@ -466,6 +470,7 @@ namespace Serein.NodeFlow.Env
{
flowStarter?.Exit();
UIContextOperation?.Invoke(() => OnFlowRunComplete?.Invoke(new FlowEventArgs()));
IOC.Reset();
flowStarter = null;
GC.Collect();
return Task.FromResult(true);
@@ -829,7 +834,10 @@ namespace Serein.NodeFlow.Env
foreach (var toNodeGuid in item.toNodeGuids)
{
var toNodeModel = GuidToModel(toNodeGuid);
if (toNodeModel is null) continue;
if (toNodeModel is null) {
// 防御性代码,加载正常保存的项目文件不会进入这里
continue;
};
var isSuccessful = ConnectInvokeOfNode(fromNodeModel, toNodeModel, item.connectionType); // 加载时确定节点间的连接关系
}
}
@@ -1396,29 +1404,6 @@ namespace Serein.NodeFlow.Env
#region
/// <summary>
/// 添加或更新全局数据
/// </summary>
/// <param name="keyName">数据名称</param>
/// <param name="data">数据集</param>
/// <returns></returns>
public object AddOrUpdateGlobalData(string keyName, object data)
{
SereinEnv.AddOrUpdateFlowGlobalData(keyName, data);
return data;
}
/// <summary>
/// 获取全局数据
/// </summary>
/// <param name="keyName">数据名称</param>
/// <returns></returns>
public object? GetGlobalData(string keyName)
{
return SereinEnv.GetFlowGlobalData(keyName);
}
/// <summary>
/// 运行时加载
/// </summary>
@@ -1796,6 +1781,20 @@ namespace Serein.NodeFlow.Env
//}
/// <summary>
/// 向容器登记缓存的持久化实例
/// </summary>
private void ReRegisterPersistennceInstance()
{
lock (PersistennceInstance)
{
foreach (var kvp in PersistennceInstance)
{
IOC.RegisterPersistennceInstance(kvp.Key, kvp.Value);
}
}
}
#endregion
#region
@@ -1819,6 +1818,7 @@ namespace Serein.NodeFlow.Env
ISereinIOC ISereinIOC.Reset()
{
sereinIOC.Reset();
ReRegisterPersistennceInstance(); // 重置后重新登记
return this;
}
@@ -1866,10 +1866,17 @@ namespace Serein.NodeFlow.Env
}
bool ISereinIOC.CustomRegisterInstance(string key, object instance, bool needInjectProperty)
bool ISereinIOC.RegisterPersistennceInstance(string key, object instance)
{
return sereinIOC.CustomRegisterInstance(key, instance, needInjectProperty);
PersistennceInstance.TryAdd(key, instance); // 记录需要持久化的实例
return sereinIOC.RegisterPersistennceInstance(key, instance);
}
bool ISereinIOC.RegisterInstance(string key, object instance)
{
return sereinIOC.RegisterInstance(key, instance);
}
object ISereinIOC.Instantiate(Type type)
{

View File

@@ -544,27 +544,7 @@ namespace Serein.NodeFlow.Env
}
#region
/// <summary>
/// 添加或更新全局数据
/// </summary>
/// <param name="keyName">数据名称</param>
/// <param name="data">数据集</param>
/// <returns></returns>
public object AddOrUpdateGlobalData(string keyName, object data)
{
return currentFlowEnvironment.AddOrUpdateGlobalData(keyName, data);
}
/// <summary>
/// 获取全局数据
/// </summary>
/// <param name="keyName">数据名称</param>
/// <returns></returns>
public object GetGlobalData(string keyName)
{
return currentFlowEnvironment.GetGlobalData(keyName);
}
/// <summary>
/// 运行时加载
@@ -594,9 +574,14 @@ namespace Serein.NodeFlow.Env
return IOC.Build();
}
public bool CustomRegisterInstance(string key, object instance, bool needInjectProperty = true)
public bool RegisterPersistennceInstance(string key, object instance)
{
return IOC.CustomRegisterInstance(key, instance, needInjectProperty);
return IOC.RegisterPersistennceInstance(key, instance);
}
public bool RegisterInstance(string key, object instance)
{
return IOC.RegisterInstance(key, instance);
}
public object Get(Type type)

View File

@@ -1297,29 +1297,6 @@ namespace Serein.NodeFlow.Env
/// <summary>
/// 添加或更新全局数据
/// </summary>
/// <param name="keyName">数据名称</param>
/// <param name="data">数据集</param>
/// <returns></returns>
public object AddOrUpdateGlobalData(string keyName, object data)
{
this.WriteLine(InfoType.INFO, "远程环境尚未实现的接口AddOrUpdateGlobalData");
return null;
}
/// <summary>
/// 获取全局数据
/// </summary>
/// <param name="keyName">数据名称</param>
/// <returns></returns>
public object GetGlobalData(string keyName)
{
this.WriteLine(InfoType.INFO, "远程环境尚未实现的接口GetGlobalData");
return null;
}
#endregion

View File

@@ -0,0 +1,13 @@
using Serein.Library.Utils;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Subjects;
using System.Text;
using System.Threading.Tasks;
namespace Serein.NodeFlow
{
}

View File

@@ -15,9 +15,8 @@ namespace Serein.NodeFlow
/// </summary>
public class FlowStarter
{
/// <summary>
/// 控制全局触发器的结束
/// 控制所有全局触发器的结束
/// </summary>
private CancellationTokenSource? _flipFlopCts;
@@ -31,15 +30,6 @@ namespace Serein.NodeFlow
/// </summary>
private Func<Task>? ExitAction { get; set; }
private void CheckStartState()
{
if (IsStopStart)
{
throw new Exception("停止启动");
}
}
/// <summary>
/// 从选定的节点开始运行
/// </summary>
@@ -76,6 +66,10 @@ namespace Serein.NodeFlow
List<MethodDetails> exitMethods)
{
#region
env.IOC.Register<IScriptFlowApi, ScriptFlowApi>(); // 注册脚本接口
#endregion
env.FlowState = RunState.Running; // 开始运行
NodeModelBase? startNode = nodes.FirstOrDefault(node => node.IsStart);
if (startNode is null) {
@@ -116,13 +110,12 @@ namespace Serein.NodeFlow
thisRuningMds.AddRange(loadingMethods.Where(md => md?.ActingInstanceType is not null));
thisRuningMds.AddRange(exitMethods.Where(md => md?.ActingInstanceType is not null));
// .AddRange(initMethods).AddRange(loadingMethods).a
foreach (var nodeMd in thisRuningMds)
{
nodeMd.ActingInstance = null;
}
env.IOC.CustomRegisterInstance(typeof(ISereinIOC).FullName, env);
// 初始化ioc容器中的类型对象
foreach (var md in thisRuningMds)
{
@@ -206,12 +199,12 @@ namespace Serein.NodeFlow
#region 退
ExitAction = async () =>
{
env.IOC.Run<WebApiServer>(web => {
web?.Stop();
});
env.IOC.Run<WebSocketServer>(server => {
server?.Stop();
});
//env.IOC.Run<WebApiServer>(web => {
// web?.Stop();
//});
//env.IOC.Run<WebSocketServer>(server => {
// server?.Stop();
//});
foreach (MethodDetails? md in exitMethods)
{
@@ -230,7 +223,7 @@ namespace Serein.NodeFlow
TerminateAllGlobalFlipflop(); // 确保所有触发器不再运行
SereinEnv.ClearFlowGlobalData(); // 清空全局数据缓存
NativeDllHelper.FreeLibrarys(); // 卸载所有已加载的 Native Dll
env.IOC.Run<FlowInterruptTool>(fit => fit.CancelAllTrigger());// 取消所有中断
env.FlowState = RunState.Completion;
env.FlipFlopState = RunState.Completion;
@@ -249,7 +242,7 @@ namespace Serein.NodeFlow
env.FlipFlopState = RunState.Running;
// 如果存在需要启动的触发器,则开始启动
_flipFlopCts = new CancellationTokenSource();
env.IOC.CustomRegisterInstance(NodeStaticConfig.FlipFlopCtsName, _flipFlopCts,false);
env.IOC.RegisterInstance(NodeStaticConfig.FlipFlopCtsName, _flipFlopCts);
// 使用 TaskCompletionSource 创建未启动的触发器任务
var tasks = flipflopNodes.Select(async node =>
@@ -281,39 +274,6 @@ namespace Serein.NodeFlow
#endregion
}
#if false
public async Task TestScript(IFlowEnvironment environment)
{
SingleScriptNode singleScriptNode = new SingleScriptNode(environment);
string script =
"""
//let argData1 = flow.GetArgIndex(0); // 通过索引的方式,获取当前节点入参第一个参数
//let argData2 = flow.GetArgName("name"); // 通过名称的方式,获取当前节点入参的第二个参数
//let nodeData = flow.GetFlowData(); // 获取上一个节点的数据
//let state = flow.GetGlobalData("key name"); // 获取全局数据
//let result1 = flow.CallNode("node guid",); // 立即调用某个节点,获取数据
//let result2 = flow.CallFunc();
class User{
int ID;
string Name;
}
let user = new User();
user.ID = 12345;
user.Name = "张三";
return user;
""";
singleScriptNode.Script = script;
singleScriptNode.LoadScript();
var result = await singleScriptNode.ExecutingAsync(new DynamicContext(environment));
SereinEnv.WriteLine(InfoType.INFO, result?.ToString());
}
#endif
private ConcurrentDictionary<SingleFlipflopNode, CancellationTokenSource> dictGlobalFlipflop = [];
/// <summary>
@@ -403,7 +363,7 @@ namespace Serein.NodeFlow
context.SetPreviousNode(nextNodes[i], singleFlipFlopNode);
if (nextNodes[i].DebugSetting.IsInterrupt) // 执行触发前
{
await nextNodes[i].DebugSetting.GetInterruptTask();
await nextNodes[i].DebugSetting.GetInterruptTask.Invoke();
await Console.Out.WriteLineAsync($"[{nextNodes[i].MethodDetails.MethodName}]中断已取消,开始执行后继分支");
}
await nextNodes[i].StartFlowAsync(context); // 启动执行触发器后继分支的节点
@@ -421,7 +381,7 @@ namespace Serein.NodeFlow
context.SetPreviousNode(nextNodes[i], singleFlipFlopNode);
if (nextNodes[i].DebugSetting.IsInterrupt) // 执行触发前
{
await nextNodes[i].DebugSetting.GetInterruptTask();
await nextNodes[i].DebugSetting.GetInterruptTask.Invoke();
await Console.Out.WriteLineAsync($"[{nextNodes[i].MethodDetails.MethodName}]中断已取消,开始执行后继分支");
}
await nextNodes[i].StartFlowAsync(context); // 启动执行触发器后继分支的节点
@@ -448,12 +408,14 @@ namespace Serein.NodeFlow
}
/// <summary>
/// 结束流程
/// </summary>
public void Exit()
{
ExitAction?.Invoke();
}
}
}
}

View File

@@ -27,7 +27,7 @@ namespace Serein.NodeFlow.Model
if (DebugSetting.IsInterrupt) // 执行触发前
{
string guid = this.Guid.ToString();
await this.DebugSetting.GetInterruptTask();
await this.DebugSetting.GetInterruptTask.Invoke();
await Console.Out.WriteLineAsync($"[{this.MethodDetails.MethodName}]中断已取消,开始执行后继分支");
}
#endregion

View File

@@ -141,12 +141,23 @@ namespace Serein.NodeFlow.Model
public override async Task<object?> ExecutingAsync(IDynamicContext context)
{
var @params = await GetParametersAsync(context);
ScriptFlowApi.Context= context;
//dynamic obj = ((object[])@params[0])[0];
//try
//{
// SereinEnv.WriteLine(InfoType.INFO, "Dynamic Object Value " + obj.VarInfo);
//}
//catch (Exception ex)
//{
// SereinEnv.WriteLine(ex);
//}
//ScriptFlowApi.Context = context; // 并发破坏了数据状态
context.AddOrUpdate($"{context.Guid}_{this.Guid}_Params", @params[0]); // 后面再改
mainNode ??= new SereinScriptParser(Script).Parse();
IScriptInvokeContext scriptContext = new ScriptInvokeContext();
IScriptInvokeContext scriptContext = new ScriptInvokeContext(context);
var result = await ScriptInterpreter.InterpretAsync(scriptContext, mainNode); // 从入口节点执行
//SereinEnv.WriteLine(InfoType.INFO, "FlowContext Guid : " + context.Guid);
return result;
}

View File

@@ -24,11 +24,7 @@ namespace Serein.NodeFlow
/// </summary>
public NodeModelBase NodeModel { get; private set; }
public IDynamicContext? Context{ get; set; }
private string _paramsKey => $"{Context?.Guid}_{NodeModel.Guid}_Params";
/// <summary>
/// 创建流程脚本接口
@@ -46,9 +42,10 @@ namespace Serein.NodeFlow
throw new NotImplementedException();
}
public object? GetArgData(int index)
public object? GetArgData(IDynamicContext context, int index)
{
var obj = Context?.GetFlowData(_paramsKey);
var _paramsKey = $"{context?.Guid}_{NodeModel.Guid}_Params";
var obj = context?.GetFlowData(_paramsKey);
if (obj is object[] @params && index < @params.Length)
{
return @params[index];
@@ -57,9 +54,9 @@ namespace Serein.NodeFlow
}
public object? GetFlowData()
public object? GetFlowData(IDynamicContext context)
{
return Context?.GetFlowData(NodeModel.Guid);
return context?.GetFlowData(NodeModel.Guid);
}
public object? GetGlobalData(string keyName)