Files
serein-flow/NodeFlow/Env/FlowControl.cs
fengjiayi 48289dae11 1. 修改了FlowResult的创建方式,新增了IsSuccess与Message,为后续进行流程日志追踪准备。
2. 修复了删除节点时,没有正确消除与之相关的参数获取关系。
3. IFlowNode新增了StartFlowAsync方法。
4. Library项目中FlowNodeExtension拓展类转移到了NodeFlow项目中。
5.  修改了Script自定义参数保存,对参数类型、返回值类型进行保存。
6. Library.Utils.BenchmarkHelpter中添加了Task、Task<>的重载方法。
7. NodeFlow项目中,FlowEnvironment默认使用通过NewtonsoftJson实现的JSON门户类。
8. NodeFlow项目中,FlowControl缓存了 FlowWorkOptions 选项实体,并且对于 FlowWorkManagement 进行了池化管理(但后续的项目中考虑重构流程任务运行时)。
2025-07-30 11:29:12 +08:00

424 lines
16 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
using Serein.Library;
using Serein.Library.Api;
using Serein.Library.Utils;
using Serein.NodeFlow.Model;
using Serein.NodeFlow.Model.Nodes;
using Serein.NodeFlow.Services;
using System;
using System.Collections;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace Serein.NodeFlow.Env
{
internal class FlowControl : IFlowControl
{
private readonly IFlowEnvironment flowEnvironment;
private readonly IFlowEnvironmentEvent flowEnvironmentEvent;
private readonly FlowLibraryService flowLibraryService;
private readonly FlowOperationService flowOperationService;
private readonly FlowModelService flowModelService;
private readonly UIContextOperation UIContextOperation;
public FlowControl(IFlowEnvironment flowEnvironment,
IFlowEnvironmentEvent flowEnvironmentEvent,
FlowLibraryService flowLibraryService,
FlowOperationService flowOperationService,
FlowModelService flowModelService,
UIContextOperation UIContextOperation)
{
this.flowEnvironment = flowEnvironment;
this.flowEnvironmentEvent = flowEnvironmentEvent;
this.flowLibraryService = flowLibraryService;
this.flowOperationService = flowOperationService;
this.flowModelService = flowModelService;
this.UIContextOperation = UIContextOperation;
contexts = new ObjectPool<IFlowContext>(() => new FlowContext(flowEnvironment));
flowTaskOptions = new FlowWorkOptions
{
FlowIOC = IOC,
Environment = flowEnvironment, // 流程
FlowContextPool = contexts, // 上下文对象池
};
flowTaskManagementPool = new ObjectPool<FlowWorkManagement>(()=> new FlowWorkManagement(flowTaskOptions));
}
private ObjectPool<IFlowContext> contexts;
private ObjectPool<FlowWorkManagement> flowTaskManagementPool;
private FlowWorkOptions flowTaskOptions;
private FlowWorkManagement flowWorkManagement;
private ISereinIOC externalIOC;
private Action<ISereinIOC> setDefultMemberOnReset;
private bool IsUseExternalIOC = false;
private object lockObj = new object();
/// <summary>
/// 如果全局触发器还在运行,则为 Running 。
/// </summary>
private RunState FlipFlopState = RunState.NoStart;
/// <summary>
/// 运行时的IOC容器
/// </summary>
public ISereinIOC IOC
{
get
{
lock (lockObj)
{
if (externalIOC is null) externalIOC = new SereinIOC();
return externalIOC;
}
}
private set
{
lock (lockObj)
{
externalIOC = value;
}
}
}
/// <inheritdoc/>
public async Task<bool> StartFlowAsync(string[] canvasGuids)
{
#region
HashSet<string> guids = new HashSet<string>();
bool isBreak = false;
foreach (var canvasGuid in canvasGuids)
{
if (guids.Contains(canvasGuid))
{
flowEnvironment.WriteLine(InfoType.WARN, $"画布重复,停止运行。{canvasGuid}");
isBreak = true;
}
else if (!flowModelService.ContainsCanvasModel(canvasGuid))
{
SereinEnv.WriteLine(InfoType.WARN, $"画布不存在,停止运行。{canvasGuid}");
isBreak = true;
}
else if (!flowModelService.IsExsitNodeOnCanvas(canvasGuid))
{
SereinEnv.WriteLine(InfoType.WARN, $"画布没有节点,停止运行。{canvasGuid}");
isBreak = true;
}
else
{
guids.Add(canvasGuid);
}
}
if (isBreak)
{
guids.Clear();
return false;
}
#endregion
#region
Dictionary<string, FlowTask> flowTasks = [];
foreach (var guid in guids)
{
if (!flowModelService.TryGetCanvasModel(guid, out var canvasModel))
{
SereinEnv.WriteLine(InfoType.WARN, $"画布不存在,停止运行。{guid}");
return false;
}
var ft = new FlowTask();
ft.GetNodes = () => flowModelService.GetAllNodeModel(guid);
if (canvasModel.StartNode?.Guid is null)
{
SereinEnv.WriteLine(InfoType.WARN, $"画布不存在起始节点,将停止运行。{guid}");
return false;
}
ft.GetStartNode = () => canvasModel.StartNode;
flowTasks.Add(guid, ft);
}
#endregion
IOC.Reset();
setDefultMemberOnReset?.Invoke(IOC);
IOC.Register<IFlowEnvironment>(() => flowEnvironment);
//externalIOC.Register<IScriptFlowApi, ScriptFlowApi>(); // 注册脚本接口
var flowTaskOptions = new FlowWorkOptions
{
FlowIOC = IOC,
Environment = flowEnvironment, // 流程
Flows = flowTasks,
FlowContextPool = contexts, // 上下文对象池
AutoRegisterTypes = flowLibraryService.GetaAutoRegisterType(), // 需要自动实例化的类型
InitMds = flowLibraryService.GetMdsOnFlowStart(NodeType.Init),
LoadMds = flowLibraryService.GetMdsOnFlowStart(NodeType.Loading),
ExitMds = flowLibraryService.GetMdsOnFlowStart(NodeType.Exit),
};
flowWorkManagement = new FlowWorkManagement(flowTaskOptions);
var cts = new CancellationTokenSource();
try
{
var t = await flowWorkManagement.RunAsync(cts.Token);
}
catch (Exception ex)
{
SereinEnv.WriteLine(ex);
}
finally
{
SereinEnv.WriteLine(InfoType.INFO, $"流程运行完毕{Environment.NewLine}"); ;
}
flowTaskOptions = null;
return true;
}
/// <inheritdoc/>
public async Task<TResult> StartFlowAsync<TResult>(string startNodeGuid)
{
var sw = Stopwatch.StartNew();
var checkpoints = new Dictionary<string, TimeSpan>();
var flowTaskManagement = flowTaskManagementPool.Allocate();
if (!flowModelService.TryGetNodeModel(startNodeGuid, out IFlowNode? nodeModel))
{
throw new Exception($"节点不存在【{startNodeGuid}】");
}
if(nodeModel is SingleFlipflopNode)
{
throw new Exception("不能从[Flipflop]节点开始");
}
var flowContextPool = flowTaskManagement.WorkOptions.FlowContextPool;
var context = flowContextPool.Allocate();
checkpoints["准备调用环境"] = sw.Elapsed;
var flowResult = await nodeModel.StartFlowAsync(context, flowTaskManagement.WorkOptions.CancellationTokenSource.Token); // 开始运行时从选定节点开始运行
checkpoints["调用节点流程"] = sw.Elapsed;
var last = TimeSpan.Zero;
foreach (var kv in checkpoints)
{
SereinEnv.WriteLine(InfoType.INFO, $"{kv.Key} 耗时: {(kv.Value - last).TotalMilliseconds} ms");
last = kv.Value;
}
//await BenchmarkHelpers.BenchmarkAsync(flowTaskManagement.StartFlowInSelectNodeAsync(nodeModel));
if (context.IsRecordInvokeInfo)
{
var invokeInfos = context.GetAllInvokeInfos();
_ = Task.Delay(100).ContinueWith(async (task) =>
{
await task;
if (invokeInfos.Count < 255)
{
foreach (var info in invokeInfos)
{
SereinEnv.WriteLine(InfoType.INFO, info.ToString());
}
}
else
{
double total = 0;
for (int i = 0; i < invokeInfos.Count; i++)
{
total += invokeInfos[i].TS.TotalSeconds;
}
SereinEnv.WriteLine(InfoType.INFO, $"运行次数:{invokeInfos.Count}");
SereinEnv.WriteLine(InfoType.INFO, $"平均耗时:{total / invokeInfos.Count}");
SereinEnv.WriteLine(InfoType.INFO, $"总耗时:{total}");
}
});
}
context.Reset();
flowContextPool.Free(context);
flowTaskManagementPool.Free(flowTaskManagement);
if (flowResult.Value is TResult result)
{
return result;
}
else if (flowResult is FlowResult && flowResult is TResult result2)
{
return result2;
}
else
{
throw new ArgumentNullException($"类型转换失败,流程返回数据与泛型不匹配,当前返回类型为[{flowResult.Value.GetType().FullName}]。");
}
}
/*/// <summary>
/// 单独运行一个节点
/// </summary>
/// <param name="nodeGuid"></param>
/// <returns></returns>
public async Task<object> InvokeNodeAsync(IDynamicContext context, string nodeGuid)
{
object result = Unit.Default;
if (this.NodeModels.TryGetValue(nodeGuid, out var model))
{
CancellationTokenSource cts = new CancellationTokenSource();
result = await model.ExecutingAsync(context, cts.Token);
cts?.Cancel();
}
return result;
}*/
/// <inheritdoc/>
public Task<bool> ExitFlowAsync()
{
flowWorkManagement?.Exit();
UIContextOperation?.Invoke(() => flowEnvironmentEvent.OnFlowRunComplete(new FlowEventArgs()));
IOC.Reset();
flowWorkManagement = null;
GC.Collect();
return Task.FromResult(true);
}
/// <inheritdoc/>
public void ActivateFlipflopNode(string nodeGuid)
{
/*if (!TryGetNodeModel(nodeGuid, out var nodeModel))
{
return;
}
if (nodeModel is null) return;
if (flowTaskManagement is not null && nodeModel is SingleFlipflopNode flipflopNode) // 子节点为触发器
{
if (FlowState != RunState.Completion
&& flipflopNode.NotExitPreviousNode()) // 正在运行,且该触发器没有上游节点
{
_ = flowTaskManagement.RunGlobalFlipflopAsync(this, flipflopNode);// 被父节点移除连接关系的子节点若为触发器,且无上级节点,则当前流程正在运行,则加载到运行环境中
}
}*/
}
/// <inheritdoc/>
public void TerminateFlipflopNode(string nodeGuid)
{
/* if (!TryGetNodeModel(nodeGuid, out var nodeModel))
{
return;
}
if (nodeModel is null) return;
if (flowTaskManagement is not null && nodeModel is SingleFlipflopNode flipflopNode) // 子节点为触发器
{
flowTaskManagement.TerminateGlobalFlipflopRuning(flipflopNode);
}*/
}
/// <inheritdoc/>
public void UseExternalIOC(ISereinIOC ioc, Action<ISereinIOC> setDefultMemberOnReset = null)
{
IOC = ioc; // 设置IOC容器
this.setDefultMemberOnReset = setDefultMemberOnReset;
IsUseExternalIOC = true;
}
/// <inheritdoc/>
public void MonitorObjectNotification(string nodeGuid, object monitorData, MonitorObjectEventArgs.ObjSourceType sourceType)
{
flowEnvironmentEvent.OnMonitorObjectChanged(new MonitorObjectEventArgs(nodeGuid, monitorData, sourceType));
}
/// <inheritdoc/>
public void TriggerInterrupt(string nodeGuid, string expression, InterruptTriggerEventArgs.InterruptTriggerType type)
{
flowEnvironmentEvent.OnInterruptTriggered(new InterruptTriggerEventArgs(nodeGuid, expression, type));
}
#region
/// <inheritdoc/>
public async Task<object> InvokeAsync(string apiGuid, Dictionary<string, object> dict)
{
var result = await InvokeAsync<object>(apiGuid, dict);
return result;
}
/// <inheritdoc/>
public async Task<TResult> InvokeAsync<TResult>(string apiGuid, Dictionary<string, object> dict)
{
if (!flowModelService.TryGetNodeModel(apiGuid, out var nodeModel))
{
throw new ArgumentNullException($"不存在流程接口:{apiGuid}");
}
if (nodeModel is not SingleFlowCallNode flowCallNode)
{
throw new ArgumentNullException($"目标节点并非流程接口:{apiGuid}");
}
var pds = flowCallNode.MethodDetails.ParameterDetailss;
if (dict.Keys.Count != pds.Length)
{
throw new ArgumentNullException($"参数数量不一致。传入参数数量:{dict.Keys.Count}。接口入参数量:{pds.Length}。");
}
IFlowContext context = contexts.Allocate();
for (int index = 0; index < pds.Length; index++)
{
ParameterDetails pd = pds[index];
if (dict.TryGetValue(pd.Name, out var value))
{
context.SetParamsTempData(flowCallNode.Guid, index, value); // 设置入参参数
}
}
CancellationTokenSource cts = new CancellationTokenSource();
#if DEBUG
FlowResult flowResult = await BenchmarkHelpers.BenchmarkAsync(async () =>
{
var flowResult = await flowCallNode.StartFlowAsync(context, cts.Token);
return flowResult;
});
#else
var flowResult = await flowCallNode.StartFlowAsync(context, cts.Token);
#endif
cts?.Cancel();
cts?.Dispose();
if (flowResult.Value is TResult result)
{
return result;
}
else if (flowResult is FlowResult && flowResult is TResult result2)
{
return result2;
}
else
{
throw new ArgumentNullException($"类型转换失败,流程返回数据与泛型不匹配,当前返回类型为[{flowResult.Value.GetType().FullName}]。");
}
}
private bool IsHasSuccessorNodes(IFlowNode nodeModel)
{
var nextTypes = new[]
{
ConnectionInvokeType.Upstream,
ConnectionInvokeType.IsSucceed,
ConnectionInvokeType.IsFail,
ConnectionInvokeType.IsError
};
foreach (var type in nextTypes)
{
if (nodeModel.SuccessorNodes.TryGetValue(type, out var nextNodes))
{
if(nextNodes.Count > 0)
{
return true;
}
}
}
return false;
}
#endregion
}
}