Files
serein-flow/NodeFlow/Env/FlowControl.cs

424 lines
16 KiB
C#
Raw Normal View History

2025-07-04 21:31:07 +08:00
using Serein.Library;
using Serein.Library.Api;
using Serein.Library.Utils;
using Serein.NodeFlow.Model;
using Serein.NodeFlow.Model.Nodes;
2025-07-04 21:31:07 +08:00
using Serein.NodeFlow.Services;
using System;
using System.Collections;
using System.Collections.Generic;
using System.Diagnostics;
2025-07-04 21:31:07 +08:00
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace Serein.NodeFlow.Env
{
internal class FlowControl : IFlowControl
{
private readonly IFlowEnvironment flowEnvironment;
private readonly IFlowEnvironmentEvent flowEnvironmentEvent;
private readonly FlowLibraryService flowLibraryService;
private readonly FlowOperationService flowOperationService;
private readonly FlowModelService flowModelService;
private readonly UIContextOperation UIContextOperation;
public FlowControl(IFlowEnvironment flowEnvironment,
IFlowEnvironmentEvent flowEnvironmentEvent,
FlowLibraryService flowLibraryService,
FlowOperationService flowOperationService,
FlowModelService flowModelService,
UIContextOperation UIContextOperation)
2025-07-04 21:31:07 +08:00
{
this.flowEnvironment = flowEnvironment;
this.flowEnvironmentEvent = flowEnvironmentEvent;
this.flowLibraryService = flowLibraryService;
this.flowOperationService = flowOperationService;
this.flowModelService = flowModelService;
this.UIContextOperation = UIContextOperation;
contexts = new ObjectPool<IFlowContext>(() => new FlowContext(flowEnvironment));
flowTaskOptions = new FlowWorkOptions
{
FlowIOC = IOC,
Environment = flowEnvironment, // 流程
FlowContextPool = contexts, // 上下文对象池
};
flowTaskManagementPool = new ObjectPool<FlowWorkManagement>(()=> new FlowWorkManagement(flowTaskOptions));
2025-07-04 21:31:07 +08:00
}
private ObjectPool<IFlowContext> contexts;
private ObjectPool<FlowWorkManagement> flowTaskManagementPool;
private FlowWorkOptions flowTaskOptions;
2025-07-04 21:31:07 +08:00
private FlowWorkManagement flowWorkManagement;
private ISereinIOC externalIOC;
private Action<ISereinIOC> setDefultMemberOnReset;
private bool IsUseExternalIOC = false;
private object lockObj = new object();
2025-07-04 21:31:07 +08:00
/// <summary>
/// 如果全局触发器还在运行,则为 Running 。
/// </summary>
private RunState FlipFlopState = RunState.NoStart;
/// <summary>
/// 运行时的IOC容器
/// </summary>
public ISereinIOC IOC
{
get
{
lock (lockObj)
{
if (externalIOC is null) externalIOC = new SereinIOC();
return externalIOC;
}
}
private set
{
lock (lockObj)
{
externalIOC = value;
}
}
}
/// <inheritdoc/>
2025-07-04 21:31:07 +08:00
public async Task<bool> StartFlowAsync(string[] canvasGuids)
{
#region
HashSet<string> guids = new HashSet<string>();
bool isBreak = false;
foreach (var canvasGuid in canvasGuids)
{
if (guids.Contains(canvasGuid))
{
flowEnvironment.WriteLine(InfoType.WARN, $"画布重复,停止运行。{canvasGuid}");
isBreak = true;
}
else if (!flowModelService.ContainsCanvasModel(canvasGuid))
{
SereinEnv.WriteLine(InfoType.WARN, $"画布不存在,停止运行。{canvasGuid}");
isBreak = true;
}
else if (!flowModelService.IsExsitNodeOnCanvas(canvasGuid))
{
SereinEnv.WriteLine(InfoType.WARN, $"画布没有节点,停止运行。{canvasGuid}");
isBreak = true;
}
else
{
guids.Add(canvasGuid);
}
}
if (isBreak)
{
guids.Clear();
return false;
}
#endregion
#region
Dictionary<string, FlowTask> flowTasks = [];
foreach (var guid in guids)
{
if (!flowModelService.TryGetCanvasModel(guid, out var canvasModel))
{
SereinEnv.WriteLine(InfoType.WARN, $"画布不存在,停止运行。{guid}");
return false;
}
var ft = new FlowTask();
ft.GetNodes = () => flowModelService.GetAllNodeModel(guid);
if (canvasModel.StartNode?.Guid is null)
{
SereinEnv.WriteLine(InfoType.WARN, $"画布不存在起始节点,将停止运行。{guid}");
return false;
}
ft.GetStartNode = () => canvasModel.StartNode;
flowTasks.Add(guid, ft);
}
#endregion
IOC.Reset();
setDefultMemberOnReset?.Invoke(IOC);
IOC.Register<IFlowEnvironment>(() => flowEnvironment);
//externalIOC.Register<IScriptFlowApi, ScriptFlowApi>(); // 注册脚本接口
2025-07-04 21:31:07 +08:00
var flowTaskOptions = new FlowWorkOptions
{
FlowIOC = IOC,
2025-07-04 21:31:07 +08:00
Environment = flowEnvironment, // 流程
Flows = flowTasks,
FlowContextPool = contexts, // 上下文对象池
AutoRegisterTypes = flowLibraryService.GetaAutoRegisterType(), // 需要自动实例化的类型
InitMds = flowLibraryService.GetMdsOnFlowStart(NodeType.Init),
LoadMds = flowLibraryService.GetMdsOnFlowStart(NodeType.Loading),
ExitMds = flowLibraryService.GetMdsOnFlowStart(NodeType.Exit),
};
flowWorkManagement = new FlowWorkManagement(flowTaskOptions);
var cts = new CancellationTokenSource();
try
{
var t = await flowWorkManagement.RunAsync(cts.Token);
}
catch (Exception ex)
{
SereinEnv.WriteLine(ex);
}
finally
{
SereinEnv.WriteLine(InfoType.INFO, $"流程运行完毕{Environment.NewLine}"); ;
}
flowTaskOptions = null;
return true;
}
/// <inheritdoc/>
2025-07-07 20:40:24 +08:00
public async Task<TResult> StartFlowAsync<TResult>(string startNodeGuid)
2025-07-04 21:31:07 +08:00
{
var sw = Stopwatch.StartNew();
var checkpoints = new Dictionary<string, TimeSpan>();
var flowTaskManagement = flowTaskManagementPool.Allocate();
if (!flowModelService.TryGetNodeModel(startNodeGuid, out IFlowNode? nodeModel))
2025-07-04 21:31:07 +08:00
{
throw new Exception($"节点不存在【{startNodeGuid}】");
}
if(nodeModel is SingleFlipflopNode)
{
throw new Exception("不能从[Flipflop]节点开始");
2025-07-07 20:40:24 +08:00
}
2025-07-04 21:31:07 +08:00
2025-07-07 20:40:24 +08:00
var flowContextPool = flowTaskManagement.WorkOptions.FlowContextPool;
var context = flowContextPool.Allocate();
checkpoints["准备调用环境"] = sw.Elapsed;
var flowResult = await nodeModel.StartFlowAsync(context, flowTaskManagement.WorkOptions.CancellationTokenSource.Token); // 开始运行时从选定节点开始运行
checkpoints["调用节点流程"] = sw.Elapsed;
2025-07-07 20:40:24 +08:00
var last = TimeSpan.Zero;
foreach (var kv in checkpoints)
{
SereinEnv.WriteLine(InfoType.INFO, $"{kv.Key} 耗时: {(kv.Value - last).TotalMilliseconds} ms");
last = kv.Value;
}
//await BenchmarkHelpers.BenchmarkAsync(flowTaskManagement.StartFlowInSelectNodeAsync(nodeModel));
if (context.IsRecordInvokeInfo)
{
var invokeInfos = context.GetAllInvokeInfos();
_ = Task.Delay(100).ContinueWith(async (task) =>
{
await task;
if (invokeInfos.Count < 255)
{
foreach (var info in invokeInfos)
{
SereinEnv.WriteLine(InfoType.INFO, info.ToString());
}
}
else
{
double total = 0;
for (int i = 0; i < invokeInfos.Count; i++)
{
total += invokeInfos[i].TS.TotalSeconds;
}
SereinEnv.WriteLine(InfoType.INFO, $"运行次数:{invokeInfos.Count}");
SereinEnv.WriteLine(InfoType.INFO, $"平均耗时:{total / invokeInfos.Count}");
SereinEnv.WriteLine(InfoType.INFO, $"总耗时:{total}");
}
});
}
context.Reset();
flowContextPool.Free(context);
flowTaskManagementPool.Free(flowTaskManagement);
2025-07-07 20:40:24 +08:00
if (flowResult.Value is TResult result)
{
return result;
}
else if (flowResult is FlowResult && flowResult is TResult result2)
{
return result2;
2025-07-04 21:31:07 +08:00
}
else
{
2025-07-07 20:40:24 +08:00
throw new ArgumentNullException($"类型转换失败,流程返回数据与泛型不匹配,当前返回类型为[{flowResult.Value.GetType().FullName}]。");
2025-07-04 21:31:07 +08:00
}
}
/*/// <summary>
/// 单独运行一个节点
/// </summary>
/// <param name="nodeGuid"></param>
/// <returns></returns>
public async Task<object> InvokeNodeAsync(IDynamicContext context, string nodeGuid)
{
object result = Unit.Default;
if (this.NodeModels.TryGetValue(nodeGuid, out var model))
{
CancellationTokenSource cts = new CancellationTokenSource();
result = await model.ExecutingAsync(context, cts.Token);
cts?.Cancel();
}
return result;
}*/
/// <inheritdoc/>
2025-07-04 21:31:07 +08:00
public Task<bool> ExitFlowAsync()
{
flowWorkManagement?.Exit();
UIContextOperation?.Invoke(() => flowEnvironmentEvent.OnFlowRunComplete(new FlowEventArgs()));
IOC.Reset();
2025-07-04 21:31:07 +08:00
flowWorkManagement = null;
GC.Collect();
return Task.FromResult(true);
}
/// <inheritdoc/>
2025-07-04 21:31:07 +08:00
public void ActivateFlipflopNode(string nodeGuid)
{
/*if (!TryGetNodeModel(nodeGuid, out var nodeModel))
{
return;
}
if (nodeModel is null) return;
if (flowTaskManagement is not null && nodeModel is SingleFlipflopNode flipflopNode) // 子节点为触发器
{
if (FlowState != RunState.Completion
&& flipflopNode.NotExitPreviousNode()) // 正在运行,且该触发器没有上游节点
{
_ = flowTaskManagement.RunGlobalFlipflopAsync(this, flipflopNode);// 被父节点移除连接关系的子节点若为触发器,且无上级节点,则当前流程正在运行,则加载到运行环境中
}
}*/
}
/// <inheritdoc/>
2025-07-04 21:31:07 +08:00
public void TerminateFlipflopNode(string nodeGuid)
{
/* if (!TryGetNodeModel(nodeGuid, out var nodeModel))
{
return;
}
if (nodeModel is null) return;
if (flowTaskManagement is not null && nodeModel is SingleFlipflopNode flipflopNode) // 子节点为触发器
{
flowTaskManagement.TerminateGlobalFlipflopRuning(flipflopNode);
}*/
}
/// <inheritdoc/>
public void UseExternalIOC(ISereinIOC ioc, Action<ISereinIOC> setDefultMemberOnReset = null)
2025-07-04 21:31:07 +08:00
{
IOC = ioc; // 设置IOC容器
this.setDefultMemberOnReset = setDefultMemberOnReset;
IsUseExternalIOC = true;
2025-07-04 21:31:07 +08:00
}
/// <inheritdoc/>
2025-07-04 21:31:07 +08:00
public void MonitorObjectNotification(string nodeGuid, object monitorData, MonitorObjectEventArgs.ObjSourceType sourceType)
{
flowEnvironmentEvent.OnMonitorObjectChanged(new MonitorObjectEventArgs(nodeGuid, monitorData, sourceType));
}
/// <inheritdoc/>
2025-07-04 21:31:07 +08:00
public void TriggerInterrupt(string nodeGuid, string expression, InterruptTriggerEventArgs.InterruptTriggerType type)
{
flowEnvironmentEvent.OnInterruptTriggered(new InterruptTriggerEventArgs(nodeGuid, expression, type));
}
#region
/// <inheritdoc/>
public async Task<object> InvokeAsync(string apiGuid, Dictionary<string, object> dict)
2025-07-04 21:31:07 +08:00
{
var result = await InvokeAsync<object>(apiGuid, dict);
return result;
2025-07-04 21:31:07 +08:00
}
/// <inheritdoc/>
public async Task<TResult> InvokeAsync<TResult>(string apiGuid, Dictionary<string, object> dict)
2025-07-04 21:31:07 +08:00
{
if (!flowModelService.TryGetNodeModel(apiGuid, out var nodeModel))
{
throw new ArgumentNullException($"不存在流程接口:{apiGuid}");
}
if (nodeModel is not SingleFlowCallNode flowCallNode)
{
throw new ArgumentNullException($"目标节点并非流程接口:{apiGuid}");
}
var pds = flowCallNode.MethodDetails.ParameterDetailss;
if (dict.Keys.Count != pds.Length)
{
throw new ArgumentNullException($"参数数量不一致。传入参数数量:{dict.Keys.Count}。接口入参数量:{pds.Length}。");
}
IFlowContext context = contexts.Allocate();
for (int index = 0; index < pds.Length; index++)
{
ParameterDetails pd = pds[index];
if (dict.TryGetValue(pd.Name, out var value))
{
context.SetParamsTempData(flowCallNode.Guid, index, value); // 设置入参参数
}
}
2025-07-04 21:31:07 +08:00
CancellationTokenSource cts = new CancellationTokenSource();
2025-07-07 20:40:24 +08:00
#if DEBUG
FlowResult flowResult = await BenchmarkHelpers.BenchmarkAsync(async () =>
{
var flowResult = await flowCallNode.StartFlowAsync(context, cts.Token);
return flowResult;
});
#else
2025-07-04 21:31:07 +08:00
var flowResult = await flowCallNode.StartFlowAsync(context, cts.Token);
2025-07-07 20:40:24 +08:00
#endif
cts?.Cancel();
cts?.Dispose();
2025-07-04 21:31:07 +08:00
if (flowResult.Value is TResult result)
{
return result;
}
else if (flowResult is FlowResult && flowResult is TResult result2)
{
return result2;
}
else
{
throw new ArgumentNullException($"类型转换失败,流程返回数据与泛型不匹配,当前返回类型为[{flowResult.Value.GetType().FullName}]。");
}
}
private bool IsHasSuccessorNodes(IFlowNode nodeModel)
{
var nextTypes = new[]
{
ConnectionInvokeType.Upstream,
ConnectionInvokeType.IsSucceed,
ConnectionInvokeType.IsFail,
ConnectionInvokeType.IsError
};
foreach (var type in nextTypes)
{
if (nodeModel.SuccessorNodes.TryGetValue(type, out var nextNodes))
{
if(nextNodes.Count > 0)
{
return true;
}
}
}
return false;
}
#endregion
}
}