修复了全局节点连接异常异常。

This commit is contained in:
fengjiayi
2025-07-29 14:25:31 +08:00
parent acb15c323e
commit 77160feaeb
66 changed files with 1719 additions and 1342 deletions

View File

@@ -2,6 +2,7 @@
using Serein.Library.Utils;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
@@ -196,6 +197,10 @@ namespace Serein.Library
}
}
private static ObjectPool<Stack<IFlowNode>> flowStackPool = new ObjectPool<Stack<IFlowNode>>(()=> new Stack<IFlowNode>());
//private static ObjectPool<HashSet<IFlowNode>> processedNodesPool = new ObjectPool<HashSet<IFlowNode>>(()=> new HashSet<IFlowNode>());
private static ObjectPool<HashSet<IFlowNode>> checkpoints = new ObjectPool<HashSet<IFlowNode>>(()=> new HashSet<IFlowNode>());
/// <summary>
/// 开始执行
/// </summary>
@@ -203,133 +208,186 @@ namespace Serein.Library
/// <param name="context"></param>
/// <param name="token">流程运行</param>
/// <returns></returns>
#nullable enable
public static async Task<FlowResult> StartFlowAsync(this IFlowNode nodeModel, IFlowContext context, CancellationToken token)
{
Stack<IFlowNode> stack = new Stack<IFlowNode>();
HashSet<IFlowNode> processedNodes = new HashSet<IFlowNode>(); // 用于记录已处理上游节点的节点
stack.Push(nodeModel);
#nullable enable
Stack<IFlowNode> flowStack = flowStackPool.Allocate() ;
//HashSet<IFlowNode> processedNodes = processedNodesPool.Allocate() ; // 用于记录已处理上游节点的节点
flowStack.Push(nodeModel);
IFlowNode? previousNode = null;
IFlowNode? currentNode = null;
while (true)
try
{
if (token.IsCancellationRequested)
{
throw new Exception($"流程执行被取消,未能获取到流程结果。");
}
#region
// 从栈中弹出一个节点作为当前节点进行处理
previousNode = currentNode;
currentNode = stack.Pop();
#region
FlowInvokeInfo? invokeInfo = null;
var isRecordInvokeInfo = context.IsRecordInvokeInfo;
if (!isRecordInvokeInfo) goto Label_NotRecordInvoke;
FlowInvokeInfo.InvokeType invokeType = context.NextOrientation switch
{
ConnectionInvokeType.IsSucceed => FlowInvokeInfo.InvokeType.IsSucceed,
ConnectionInvokeType.IsFail => FlowInvokeInfo.InvokeType.IsFail,
ConnectionInvokeType.IsError => FlowInvokeInfo.InvokeType.IsError,
ConnectionInvokeType.Upstream => FlowInvokeInfo.InvokeType.Upstream,
_ => FlowInvokeInfo.InvokeType.None
};
invokeInfo = context.NewInvokeInfo(previousNode, currentNode, invokeType);
#endregion
Label_NotRecordInvoke:
context.NextOrientation = ConnectionInvokeType.None; // 重置上下文状态
FlowResult flowResult = null;
try
{
flowResult = await currentNode.ExecutingAsync(context, token);
if (context.NextOrientation == ConnectionInvokeType.None) // 没有手动设置时,进行自动设置
{
context.NextOrientation = ConnectionInvokeType.IsSucceed;
}
}
catch (Exception ex)
{
flowResult = new FlowResult(currentNode.Guid, context);
context.Env.WriteLine(InfoType.ERROR, $"节点[{currentNode.Guid}]异常:" + ex);
context.NextOrientation = ConnectionInvokeType.IsError;
context.ExceptionOfRuning = ex;
}
#endregion
#region
var state = context.NextOrientation switch
{
ConnectionInvokeType.IsFail => FlowInvokeInfo.RunState.Failed,
ConnectionInvokeType.IsError => FlowInvokeInfo.RunState.Error,
_ => FlowInvokeInfo.RunState.Succeed
};
if (isRecordInvokeInfo)
{
invokeInfo.UploadState(state);
invokeInfo.UploadResultValue(flowResult.Value);
}
#endregion
#region
context.AddOrUpdateFlowData(currentNode.Guid, flowResult); // 上下文中更新数据
// 首先将指定类别后继分支的所有节点逆序推入栈中
var nextNodes = currentNode.SuccessorNodes[context.NextOrientation];
for (int index = nextNodes.Count - 1; index >= 0; index--)
{
// 筛选出启用的节点的节点
if (nextNodes[index].DebugSetting.IsEnable)
{
//if (!ignodeState)
context.SetPreviousNode(nextNodes[index].Guid, currentNode.Guid);
stack.Push(nextNodes[index]);
}
}
// 然后将指上游分支的所有节点逆序推入栈中
var upstreamNodes = currentNode.SuccessorNodes[ConnectionInvokeType.Upstream];
for (int index = upstreamNodes.Count - 1; index >= 0; index--)
{
// 筛选出启用的节点的节点
if (upstreamNodes[index].DebugSetting.IsEnable)
{
context.SetPreviousNode(upstreamNodes[index].Guid, currentNode.Guid);
stack.Push(upstreamNodes[index]);
}
}
#endregion
#region
if (stack.Count == 0)
{
return flowResult; // 说明流程到了终点
}
if (context.RunState == RunState.Completion)
{
currentNode.Env.WriteLine(InfoType.INFO, $"流程执行到节点[{currentNode.Guid}]时提前结束,将返回当前执行结果。");
return flowResult; // 流程执行完成,返回结果
}
if (token.IsCancellationRequested)
{
throw new Exception($"流程执行到节点[{currentNode.Guid}]时被取消,未能获取到流程结果。");
}
#endregion
#if DEBUG
//await Task.Delay(1);
/*
var sw = Stopwatch.StartNew();
var checkpoints = new Dictionary<string, TimeSpan>();
checkpoints["创建调用信息"] = sw.Elapsed;
var last = TimeSpan.Zero;
foreach (var kv in checkpoints)
{
SereinEnv.WriteLine(InfoType.INFO, $"{kv.Key} 耗时: {(kv.Value - last).TotalMilliseconds} ms");
last = kv.Value;
}
*/
var sw = Stopwatch.StartNew();
var checkpoints = new Dictionary<string, TimeSpan>();
#endif
while (true)
{
#if DEBUG
sw.Restart();
var last = TimeSpan.Zero;
checkpoints.Clear();
#endif
#region
// 从栈中弹出一个节点作为当前节点进行处理
previousNode = currentNode;
currentNode = flowStack.Pop();
#region
FlowInvokeInfo? invokeInfo = null;
var isRecordInvokeInfo = context.IsRecordInvokeInfo;
if (!isRecordInvokeInfo) goto Label_NotRecordInvoke;
FlowInvokeInfo.InvokeType invokeType = context.NextOrientation switch
{
ConnectionInvokeType.IsSucceed => FlowInvokeInfo.InvokeType.IsSucceed,
ConnectionInvokeType.IsFail => FlowInvokeInfo.InvokeType.IsFail,
ConnectionInvokeType.IsError => FlowInvokeInfo.InvokeType.IsError,
ConnectionInvokeType.Upstream => FlowInvokeInfo.InvokeType.Upstream,
_ => FlowInvokeInfo.InvokeType.None
};
invokeInfo = context.NewInvokeInfo(previousNode, currentNode, invokeType);
#endregion
#if DEBUG
checkpoints[$"[{currentNode.Guid}]\t创建调用信息"] = sw.Elapsed;
#endif
Label_NotRecordInvoke:
context.NextOrientation = ConnectionInvokeType.IsSucceed; // 默认执行成功
FlowResult? flowResult = null;
try
{
flowResult = await currentNode.ExecutingAsync(context, token);
}
catch (Exception ex)
{
flowResult = new FlowResult(currentNode.Guid, context);
context.Env.WriteLine(InfoType.ERROR, $"节点[{currentNode.Guid}]异常:" + ex);
context.NextOrientation = ConnectionInvokeType.IsError;
context.ExceptionOfRuning = ex;
}
#if DEBUG
finally
{
checkpoints[$"[{currentNode.Guid}]\t方法调用"] = sw.Elapsed;
}
#endif
#endregion
#region
var state = context.NextOrientation switch
{
ConnectionInvokeType.IsFail => FlowInvokeInfo.RunState.Failed,
ConnectionInvokeType.IsError => FlowInvokeInfo.RunState.Error,
_ => FlowInvokeInfo.RunState.Succeed
};
if (isRecordInvokeInfo)
{
invokeInfo.UploadState(state);
invokeInfo.UploadResultValue(flowResult.Value);
}
#endregion
#if DEBUG
checkpoints[$"[{currentNode.Guid}]\t更新调用信息"] = sw.Elapsed;
#endif
#region
context.AddOrUpdateFlowData(currentNode.Guid, flowResult); // 上下文中更新数据
#if DEBUG
checkpoints["[{currentNode.Guid}]\t执行完成时更新栈"] = sw.Elapsed;
#endif
// 首先将指定类别后继分支的所有节点逆序推入栈中
var nextNodes = currentNode.SuccessorNodes[context.NextOrientation];
for (int index = nextNodes.Count - 1; index >= 0; index--)
{
// 筛选出启用的节点的节点
if (nextNodes[index].DebugSetting.IsEnable)
{
var node = nextNodes[index];
context.SetPreviousNode(node.Guid, currentNode.Guid);
flowStack.Push(node);
}
}
#if DEBUG
checkpoints[$"[{currentNode.Guid}]\t后继分支推入栈中"] = sw.Elapsed;
#endif
// 然后将指上游分支的所有节点逆序推入栈中
var upstreamNodes = currentNode.SuccessorNodes[ConnectionInvokeType.Upstream];
for (int index = upstreamNodes.Count - 1; index >= 0; index--)
{
// 筛选出启用的节点的节点
if (upstreamNodes[index].DebugSetting.IsEnable)
{
var node = upstreamNodes[index];
context.SetPreviousNode(node.Guid, currentNode.Guid);
flowStack.Push(node);
}
}
#if DEBUG
checkpoints[$"[{currentNode.Guid}]\t上游分支推入栈中"] = sw.Elapsed;
#endif
#endregion
#if DEBUG
foreach (var kv in checkpoints)
{
SereinEnv.WriteLine(InfoType.INFO, $"{kv.Key} 耗时: {(kv.Value - last).TotalMilliseconds} ms");
last = kv.Value;
}
SereinEnv.WriteLine(InfoType.INFO, $"------");
#endif
#region
if (flowStack.Count == 0)
{
return flowResult; // 说明流程到了终点
}
if (context.RunState == RunState.Completion)
{
currentNode.Env.WriteLine(InfoType.INFO, $"流程执行到节点[{currentNode.Guid}]时提前结束,将返回当前执行结果。");
return flowResult; // 流程执行完成,返回结果
}
if (token.IsCancellationRequested)
{
throw new Exception($"流程执行到节点[{currentNode.Guid}]时被取消,未能获取到流程结果。");
}
#endregion
#if DEBUG
//await Task.Delay(1);
#endif
}
}
finally
{
flowStackPool.Free(flowStack);
//processedNodesPool.Free(processedNodes);
}
}

View File

@@ -58,3 +58,5 @@ namespace Serein.Library
public double ScaleY;
}
}

View File

@@ -173,34 +173,6 @@ namespace Serein.Library
dictPreviousNodes.AddOrUpdate(currentNodeModel, (_) => PreviousNode, (o, n) => PreviousNode);
}
/// <summary>
/// 忽略处理该节点流程
/// </summary>
/// <param name="node"></param>
public void IgnoreFlowHandle(string node)
{
dictIgnoreNodeFlow.AddOrUpdate(node, (o) => true, (o, n) => true);
}
/// <summary>
/// 获取此次流程处理状态
/// </summary>
/// <param name="node"></param>
/// <returns></returns>
public bool GetIgnodeFlowStateUpload(string node)
{
return dictIgnoreNodeFlow.TryGetValue(node, out var state) ? state : false;
}
/// <summary>
/// 恢复流程处理状态
/// </summary>
/// <param name="node"></param>
/// <returns></returns>
public void RecoverIgnodeFlowStateUpload(string node)
{
dictIgnoreNodeFlow.AddOrUpdate(node, (o) => false, (o, n) => false);
}
/// <summary>
/// 获取当前节点的运行时上一节点
/// </summary>
@@ -242,7 +214,6 @@ namespace Serein.Library
/// <param name="flowData">新的数据</param>
public void AddOrUpdateFlowData(string nodeModel, FlowResult flowData)
{
// this.dictNodeFlowData.TryGetValue(nodeGuid, out var oldFlowData);
dictNodeFlowData.AddOrUpdate(nodeModel, _ => flowData, (o,n ) => flowData);
}
@@ -274,10 +245,6 @@ namespace Serein.Library
throw new InvalidOperationException($"透传{nodeModel}节点数据时发生异常:上一节点不存在数据");
}
/// <summary>
/// 开始
/// </summary>
/// <summary>
/// 重置
/// </summary>

View File

@@ -3,6 +3,7 @@ using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive;
using System.Runtime.InteropServices;
using System.Text;
using System.Threading.Tasks;
@@ -23,7 +24,7 @@ namespace Serein.Library
/// <summary>
/// 流程返回值的包装
/// </summary>
public class FlowResult
public sealed class FlowResult
{
/// <summary>
/// 实例化返回值

View File

@@ -151,14 +151,21 @@ namespace Serein.Library
}
}
private enum ActionType
{
Action,
Task,
}
private ActionType actionType = ActionType.Action;
public void SetAction(Action<IFlowContext> action)
{
this.action = action;
actionType = ActionType.Action;
}
public void SetAction(Func<IFlowContext, Task> taskFunc)
{
this.taskFunc = taskFunc;
actionType = ActionType.Task;
}
@@ -177,12 +184,14 @@ namespace Serein.Library
public Dictionary<ConnectionInvokeType, List<CallNode>> SuccessorNodes { get; private set; }
public CallNode[][] ChildNodes { get; private set; } = new CallNode[][]
{
new CallNode[32],
new CallNode[32],
new CallNode[32],
new CallNode[32]
new CallNode[MaxChildNodeCount],
new CallNode[MaxChildNodeCount],
new CallNode[MaxChildNodeCount],
new CallNode[MaxChildNodeCount]
};
private const int MaxChildNodeCount = 16; // 每个分支最多支持16个子节点
public int GetCount(ConnectionInvokeType type)
{
if (type == ConnectionInvokeType.Upstream) return UpstreamNodeCount;
@@ -245,13 +254,13 @@ namespace Serein.Library
{
return;
}
if (action is not null)
if (actionType == ActionType.Action)
{
action(context);
action.Invoke(context);
}
else if (taskFunc is not null)
else if (actionType == ActionType.Task)
{
await taskFunc(context);
await taskFunc.Invoke(context);
}
else
{
@@ -286,11 +295,8 @@ namespace Serein.Library
FlowResult flowResult = null;
try
{
context.NextOrientation = ConnectionInvokeType.IsSucceed; // 默认执行成功
await currentNode.InvokeAsync(context, token);
if (context.NextOrientation == ConnectionInvokeType.None) // 没有手动设置时,进行自动设置
{
context.NextOrientation = ConnectionInvokeType.IsSucceed;
}
}
catch (Exception ex)
{
@@ -306,17 +312,18 @@ namespace Serein.Library
var nextNodes = currentNode.SuccessorNodes[context.NextOrientation];
for (int index = nextNodes.Count - 1; index >= 0; index--)
{
//if (!ignodeState)
context.SetPreviousNode(nextNodes[index].Guid, currentNode.Guid);
stack.Push(nextNodes[index]);
var node = nextNodes[index];
context.SetPreviousNode(node.Guid, currentNode.Guid);
stack.Push(node);
}
// 然后将指上游分支的所有节点逆序推入栈中
var upstreamNodes = currentNode.SuccessorNodes[ConnectionInvokeType.Upstream];
for (int index = upstreamNodes.Count - 1; index >= 0; index--)
{
context.SetPreviousNode(upstreamNodes[index].Guid, currentNode.Guid);
stack.Push(upstreamNodes[index]);
var node = upstreamNodes[index];
context.SetPreviousNode(node.Guid, currentNode.Guid);
stack.Push(node);
}
#endregion
@@ -334,7 +341,7 @@ namespace Serein.Library
_stackPool.Return(stack);
context.Env.WriteLine(InfoType.INFO, $"流程执行到节点[{currentNode.Guid}]时提前结束,将返回当前执行结果。");
flowResult = context.GetFlowData(currentNode.Guid); _stackPool.Return(stack);
flowResult = context.GetFlowData(currentNode.Guid);
return flowResult; // 流程执行完成,返回结果
}

View File

@@ -306,7 +306,7 @@ namespace Serein.Library
MethodName = this.MethodName, // 拷贝
MethodLockName = this.MethodLockName, // 拷贝
ParamsArgIndex = this.ParamsArgIndex, // 拷贝
ParameterDetailss = this.ParameterDetailss?.Select(p => p?.CloneOfModel(nodeModel)).ToArray(), // 拷贝属于节点方法的新入参描述
ParameterDetailss = this.ParameterDetailss?.Select(p => p?.CloneOfModel(nodeModel)).ToArray() , // 拷贝属于节点方法的新入参描述
IsAsync = this.IsAsync, // 拷贝
IsStatic = this.IsStatic, // 拷贝
};

View File

@@ -10,8 +10,28 @@ namespace Serein.Library.Utils
/// <summary>
/// 对于 linq 的异步扩展方法
/// </summary>
public static class LinqAsyncHelper
public static class LinqHelper
{
/// <summary>
/// 根据条件筛选,只保留第一个满足条件的元素,其余的不包含。
/// </summary>
public static IEnumerable<T> DistinctByCondition<T>(
this IEnumerable<T> source,
Func<T, bool> predicate)
{
var seenKeys = new HashSet<T>();
foreach (var item in source)
{
if (!predicate(item))
continue;
/*var key = keySelector(item);
if (seenKeys.Add(key)) // 如果是新键*/
yield return item;
}
}
public static async Task<IEnumerable<TResult>> SelectAsync<TSource, TResult>(this IEnumerable<TSource> source,
Func<TSource, Task<TResult>> method)
{

View File

@@ -42,7 +42,8 @@ namespace Serein.Library.Utils
public ObjectPool(Factory factory)
: this(factory, Environment.ProcessorCount * 2)
{ }
{
}
public ObjectPool(Factory factory, int size)
{