From bf987f4ef0fc049b631ba67074c49476a2292b81 Mon Sep 17 00:00:00 2001 From: fengjiayi <12821976+ning_xi@user.noreply.gitee.com> Date: Mon, 4 Aug 2025 14:59:48 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B3=A8=E9=87=8A=E4=BA=86EventBus=E7=9A=84?= =?UTF-8?q?=E7=9B=B8=E5=85=B3=E4=BB=A3=E7=A0=81=EF=BC=8C=E7=9B=AE=E5=89=8D?= =?UTF-8?q?=E5=B9=B6=E4=B8=8D=E6=89=93=E7=AE=97=E5=AE=9E=E7=8E=B0=E4=BA=8B?= =?UTF-8?q?=E4=BB=B6=E6=80=BB=E7=BA=BF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 1 - Workbench/Serein.WorkBench.csproj | 1 - Workbench/Tool/IEventBus.cs | 271 ++++++++++++++++++++++++++++++ 3 files changed, 271 insertions(+), 2 deletions(-) create mode 100644 Workbench/Tool/IEventBus.cs diff --git a/.gitignore b/.gitignore index 072da08..5c3a4ae 100644 --- a/.gitignore +++ b/.gitignore @@ -26,4 +26,3 @@ WorkBench.Remote Serein.FlowStartTool Serein.CloudWorkbench Serein.CollaborationSync -/Workbench/Tool/IEventBus.cs diff --git a/Workbench/Serein.WorkBench.csproj b/Workbench/Serein.WorkBench.csproj index a7ecd34..21e22fc 100644 --- a/Workbench/Serein.WorkBench.csproj +++ b/Workbench/Serein.WorkBench.csproj @@ -49,7 +49,6 @@ - diff --git a/Workbench/Tool/IEventBus.cs b/Workbench/Tool/IEventBus.cs new file mode 100644 index 0000000..81d4f01 --- /dev/null +++ b/Workbench/Tool/IEventBus.cs @@ -0,0 +1,271 @@ +using Microsoft.Extensions.DependencyInjection; +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using System.Text; +using System.Threading.Channels; +using System.Threading.Tasks; + +namespace Serein.Library.Api +{ +#if false + + #region 事件总线接口 + public interface IEvent + { + + } + + public interface IAsyncEventHandler where TEvent : IEvent + { + Task HandleAsync(TEvent @event); + + void HandleException(TEvent @event, Exception ex); + } + + public interface IEventBus + { + void Publish(TEvent @event) where TEvent : IEvent; + Task PublishAsync(TEvent @event) where TEvent : IEvent; + + void OnSubscribe() where TEvent : IEvent; + } + public interface ILocalEventBusManager where TEvent : IEvent + { + void Publish(TEvent @event); + Task PublishAsync(TEvent @event); + + void AutoHandle(); + } + + #endregion + #region 事件总线实现类 + public class LocalEventBusManager(IServiceProvider serviceProvider) : ILocalEventBusManager + where TEvent : IEvent + { + readonly IServiceProvider _servicesProvider = serviceProvider; + + private readonly Channel _eventChannel = Channel.CreateUnbounded(); + + public void Publish(TEvent @event) + { + Debug.Assert(_eventChannel != null, nameof(_eventChannel) + " != null"); + _eventChannel.Writer.WriteAsync(@event); + } + + private CancellationTokenSource Cts { get; } = new(); + + public void Cancel() + { + Cts.Cancel(); + } + + public async Task PublishAsync(TEvent @event) + { + await _eventChannel.Writer.WriteAsync(@event); + } + + public void AutoHandle() + { + // 确保只启动一次 + if (!Cts.IsCancellationRequested) return; + + Task.Run(async () => + { + while (!Cts.IsCancellationRequested) + { + var reader = await _eventChannel.Reader.ReadAsync(); + await HandleAsync(reader); + } + }, Cts.Token); + } + + async Task HandleAsync(TEvent @event) + { + var handler = _servicesProvider.GetService>(); + + if (handler is null) + { + throw new NullReferenceException($"No handler for event {@event.GetType().Name}"); + } + try + { + await handler.HandleAsync(@event); + } + catch (Exception ex) + { + handler.HandleException(@event, ex); + } + } + } + + public sealed class LocalEventBusPool(IServiceProvider serviceProvider) + { + private readonly IServiceProvider _serviceProvider = serviceProvider; + + private class ChannelKey + { + public required string Key { get; init; } + public int Subscribers { get; set; } + + public override bool Equals(object? obj) + { + if (obj is ChannelKey key) + { + return string.Equals(key.Key, Key, StringComparison.OrdinalIgnoreCase); + } + + return false; + } + + public override int GetHashCode() + { + return 0; + } + } + + private Channel Rent(string channel) + { + _channels.TryGetValue(new ChannelKey() { Key = channel }, out var value); + + if (value != null) return value; + value = Channel.CreateUnbounded(); + _channels.TryAdd(new ChannelKey() { Key = channel }, value); + return value; + } + + private Channel Rent(ChannelKey channelKey) + { + _channels.TryGetValue(channelKey, out var value); + if (value != null) return value; + value = Channel.CreateUnbounded(); + _channels.TryAdd(channelKey, value); + return value; + } + + private readonly ConcurrentDictionary> _channels = new(); + + private CancellationTokenSource Cts { get; } = new(); + + public void Cancel() + { + Cts.Cancel(); + _channels.Clear(); + Cts.TryReset(); + } + + public async Task PublishAsync(TEvent @event) where TEvent : IEvent + { + await Rent(typeof(TEvent).Name).Writer.WriteAsync(@event); + } + + public void Publish(TEvent @event) where TEvent : IEvent + { + Rent(typeof(TEvent).Name).Writer.TryWrite(@event); + } + + public void OnSubscribe() where TEvent : IEvent + { + var channelKey = _channels.FirstOrDefault(x => x.Key.Key == typeof(TEvent).Name).Key ?? + new ChannelKey() { Key = typeof(TEvent).Name }; + channelKey.Subscribers++; + + Task.Run(async () => + { + try + { + while (!Cts.IsCancellationRequested) + { + var @event = await ReadAsync(channelKey); + + var handler = _serviceProvider.GetService>(); + if (handler == null) throw new NullReferenceException($"No handler for Event {typeof(TEvent).Name}"); + try + { + await handler.HandleAsync((TEvent)@event); + } + catch (Exception ex) + { + handler.HandleException((TEvent)@event, ex); + } + } + } + catch (Exception e) + { + throw new InvalidOperationException("Error on onSubscribe handler", e); + } + }, Cts.Token); + } + + private async Task ReadAsync(string channel) + { + return await Rent(channel).Reader.ReadAsync(Cts.Token); + } + + private async Task ReadAsync(ChannelKey channel) + { + return await Rent(channel).Reader.ReadAsync(Cts.Token); + } + } + + + + #endregion + #region 测试方法 + public class EventBusTest + { + private readonly LocalEventBusPool _localEventBusPool; + private readonly LocalEventBusManager _localEventBusManager; + public EventBusTest(IServiceProvider serviceProvider) + { + /*collection.AddSingleton, TestHandler>(); + EventBusTest eventBusTest = new EventBusTest(services); + _ = eventBusTest.Run();*/ + + _localEventBusPool = new LocalEventBusPool(serviceProvider); + _localEventBusManager = new Api.LocalEventBusManager(serviceProvider); + + } + + public async Task Run() + { + var @event = new TestEvent { Name = "Test Event Async" }; + + _localEventBusPool.OnSubscribe(); + _localEventBusPool.Publish(@event); + await _localEventBusPool.PublishAsync(@event); + + //_localEventBusManager.AutoHandle(); + _localEventBusManager.Publish(@event); + await _localEventBusManager.PublishAsync(@event); + } + } + + public class TestEvent : IEvent + { + public string Name { get; set; } = "Test Event"; + } + + + + public class TestHandler : IAsyncEventHandler + { + public Task HandleAsync(TestEvent @event) + { + + Debug.WriteLine($"Handling event: {@event.Name}"); + return Task.CompletedTask; + } + + public void HandleException(TestEvent @event, Exception ex) + { + Debug.WriteLine($"Exception occurred while handling event {@event.GetType().Name}: {ex.Message}"); + } + } + + #endregion + +#endif +}