mirror of
https://gitee.com/langsisi_admin/serein-flow
synced 2026-03-03 00:00:49 +08:00
整理了Serein.Library项目类文件,IDynamicContext、DynamicContext接口名称及实现类改为IFlowContext、FlowContext,使其与流程其它接口类命名风格统一。
This commit is contained in:
@@ -179,7 +179,7 @@ namespace Serein.Library.Utils
|
||||
{
|
||||
if (data == null)
|
||||
{
|
||||
return Activator.CreateInstance(type);
|
||||
return null;
|
||||
}
|
||||
else
|
||||
{
|
||||
@@ -308,7 +308,7 @@ namespace Serein.Library.Utils
|
||||
}
|
||||
public static object ToValueData(this string valueStr, Type type)
|
||||
{
|
||||
if (string.IsNullOrEmpty(valueStr))
|
||||
if (string.IsNullOrWhiteSpace(valueStr))
|
||||
{
|
||||
return Activator.CreateInstance(type);
|
||||
}
|
||||
|
||||
@@ -9,8 +9,6 @@ using System.Threading.Tasks;
|
||||
namespace Serein.Library.Utils
|
||||
{
|
||||
|
||||
|
||||
|
||||
public class ChannelFlowTrigger<TSignal> : IFlowTrigger<TSignal>
|
||||
{
|
||||
// 使用并发字典管理每个枚举信号对应的 Channel
|
||||
|
||||
@@ -23,7 +23,10 @@ namespace Serein.Library.Utils
|
||||
{
|
||||
// 使用并发字典管理每个信号对应的广播列表
|
||||
private readonly ConcurrentDictionary<TSignal, Subject<TriggerResult<object>>> _subscribers = new ConcurrentDictionary<TSignal, Subject<TriggerResult<object>>>();
|
||||
private readonly TriggerResultPool<object> _triggerResultPool = new TriggerResultPool<object>();
|
||||
private readonly ObjectPool<TriggerResult<object>> _triggerResultPool = new ObjectPool<TriggerResult<object>>(() => new TriggerResult<object>());
|
||||
|
||||
|
||||
|
||||
/// <summary>
|
||||
/// 获取或创建指定信号的 Subject(消息广播者)
|
||||
/// </summary>
|
||||
@@ -66,7 +69,7 @@ namespace Serein.Library.Utils
|
||||
{
|
||||
if (!cts.Token.IsCancellationRequested)
|
||||
{
|
||||
var outResult = _triggerResultPool.Get();
|
||||
var outResult = _triggerResultPool.Allocate();
|
||||
outResult.Type = TriggerDescription.Overtime;
|
||||
subject.OnNext(outResult);
|
||||
subject.OnCompleted();
|
||||
@@ -96,7 +99,7 @@ namespace Serein.Library.Utils
|
||||
var result2 = result.Value is TResult data
|
||||
? new TriggerResult<TResult> { Value = data, Type = TriggerDescription.External }
|
||||
: new TriggerResult<TResult> { Type = TriggerDescription.TypeInconsistency };
|
||||
_triggerResultPool.Return(result); // 将结果归还池中
|
||||
_triggerResultPool.Free(result); // 将结果归还池中
|
||||
return result2;
|
||||
}
|
||||
|
||||
@@ -112,7 +115,7 @@ namespace Serein.Library.Utils
|
||||
{
|
||||
if (_subscribers.TryGetValue(signal, out var subject))
|
||||
{
|
||||
var result = _triggerResultPool.Get();
|
||||
var result = _triggerResultPool.Allocate();
|
||||
result.Type = TriggerDescription.External;
|
||||
result.Value = value;
|
||||
subject.OnNext(result); // 广播给所有订阅者
|
||||
|
||||
21
Library/Utils/FlowTrigger/TriggerDescription.cs
Normal file
21
Library/Utils/FlowTrigger/TriggerDescription.cs
Normal file
@@ -0,0 +1,21 @@
|
||||
namespace Serein.Library.Utils
|
||||
{
|
||||
/// <summary>
|
||||
/// 触发类型
|
||||
/// </summary>
|
||||
public enum TriggerDescription
|
||||
{
|
||||
/// <summary>
|
||||
/// 外部触发
|
||||
/// </summary>
|
||||
External,
|
||||
/// <summary>
|
||||
/// 超时触发
|
||||
/// </summary>
|
||||
Overtime,
|
||||
/// <summary>
|
||||
/// 触发了,但类型不一致
|
||||
/// </summary>
|
||||
TypeInconsistency
|
||||
}
|
||||
}
|
||||
@@ -12,100 +12,4 @@ namespace Serein.Library.Utils
|
||||
public TriggerDescription Type { get; set; }
|
||||
public TResult Value { get; set; }
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 对象池队列
|
||||
/// </summary>
|
||||
public class ConcurrentExpandingObjectPool<T> where T : class, new()
|
||||
{
|
||||
private readonly ConcurrentQueue<T> _pool; // 存储池中对象的队列
|
||||
|
||||
public ConcurrentExpandingObjectPool(int initialCapacity)
|
||||
{
|
||||
// 初始化对象池,初始容量为 initialCapacity
|
||||
_pool = new ConcurrentQueue<T>();
|
||||
|
||||
// 填充初始对象
|
||||
for (int i = 0; i < initialCapacity; i++)
|
||||
{
|
||||
_pool.Enqueue(new T());
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 获取一个对象,如果池中没有对象,则动态创建新的对象
|
||||
/// </summary>
|
||||
/// <returns>池中的一个对象</returns>
|
||||
public T Get()
|
||||
{
|
||||
// 尝试从池中获取一个对象
|
||||
if (!_pool.TryDequeue(out var item))
|
||||
{
|
||||
// 如果池为空,则创建一个新的对象
|
||||
item = new T();
|
||||
}
|
||||
|
||||
return item;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 将一个对象归还到池中
|
||||
/// </summary>
|
||||
/// <param name="item">需要归还的对象</param>
|
||||
public void Return(T item)
|
||||
{
|
||||
// 将对象归还到池中
|
||||
_pool.Enqueue(item);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 获取当前池中的对象数
|
||||
/// </summary>
|
||||
public int CurrentSize => _pool.Count;
|
||||
|
||||
/// <summary>
|
||||
/// 清空池中的所有对象
|
||||
/// </summary>
|
||||
public void Clear()
|
||||
{
|
||||
while (_pool.TryDequeue(out _)) { } // 清空队列
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 使用 ObjectPool 来复用 TriggerResult 对象
|
||||
/// </summary>
|
||||
public class TriggerResultPool<TResult>
|
||||
{
|
||||
private readonly ConcurrentExpandingObjectPool<TriggerResult<TResult>> _objectPool;
|
||||
|
||||
public TriggerResultPool(int defaultCapacity = 30)
|
||||
{
|
||||
_objectPool = new ConcurrentExpandingObjectPool<TriggerResult<TResult>>(defaultCapacity);
|
||||
}
|
||||
|
||||
public TriggerResult<TResult> Get() => _objectPool.Get();
|
||||
|
||||
public void Return(TriggerResult<TResult> result) => _objectPool.Return(result);
|
||||
}
|
||||
|
||||
|
||||
/// <summary>
|
||||
/// 触发类型
|
||||
/// </summary>
|
||||
public enum TriggerDescription
|
||||
{
|
||||
/// <summary>
|
||||
/// 外部触发
|
||||
/// </summary>
|
||||
External,
|
||||
/// <summary>
|
||||
/// 超时触发
|
||||
/// </summary>
|
||||
Overtime,
|
||||
/// <summary>
|
||||
/// 触发了,但类型不一致
|
||||
/// </summary>
|
||||
TypeInconsistency
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,6 +18,7 @@ namespace Serein.Library.Utils
|
||||
return await Task.WhenAll(source.Select(async s => await method(s)));
|
||||
}
|
||||
|
||||
|
||||
public static async Task<IEnumerable<TResult>> SelectAsync<TSource, TResult>(this IEnumerable<TSource> source,
|
||||
Func<TSource, Task<TResult>> method,
|
||||
int concurrency = int.MaxValue)
|
||||
|
||||
@@ -28,7 +28,7 @@ namespace Serein.Library.Utils
|
||||
// 时间戳
|
||||
long timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
|
||||
|
||||
// 机器标识(可以替换成更加独特的标识,如机器的MAC地址等)
|
||||
// 机器标识
|
||||
string machineId = GetMachineId();
|
||||
|
||||
// 进程ID
|
||||
@@ -53,7 +53,6 @@ namespace Serein.Library.Utils
|
||||
private static string GetMachineId()
|
||||
{
|
||||
// 这里使用 GUID 模拟机器标识
|
||||
// 可以替换为更具体的机器信息
|
||||
return Guid.NewGuid().ToString("N");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -22,7 +22,7 @@ namespace Serein.Library.Utils
|
||||
/// <param name="parent">父类对象</param>
|
||||
/// <param name="childType">子类类型</param>
|
||||
/// <returns></returns>
|
||||
public static object ConvertParentToChild(object parent,Type childType)
|
||||
public static object ConvertParentToChild(object parent, Type childType)
|
||||
{
|
||||
var child = Activator.CreateInstance(childType);
|
||||
var parentType = parent.GetType();
|
||||
|
||||
@@ -33,7 +33,7 @@ namespace Serein.Library
|
||||
SereinEnv.EnvGlobalData.AddOrUpdate(name, data, (k, o) => data);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/*/// <summary>
|
||||
/// 更改某个数据的名称
|
||||
/// </summary>
|
||||
/// <param name="oldName">旧名称</param>
|
||||
@@ -56,7 +56,7 @@ namespace Serein.Library
|
||||
{
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}*/
|
||||
|
||||
/// <summary>
|
||||
/// 获取全局数据
|
||||
|
||||
@@ -1,136 +0,0 @@
|
||||
using Newtonsoft.Json.Linq;
|
||||
using Serein.Library.Api;
|
||||
using System;
|
||||
using System.Collections.Concurrent;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Reactive.Subjects;
|
||||
using System.Text;
|
||||
using System.Threading;
|
||||
using System.Threading.Channels;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace Serein.Library.Utils
|
||||
{
|
||||
/// <summary>
|
||||
/// 同步的单体消息触发器
|
||||
/// </summary>
|
||||
/// <typeparam name="TSingle"></typeparam>
|
||||
public class SingleSyncFlowTrigger<TSingle> : IFlowTrigger<TSingle>
|
||||
{
|
||||
private readonly ConcurrentDictionary<TSingle, Queue<TaskCompletionSource<TriggerResult<object>>>> _syncChannel
|
||||
= new ConcurrentDictionary<TSingle, Queue<TaskCompletionSource<TriggerResult<object>>>>();
|
||||
|
||||
public void CancelAllTrigger()
|
||||
{
|
||||
foreach (var triggers in _syncChannel.Values)
|
||||
{
|
||||
foreach (var trigger in triggers)
|
||||
{
|
||||
trigger.SetCanceled();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public Task<bool> InvokeTriggerAsync<TResult>(TSingle signal, TResult value)
|
||||
{
|
||||
if(_syncChannel.TryGetValue(signal, out var tcss))
|
||||
{
|
||||
var tcs = tcss.Dequeue();
|
||||
var result = new TriggerResult<object>
|
||||
{
|
||||
Type = TriggerDescription.External,
|
||||
Value = value,
|
||||
};
|
||||
tcs.SetResult(result);
|
||||
return Task.FromResult(true);
|
||||
}
|
||||
return Task.FromResult(false);
|
||||
}
|
||||
|
||||
public async Task<TriggerResult<TResult>> WaitTriggerAsync<TResult>(TSingle signal)
|
||||
{
|
||||
if (!_syncChannel.TryGetValue(signal,out var tcss))
|
||||
{
|
||||
tcss = new Queue<TaskCompletionSource<TriggerResult<object>>>();
|
||||
_syncChannel.TryAdd(signal, tcss);
|
||||
}
|
||||
var taskCompletionSource = new TaskCompletionSource<TriggerResult<object>>();
|
||||
tcss.Enqueue(taskCompletionSource);
|
||||
var result = await taskCompletionSource.Task;
|
||||
if (result.Value is TResult result2)
|
||||
{
|
||||
return new TriggerResult<TResult>
|
||||
{
|
||||
Type = TriggerDescription.External,
|
||||
Value = result2,
|
||||
};
|
||||
}
|
||||
else
|
||||
{
|
||||
return new TriggerResult<TResult>
|
||||
{
|
||||
Type = TriggerDescription.TypeInconsistency,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
public async Task<TriggerResult<TResult>> WaitTriggerWithTimeoutAsync<TResult>(TSingle signal, TimeSpan outTime)
|
||||
{
|
||||
if (!_syncChannel.TryGetValue(signal, out var tcss))
|
||||
{
|
||||
tcss = new Queue<TaskCompletionSource<TriggerResult<object>>>();
|
||||
_syncChannel.TryAdd(signal, tcss);
|
||||
}
|
||||
|
||||
|
||||
var taskCompletionSource = new TaskCompletionSource<TriggerResult<object>>();
|
||||
tcss.Enqueue(taskCompletionSource);
|
||||
|
||||
var cts = new CancellationTokenSource();
|
||||
|
||||
// 异步任务:超时后自动触发信号
|
||||
_ = Task.Run(async () =>
|
||||
{
|
||||
try
|
||||
{
|
||||
await Task.Delay(outTime, cts.Token);
|
||||
if (!cts.IsCancellationRequested) // 如果还没有被取消
|
||||
{
|
||||
var outResult = new TriggerResult<object>()
|
||||
{
|
||||
Type = TriggerDescription.Overtime
|
||||
};
|
||||
taskCompletionSource.SetResult(outResult); // 超时触发
|
||||
}
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
// 超时任务被取消
|
||||
}
|
||||
finally
|
||||
{
|
||||
cts?.Dispose(); // 确保 cts 被释放
|
||||
}
|
||||
}, cts.Token);
|
||||
var result = await taskCompletionSource.Task;
|
||||
cts?.Cancel();
|
||||
if (result.Value is TResult result2)
|
||||
{
|
||||
return new TriggerResult<TResult>
|
||||
{
|
||||
Type = result.Type,
|
||||
Value = result2,
|
||||
};
|
||||
}
|
||||
else
|
||||
{
|
||||
return new TriggerResult<TResult>
|
||||
{
|
||||
Type = result.Type,
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user