using Serein.Library;
using Serein.Library.Api;
using Serein.Library.Utils;
using Serein.Proto.WebSocket.Handle;
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Net;
using System.Net.WebSockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Serein.Proto.WebSocket
{
///
/// WebSocket JSON 消息授权管理
///
public class WebSocketAuthorizedHelper
{
///
/// WebSocket JSON 消息授权管理
///
public WebSocketAuthorizedHelper(string addresPort,string token, Func> inspectionAuthorizedFunc)
{
AddresPort = addresPort;
TokenKey = token;
InspectionAuthorizedFunc = inspectionAuthorizedFunc;
}
///
/// 客户端地址
///
public string AddresPort { get; }
///
/// 授权字段
///
private readonly string TokenKey;
///
/// 处理消息授权事件
///
private readonly Func> InspectionAuthorizedFunc;
private SemaphoreSlim semaphoreSlim = new SemaphoreSlim(1);
///
/// 处理消息授权
///
///
public async Task HandleAuthorized(string message)
{
if (InspectionAuthorizedFunc is null)
{
return false; // 如果没有授权方法,则默认解决
}
await semaphoreSlim.WaitAsync(1);
bool isAuthorized = false;
IJsonToken json = JsonHelper.Parse(message);
if(json.TryGetValue(TokenKey,out var token))
{
// 交给之前定义的授权方法进行判断
isAuthorized = await InspectionAuthorizedFunc.Invoke(token);
}
else
{
isAuthorized = false;
}
return isAuthorized;
}
}
///
/// WebSocket服务类
///
[AutoRegister]
public class WebSocketServer
{
///
/// 消息处理
///
public WebSocketMsgHandleHelper MsgHandleHelper { get; } = new WebSocketMsgHandleHelper();
private HttpListener? listener;
///
/// 创建无须授权验证的WebSocket服务端
///
public WebSocketServer()
{
AuthorizedClients = new ConcurrentDictionary();
InspectionAuthorizedFunc = (tokenObj) => Task.FromResult(true);
IsCheckToken = false;
}
///
/// 创建需要授权验证的WebSocket服务端
///
/// token 字段
/// 验证token的方法
public WebSocketServer(string tokenKey, Func> inspectionAuthorizedFunc)
{
TokenKey = tokenKey;
AuthorizedClients = new ConcurrentDictionary();
InspectionAuthorizedFunc = inspectionAuthorizedFunc;
IsCheckToken = true;
}
///
/// 授权
///
public ConcurrentDictionary AuthorizedClients;
private readonly string TokenKey = string.Empty;
private readonly Func> InspectionAuthorizedFunc;
private bool IsCheckToken = false;
///
/// 进行监听服务
///
///
///
public async Task StartAsync(string url)
{
listener = new HttpListener();
listener.Prefixes.Add(url);
await Console.Out.WriteLineAsync($"WebSocket消息处理已启动[{url}]");
try
{
listener.Start();
}
catch (Exception ex)
{
await Console.Out.WriteLineAsync(ex.Message);
return;
}
while (true)
{
try
{
var context = await listener.GetContextAsync();
string clientPoint = context.Request.RemoteEndPoint.ToString();
await Console.Out.WriteLineAsync($"新的连接加入:{clientPoint}");
if (context.Request.IsWebSocketRequest)
{
#error "需要重写 WebSocket 服务"
WebSocketAuthorizedHelper? authorizedHelper = null;
if (IsCheckToken)
{
if (AuthorizedClients.TryAdd(clientPoint, new WebSocketAuthorizedHelper(clientPoint, TokenKey, InspectionAuthorizedFunc)))
{
AuthorizedClients.TryGetValue(clientPoint, out authorizedHelper);
}
}
var webSocketContext = await context.AcceptWebSocketAsync(null); //新连接
_ = HandleWebSocketAsync(webSocketContext.WebSocket, authorizedHelper!); // 处理消息
}
}
catch (Exception ex)
{
await Console.Out.WriteLineAsync(ex.Message);
break;
}
}
}
///
/// 停止监听服务
///
public void Stop()
{
listener?.Stop();
}
private async Task HandleWebSocketAsync(System.Net.WebSockets.WebSocket webSocket, WebSocketAuthorizedHelper authorizedHelper)
{
// 需要授权,却没有成功创建授权类,关闭连接
if (IsCheckToken && authorizedHelper is null)
{
await webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Closing", CancellationToken.None);
return;
}
var msgQueueUtil = new WebSocketMessageTransmissionTool();
_ = Task.Run(async () =>
{
await HandleMsgAsync(webSocket,msgQueueUtil, authorizedHelper);
});
//Func SendAsync = async (text) =>
//{
// await WebSocketServer.SendAsync(webSocket, text);
//};
var receivedMessage = new StringBuilder(); // 用于拼接长消息
while ( webSocket.State == WebSocketState.Open)
{
try
{
WebSocketReceiveResult result;
var buffer = new byte[1024];
do
{
result = await webSocket.ReceiveAsync(new ArraySegment(buffer), CancellationToken.None);
if (result.MessageType == WebSocketMessageType.Close)
{
await webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Closing", CancellationToken.None);
if (IsCheckToken)
{
AuthorizedClients.TryRemove(authorizedHelper.AddresPort, out var _);
}
}
// 将接收到的部分消息解码并拼接
var partialMessage = Encoding.UTF8.GetString(buffer, 0, result.Count);
receivedMessage.Append(partialMessage);
} while (!result.EndOfMessage); // 循环直到接收到完整的消息
// 完整消息已经接收到,准备处理
var message = receivedMessage.ToString(); // 获取消息文本
receivedMessage.Clear(); // 清空 StringBuilder 为下一条消息做准备
await msgQueueUtil.WriteMsgAsync(message); // 处理消息
}
catch (Exception ex)
{
// 处理异常
Debug.WriteLine($"Error: {ex.ToString()}");
}
}
}
public async Task HandleMsgAsync(System.Net.WebSockets.WebSocket webSocket,
WebSocketMessageTransmissionTool msgQueueUtil,
WebSocketAuthorizedHelper authorizedHelper)
{
async Task sendasync(string text)
{
await SocketExtension.SendAsync(webSocket, text); // 回复客户端,处理方法中入参如果需要发送消息委托,则将该回调方法作为委托参数传入
}
while (true)
{
var message = await msgQueueUtil.WaitMsgAsync(); // 有消息时通知
if (IsCheckToken)
{
var authorizedResult = await authorizedHelper.HandleAuthorized(message); // 尝试检测授权
if (!authorizedResult) // 授权失败
{
await webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Closing", CancellationToken.None);
if (IsCheckToken)
{
AuthorizedClients.TryRemove(authorizedHelper.AddresPort, out var _);
}
return;
}
}
var context = new WebSocketMsgContext(sendasync);
context.MsgRequest = JsonHelper.Parse(message);
MsgHandleHelper.HandleAsync(context); // 处理消息
//using (var context = new WebSocketMsgContext(sendasync))
//{
// context.JsonObject = JObject.Parse(message);
// await MsgHandleHelper.Handle(context); // 处理消息
//}
//_ = Task.Run(() => {
//});
}
}
}
}