优化了中断功能,增加了节点变量的查看。

This commit is contained in:
fengjiayi
2024-09-22 14:10:13 +08:00
parent 3537a49784
commit c930c870a6
26 changed files with 1686 additions and 494 deletions

View File

@@ -81,7 +81,7 @@ namespace Serein.NodeFlow.Base
/// <summary>
/// 当前传递数据(执行了节点对应的方法,才会存在值)
/// </summary>
public object? FlowData { get; set; } = null;
protected object? FlowData { get; set; } = null;
}

View File

@@ -8,6 +8,7 @@ using Serein.NodeFlow.Tool.SereinExpression;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Net.Http.Headers;
using System.Text;
using System.Threading.Tasks;
@@ -34,7 +35,6 @@ namespace Serein.NodeFlow.Base
public void Interrupt()
{
this.DebugSetting.InterruptClass = InterruptClass.Branch;
this.DebugSetting.IsInterrupt = true;
}
/// <summary>
@@ -43,7 +43,6 @@ namespace Serein.NodeFlow.Base
public void CancelInterrupt()
{
this.DebugSetting.InterruptClass = InterruptClass.None;
this.DebugSetting.IsInterrupt = false;
CancelInterruptCallback?.Invoke();
CancelInterruptCallback = null;
}
@@ -106,93 +105,64 @@ namespace Serein.NodeFlow.Base
/// <returns></returns>
public async Task StartExecute(IDynamicContext context)
{
CancellationTokenSource cts = null;
try
Stack<NodeModelBase> stack = new Stack<NodeModelBase>();
stack.Push(this);
var cts = context.SereinIoc.Get<CancellationTokenSource>(FlowStarter.FlipFlopCtsName);
while (stack.Count > 0 && !cts.IsCancellationRequested) // 循环中直到栈为空才会退出循环
{
cts = context.SereinIoc.Get<CancellationTokenSource>(FlowStarter.FlipFlopCtsName);
// 从栈中弹出一个节点作为当前节点进行处理
var currentNode = stack.Pop();
Stack<NodeModelBase> stack = new Stack<NodeModelBase>();
stack.Push(this);
while (stack.Count > 0 && !cts.IsCancellationRequested) // 循环中直到栈为空才会退出循环
// 设置方法执行的对象
if (currentNode.MethodDetails?.ActingInstance == null && currentNode.MethodDetails?.ActingInstanceType is not null)
{
// 从栈中弹出一个节点作为当前节点进行处理
var currentNode = stack.Pop();
// 设置方法执行的对象
if (currentNode.MethodDetails?.ActingInstance == null && currentNode.MethodDetails?.ActingInstanceType is not null)
{
currentNode.MethodDetails.ActingInstance ??= context.SereinIoc.GetOrRegisterInstantiate(currentNode.MethodDetails.ActingInstanceType);
}
#region
// 首先执行上游分支
#if false
var upstreamNodes = currentNode.SuccessorNodes[ConnectionType.Upstream];
for (int i = upstreamNodes.Count - 1; i >= 0; i--)
{
if (upstreamNodes[i].DebugSetting.IsEnable) // 排除未启用的上游节点
{
upstreamNodes[i].PreviousNode = currentNode;
await upstreamNodes[i].StartExecute(context); // 执行流程节点的上游分支
}
}
#endif
currentNode.FlowData = await currentNode.ExecutingAsync(context); // 流程中正常执行
#endregion
#region
if (currentNode.NextOrientation == ConnectionType.None) break; // 不再执行
// 选择后继分支
var nextNodes = currentNode.SuccessorNodes[currentNode.NextOrientation];
// 将下一个节点集合中的所有节点逆序推入栈中
for (int i = nextNodes.Count - 1; i >= 0; i--)
{
// 排除未启用的节点
if (nextNodes[i].DebugSetting.IsEnable)
{
nextNodes[i].PreviousNode = currentNode;
stack.Push(nextNodes[i]);
}
}
#endregion
currentNode.MethodDetails.ActingInstance ??= context.SereinIoc.GetOrRegisterInstantiate(currentNode.MethodDetails.ActingInstanceType);
}
}
finally
{
cts?.Dispose();
}
}
public static bool TryCreateInterruptTask(IDynamicContext context, NodeModelBase currentNode, out Task<CancelType>? task)
{
bool haveTask;
Console.WriteLine($"[{currentNode.MethodDetails.MethodName}]在当前分支中断");
#region
if (currentNode.DebugSetting.InterruptClass == InterruptClass.None)
{
haveTask = false;
task = null;
currentNode.DebugSetting.IsInterrupt = false; // 纠正设置
}
else if (currentNode.DebugSetting.InterruptClass == InterruptClass.Branch) // 中断当前分支
{
currentNode.DebugSetting.IsInterrupt = true;
haveTask = true;
task = context.FlowEnvironment.ChannelFlowInterrupt.CreateChannelWithTimeoutAsync(currentNode.Guid, TimeSpan.FromSeconds(60 * 30)); // 中断30分钟
}
else
{
haveTask = false;
task = null;
}
// 首先执行上游分支
var upstreamNodes = currentNode.SuccessorNodes[ConnectionType.Upstream];
for (int i = upstreamNodes.Count - 1; i >= 0; i--)
{
if (upstreamNodes[i].DebugSetting.IsEnable) // 排除未启用的上游节点
{
upstreamNodes[i].PreviousNode = currentNode;
var upNewFlowData = await upstreamNodes[i].ExecutingAsync(context); // 执行流程节点的上游分支
await FlowRefreshDataOrInterrupt(context, upstreamNodes[i], upNewFlowData); // 执行上游分支后刷新上游节点数据
}
}
return haveTask;
// 执行当前节点
var newFlowData = await currentNode.ExecutingAsync(context);
await FlowRefreshDataOrInterrupt(context, currentNode, newFlowData); // 执行当前节点后刷新数据
#endregion
#region
if (cts == null || cts.IsCancellationRequested || currentNode.NextOrientation == ConnectionType.None)
{
// 不再执行
break;
}
// 选择后继分支
var nextNodes = currentNode.SuccessorNodes[currentNode.NextOrientation];
// 将下一个节点集合中的所有节点逆序推入栈中
for (int i = nextNodes.Count - 1; i >= 0; i--)
{
// 排除未启用的节点
if (nextNodes[i].DebugSetting.IsEnable)
{
nextNodes[i].PreviousNode = currentNode;
stack.Push(nextNodes[i]);
}
}
#endregion
}
}
/// <summary>
@@ -203,12 +173,11 @@ namespace Serein.NodeFlow.Base
public virtual async Task<object?> ExecutingAsync(IDynamicContext context)
{
#region
if (DebugSetting.IsInterrupt && TryCreateInterruptTask(context, this, out Task<CancelType>? task)) // 执行节点前检查中断
if (DebugSetting.InterruptClass != InterruptClass.None && TryCreateInterruptTask(context, this, out Task<CancelType>? task)) // 执行节点前检查中断
{
string guid = this.Guid.ToString();
this.CancelInterruptCallback ??= () => context.FlowEnvironment.ChannelFlowInterrupt.TriggerSignal(guid);
var cancelType = await task!;
task?.ToString();
await Console.Out.WriteLineAsync($"[{this.MethodDetails.MethodName}]中断已{(cancelType == CancelType.Manual ? "" : "")},开始执行后继分支");
}
@@ -223,7 +192,7 @@ namespace Serein.NodeFlow.Base
try
{
// Action/Func([方法作用的实例],[可能的参数值],[可能的返回值])
object?[]? parameters = GetParameters(context, md);
object?[]? parameters = GetParameters(context,this, md);
object? result = (haveParameter, haveResult) switch
{
(false, false) => Execution((Action<object>)del, instance), // 调用节点方法返回null
@@ -232,33 +201,6 @@ namespace Serein.NodeFlow.Base
(true, true) => Execution((Func<object, object?[]?, object?>)del, instance, parameters), // 调用节点方法,获取入参参数,返回方法忏悔类型
};
//object?[]? parameters;
//object? result = null;
//if ( haveParameter )
//{
// var data = GetParameters(context, md);
// if (data[0] is Int32 count && count > 1)
// {
// }
// parameters = [instance, data];
//}
//else
//{
// parameters = [instance];
//}
//if (haveResult)
//{
// result = del.DynamicInvoke(parameters);
//}
//else
//{
// del.DynamicInvoke(parameters);
//}
NextOrientation = ConnectionType.IsSucceed;
return result;
}
@@ -270,13 +212,6 @@ namespace Serein.NodeFlow.Base
}
}
/// <summary>
/// 执行等待触发器的方法
/// </summary>
/// <param name="context"></param>
/// <returns>节点传回数据对象</returns>
/// <exception cref="RuningException"></exception>
#region
@@ -304,7 +239,7 @@ namespace Serein.NodeFlow.Base
/// <summary>
/// 获取对应的参数数组
/// </summary>
public object?[]? GetParameters(IDynamicContext context, MethodDetails md)
public static object?[]? GetParameters(IDynamicContext context, NodeModelBase nodeModel, MethodDetails md)
{
// 用正确的大小初始化参数数组
if (md.ExplicitDatas.Length == 0)
@@ -313,7 +248,7 @@ namespace Serein.NodeFlow.Base
}
object?[]? parameters = new object[md.ExplicitDatas.Length];
var flowData = PreviousNode?.FlowData; // 当前传递的数据
var flowData = nodeModel.PreviousNode?.FlowData; // 当前传递的数据
var previousDataType = flowData?.GetType();
for (int i = 0; i < parameters.Length; i++)
@@ -349,7 +284,7 @@ namespace Serein.NodeFlow.Base
//Type t when t == previousDataType => inputParameter, // 上下文
Type t when t == typeof(IDynamicContext) => context, // 上下文
Type t when t == typeof(MethodDetails) => md, // 节点方法描述
Type t when t == typeof(NodeModelBase) => this, // 节点实体类
Type t when t == typeof(NodeModelBase) => nodeModel, // 节点实体类
Type t when t == typeof(Guid) => new Guid(inputParameter?.ToString()),
Type t when t == typeof(DateTime) => DateTime.Parse(inputParameter?.ToString()),
Type t when t == typeof(string) => inputParameter?.ToString(),
@@ -387,6 +322,88 @@ namespace Serein.NodeFlow.Base
return parameters;
}
/// <summary>
/// 更新节点数据,并检查监视表达式
/// </summary>
/// <param name="newData"></param>
public static async Task FlowRefreshDataOrInterrupt(IDynamicContext context , NodeModelBase nodeModel, object? newData = null)
{
string guid = nodeModel.Guid;
if (newData is not null)
{
// 判断是否存在表达式
bool isInterrupt = false;
// 判断监视表达式
for (int i = 0; i < nodeModel.DebugSetting.InterruptExpression.Count && !isInterrupt; i++)
{
string? exp = nodeModel.DebugSetting.InterruptExpression[i];
isInterrupt = SereinConditionParser.To(newData, exp);
}
if (isInterrupt) // 触发中断
{
nodeModel.Interrupt();
if(TryCreateInterruptTask(context, nodeModel, out Task<CancelType>? task))
{
nodeModel.CancelInterruptCallback ??= () => context.FlowEnvironment.ChannelFlowInterrupt.TriggerSignal(guid);
var cancelType = await task!;
await Console.Out.WriteLineAsync($"[{nodeModel.MethodDetails.MethodName}]中断已{(cancelType == CancelType.Manual ? "" : "")},开始执行后继分支");
}
}
}
nodeModel.FlowData = newData;
// 节点是否监视了数据,如果是,调用环境接口触发其相关事件。
if (nodeModel.DebugSetting.IsMonitorFlowData)
{
context.FlowEnvironment.FlowDataUpdateNotification(guid, newData);
}
}
public static bool TryCreateInterruptTask(IDynamicContext context, NodeModelBase currentNode, out Task<CancelType>? task)
{
bool haveTask;
Console.WriteLine($"[{currentNode.MethodDetails.MethodName}]在当前分支中断");
if (currentNode.DebugSetting.InterruptClass == InterruptClass.None)
{
haveTask = false;
task = null;
}
else if (currentNode.DebugSetting.InterruptClass == InterruptClass.Branch) // 中断当前分支
{
haveTask = true;
task = context.FlowEnvironment.ChannelFlowInterrupt.CreateChannelWithTimeoutAsync(currentNode.Guid, TimeSpan.FromSeconds(60 * 30)); // 中断30分钟
}
else
{
haveTask = false;
task = null;
}
return haveTask;
}
/// <summary>
/// 释放对象
/// </summary>
public void ReleaseFlowData()
{
if (typeof(IDisposable).IsAssignableFrom(FlowData?.GetType()) && FlowData is IDisposable disposable)
{
disposable?.Dispose();
}
this.FlowData = null;
}
/// <summary>
/// 获取节点数据
/// </summary>
/// <returns></returns>
public object? GetFlowData()
{
return this.FlowData ;
}
#endregion
}

View File

@@ -1,4 +1,5 @@
using Newtonsoft.Json.Linq;
using Serein.Library.Api;
using Serein.Library.Attributes;
using Serein.Library.Entity;
@@ -90,10 +91,25 @@ namespace Serein.NodeFlow
public event StartNodeChangeHandler OnStartNodeChange;
/// <summary>
/// 流程运行完成时间
/// 流程运行完成事件
/// </summary>
public event FlowRunCompleteHandler OnFlowRunComplete;
/// <summary>
/// 被监视的对象改变事件
/// </summary>
public event MonitorObjectChangeHandler OnMonitorObjectChange;
/// <summary>
/// 节点中断状态改变事件
/// </summary>
public event NodeInterruptStateChangeHandler OnNodeInterruptStateChange;
/// <summary>
/// 节点触发了中断
/// </summary>
public event NodeInterruptTriggerHandler OnNodeInterruptTrigger;
#endregion
@@ -171,23 +187,25 @@ namespace Serein.NodeFlow
{
ChannelFlowInterrupt?.CancelAllTasks();
flowStarter = new FlowStarter();
List<SingleFlipflopNode> flipflopNodes = Nodes.Values.Where(it => it.MethodDetails?.MethodDynamicType == NodeType.Flipflop && it.IsStart == false)
.Select(it => (SingleFlipflopNode)it)
.Where(node => node is SingleFlipflopNode flipflopNode && flipflopNode.NotExitPreviousNode())
.ToList();// 获取需要再运行开始之前启动的触发器节点
var runMethodDetailess = Nodes.Values.Select(item => item.MethodDetails).ToList(); // 获取环境中所有节点的方法信息
var initMethods = MethodDetailss.Where(it => it.MethodDynamicType == NodeType.Init).ToList();
var loadingMethods = MethodDetailss.Where(it => it.MethodDynamicType == NodeType.Loading).ToList();
var exitMethods = MethodDetailss.Where(it => it.MethodDynamicType == NodeType.Exit).ToList();
var nodes = Nodes.Values.ToList();
List<MethodDetails> initMethods;
List<MethodDetails> loadingMethods;
List<MethodDetails> exitMethods;
initMethods = MethodDetailss.Where(it => it.MethodDynamicType == NodeType.Init).ToList();
loadingMethods = MethodDetailss.Where(it => it.MethodDynamicType == NodeType.Loading).ToList();
exitMethods = MethodDetailss.Where(it => it.MethodDynamicType == NodeType.Exit).ToList();
await flowStarter.RunAsync(this, nodes, initMethods, loadingMethods, exitMethods);
await flowStarter.RunAsync(StartNode,
this,
runMethodDetailess,
initMethods,
loadingMethods,
exitMethods,
flipflopNodes);
//await flowStarter.RunAsync(StartNode,
// this,
// runMethodDetailess,
// initMethods,
// loadingMethods,
// exitMethods,
// flipflopNodes);
if(flowStarter?.FlipFlopState == RunState.NoStart)
{
@@ -195,19 +213,24 @@ namespace Serein.NodeFlow
}
flowStarter = null;
}
/// <summary>
/// 退出
/// </summary>
public void Exit()
{
foreach (var node in Nodes.Values)
{
if (typeof(IDisposable).IsAssignableFrom(node?.FlowData?.GetType()) && node.FlowData is IDisposable disposable)
{
disposable?.Dispose();
}
node!.FlowData = null;
}
ChannelFlowInterrupt?.CancelAllTasks();
flowStarter?.Exit();
foreach (var node in Nodes.Values)
{
if(node is not null)
{
node.ReleaseFlowData(); // 退出时释放对象计数
}
}
OnFlowRunComplete?.Invoke(new FlowEventArgs());
GC.Collect();
@@ -443,20 +466,12 @@ namespace Serein.NodeFlow
/// <exception cref="NotImplementedException"></exception>
public void RemoteNode(string nodeGuid)
{
if (!Nodes.TryGetValue(nodeGuid, out NodeModelBase? remoteNode))
{
return;
}
if (remoteNode is null)
{
return;
}
NodeModelBase remoteNode = GuidToModel(nodeGuid);
if (remoteNode.IsStart)
{
return;
}
// 遍历所有父节点,从那些父节点中的子节点集合移除该节点
foreach (var pnc in remoteNode.PreviousNodes)
{
@@ -507,14 +522,8 @@ namespace Serein.NodeFlow
public void ConnectNode(string fromNodeGuid, string toNodeGuid, ConnectionType connectionType)
{
// 获取起始节点与目标节点
if (!Nodes.TryGetValue(fromNodeGuid, out NodeModelBase? fromNode) || !Nodes.TryGetValue(toNodeGuid, out NodeModelBase? toNode))
{
return;
}
if (fromNode is null || toNode is null)
{
return;
}
NodeModelBase fromNode = GuidToModel(fromNodeGuid);
NodeModelBase toNode = GuidToModel(toNodeGuid);
// 开始连接
ConnectNode(fromNode, toNode, connectionType); // 外部调用连接方法
@@ -530,14 +539,8 @@ namespace Serein.NodeFlow
public void RemoteConnect(string fromNodeGuid, string toNodeGuid, ConnectionType connectionType)
{
// 获取起始节点与目标节点
if (!Nodes.TryGetValue(fromNodeGuid, out NodeModelBase? fromNode) || !Nodes.TryGetValue(toNodeGuid, out NodeModelBase? toNode))
{
return;
}
if (fromNode is null || toNode is null)
{
return;
}
NodeModelBase fromNode = GuidToModel(fromNodeGuid);
NodeModelBase toNode = GuidToModel(toNodeGuid);
RemoteConnect(fromNode, toNode, connectionType);
//fromNode.SuccessorNodes[connectionType].Remove(toNode);
@@ -603,28 +606,83 @@ namespace Serein.NodeFlow
/// <param name="newNodeGuid"></param>
public void SetStartNode(string newNodeGuid)
{
if (string.IsNullOrEmpty(newNodeGuid))
{
return;
}
if (Nodes.TryGetValue(newNodeGuid, out NodeModelBase? newStartNodeModel))
{
if (newStartNodeModel != null)
{
SetStartNode(newStartNodeModel);
//var oldNodeGuid = "";
//if(StartNode != null)
//{
// oldNodeGuid = StartNode.Guid;
// StartNode.IsStart = false;
//}
//newStartNodeModel.IsStart = true;
//StartNode = newStartNodeModel;
//OnStartNodeChange?.Invoke(new StartNodeChangeEventArgs(oldNodeGuid, newNodeGuid));
}
}
NodeModelBase newStartNodeModel = GuidToModel(newNodeGuid);
SetStartNode(newStartNodeModel);
//if (string.IsNullOrEmpty(newNodeGuid))
//{
// return;
//}
//if (Nodes.TryGetValue(newNodeGuid, out NodeModelBase? newStartNodeModel))
//{
// if (newStartNodeModel != null)
// {
// SetStartNode(newStartNodeModel);
// //var oldNodeGuid = "";
// //if(StartNode != null)
// //{
// // oldNodeGuid = StartNode.Guid;
// // StartNode.IsStart = false;
// //}
// //newStartNodeModel.IsStart = true;
// //StartNode = newStartNodeModel;
// //OnStartNodeChange?.Invoke(new StartNodeChangeEventArgs(oldNodeGuid, newNodeGuid));
// }
//}
}
/// <summary>
/// 中断指定节点,并指定中断等级。
/// </summary>
/// <param name="nodeGuid">被中断的目标节点Guid</param>
/// <param name="interruptClass">中断级别</param>
/// <returns>操作是否成功</returns>
public bool NodeInterruptChange(string nodeGuid, InterruptClass interruptClass)
{
NodeModelBase nodeModel = GuidToModel(nodeGuid);
nodeModel.DebugSetting.InterruptClass = interruptClass;
OnNodeInterruptStateChange.Invoke(new NodeInterruptStateChangeEventArgs(nodeGuid, interruptClass));
return true;
}
/// <summary>
/// 监视节点的数据
/// </summary>
/// <param name="nodeGuid">需要监视的节点Guid</param>
public void SetNodeFLowDataMonitorState(string nodeGuid, bool isMonitor)
{
NodeModelBase nodeModel = GuidToModel(nodeGuid);
nodeModel.DebugSetting.IsMonitorFlowData = isMonitor;
}
/// <summary>
/// 节点数据更新通知
/// </summary>
/// <param name="nodeGuid"></param>
public void FlowDataUpdateNotification(string nodeGuid, object flowData)
{
OnMonitorObjectChange?.Invoke(new MonitorObjectEventArgs(nodeGuid, flowData));
}
/// <summary>
/// Guid 转 NodeModel
/// </summary>
/// <param name="nodeGuid">节点Guid</param>
/// <returns>节点Model</returns>
/// <exception cref="ArgumentNullException">无法获取节点、Guid/节点为null时报错</exception>
private NodeModelBase GuidToModel(string nodeGuid)
{
if (string.IsNullOrEmpty(nodeGuid))
{
throw new ArgumentNullException("not contains - Guid没有对应节点:" + (nodeGuid));
}
if (!Nodes.TryGetValue(nodeGuid, out NodeModelBase? nodeModel) || nodeModel is null)
{
throw new ArgumentNullException("null - Guid存在对应节点,但节点为null:" + (nodeGuid));
}
return nodeModel;
}
#endregion
#region

View File

@@ -8,6 +8,7 @@ using Serein.NodeFlow.Base;
using Serein.NodeFlow.Model;
using System.ComponentModel.Design;
using System.Runtime.CompilerServices;
using System.Xml.Linq;
using static Serein.Library.Utils.ChannelFlowInterrupt;
namespace Serein.NodeFlow
@@ -46,7 +47,7 @@ namespace Serein.NodeFlow
/// <summary>
/// 控制触发器
/// </summary>
private CancellationTokenSource _flipFlopCts = null;
public const string FlipFlopCtsName = "<>.FlowFlipFlopCts";
public bool IsStopStart = false;
@@ -55,22 +56,18 @@ namespace Serein.NodeFlow
/// </summary>
public RunState FlowState { get; private set; } = RunState.NoStart;
public RunState FlipFlopState { get; private set; } = RunState.NoStart;
/// <summary>
/// 运行时的IOC容器
/// </summary>
private ISereinIOC SereinIOC { get; } = null;
/// <summary>
/// 结束运行时需要执行的方法
/// </summary>
private Action ExitAction { get; set; } = null;
/// <summary>
/// 运行的上下文
/// </summary>
private IDynamicContext Context { get; set; } = null;
private void CheckStartState()
{
if (IsStopStart)
@@ -81,30 +78,52 @@ namespace Serein.NodeFlow
}
// <summary>
// 开始运行
// </summary>
// <param name="startNode">起始节点</param>
// <param name="env">运行环境</param>
// <param name="runNodeMd">环境中已加载的所有节点方法</param>
// <param name="flipflopNodes">触发器节点</param>
// <returns></returns>
/// <summary>
/// 开始运行
/// </summary>
/// <param name="startNode">起始节点</param>
/// <param name="env">运行环境</param>
/// <param name="runNodeMd">环境中已加载的所有节点方法</param>
/// <param name="flipflopNodes">触发器节点</param>
/// <param name="nodes">环境中已加载的所有节点</param>
/// <param name="initMethods">初始化方法</param>
/// <param name="loadingMethods">加载时方法</param>
/// <param name="exitMethods">结束时方法</param>
/// <returns></returns>
public async Task RunAsync(NodeModelBase startNode,
IFlowEnvironment env,
List<MethodDetails> runNodeMd,
public async Task RunAsync(IFlowEnvironment env,
List<NodeModelBase> nodes,
List<MethodDetails> initMethods,
List<MethodDetails> loadingMethods,
List<MethodDetails> exitMethods,
List<SingleFlipflopNode> flipflopNodes)
List<MethodDetails> exitMethods)
{
FlowState = RunState.Running; // 开始运行
if (startNode == null) {
NodeModelBase? startNode = nodes.FirstOrDefault(node => node.IsStart);
if (startNode is null) {
FlowState = RunState.Completion; // 不存在起点,退出流程
return;
}
#region
List<MethodDetails> runNodeMd;
List<SingleFlipflopNode> flipflopNodes;
flipflopNodes = nodes.Where(it => it.MethodDetails?.MethodDynamicType == NodeType.Flipflop && it.IsStart == false)
.Select(it => (SingleFlipflopNode)it)
.Where(node => node is SingleFlipflopNode flipflopNode && flipflopNode.NotExitPreviousNode())
.ToList();// 获取需要再运行开始之前启动的触发器节点
runNodeMd = nodes.Select(item => item.MethodDetails).ToList(); // 获取环境中所有节点的方法信息
#endregion
#region
// 判断使用哪一种流程上下文
@@ -146,7 +165,7 @@ namespace Serein.NodeFlow
IsStopStart = true;
}
}
CheckStartState();
CheckStartState(); // 初始化IOC后检查状态
SereinIOC.Build(); // 流程启动前的初始化
@@ -161,12 +180,8 @@ namespace Serein.NodeFlow
}
}
CheckStartState();
CheckStartState();// 调用节点初始化后检查状态
//foreach (var md in flipflopNodes.Select(it => it.MethodDetails).ToArray())
//{
// md.ActingInstance = SereinIoc.GetOrCreateServiceInstance(md.ActingInstanceType);
//}
#endregion
#region 退
@@ -204,7 +219,6 @@ namespace Serein.NodeFlow
#region 退
ExitAction = () =>
{
SereinIOC.Run<WebServer>(web => {
web?.Stop();
});
@@ -214,23 +228,18 @@ namespace Serein.NodeFlow
((Action<object, object?[]?>)md.MethodDelegate).Invoke(md.ActingInstance, [Context]);
}
//if (Context != null && Context.NodeRunCts != null && !Context.NodeRunCts.IsCancellationRequested)
//{
// Context.NodeRunCts.Cancel();
//}
//if (FlipFlopCts != null && !FlipFlopCts.IsCancellationRequested)
//{
// FlipFlopCts?.Cancel();
// FlipFlopCts?.Dispose();
//}
if (_flipFlopCts != null && !_flipFlopCts.IsCancellationRequested)
{
_flipFlopCts?.Cancel();
_flipFlopCts?.Dispose();
}
FlowState = RunState.Completion;
FlipFlopState = RunState.Completion;
};
#endregion
#region
CancellationTokenSource FlipFlopCts = null;
try
{
@@ -238,8 +247,8 @@ namespace Serein.NodeFlow
{
FlipFlopState = RunState.Running;
// 如果存在需要启动的触发器,则开始启动
FlipFlopCts = new CancellationTokenSource();
SereinIOC.CustomRegisterInstance(FlipFlopCtsName, FlipFlopCts,false);
_flipFlopCts = new CancellationTokenSource();
SereinIOC.CustomRegisterInstance(FlipFlopCtsName, _flipFlopCts,false);
// 使用 TaskCompletionSource 创建未启动的触发器任务
var tasks = flipflopNodes.Select(async node =>
@@ -250,15 +259,13 @@ namespace Serein.NodeFlow
}
await startNode.StartExecute(Context); // 开始运行时从起始节点开始运行
// 等待结束
if(FlipFlopState == RunState.Running && FlipFlopCts is not null)
if(FlipFlopState == RunState.Running && _flipFlopCts is not null)
{
while (!FlipFlopCts.IsCancellationRequested)
while (!_flipFlopCts.IsCancellationRequested)
{
await Task.Delay(100);
}
}
//FlipFlopCts?.Dispose();
}
catch (Exception ex)
{
@@ -266,7 +273,6 @@ namespace Serein.NodeFlow
}
finally
{
FlipFlopCts?.Dispose();
FlowState = RunState.Completion;
}
#endregion
@@ -290,72 +296,72 @@ namespace Serein.NodeFlow
/// <returns></returns>
private async Task FlipflopExecute(IFlowEnvironment flowEnvironment,SingleFlipflopNode singleFlipFlopNode)
{
CancellationTokenSource cts = null;
var context = new DynamicContext(SereinIOC, flowEnvironment); // 启动全局触发器时新建上下文
MethodDetails md = singleFlipFlopNode.MethodDetails;
var del = md.MethodDelegate;
object?[]? parameters = singleFlipFlopNode.GetParameters(context, singleFlipFlopNode.MethodDetails); // 启动全局触发器时获取入参参数
// 设置委托对象
var func = md.ExplicitDatas.Length == 0 ?
(Func<object, object, Task<IFlipflopContext>>)del :
(Func<object, object[], Task<IFlipflopContext>>)del;
bool t = md.ExplicitDatas.Length == 0;
try
{
cts = Context.SereinIoc.Get<CancellationTokenSource>(FlipFlopCtsName);
while (!cts.IsCancellationRequested)
while (!_flipFlopCts.IsCancellationRequested)
{
singleFlipFlopNode.FlowData = await singleFlipFlopNode.ExecutingAsync(context);
var newFlowData = await singleFlipFlopNode.ExecutingAsync(context);
await NodeModelBase.FlowRefreshDataOrInterrupt(context, singleFlipFlopNode, newFlowData); // 全局触发器触发后刷新该触发器的节点数据
if (singleFlipFlopNode.NextOrientation != ConnectionType.None)
{
var nextNodes = singleFlipFlopNode.SuccessorNodes[singleFlipFlopNode.NextOrientation];
for (int i = nextNodes.Count - 1; i >= 0; i--)
for (int i = nextNodes.Count - 1; i >= 0 && !_flipFlopCts.IsCancellationRequested; i--)
{
if (nextNodes[i].DebugSetting.IsEnable) // 排除未启用的后继节点
{
nextNodes[i].PreviousNode = singleFlipFlopNode;
await nextNodes[i].StartExecute(context); // 执行流程节点的后继分支
await nextNodes[i].StartExecute(context); // 启动执行触发器后继分支的节点
}
}
}
//if(t)
//{
// IFlipflopContext flipflopContext = await ((Func<object, Task<IFlipflopContext>>)del.Clone()).Invoke(md.ActingInstance);// 开始等待全局触发器的触发
// var connectionType = flipflopContext.State.ToContentType();
// if (connectionType != ConnectionType.None)
// {
// await GlobalFlipflopExecute(context, singleFlipFlopNode, connectionType, cts);
// }
//}
//else
//{
// IFlipflopContext flipflopContext = await ((Func<object, object[], Task<IFlipflopContext>>)del.Clone()).Invoke(md.ActingInstance, parameters);// 开始等待全局触发器的触发
// var connectionType = flipflopContext.State.ToContentType();
// if (connectionType != ConnectionType.None)
// {
// await GlobalFlipflopExecute(context, singleFlipFlopNode, connectionType, cts);
// }
//}
}
}
catch (Exception ex)
{
await Console.Out.WriteLineAsync(ex.ToString());
}
finally
{
cts?.Cancel();
}
//MethodDetails md = singleFlipFlopNode.MethodDetails;
//var del = md.MethodDelegate;
//object?[]? parameters = singleFlipFlopNode.GetParameters(context, singleFlipFlopNode.MethodDetails); // 启动全局触发器时获取入参参数
//// 设置委托对象
//var func = md.ExplicitDatas.Length == 0 ?
// (Func<object, object, Task<IFlipflopContext>>)del :
// (Func<object, object[], Task<IFlipflopContext>>)del;
//if(t)
//{
// IFlipflopContext flipflopContext = await ((Func<object, Task<IFlipflopContext>>)del.Clone()).Invoke(md.ActingInstance);// 开始等待全局触发器的触发
// var connectionType = flipflopContext.State.ToContentType();
// if (connectionType != ConnectionType.None)
// {
// await GlobalFlipflopExecute(context, singleFlipFlopNode, connectionType, cts);
// }
//}
//else
//{
// IFlipflopContext flipflopContext = await ((Func<object, object[], Task<IFlipflopContext>>)del.Clone()).Invoke(md.ActingInstance, parameters);// 开始等待全局触发器的触发
// var connectionType = flipflopContext.State.ToContentType();
// if (connectionType != ConnectionType.None)
// {
// await GlobalFlipflopExecute(context, singleFlipFlopNode, connectionType, cts);
// }
//}
}
public void Exit()
{
ExitAction?.Invoke();
}
#if false
/// <summary>
/// 全局触发器开始执行相关分支
/// </summary>
@@ -364,9 +370,10 @@ namespace Serein.NodeFlow
/// <param name="connectionType">分支类型</param>
/// <returns></returns>
public async Task GlobalFlipflopExecute(IDynamicContext context, SingleFlipflopNode singleFlipFlopNode,
ConnectionType connectionType, CancellationTokenSource cts)
ConnectionType connectionType, CancellationTokenSource cts)
{
bool skip = true;
Stack<NodeModelBase> stack = new Stack<NodeModelBase>();
@@ -400,9 +407,9 @@ namespace Serein.NodeFlow
else
{
currentNode.FlowData = await currentNode.ExecutingAsync(context);
if (currentNode.NextOrientation == ConnectionType.None)
if (currentNode.NextOrientation == ConnectionType.None)
{
break; // 不再执行
}
@@ -418,13 +425,13 @@ namespace Serein.NodeFlow
nextNodes[i].PreviousNode = currentNode;
stack.Push(nextNodes[i]);
}
}
}
}}
#endif
public void Exit()
{
ExitAction?.Invoke();
}
}
}

View File

@@ -38,7 +38,7 @@ namespace Serein.NodeFlow.Model
break;
}
}
return Task.FromResult( PreviousNode?.FlowData);
return Task.FromResult( PreviousNode?.GetFlowData());
}

View File

@@ -39,7 +39,7 @@ namespace Serein.NodeFlow.Model
}
else
{
result = PreviousNode?.FlowData;
result = PreviousNode?.GetFlowData();
}
try
{

View File

@@ -21,7 +21,7 @@ namespace Serein.NodeFlow.Model
//public override async Task<object?> Executing(IDynamicContext context)
public override Task<object?> ExecutingAsync(IDynamicContext context)
{
var data = PreviousNode?.FlowData;
var data = PreviousNode?.GetFlowData();
try
{
@@ -34,7 +34,7 @@ namespace Serein.NodeFlow.Model
}
else
{
result = PreviousNode?.FlowData;
result = data;
}
NextOrientation = ConnectionType.IsSucceed;
@@ -44,7 +44,7 @@ namespace Serein.NodeFlow.Model
{
NextOrientation = ConnectionType.IsError;
RuningException = ex;
return Task.FromResult(PreviousNode?.FlowData);
return Task.FromResult(data);
}
}

View File

@@ -22,12 +22,11 @@ namespace Serein.NodeFlow.Model
public override async Task<object?> ExecutingAsync(IDynamicContext context)
{
#region
if (DebugSetting.IsInterrupt && TryCreateInterruptTask(context, this, out Task<CancelType>? task)) // 执行触发前
if (DebugSetting.InterruptClass != InterruptClass.None && TryCreateInterruptTask(context, this, out Task<CancelType>? task)) // 执行触发前
{
string guid = this.Guid.ToString();
this.CancelInterruptCallback ??= () => context.FlowEnvironment.ChannelFlowInterrupt.TriggerSignal(guid);
var cancelType = await task!;
task?.ToString();
await Console.Out.WriteLineAsync($"[{this.MethodDetails.MethodName}]中断已{(cancelType == CancelType.Manual ? "" : "")},开始执行后继分支");
}
#endregion
@@ -42,24 +41,9 @@ namespace Serein.NodeFlow.Model
Task<IFlipflopContext> flipflopTask = md.ExplicitDatas.Length switch
{
0 => ((Func<object, Task<IFlipflopContext>>)del).Invoke(md.ActingInstance),
_ => ((Func<object, object?[]?, Task<IFlipflopContext>>)del).Invoke(md.ActingInstance, GetParameters(context, md)), // 执行流程中的触发器方法时获取入参参数
_ => ((Func<object, object?[]?, Task<IFlipflopContext>>)del).Invoke(md.ActingInstance, GetParameters(context, this, md)), // 执行流程中的触发器方法时获取入参参数
};
//object?[]? parameters;
//object? result = null;
//if (haveParameter)
//{
// var data = GetParameters(context, md);
// parameters = [instance, data];
//}
//else
//{
// parameters = [instance];
//}
//flipflopTask = del.DynamicInvoke(parameters) as Task<IFlipflopContext>;
//if (flipflopTask == null)
//{
// throw new FlipflopException(base.MethodDetails.MethodName + "触发器返回值非 Task<IFlipflopContext> 类型");
//}
IFlipflopContext flipflopContext = (await flipflopTask) ?? throw new FlipflopException("没有返回上下文");
NextOrientation = flipflopContext.State.ToContentType();
if(flipflopContext.TriggerData is null || flipflopContext.TriggerData.Type == Library.NodeFlow.Tool.TriggerType.Overtime)
@@ -72,7 +56,7 @@ namespace Serein.NodeFlow.Model
{
NextOrientation = ConnectionType.None;
RuningException = ex;
throw;
return null;
}
catch (Exception ex)
{