using Serein.Library; using Serein.Library.Utils; using Serein.Proto.WebSocket.Attributes; using Serein.Proto.WebSocket.Handle; using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Data; using System.Diagnostics.CodeAnalysis; using System.Linq; using System.Net.Sockets; using System.Net.WebSockets; using System.Reactive; using System.Reflection; using System.Text; using System.Text.Json; using System.Threading.Tasks; using NetWebSocket = System.Net.WebSockets.WebSocket; namespace Serein.Proto.WebSocket { /// /// WebSocket 服务类,负责管理所有 WebSocket 连接和处理模块 /// public partial class SereinWebSocketService : ISereinWebSocketService { /// /// (Theme Name ,Data Name) - HandleModule /// private readonly ConcurrentDictionary<(string, string), WebSocketHandleModule> _socketModules; /// /// 追踪未处理的异常 /// private Action? _onExceptionTracking; private Action? _onReply; private Func _onReplyMakeData; /// /// 维护所有 WebSocket 连接 /// private readonly List _sockets; /// /// 用于增加、移除 WebSocket 连接时,保证线程安全操作 /// private readonly object _lock = new object(); public int ConcetionCount => _sockets.Count; /// /// SereinWebSocketService 构造函数,初始化 WebSocket 模块字典和连接列表 /// public SereinWebSocketService() { _socketModules = new ConcurrentDictionary<(string, string), WebSocketHandleModule>(); _sockets = new List(); _lock = new object(); } #region 添加处理模块 /// /// 添加处理模块,使用指定的实例工厂和异常追踪回调 /// /// /// /// public ISereinWebSocketService AddHandleModule() where T : ISocketHandleModule, new() { var type = typeof(T); Func instanceFactory = () => (T)Activator.CreateInstance(type); return AddHandleModule(type, instanceFactory); } /// /// 添加处理模块,使用指定的实例工厂和异常追踪回调 /// /// /// /// public ISereinWebSocketService AddHandleModule(Func instanceFactory) where T : ISocketHandleModule { var type = typeof(T); return AddHandleModule(type, instanceFactory); } /// /// 添加处理模块,使用指定的类型、实例工厂和异常追踪回调 /// /// /// /// private ISereinWebSocketService AddHandleModule(Type type, Func instanceFactory) { if(!CheckAttribute(type, out var attribute)) { throw new Exception($"类型 {type} 需要标记 WebSocketModuleAttribute 特性"); } var modbuleConfig = GetConfig(attribute); var module = GetOrAddModule(modbuleConfig); var methodInfos = type.GetMethods(BindingFlags.Instance | BindingFlags.Public | BindingFlags.NonPublic).ToArray(); var methodConfigs = CreateMethodConfig(methodInfos, instanceFactory); SereinEnv.WriteLine(InfoType.INFO, $"add websocket handle model :"); SereinEnv.WriteLine(InfoType.INFO, $"theme key, data key : {modbuleConfig.ThemeJsonKey}, {modbuleConfig.DataJsonKey}"); foreach (var methodConfig in methodConfigs) { SereinEnv.WriteLine(InfoType.INFO, $"theme value : {methodConfig!.ThemeValue} "); var result = module.AddHandleConfigs(methodConfig); } return this; } /// /// 检查特性 /// /// /// /// /// private bool CheckAttribute(Type type, out TAttribute attribute) where TAttribute : Attribute { attribute = type.GetCustomAttribute(); if (attribute is null) { return false; } return true; } /// /// 获取模块配置 /// /// /// private WebSocketModuleConfig GetConfig(WebSocketModuleAttribute moduleAttribute) { var themeKey = moduleAttribute.ThemeKey; var dataKey = moduleAttribute.DataKey; var msgIdKey = moduleAttribute.MsgIdKey; var isResponseUseReturn = moduleAttribute.IsResponseUseReturn; var moduleConfig = new WebSocketModuleConfig() { ThemeJsonKey = themeKey, DataJsonKey = dataKey, MsgIdJsonKey = msgIdKey, IsResponseUseReturn = isResponseUseReturn, }; return moduleConfig; } /// /// 获取或添加消息处理与异常处理 /// /// 模块配置 /// private WebSocketHandleModule GetOrAddModule(WebSocketModuleConfig moduleConfig) { var key = (moduleConfig.ThemeJsonKey, moduleConfig.DataJsonKey); if (_socketModules.TryGetValue(key, out var wsHandleModule)) { return wsHandleModule; } wsHandleModule = new WebSocketHandleModule(moduleConfig); _socketModules[key] = wsHandleModule; return wsHandleModule; } /// /// 创建方法配置 /// /// /// /// /// private List CreateMethodConfig(MethodInfo[] methodInfos, Func instanceFactory) { List configs = []; foreach (var methodInfo in methodInfos) { var wsMethodAttribute = methodInfo.GetCustomAttribute(); if (wsMethodAttribute is null) { continue; } var parameterInfos = methodInfo.GetParameters(); var temp_array = parameterInfos.Select(p => { var isSend = CheckSendType(p.ParameterType, out var sendType); return new { IsNeedSend = isSend, Type = sendType }; }).ToArray(); var config = new WebSocketMethodConfig { ThemeValue = string.IsNullOrEmpty(wsMethodAttribute.ThemeValue) ? methodInfo.Name : wsMethodAttribute.ThemeValue, IsReturnValue = wsMethodAttribute.IsReturnValue, DelegateDetails = new DelegateDetails(methodInfo), // 对应theme的emit构造委托调用工具类 InstanceFactory = instanceFactory, // 调用emit委托时的实例 //OnExceptionTracking = onExceptionTracking, // 异常追踪 ParameterType = parameterInfos.Select(t => t.ParameterType).ToArray(), // 入参参数类型 ParameterName = parameterInfos.Select(t => $"{t.Name}").ToArray(), // 入参参数名称 UseRequest = parameterInfos.Select(p => p.GetCustomAttribute() is not null).ToArray(),// 是否使用整体data数据 UseData = parameterInfos.Select(p => p.GetCustomAttribute() is not null).ToArray(), // 是否使用整体data数据 UseMsgId = parameterInfos.Select(p => p.GetCustomAttribute() is not null).ToArray(), // 是否使用消息ID UseContent = parameterInfos.Select(p => p.ParameterType.IsAssignableFrom(typeof(WebSocketHandleContext))).ToArray(), // 是否使用上下文 IsNeedSendDelegate = temp_array.Select(p => p.IsNeedSend).ToArray(), // 是否需要发送消息的委托 SendDelegateType = temp_array.Select(p => p.Type).ToArray(), // 发送消息的委托类型 CachedSendDelegates = new Delegate[temp_array.Length], // 提前缓存发送委托数组 }; configs.Add(config); } return configs; } private bool CheckSendType(Type type , out SendType sendType) { if (type.IsAssignableFrom(typeof(Func))) { sendType = SendType.ObjectAsync; } else if (type.IsAssignableFrom(typeof(Func))) { sendType = SendType.StringAsync; } else if (type.IsAssignableFrom(typeof(Action))) { sendType = SendType.StringAsync; } else if (type.IsAssignableFrom(typeof(Action))) { sendType = SendType.StringAsync; } else { sendType = SendType.None; return false; } return true; } #endregion /// /// 跟踪未处理的异常 /// /// public ISereinWebSocketService TrackUnhandledExceptions(Action onExceptionTracking) { _onExceptionTracking = onExceptionTracking; return this; } /// /// 传入新的 WebSocket 连接,开始进行处理 /// /// /// /// 处理新的 WebSocket 连接 /// /// /// public async Task AddWebSocketHandleAsync(NetWebSocket socket) { lock (_lock) { _sockets.Add(socket); } var buffer = new byte[4096]; var msgHandleUtil = new WebSocketMessageTransmissionTool(); // 消息队列 _ = Task.Run(async () => { await HandleMsgAsync(socket, msgHandleUtil); }); try { while (socket.State == WebSocketState.Open) { var result = await socket.ReceiveAsync(buffer: new ArraySegment(buffer), CancellationToken.None); if (result.MessageType == WebSocketMessageType.Close) { break; } else if (result.MessageType == WebSocketMessageType.Text) { var message = Encoding.UTF8.GetString(buffer, 0, result.Count); Console.WriteLine($"收到客户端消息: {message}"); var echo = Encoding.UTF8.GetBytes(message); await msgHandleUtil.WriteMsgAsync(message); // 异步传递消息 } } } catch (Exception ex) { Console.WriteLine($"WebSocket 异常: {ex.Message}"); } finally { lock (_lock) { _sockets.Remove(socket); } await socket.CloseAsync(WebSocketCloseStatus.NormalClosure, "关闭连接", CancellationToken.None); socket.Dispose(); } } /// /// 处理消息 /// /// /// /// private async Task HandleMsgAsync(NetWebSocket webSocket,WebSocketMessageTransmissionTool tranTool) { //var AuthorizedClients = new ConcurrentDictionary(); async Task sendasync(string text) { await SocketExtension.SendAsync(webSocket, text); // 回复客户端,处理方法中入参如果需要发送消息委托,则将该回调方法作为委托参数传入 } ObjectPool contextPool = new ObjectPool(() => { var context = new WebSocketHandleContext(sendasync); context.OnExceptionTracking = _onExceptionTracking; context.OnReplyMakeData = _onReplyMakeData; context.OnReply = _onReply; return context; }, context => { context.Reset(); }); while (webSocket.State == WebSocketState.Open) { var message = await tranTool.WaitMsgAsync(); // 有消息时通知 if (!JsonHelper.TryParse(message, out var jsonReques)) { Console.WriteLine($"WebSocket 消息解析失败: {message}"); continue; } var context = contextPool.Allocate(); context.MsgRequest = jsonReques; await HandleAsync(context); // 处理消息 contextPool.Free(context); } } /// /// 异步处理消息 /// /// 此次请求的上下文 /// private async Task HandleAsync(WebSocketHandleContext context) { foreach (var module in _socketModules.Values) { if (context.Handle) { return; } await module.HandleAsync(context); } } /// /// 给所有客户端广播最新数据 /// /// /// public async Task PushDataAsync(object latestData) { var options = new JsonSerializerOptions { PropertyNamingPolicy = JsonNamingPolicy.CamelCase }; var json = JsonSerializer.Serialize(latestData, options); var buffer = Encoding.UTF8.GetBytes(json); var segment = new ArraySegment(buffer); List socketsCopy; lock (_lock) { socketsCopy = _sockets.ToList(); } foreach (var socket in socketsCopy) { if (socket.State == WebSocketState.Open) { try { await socket.SendAsync(segment, WebSocketMessageType.Text, true, CancellationToken.None); } catch { // 忽略异常或移除失效连接 } } } } /// /// 设置回调函数,用于处理外部请求时的回复消息 /// /// public void OnReplyMakeData(Func func) { _onReplyMakeData = func; } /// /// 设置回调函数,回复外部请求时,记录消息内容 /// /// public void OnReply(Action onReply) { _onReply = onReply; } } }