mirror of
https://gitee.com/langsisi_admin/serein-flow
synced 2026-03-03 00:00:49 +08:00
1. 修改了FlowResult的创建方式,新增了IsSuccess与Message,为后续进行流程日志追踪准备。
2. 修复了删除节点时,没有正确消除与之相关的参数获取关系。 3. IFlowNode新增了StartFlowAsync方法。 4. Library项目中FlowNodeExtension拓展类转移到了NodeFlow项目中。 5. 修改了Script自定义参数保存,对参数类型、返回值类型进行保存。 6. Library.Utils.BenchmarkHelpter中添加了Task、Task<>的重载方法。 7. NodeFlow项目中,FlowEnvironment默认使用通过NewtonsoftJson实现的JSON门户类。 8. NodeFlow项目中,FlowControl缓存了 FlowWorkOptions 选项实体,并且对于 FlowWorkManagement 进行了池化管理(但后续的项目中考虑重构流程任务运行时)。
This commit is contained in:
@@ -1,5 +1,7 @@
|
||||
using Serein.Library;
|
||||
using Serein.Library.Api;
|
||||
using Serein.Library.Utils;
|
||||
using System.Diagnostics;
|
||||
|
||||
namespace Serein.NodeFlow.Model.Nodes
|
||||
{
|
||||
@@ -72,7 +74,7 @@ namespace Serein.NodeFlow.Model.Nodes
|
||||
{
|
||||
object[] args = await this.GetParametersAsync(context, token);
|
||||
var result = await dd.InvokeAsync(null, args);
|
||||
var flowReslt = new FlowResult(this.Guid, context, result);
|
||||
var flowReslt = FlowResult.OK(this.Guid, context, result);
|
||||
return flowReslt;
|
||||
}
|
||||
else
|
||||
@@ -85,7 +87,7 @@ namespace Serein.NodeFlow.Model.Nodes
|
||||
}
|
||||
object[] args = await this.GetParametersAsync(context, token);
|
||||
var result = await dd.InvokeAsync(instance, args);
|
||||
var flowReslt = new FlowResult(this.Guid, context, result);
|
||||
var flowReslt = FlowResult.OK(this.Guid, context, result);
|
||||
return flowReslt;
|
||||
}
|
||||
|
||||
@@ -93,6 +95,285 @@ namespace Serein.NodeFlow.Model.Nodes
|
||||
}
|
||||
|
||||
|
||||
private readonly static ObjectPool<Stack<IFlowNode>> flowStackPool = new ObjectPool<Stack<IFlowNode>>(() => new Stack<IFlowNode>());
|
||||
|
||||
/// <summary>
|
||||
/// 开始执行
|
||||
/// </summary>
|
||||
/// <param name="context"></param>
|
||||
/// <param name="token">流程运行</param>
|
||||
/// <returns></returns>
|
||||
public async Task<FlowResult> StartFlowAsync(IFlowContext context, CancellationToken token)
|
||||
{
|
||||
#if false
|
||||
|
||||
/*
|
||||
var sw = Stopwatch.StartNew();
|
||||
var checkpoints = new Dictionary<string, TimeSpan>();
|
||||
checkpoints["创建调用信息"] = sw.Elapsed;
|
||||
var last = TimeSpan.Zero;
|
||||
foreach (var kv in checkpoints)
|
||||
{
|
||||
SereinEnv.WriteLine(InfoType.INFO, $"{kv.Key} 耗时: {(kv.Value - last).TotalMilliseconds} ms");
|
||||
last = kv.Value;
|
||||
}
|
||||
*/
|
||||
var sw = Stopwatch.StartNew();
|
||||
var checkpoints = new Dictionary<string, TimeSpan>();
|
||||
sw.Restart();
|
||||
checkpoints.Clear();
|
||||
#endif
|
||||
IFlowNode? previousNode = null;
|
||||
IFlowNode? currentNode = null;
|
||||
FlowResult? flowResult = null;
|
||||
|
||||
IFlowNode nodeModel = this;
|
||||
Stack<IFlowNode> flowStack = flowStackPool.Allocate();
|
||||
flowStack.Push(nodeModel);
|
||||
//HashSet<IFlowNode> processedNodes = processedNodesPool.Allocate() ; // 用于记录已处理上游节点的节点
|
||||
#if false
|
||||
checkpoints[$"[{nodeModel?.Guid}]\t流程开始准备"] = sw.Elapsed;
|
||||
#endif
|
||||
try
|
||||
{
|
||||
while (true)
|
||||
{
|
||||
|
||||
#region 执行相关
|
||||
// 从栈中弹出一个节点作为当前节点进行处理
|
||||
previousNode = currentNode;
|
||||
currentNode = flowStack.Pop();
|
||||
|
||||
#region 新增调用信息
|
||||
FlowInvokeInfo? invokeInfo = null;
|
||||
var isRecordInvokeInfo = context.IsRecordInvokeInfo;
|
||||
if (!isRecordInvokeInfo) goto Label_NotRecordInvoke;
|
||||
|
||||
FlowInvokeInfo.InvokeType invokeType = context.NextOrientation switch
|
||||
{
|
||||
ConnectionInvokeType.IsSucceed => FlowInvokeInfo.InvokeType.IsSucceed,
|
||||
ConnectionInvokeType.IsFail => FlowInvokeInfo.InvokeType.IsFail,
|
||||
ConnectionInvokeType.IsError => FlowInvokeInfo.InvokeType.IsError,
|
||||
ConnectionInvokeType.Upstream => FlowInvokeInfo.InvokeType.Upstream,
|
||||
_ => FlowInvokeInfo.InvokeType.None
|
||||
};
|
||||
invokeInfo = context.NewInvokeInfo(previousNode, currentNode, invokeType);
|
||||
#endregion
|
||||
#if false
|
||||
checkpoints[$"[{currentNode.Guid}]\t创建调用信息"] = sw.Elapsed;
|
||||
#endif
|
||||
|
||||
Label_NotRecordInvoke:
|
||||
context.NextOrientation = ConnectionInvokeType.IsSucceed; // 默认执行成功
|
||||
|
||||
try
|
||||
{
|
||||
flowResult = await currentNode.ExecutingAsync(context, token);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
flowResult = FlowResult.Fail(currentNode.Guid, context, ex.Message);
|
||||
context.Env.WriteLine(InfoType.ERROR, $"节点[{currentNode.Guid}]异常:" + ex);
|
||||
context.NextOrientation = ConnectionInvokeType.IsError;
|
||||
context.ExceptionOfRuning = ex;
|
||||
}
|
||||
#if false
|
||||
finally
|
||||
{
|
||||
checkpoints[$"[{currentNode.Guid}]\t方法调用"] = sw.Elapsed;
|
||||
}
|
||||
#endif
|
||||
#endregion
|
||||
|
||||
#region 更新调用信息
|
||||
var state = context.NextOrientation switch
|
||||
{
|
||||
ConnectionInvokeType.IsFail => FlowInvokeInfo.RunState.Failed,
|
||||
ConnectionInvokeType.IsError => FlowInvokeInfo.RunState.Error,
|
||||
_ => FlowInvokeInfo.RunState.Succeed
|
||||
};
|
||||
if (isRecordInvokeInfo)
|
||||
{
|
||||
invokeInfo.UploadState(state);
|
||||
invokeInfo.UploadResultValue(flowResult.Value);
|
||||
}
|
||||
|
||||
#endregion
|
||||
|
||||
#if false
|
||||
checkpoints[$"[{currentNode.Guid}]\t更新调用信息"] = sw.Elapsed;
|
||||
#endif
|
||||
|
||||
#region 执行完成时更新栈
|
||||
context.AddOrUpdateFlowData(currentNode.Guid, flowResult); // 上下文中更新数据
|
||||
#if false
|
||||
checkpoints[$"[{currentNode.Guid}]\t执行完成时更新栈"] = sw.Elapsed;
|
||||
#endif
|
||||
|
||||
// 首先将指定类别后继分支的所有节点逆序推入栈中
|
||||
var nextNodes = currentNode.SuccessorNodes[context.NextOrientation];
|
||||
for (int index = nextNodes.Count - 1; index >= 0; index--)
|
||||
{
|
||||
// 筛选出启用的节点的节点
|
||||
if (nextNodes[index].DebugSetting.IsEnable)
|
||||
{
|
||||
var node = nextNodes[index];
|
||||
context.SetPreviousNode(node.Guid, currentNode.Guid);
|
||||
flowStack.Push(node);
|
||||
}
|
||||
}
|
||||
|
||||
#if false
|
||||
checkpoints[$"[{currentNode.Guid}]\t后继分支推入栈中"] = sw.Elapsed;
|
||||
#endif
|
||||
|
||||
// 然后将指上游分支的所有节点逆序推入栈中
|
||||
var upstreamNodes = currentNode.SuccessorNodes[ConnectionInvokeType.Upstream];
|
||||
for (int index = upstreamNodes.Count - 1; index >= 0; index--)
|
||||
{
|
||||
// 筛选出启用的节点的节点
|
||||
if (upstreamNodes[index].DebugSetting.IsEnable)
|
||||
{
|
||||
var node = upstreamNodes[index];
|
||||
context.SetPreviousNode(node.Guid, currentNode.Guid);
|
||||
flowStack.Push(node);
|
||||
}
|
||||
}
|
||||
|
||||
#if false
|
||||
checkpoints[$"[{currentNode.Guid}]\t上游分支推入栈中"] = sw.Elapsed;
|
||||
#endif
|
||||
#endregion
|
||||
|
||||
|
||||
|
||||
|
||||
#region 执行完成后检查
|
||||
if (flowStack.Count == 0)
|
||||
{
|
||||
break; // 说明流程到了终点
|
||||
}
|
||||
|
||||
if (context.RunState == RunState.Completion)
|
||||
{
|
||||
flowResult = null;
|
||||
currentNode.Env.WriteLine(InfoType.INFO, $"流程执行到节点[{currentNode.Guid}]时提前结束,未能获取到流程结果。");
|
||||
break; // 流程执行完成,返回结果
|
||||
}
|
||||
|
||||
if (token.IsCancellationRequested)
|
||||
{
|
||||
flowResult = null;
|
||||
currentNode.Env.WriteLine(InfoType.INFO, "流程执行到节点[{currentNode.Guid}]时被取消,未能获取到流程结果。");
|
||||
break;
|
||||
}
|
||||
|
||||
#endregion
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
flowStackPool.Free(flowStack);
|
||||
|
||||
}
|
||||
|
||||
#if false
|
||||
var theTS = sw.Elapsed;
|
||||
checkpoints[$"[{nodeModel?.Guid}]\t流程完毕"] = theTS;
|
||||
var last = TimeSpan.Zero;
|
||||
foreach (var kv in checkpoints)
|
||||
{
|
||||
SereinEnv.WriteLine(InfoType.INFO, $"{kv.Key} 耗时: {(kv.Value - last).TotalMilliseconds} ms");
|
||||
last = kv.Value;
|
||||
}
|
||||
|
||||
SereinEnv.WriteLine(InfoType.INFO, $"总耗时:{theTS.TotalSeconds} ms");
|
||||
SereinEnv.WriteLine(InfoType.INFO, $"------");
|
||||
|
||||
#endif
|
||||
return flowResult;
|
||||
}
|
||||
|
||||
|
||||
|
||||
/// <summary>
|
||||
/// 获取所有参数
|
||||
/// </summary>
|
||||
/// <param name="context"></param>
|
||||
/// <param name="token"></param>
|
||||
/// <returns></returns>
|
||||
public async Task<object[]> GetParametersAsync(IFlowContext context, CancellationToken token)
|
||||
{
|
||||
IFlowNode nodeModel = this;
|
||||
var md = nodeModel.MethodDetails;
|
||||
var pds = md.ParameterDetailss;
|
||||
|
||||
if (pds.Length == 0)
|
||||
return [];
|
||||
|
||||
object[] args;
|
||||
object[] paramsArgs = null; // 改为强类型数组 object[]
|
||||
|
||||
int paramsArgIndex = md.ParamsArgIndex;
|
||||
|
||||
if (paramsArgIndex >= 0)
|
||||
{
|
||||
// 可变参数数量
|
||||
int paramsLength = pds.Length - paramsArgIndex;
|
||||
|
||||
// 用 object[] 表示可变参数数组(如果类型固定也可以用 int[] 等)
|
||||
paramsArgs = new object[paramsLength];
|
||||
|
||||
// 方法参数中占位,最后一项是 object[]
|
||||
args = new object[paramsArgIndex + 1];
|
||||
args[paramsArgIndex] = paramsArgs;
|
||||
}
|
||||
else
|
||||
{
|
||||
args = new object[pds.Length];
|
||||
}
|
||||
|
||||
// 并发处理常规参数
|
||||
Task<object>[] mainArgTasks = new Task<object>[paramsArgIndex >= 0 ? paramsArgIndex : pds.Length];
|
||||
|
||||
for (int i = 0; i < mainArgTasks.Length; i++)
|
||||
{
|
||||
var pd = pds[i];
|
||||
mainArgTasks[i] = pd.ToMethodArgData(context);
|
||||
}
|
||||
|
||||
await Task.WhenAll(mainArgTasks);
|
||||
|
||||
for (int i = 0; i < mainArgTasks.Length; i++)
|
||||
{
|
||||
args[i] = mainArgTasks[i].Result;
|
||||
}
|
||||
|
||||
// 并发处理 params 类型的入参参数
|
||||
if (paramsArgs != null)
|
||||
{
|
||||
int paramsLength = paramsArgs.Length;
|
||||
Task<object>[] paramTasks = new Task<object>[paramsLength];
|
||||
|
||||
for (int i = 0; i < paramsLength; i++)
|
||||
{
|
||||
var pd = pds[paramsArgIndex + i];
|
||||
paramTasks[i] = pd.ToMethodArgData(context);
|
||||
}
|
||||
|
||||
await Task.WhenAll(paramTasks);
|
||||
|
||||
for (int i = 0; i < paramsLength; i++)
|
||||
{
|
||||
paramsArgs[i] = paramTasks[i].Result;
|
||||
}
|
||||
|
||||
args[args.Length - 1] = paramsArgs;
|
||||
}
|
||||
|
||||
return args;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user