增加流程运行特性:支持异步节点

This commit is contained in:
fengjiayi
2024-10-07 22:52:10 +08:00
parent 878b1c5893
commit 9529b9e19d
13 changed files with 190 additions and 96 deletions

View File

@@ -6,8 +6,10 @@ using Serein.Library.Entity;
using Serein.Library.Enums;
using Serein.Library.Ex;
using Serein.Library.Utils;
using Serein.NodeFlow.Tool;
using Serein.NodeFlow.Tool.SereinExpression;
using System;
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
@@ -101,48 +103,48 @@ namespace Serein.NodeFlow.Base
{
Stack<NodeModelBase> stack = new Stack<NodeModelBase>();
stack.Push(this);
var cts = context.Env.IOC.Get<CancellationTokenSource>(FlowStarter.FlipFlopCtsName);
var flowCts = context.Env.IOC.Get<CancellationTokenSource>(FlowStarter.FlipFlopCtsName);
while (stack.Count > 0 ) // 循环中直到栈为空才会退出循环
{
if(cts is not null)
if(flowCts is not null)
{
if (cts.IsCancellationRequested)
if (flowCts.IsCancellationRequested)
break;
}
// 节点执行异常时跳过执行
// 从栈中弹出一个节点作为当前节点进行处理
var currentNode = stack.Pop();
//// 设置方法执行的对象
//if (currentNode.MethodDetails?.ActingInstance is not null && currentNode.MethodDetails?.ActingInstanceType is not null)
//{
// currentNode.MethodDetails.ActingInstance = context.Env.IOC.GetOrRegisterInstantiate(currentNode.MethodDetails.ActingInstanceType);
//}
#region
// 首先执行上游分支
var upstreamNodes = currentNode.SuccessorNodes[ConnectionType.Upstream];
for (int i = upstreamNodes.Count - 1; i >= 0; i--)
// 筛选出上游分支
var upstreamNodes = currentNode.SuccessorNodes[ConnectionType.Upstream].Where(
node => node.DebugSetting.IsEnable
).ToArray();
// 执行上游分支
foreach (var upstreamNode in upstreamNodes)
{
// 筛选出启用的节点
if (upstreamNodes[i].DebugSetting.IsEnable)
if (upstreamNode.DebugSetting.IsEnable)
{
if (upstreamNodes[i].DebugSetting.InterruptClass != InterruptClass.None) // 执行触发前
if (upstreamNode.DebugSetting.InterruptClass != InterruptClass.None) // 执行触发前
{
var cancelType = await upstreamNodes[i].DebugSetting.GetInterruptTask();
await Console.Out.WriteLineAsync($"[{upstreamNodes[i]?.MethodDetails?.MethodName}]中断已{cancelType},开始执行后继分支");
var cancelType = await upstreamNode.DebugSetting.GetInterruptTask();
await Console.Out.WriteLineAsync($"[{upstreamNode.MethodDetails?.MethodName}]中断已{cancelType},开始执行后继分支");
}
upstreamNode.PreviousNode = currentNode;
await upstreamNode.StartExecute(context); // 执行流程节点的上游分支
if (upstreamNode.NextOrientation == ConnectionType.IsError)
{
// 如果上游分支执行失败,不再继续执行
// 使上游节点(仅上游节点本身,不包含上游节点的后继节点)
// 具备通过抛出异常中断流程的能力
break;
}
upstreamNodes[i].PreviousNode = currentNode;
await upstreamNodes[i].StartExecute(context); // 执行流程节点的上游分支
}
}
// 执行当前节点
// 上游分支执行完成,才执行当前节点
object? newFlowData = await currentNode.ExecutingAsync(context);
if (cts is null || cts.IsCancellationRequested || currentNode.NextOrientation == ConnectionType.None)
if (flowCts is null || flowCts.IsCancellationRequested || currentNode.NextOrientation == ConnectionType.None)
{
// 不再执行
break;
@@ -159,14 +161,9 @@ namespace Serein.NodeFlow.Base
// 将下一个节点集合中的所有节点逆序推入栈中
for (int i = nextNodes.Count - 1; i >= 0; i--)
{
// 筛选出启用的节点、未被中断的节点
if (nextNodes[i].DebugSetting.IsEnable /*&& nextNodes[i].DebugSetting.InterruptClass == InterruptClass.None*/)
// 筛选出启用的节点的节点
if (nextNodes[i].DebugSetting.IsEnable)
{
if (nextNodes[i].DebugSetting.InterruptClass != InterruptClass.None) // 执行触发前
{
var cancelType = await nextNodes[i].DebugSetting.GetInterruptTask();
await Console.Out.WriteLineAsync($"[{nextNodes[i]?.MethodDetails?.MethodName}]中断已{cancelType},开始执行后继分支");
}
nextNodes[i].PreviousNode = currentNode;
stack.Push(nextNodes[i]);
}
@@ -176,7 +173,7 @@ namespace Serein.NodeFlow.Base
}
}
/// <summary>
/// 执行节点对应的方法
/// </summary>
@@ -186,14 +183,9 @@ namespace Serein.NodeFlow.Base
{
#region
if (DebugSetting.InterruptClass != InterruptClass.None) // 执行触发
if (DebugSetting.InterruptClass != InterruptClass.None) // 执行触发检查是否需要中断
{
var cancelType = await this.DebugSetting.GetInterruptTask();
//if(cancelType == CancelType.Discard)
//{
// this.NextOrientation = ConnectionType.None;
// return null;
//}
var cancelType = await this.DebugSetting.GetInterruptTask(); // 等待中断结束
await Console.Out.WriteLineAsync($"[{this.MethodDetails?.MethodName}]中断已{cancelType},开始执行后继分支");
}
@@ -212,19 +204,41 @@ namespace Serein.NodeFlow.Base
md.ActingInstance ??= context.Env.IOC.Get(md.ActingInstanceType);
object instance = md.ActingInstance;
var haveParameter = md.ExplicitDatas.Length > 0;
var haveResult = md.ReturnType != typeof(void);
bool haveParameter = md.ExplicitDatas.Length > 0;
bool haveResult = md.ReturnType != typeof(void);
Type? taskResult = null;
bool isTask = md.ReturnType is not null && MethodDetailsHelperTmp.IsGenericTask(md.ReturnType, out taskResult);
bool isTaskHaveResult = taskResult is not null;
object? result;
Console.WriteLine($"(isTask, isTaskHaveResult):{(isTask, isTaskHaveResult)}");
try
{
// Action/Func([方法作用的实例],[可能的参数值],[可能的返回值])
object?[]? parameters = GetParameters(context, this, md);
object? result = (haveParameter, haveResult) switch
if (isTask)
{
(false, false) => Execution((Action<object>)del, instance), // 调用节点方法返回null
(true, false) => Execution((Action<object, object?[]?>)del, instance, parameters), // 调用节点方法返回null
(false, true) => Execution((Func<object, object?>)del, instance), // 调用节点方法,返回方法传回类型
(true, true) => Execution((Func<object, object?[]?, object?>)del, instance, parameters), // 调用节点方法,获取入参参数,返回方法忏悔类型
};
// 异步方法因为返回了Task所以排除Action<>委托的可能)
result = (haveParameter, isTaskHaveResult) switch
{
(false, false) => await ExecutionAsync((Func<object, Task>)del, instance), // 调用节点方法,返回方法传回类型
(true, false) => await ExecutionAsync((Func<object, object?[]?, Task>)del, instance, parameters), // 调用节点方法,获取入参参数,返回方法返回类型
(false, true) => await ExecutionAsync((Func<object, Task<object?>>)del, instance), // 调用节点方法,返回方法传回类型
(true, true) => await ExecutionAsync((Func<object, object?[]?, Task<object?>>)del, instance, parameters), // 调用节点方法,获取入参参数,返回方法返回类型
};
}
else
{
// 非异步方法
result = (haveParameter, haveResult) switch
{
(false, false) => Execution((Action<object>)del, instance), // 调用节点方法返回null
(true, false) => Execution((Action<object, object?[]?>)del, instance, parameters), // 调用节点方法返回null
(false, true) => Execution((Func<object, object?>)del, instance), // 调用节点方法,返回方法传回类型
(true, true) => Execution((Func<object, object?[]?, object?>)del, instance, parameters), // 调用节点方法,获取入参参数,返回方法返回类型
};
}
NextOrientation = ConnectionType.IsSucceed;
return result;
@@ -242,21 +256,41 @@ namespace Serein.NodeFlow.Base
#region
public static object? Execution(Action<object> del, object instance)
{
del?.Invoke(instance);
del.Invoke(instance);
return null;
}
public static object? Execution(Action<object, object?[]?> del, object instance, object?[]? parameters)
{
del?.Invoke(instance, parameters);
del.Invoke(instance, parameters);
return null;
}
public static object? Execution(Func<object, object?> del, object instance)
{
return del?.Invoke(instance);
return del.Invoke(instance);
}
public static object? Execution(Func<object, object?[]?, object?> del, object instance, object?[]? parameters)
{
return del?.Invoke(instance, parameters);
return del.Invoke(instance, parameters);
}
public static async Task<object?> ExecutionAsync(Func<object, Task> del, object instance)
{
await del.Invoke(instance);
return null;
}
public static async Task<object?> ExecutionAsync(Func<object, object?[]?, Task> del, object instance, object?[]? parameters)
{
await del.Invoke(instance, parameters);
return null;
}
public static async Task<object?> ExecutionAsync(Func<object, Task<object?>> del, object instance)
{
return await del.Invoke(instance);
}
public static async Task<object?> ExecutionAsync(Func<object, object?[]?, Task<object?>> del, object instance, object?[]? parameters)
{
return await del.Invoke(instance, parameters);
}
#endregion