using Serein.Library;
using Serein.Library.Api;
using Serein.Library.Utils;
using System.Diagnostics;
namespace Serein.NodeFlow.Model.Nodes
{
///
/// 节点基类
///
public abstract partial class NodeModelBase : ISereinFlow
{
///
/// 实体节点创建完成后调用的方法,调用时间早于 LoadInfo() 方法
///
public virtual void OnCreating()
{
}
///
/// 保存自定义信息
///
///
public virtual NodeInfo SaveCustomData(NodeInfo nodeInfo)
{
return nodeInfo;
}
///
/// 加载自定义数据
///
///
public virtual void LoadCustomData(NodeInfo nodeInfo)
{
return;
}
///
/// 执行节点对应的方法
///
/// 流程上下文
///
/// 节点传回数据对象
public virtual async Task ExecutingAsync(IFlowContext context, CancellationToken token)
{
// 执行触发检查是否需要中断
if (DebugSetting.IsInterrupt)
{
context.Env.FlowControl.TriggerInterrupt(Guid, "", InterruptTriggerEventArgs.InterruptTriggerType.Monitor); // 通知运行环境该节点中断了
await DebugSetting.GetInterruptTask.Invoke();
SereinEnv.WriteLine(InfoType.INFO, $"[{this.MethodDetails?.MethodName}]中断已取消,开始执行后继分支");
if (token.IsCancellationRequested) { return null; }
}
MethodDetails? md = MethodDetails;
if (md is null)
{
throw new Exception($"节点{Guid}不存在方法信息,请检查是否需要重写节点的ExecutingAsync");
}
if (!context.Env.TryGetDelegateDetails(md.AssemblyName, md.MethodName, out var dd)) // 流程运行到某个节点
{
throw new Exception($"节点{this.Guid}不存在对应委托");
}
if (md.IsStatic)
{
object[] args = await this.GetParametersAsync(context, token);
var result = await dd.InvokeAsync(null, args);
var flowReslt = FlowResult.OK(this.Guid, context, result);
return flowReslt;
}
else
{
var instance = Env.FlowControl.IOC.Get(md.ActingInstanceType);
if (instance is null)
{
Env.FlowControl.IOC.Register(md.ActingInstanceType).Build();
instance = Env.FlowControl.IOC.Get(md.ActingInstanceType);
}
object[] args = await this.GetParametersAsync(context, token);
var result = await dd.InvokeAsync(instance, args);
var flowReslt = FlowResult.OK(this.Guid, context, result);
return flowReslt;
}
}
private readonly static ObjectPool> flowStackPool = new ObjectPool>(() => new Stack());
///
/// 开始执行
///
///
/// 流程运行
///
public async Task StartFlowAsync(IFlowContext context, CancellationToken token)
{
#if false
/*
var sw = Stopwatch.StartNew();
var checkpoints = new Dictionary();
checkpoints["创建调用信息"] = sw.Elapsed;
var last = TimeSpan.Zero;
foreach (var kv in checkpoints)
{
SereinEnv.WriteLine(InfoType.INFO, $"{kv.Key} 耗时: {(kv.Value - last).TotalMilliseconds} ms");
last = kv.Value;
}
*/
var sw = Stopwatch.StartNew();
var checkpoints = new Dictionary();
sw.Restart();
checkpoints.Clear();
#endif
IFlowNode? previousNode = null;
IFlowNode? currentNode = null;
FlowResult? flowResult = null;
IFlowNode nodeModel = this;
Stack flowStack = flowStackPool.Allocate();
flowStack.Push(nodeModel);
//HashSet processedNodes = processedNodesPool.Allocate() ; // 用于记录已处理上游节点的节点
#if false
checkpoints[$"[{nodeModel?.Guid}]\t流程开始准备"] = sw.Elapsed;
#endif
try
{
while (true)
{
#region 执行相关
// 从栈中弹出一个节点作为当前节点进行处理
previousNode = currentNode;
currentNode = flowStack.Pop();
#region 新增调用信息
FlowInvokeInfo? invokeInfo = null;
var isRecordInvokeInfo = context.IsRecordInvokeInfo;
if (!isRecordInvokeInfo) goto Label_NotRecordInvoke;
FlowInvokeInfo.InvokeType invokeType = context.NextOrientation switch
{
ConnectionInvokeType.IsSucceed => FlowInvokeInfo.InvokeType.IsSucceed,
ConnectionInvokeType.IsFail => FlowInvokeInfo.InvokeType.IsFail,
ConnectionInvokeType.IsError => FlowInvokeInfo.InvokeType.IsError,
ConnectionInvokeType.Upstream => FlowInvokeInfo.InvokeType.Upstream,
_ => FlowInvokeInfo.InvokeType.None
};
invokeInfo = context.NewInvokeInfo(previousNode, currentNode, invokeType);
#endregion
#if false
checkpoints[$"[{currentNode.Guid}]\t创建调用信息"] = sw.Elapsed;
#endif
Label_NotRecordInvoke:
context.NextOrientation = ConnectionInvokeType.IsSucceed; // 默认执行成功
try
{
flowResult = await currentNode.ExecutingAsync(context, token);
}
catch (Exception ex)
{
flowResult = FlowResult.Fail(currentNode.Guid, context, ex.Message);
context.Env.WriteLine(InfoType.ERROR, $"节点[{currentNode.Guid}]异常:" + ex);
context.NextOrientation = ConnectionInvokeType.IsError;
context.ExceptionOfRuning = ex;
}
#if false
finally
{
checkpoints[$"[{currentNode.Guid}]\t方法调用"] = sw.Elapsed;
}
#endif
#endregion
#region 更新调用信息
var state = context.NextOrientation switch
{
ConnectionInvokeType.IsFail => FlowInvokeInfo.RunState.Failed,
ConnectionInvokeType.IsError => FlowInvokeInfo.RunState.Error,
_ => FlowInvokeInfo.RunState.Succeed
};
if (isRecordInvokeInfo)
{
invokeInfo.UploadState(state);
invokeInfo.UploadResultValue(flowResult.Value);
}
#endregion
#if false
checkpoints[$"[{currentNode.Guid}]\t更新调用信息"] = sw.Elapsed;
#endif
#region 执行完成时更新栈
context.AddOrUpdateFlowData(currentNode.Guid, flowResult); // 上下文中更新数据
#if false
checkpoints[$"[{currentNode.Guid}]\t执行完成时更新栈"] = sw.Elapsed;
#endif
// 首先将指定类别后继分支的所有节点逆序推入栈中
var nextNodes = currentNode.SuccessorNodes[context.NextOrientation];
for (int index = nextNodes.Count - 1; index >= 0; index--)
{
// 筛选出启用的节点的节点
if (nextNodes[index].DebugSetting.IsEnable)
{
var node = nextNodes[index];
context.SetPreviousNode(node.Guid, currentNode.Guid);
flowStack.Push(node);
}
}
#if false
checkpoints[$"[{currentNode.Guid}]\t后继分支推入栈中"] = sw.Elapsed;
#endif
// 然后将指上游分支的所有节点逆序推入栈中
var upstreamNodes = currentNode.SuccessorNodes[ConnectionInvokeType.Upstream];
for (int index = upstreamNodes.Count - 1; index >= 0; index--)
{
// 筛选出启用的节点的节点
if (upstreamNodes[index].DebugSetting.IsEnable)
{
var node = upstreamNodes[index];
context.SetPreviousNode(node.Guid, currentNode.Guid);
flowStack.Push(node);
}
}
#if false
checkpoints[$"[{currentNode.Guid}]\t上游分支推入栈中"] = sw.Elapsed;
#endif
#endregion
#region 执行完成后检查
if (flowStack.Count == 0)
{
break; // 说明流程到了终点
}
if (context.RunState == RunState.Completion)
{
flowResult = null;
currentNode.Env.WriteLine(InfoType.INFO, $"流程执行到节点[{currentNode.Guid}]时提前结束,未能获取到流程结果。");
break; // 流程执行完成,返回结果
}
if (token.IsCancellationRequested)
{
flowResult = null;
currentNode.Env.WriteLine(InfoType.INFO, "流程执行到节点[{currentNode.Guid}]时被取消,未能获取到流程结果。");
break;
}
#endregion
}
}
finally
{
flowStackPool.Free(flowStack);
}
#if false
var theTS = sw.Elapsed;
checkpoints[$"[{nodeModel?.Guid}]\t流程完毕"] = theTS;
var last = TimeSpan.Zero;
foreach (var kv in checkpoints)
{
SereinEnv.WriteLine(InfoType.INFO, $"{kv.Key} 耗时: {(kv.Value - last).TotalMilliseconds} ms");
last = kv.Value;
}
SereinEnv.WriteLine(InfoType.INFO, $"总耗时:{theTS.TotalSeconds} ms");
SereinEnv.WriteLine(InfoType.INFO, $"------");
#endif
return flowResult;
}
///
/// 获取所有参数
///
///
///
///
public async Task