更改了远程环境下websocket会来回发送重复消息的问题

This commit is contained in:
fengjiayi
2024-10-28 10:25:57 +08:00
parent e2f1ec5810
commit f20cfb755c
20 changed files with 297 additions and 167 deletions

View File

@@ -2,6 +2,7 @@
using Newtonsoft.Json;
using Serein.Library;
using Serein.Library.Api;
using Serein.Library.Core.NodeFlow;
using Serein.Library.FlowNode;
using Serein.Library.Utils;
using Serein.Library.Utils.SereinExpression;
@@ -407,21 +408,18 @@ namespace Serein.NodeFlow.Env
/// <summary>
/// 单独运行一个节点
/// </summary>
/// <param name="context"></param>
/// <param name="nodeGuid"></param>
/// <returns></returns>
public async Task<object> InvokeNodeAsync(string nodeGuid)
{
if(this.NodeModels.TryGetValue(nodeGuid, out var model))
IDynamicContext context = new DynamicContext(this);
object result = true;
if (this.NodeModels.TryGetValue(nodeGuid, out var model))
{
return await model.InvokeAsync(this);
}
else
{
return null;
result = await model.InvokeAsync(context);
}
context.Exit();
return result;
}
/// <summary>

View File

@@ -364,9 +364,9 @@ namespace Serein.NodeFlow.Env
await currentFlowEnvironment.StartAsyncInSelectNode(startNodeGuid);
}
public async Task<object> InvokeNodeAsync(string nodeGuid)
public async Task<object> InvokeNodeAsync( string nodeGuid)
{
return await currentFlowEnvironment.InvokeNodeAsync(nodeGuid);
return await currentFlowEnvironment.InvokeNodeAsync( nodeGuid);
}
public async Task StartRemoteServerAsync(int port = 7525)

View File

@@ -78,7 +78,7 @@ namespace Serein.NodeFlow.Env
/// </summary>
/// <param name="msgId"></param>
/// <param name="flowEnvInfo"></param>
[AutoSocketHandle(ThemeValue = EnvMsgTheme.GetEnvInfo)]
[AutoSocketHandle(ThemeValue = EnvMsgTheme.GetEnvInfo, IsReturnValue = false)]
public void GetEnvInfo([UseMsgId] string msgId, [UseData] FlowEnvInfo flowEnvInfo)
{
remoteFlowEnvironment.TriggerSignal(msgId, flowEnvInfo);
@@ -90,19 +90,19 @@ namespace Serein.NodeFlow.Env
/// </summary>
/// <param name="msgId"></param>
/// <param name="sereinProjectData"></param>
[AutoSocketHandle(ThemeValue = EnvMsgTheme.GetProjectInfo)]
[AutoSocketHandle(ThemeValue = EnvMsgTheme.GetProjectInfo, IsReturnValue = false)]
public void GetProjectInfo([UseMsgId] string msgId, [UseData] SereinProjectData sereinProjectData)
{
remoteFlowEnvironment.TriggerSignal(msgId, sereinProjectData);
}
[AutoSocketHandle(ThemeValue = EnvMsgTheme.SetNodeInterrupt)]
[AutoSocketHandle(ThemeValue = EnvMsgTheme.SetNodeInterrupt, IsReturnValue = false)]
public void SetNodeInterrupt([UseMsgId] string msgId)
{
remoteFlowEnvironment.TriggerSignal(msgId, null);
}
[AutoSocketHandle(ThemeValue = EnvMsgTheme.AddInterruptExpression)]
[AutoSocketHandle(ThemeValue = EnvMsgTheme.AddInterruptExpression, IsReturnValue = false)]
public void AddInterruptExpression([UseMsgId] string msgId)
{
remoteFlowEnvironment.TriggerSignal(msgId, null);
@@ -110,37 +110,37 @@ namespace Serein.NodeFlow.Env
[AutoSocketHandle(ThemeValue = EnvMsgTheme.CreateNode)]
[AutoSocketHandle(ThemeValue = EnvMsgTheme.CreateNode, IsReturnValue = false)]
public void CreateNode([UseMsgId] string msgId, [UseData] NodeInfo nodeInfo)
{
remoteFlowEnvironment.TriggerSignal(msgId, nodeInfo);
}
[AutoSocketHandle(ThemeValue = EnvMsgTheme.RemoveNode)]
[AutoSocketHandle(ThemeValue = EnvMsgTheme.RemoveNode, IsReturnValue = false)]
public void RemoveNode([UseMsgId] string msgId, bool state)
{
remoteFlowEnvironment.TriggerSignal(msgId, state);
}
[AutoSocketHandle(ThemeValue = EnvMsgTheme.ConnectInvokeNode)]
[AutoSocketHandle(ThemeValue = EnvMsgTheme.ConnectInvokeNode, IsReturnValue = false)]
public void ConnectInvokeNode([UseMsgId] string msgId, bool state)
{
remoteFlowEnvironment.TriggerSignal(msgId, state);
}
[AutoSocketHandle(ThemeValue = EnvMsgTheme.RemoveInvokeConnect)]
[AutoSocketHandle(ThemeValue = EnvMsgTheme.RemoveInvokeConnect, IsReturnValue = false)]
public void RemoveInvokeConnect([UseMsgId] string msgId, bool state)
{
remoteFlowEnvironment.TriggerSignal(msgId, state);
}
[AutoSocketHandle(ThemeValue = EnvMsgTheme.ConnectArgSourceNode)]
[AutoSocketHandle(ThemeValue = EnvMsgTheme.ConnectArgSourceNode, IsReturnValue = false)]
public void ConnectArgSourceNode([UseMsgId] string msgId, bool state)
{
remoteFlowEnvironment.TriggerSignal(msgId, state);
}
[AutoSocketHandle(ThemeValue = EnvMsgTheme.RemoveArgSourceConnect)]
[AutoSocketHandle(ThemeValue = EnvMsgTheme.RemoveArgSourceConnect, IsReturnValue = false)]
public void RemoveArgSourceConnect([UseMsgId] string msgId, bool state)
{
remoteFlowEnvironment.TriggerSignal(msgId, state);

View File

@@ -358,33 +358,42 @@ namespace Serein.NodeFlow
/// <returns></returns>
private async Task FlipflopExecuteAsync(IFlowEnvironment env, SingleFlipflopNode singleFlipFlopNode, CancellationTokenSource cts)
{
var context = new DynamicContext(env); // 启动全局触发器时新建上下文
if(_flipFlopCts is null)
{
Console.WriteLine("flowStarter -> FlipflopExecuteAsync -> _flipFlopCts is null");
return;
}
while (!_flipFlopCts.IsCancellationRequested && !cts.IsCancellationRequested)
{
var context = new DynamicContext(env); // 启动全局触发器时新建上下文
try
{
var newFlowData = await singleFlipFlopNode.ExecutingAsync(context); // 获取触发器等待Task
await NodeModelBase.RefreshFlowDataAndExpInterrupt(context, singleFlipFlopNode, newFlowData); // 全局触发器触发后刷新该触发器的节点数据
if (context.NextOrientation != ConnectionInvokeType.None)
if (context.NextOrientation == ConnectionInvokeType.None)
{
var nextNodes = singleFlipFlopNode.SuccessorNodes[context.NextOrientation];
for (int i = nextNodes.Count - 1; i >= 0 && !_flipFlopCts.IsCancellationRequested; i--)
{
// 筛选出启用的节点
if (nextNodes[i].DebugSetting.IsEnable)
{
nextNodes[i].PreviousNode = singleFlipFlopNode;
if (nextNodes[i].DebugSetting.IsInterrupt) // 执行触发前
{
var cancelType = await nextNodes[i].DebugSetting.GetInterruptTask();
await Console.Out.WriteLineAsync($"[{nextNodes[i].MethodDetails.MethodName}]中断已{cancelType},开始执行后继分支");
}
await nextNodes[i].StartFlowAsync(context); // 启动执行触发器后继分支的节点
}
}
continue;
}
var nextNodes = singleFlipFlopNode.SuccessorNodes[context.NextOrientation];
for (int i = nextNodes.Count - 1; i >= 0 && !_flipFlopCts.IsCancellationRequested; i--)
{
// 筛选出启用的节点
if (!nextNodes[i].DebugSetting.IsEnable)
{
continue;
}
nextNodes[i].PreviousNode = singleFlipFlopNode;
if (nextNodes[i].DebugSetting.IsInterrupt) // 执行触发前
{
var cancelType = await nextNodes[i].DebugSetting.GetInterruptTask();
await Console.Out.WriteLineAsync($"[{nextNodes[i].MethodDetails.MethodName}]中断已{cancelType},开始执行后继分支");
}
await nextNodes[i].StartFlowAsync(context); // 启动执行触发器后继分支的节点
}
}
catch(FlipflopException ex)
catch (FlipflopException ex)
{
await Console.Out.WriteLineAsync($"触发器[{singleFlipFlopNode.MethodDetails.MethodName}]因非预期异常终止。"+ex.Message);
if (ex.Type == FlipflopException.CancelClass.Flow)
@@ -396,6 +405,10 @@ namespace Serein.NodeFlow
{
await Console.Out.WriteLineAsync(ex.Message);
}
finally
{
context.Exit();
}
}
}

View File

@@ -9,35 +9,7 @@ namespace Serein.NodeFlow.Tool;
public static class NodeMethodDetailsHelper
{
/// <summary>
/// 生成方法信息
/// </summary>
/// <param name="serviceContainer"></param>
/// <param name="type"></param>
/// <returns></returns>
//public static List<MethodDetails> GetList(Type type)
//{
// var methodDetailsDictionary = new List<MethodDetails>();
// var delegateDictionary = new List<Delegate>();
// var assemblyName = type.Assembly.GetName().Name;
// var methods = GetMethodsToProcess(type);
// foreach (var method in methods)
// {
// (var methodDetails,var methodDelegate) = CreateMethodDetails(type, method, assemblyName);
// methodDetailsDictionary.Add(methodDetails);
// delegateDictionary.Add(methodDelegate);
// }
// var mds = methodDetailsDictionary.OrderBy(it => it.MethodName).ToList();
// var dels = delegateDictionary;
// return mds;
//}
/// <summary>
/// 获取处理方法
/// </summary>