Files
Yi.Admin/Yi.Abp.Net8/module/ai-hub/Yi.Framework.AiHub.Domain/Managers/AiGateWayManager.cs

658 lines
25 KiB
C#
Raw Normal View History

2025-07-05 15:11:56 +08:00
using System.Collections.Concurrent;
2025-08-03 23:23:32 +08:00
using System.Diagnostics;
2025-06-25 17:12:09 +08:00
using System.Runtime.CompilerServices;
2025-07-05 15:11:56 +08:00
using System.Text;
using System.Text.Json;
2025-07-05 15:11:56 +08:00
using Microsoft.AspNetCore.Http;
2025-06-21 01:08:14 +08:00
using Microsoft.Extensions.DependencyInjection;
2025-07-05 15:11:56 +08:00
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Newtonsoft.Json.Serialization;
2025-06-21 01:08:14 +08:00
using Volo.Abp.Domain.Services;
2025-07-17 23:10:26 +08:00
using Yi.Framework.AiHub.Domain.AiGateWay;
2025-08-03 23:23:32 +08:00
using Yi.Framework.AiHub.Domain.AiGateWay.Exceptions;
2025-06-27 22:13:26 +08:00
using Yi.Framework.AiHub.Domain.Entities.Model;
2025-06-25 17:12:09 +08:00
using Yi.Framework.AiHub.Domain.Shared.Dtos;
2025-10-11 15:25:43 +08:00
using Yi.Framework.AiHub.Domain.Shared.Dtos.Anthropic;
2025-08-11 15:31:11 +08:00
using Yi.Framework.AiHub.Domain.Shared.Dtos.OpenAi;
using Yi.Framework.AiHub.Domain.Shared.Dtos.OpenAi.Embeddings;
using Yi.Framework.AiHub.Domain.Shared.Dtos.OpenAi.Images;
2025-08-03 23:23:32 +08:00
using Yi.Framework.Core.Extensions;
2025-06-25 17:12:09 +08:00
using Yi.Framework.SqlSugarCore.Abstractions;
2025-10-11 15:25:43 +08:00
using JsonSerializer = System.Text.Json.JsonSerializer;
using ThorJsonSerializer = Yi.Framework.AiHub.Domain.AiGateWay.ThorJsonSerializer;
2025-06-21 01:08:14 +08:00
namespace Yi.Framework.AiHub.Domain.Managers;
public class AiGateWayManager : DomainService
{
2025-06-25 17:12:09 +08:00
private readonly ISqlSugarRepository<AiAppAggregateRoot> _aiAppRepository;
2025-07-05 15:11:56 +08:00
private readonly ILogger<AiGateWayManager> _logger;
private readonly AiMessageManager _aiMessageManager;
private readonly UsageStatisticsManager _usageStatisticsManager;
2025-07-18 20:46:30 +08:00
private readonly ISpecialCompatible _specialCompatible;
2025-06-21 01:08:14 +08:00
2025-07-05 15:11:56 +08:00
public AiGateWayManager(ISqlSugarRepository<AiAppAggregateRoot> aiAppRepository, ILogger<AiGateWayManager> logger,
2025-07-18 20:46:30 +08:00
AiMessageManager aiMessageManager, UsageStatisticsManager usageStatisticsManager,
ISpecialCompatible specialCompatible)
2025-06-21 01:08:14 +08:00
{
2025-06-25 17:12:09 +08:00
_aiAppRepository = aiAppRepository;
2025-07-05 15:11:56 +08:00
_logger = logger;
_aiMessageManager = aiMessageManager;
_usageStatisticsManager = usageStatisticsManager;
2025-07-18 20:46:30 +08:00
_specialCompatible = specialCompatible;
2025-06-21 01:08:14 +08:00
}
2025-06-25 17:12:09 +08:00
/// <summary>
/// 获取模型
/// </summary>
/// <param name="modelId"></param>
/// <returns></returns>
private async Task<AiModelDescribe> GetModelAsync(string modelId)
2025-06-21 01:08:14 +08:00
{
2025-06-25 17:12:09 +08:00
var allApp = await _aiAppRepository._DbQueryable.Includes(x => x.AiModels).ToListAsync();
foreach (var app in allApp)
2025-06-21 01:08:14 +08:00
{
2025-06-25 17:12:09 +08:00
var model = app.AiModels.FirstOrDefault(x => x.ModelId == modelId);
if (model is not null)
2025-06-21 01:08:14 +08:00
{
2025-06-25 17:12:09 +08:00
return new AiModelDescribe
{
AppId = app.Id,
AppName = app.Name,
Endpoint = app.Endpoint,
ApiKey = app.ApiKey,
OrderNum = model.OrderNum,
HandlerName = model.HandlerName,
ModelId = model.ModelId,
ModelName = model.Name,
2025-07-17 23:16:16 +08:00
Description = model.Description,
2025-07-17 23:52:00 +08:00
AppExtraUrl = app.ExtraUrl,
ModelExtraInfo = model.ExtraInfo
2025-06-25 17:12:09 +08:00
};
2025-06-21 01:08:14 +08:00
}
}
2025-06-21 01:41:05 +08:00
2025-06-25 17:12:09 +08:00
throw new UserFriendlyException($"{modelId}模型当前版本不支持");
}
/// <summary>
2025-07-09 19:12:53 +08:00
/// 聊天完成-流式
2025-06-25 17:12:09 +08:00
/// </summary>
2025-07-17 23:10:26 +08:00
/// <param name="request"></param>
2025-06-25 17:12:09 +08:00
/// <param name="cancellationToken"></param>
/// <returns></returns>
2025-07-17 23:10:26 +08:00
public async IAsyncEnumerable<ThorChatCompletionsResponse> CompleteChatStreamAsync(
ThorChatCompletionsRequest request,
2025-06-25 17:12:09 +08:00
[EnumeratorCancellation] CancellationToken cancellationToken)
{
2025-07-18 20:46:30 +08:00
_specialCompatible.Compatible(request);
2025-07-17 23:10:26 +08:00
var modelDescribe = await GetModelAsync(request.Model);
2025-07-17 23:16:16 +08:00
var chatService =
LazyServiceProvider.GetRequiredKeyedService<IChatCompletionService>(modelDescribe.HandlerName);
2025-07-18 23:12:20 +08:00
2025-07-17 23:10:26 +08:00
await foreach (var result in chatService.CompleteChatStreamAsync(modelDescribe, request, cancellationToken))
2025-06-25 17:12:09 +08:00
{
yield return result;
}
2025-06-21 01:08:14 +08:00
}
2025-07-05 15:11:56 +08:00
2025-07-09 19:12:53 +08:00
/// <summary>
/// 聊天完成-非流式
/// </summary>
/// <param name="httpContext"></param>
2025-07-17 23:10:26 +08:00
/// <param name="request"></param>
2025-07-09 19:12:53 +08:00
/// <param name="userId"></param>
/// <param name="sessionId"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
2025-07-17 23:10:26 +08:00
public async Task CompleteChatForStatisticsAsync(HttpContext httpContext,
ThorChatCompletionsRequest request,
2025-07-09 19:12:53 +08:00
Guid? userId = null,
Guid? sessionId = null,
CancellationToken cancellationToken = default)
{
2025-07-18 20:46:30 +08:00
_specialCompatible.Compatible(request);
2025-07-09 19:12:53 +08:00
var response = httpContext.Response;
// 设置响应头,声明是 json
2025-07-18 23:12:20 +08:00
//response.ContentType = "application/json; charset=UTF-8";
2025-07-17 23:10:26 +08:00
var modelDescribe = await GetModelAsync(request.Model);
2025-07-17 23:16:16 +08:00
var chatService =
LazyServiceProvider.GetRequiredKeyedService<IChatCompletionService>(modelDescribe.HandlerName);
2025-07-17 23:10:26 +08:00
var data = await chatService.CompleteChatAsync(modelDescribe, request, cancellationToken);
2025-07-09 19:12:53 +08:00
if (userId is not null)
{
await _aiMessageManager.CreateUserMessageAsync(userId.Value, sessionId,
new MessageInputDto
{
2025-07-28 13:15:42 +08:00
Content = request.Messages?.LastOrDefault().Content ?? string.Empty,
2025-07-17 23:10:26 +08:00
ModelId = request.Model,
TokenUsage = data.Usage,
2025-07-09 19:12:53 +08:00
});
await _aiMessageManager.CreateSystemMessageAsync(userId.Value, sessionId,
new MessageInputDto
{
Content = data.Choices?.FirstOrDefault()?.Delta.Content,
2025-07-17 23:10:26 +08:00
ModelId = request.Model,
TokenUsage = data.Usage
2025-07-09 19:12:53 +08:00
});
2025-07-22 10:40:23 +08:00
await _usageStatisticsManager.SetUsageAsync(userId.Value, request.Model, data.Usage);
2025-07-09 19:12:53 +08:00
}
2025-07-21 21:15:02 +08:00
2025-07-18 23:12:20 +08:00
await response.WriteAsJsonAsync(data, cancellationToken);
2025-07-09 19:12:53 +08:00
}
2025-07-05 15:11:56 +08:00
/// <summary>
/// 聊天完成-缓存处理
/// </summary>
/// <param name="httpContext"></param>
2025-07-17 23:10:26 +08:00
/// <param name="request"></param>
/// <param name="userId"></param>
2025-07-05 15:11:56 +08:00
/// <param name="sessionId"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
2025-07-09 19:12:53 +08:00
public async Task CompleteChatStreamForStatisticsAsync(
2025-07-05 15:11:56 +08:00
HttpContext httpContext,
2025-07-17 23:10:26 +08:00
ThorChatCompletionsRequest request,
2025-07-05 15:11:56 +08:00
Guid? userId = null,
Guid? sessionId = null,
CancellationToken cancellationToken = default)
{
var response = httpContext.Response;
// 设置响应头,声明是 SSE 流
2025-07-21 21:15:02 +08:00
response.ContentType = "text/event-stream;charset=utf-8;";
response.Headers.TryAdd("Cache-Control", "no-cache");
response.Headers.TryAdd("Connection", "keep-alive");
2025-07-05 15:11:56 +08:00
var gateWay = LazyServiceProvider.GetRequiredService<AiGateWayManager>();
2025-07-17 23:10:26 +08:00
var completeChatResponse = gateWay.CompleteChatStreamAsync(request, cancellationToken);
var tokenUsage = new ThorUsageResponse();
2025-07-05 15:11:56 +08:00
//缓存队列算法
// 创建一个队列来缓存消息
var messageQueue = new ConcurrentQueue<string>();
StringBuilder backupSystemContent = new StringBuilder();
// 设置输出速率例如每50毫秒输出一次
2025-07-08 18:24:21 +08:00
var outputInterval = TimeSpan.FromMilliseconds(75);
2025-07-05 15:11:56 +08:00
// 标记是否完成接收
var isComplete = false;
// 启动一个后台任务来消费队列
var outputTask = Task.Run(async () =>
{
while (!(isComplete && messageQueue.IsEmpty))
{
if (messageQueue.TryDequeue(out var message))
{
2025-07-21 21:15:02 +08:00
await response.WriteAsync(message, Encoding.UTF8, cancellationToken).ConfigureAwait(false);
await response.Body.FlushAsync(cancellationToken).ConfigureAwait(false);
2025-07-05 15:11:56 +08:00
}
if (!isComplete)
{
// 如果没有完成,才等待,已完成,全部输出
2025-07-21 21:15:02 +08:00
await Task.Delay(outputInterval, cancellationToken).ConfigureAwait(false);
2025-07-05 15:11:56 +08:00
}
else
{
//已经完成了,也等待,但是速度可以放快
await Task.Delay(10, cancellationToken).ConfigureAwait(false);
}
2025-07-05 15:11:56 +08:00
}
}, cancellationToken);
//IAsyncEnumerable 只能在最外层捕获异常(如果你有其他办法的话...
try
{
await foreach (var data in completeChatResponse)
{
2025-07-22 10:40:23 +08:00
if (data.Usage is not null)
2025-07-05 15:11:56 +08:00
{
2025-07-17 23:10:26 +08:00
tokenUsage = data.Usage;
2025-07-05 15:11:56 +08:00
}
2025-07-21 21:15:02 +08:00
var message = System.Text.Json.JsonSerializer.Serialize(data, ThorJsonSerializer.DefaultOptions);
2025-07-17 23:10:26 +08:00
backupSystemContent.Append(data.Choices.FirstOrDefault()?.Delta.Content);
2025-07-05 15:11:56 +08:00
// 将消息加入队列而不是直接写入
2025-07-21 21:15:02 +08:00
messageQueue.Enqueue($"data: {message}\n\n");
2025-07-05 15:11:56 +08:00
}
}
catch (Exception e)
{
_logger.LogError(e, $"Ai对话异常");
2025-08-03 23:23:32 +08:00
var errorContent = $"对话Ai异常异常信息\n当前Ai模型{request.Model}\n异常信息{e.Message}\n异常堆栈:{e}";
2025-07-17 23:10:26 +08:00
var model = new ThorChatCompletionsResponse()
{
Choices = new List<ThorChatChoiceResponse>()
{
new ThorChatChoiceResponse()
{
Delta = new ThorChatMessage()
{
Content = errorContent
}
}
}
};
2025-07-05 15:11:56 +08:00
var message = JsonConvert.SerializeObject(model, new JsonSerializerSettings
{
ContractResolver = new CamelCasePropertyNamesContractResolver()
});
backupSystemContent.Append(errorContent);
2025-07-21 21:15:02 +08:00
messageQueue.Enqueue($"data: {message}\n\n");
2025-07-05 15:11:56 +08:00
}
//断开连接
2025-07-21 21:15:02 +08:00
messageQueue.Enqueue("data: [DONE]\n\n");
2025-07-05 15:11:56 +08:00
// 标记完成并发送结束标记
isComplete = true;
await outputTask;
2025-08-03 21:32:54 +08:00
await _aiMessageManager.CreateUserMessageAsync(userId, sessionId,
new MessageInputDto
{
Content = request.Messages?.LastOrDefault()?.Content ?? string.Empty,
ModelId = request.Model,
TokenUsage = tokenUsage,
});
2025-07-05 15:11:56 +08:00
2025-08-03 21:32:54 +08:00
await _aiMessageManager.CreateSystemMessageAsync(userId, sessionId,
new MessageInputDto
{
Content = backupSystemContent.ToString(),
ModelId = request.Model,
TokenUsage = tokenUsage
});
await _usageStatisticsManager.SetUsageAsync(userId, request.Model, tokenUsage);
2025-07-05 15:11:56 +08:00
}
2025-08-03 23:23:32 +08:00
/// <summary>
/// 图片生成
/// </summary>
/// <param name="context"></param>
/// <param name="userId"></param>
/// <param name="sessionId"></param>
/// <param name="request"></param>
/// <exception cref="BusinessException"></exception>
/// <exception cref="Exception"></exception>
public async Task CreateImageForStatisticsAsync(HttpContext context, Guid? userId, Guid? sessionId,
ImageCreateRequest request)
2025-08-03 23:23:32 +08:00
{
try
{
var model = request.Model;
if (string.IsNullOrEmpty(model)) model = "dall-e-2";
2025-08-03 23:23:32 +08:00
var modelDescribe = await GetModelAsync(model);
2025-08-03 23:23:32 +08:00
// 获取渠道指定的实现类型的服务
var imageService =
LazyServiceProvider.GetRequiredKeyedService<IImageService>(modelDescribe.HandlerName);
2025-08-03 23:23:32 +08:00
var response = await imageService.CreateImage(request, modelDescribe);
if (response.Error != null || response.Results.Count == 0)
{
throw new BusinessException(response.Error?.Message ?? "图片生成失败", response.Error?.Code?.ToString());
}
await context.Response.WriteAsJsonAsync(response);
2025-08-03 23:23:32 +08:00
await _aiMessageManager.CreateUserMessageAsync(userId, sessionId,
new MessageInputDto
{
Content = request.Prompt,
ModelId = model,
TokenUsage = response.Usage,
});
await _aiMessageManager.CreateSystemMessageAsync(userId, sessionId,
new MessageInputDto
{
Content = response.Results?.FirstOrDefault()?.Url,
ModelId = model,
TokenUsage = response.Usage
});
await _usageStatisticsManager.SetUsageAsync(userId, model, response.Usage);
}
catch (Exception e)
{
var errorContent = $"图片生成Ai异常异常信息\n当前Ai模型{request.Model}\n异常信息{e.Message}\n异常堆栈:{e}";
throw new UserFriendlyException(errorContent);
}
}
/// <summary>
/// 向量生成
/// </summary>
/// <param name="context"></param>
/// <param name="sessionId"></param>
/// <param name="input"></param>
/// <param name="userId"></param>
/// <exception cref="Exception"></exception>
/// <exception cref="BusinessException"></exception>
public async Task EmbeddingForStatisticsAsync(HttpContext context, Guid? userId, Guid? sessionId,
ThorEmbeddingInput input)
{
try
{
if (input == null) throw new Exception("模型校验异常");
using var embedding =
Activity.Current?.Source.StartActivity("向量模型调用");
var modelDescribe = await GetModelAsync(input.Model);
// 获取渠道指定的实现类型的服务
var embeddingService =
LazyServiceProvider.GetRequiredKeyedService<ITextEmbeddingService>(modelDescribe.HandlerName);
var embeddingCreateRequest = new EmbeddingCreateRequest
{
Model = input.Model,
EncodingFormat = input.EncodingFormat
};
//dto进行转换支持多种格式
if (input.Input is JsonElement str)
{
if (str.ValueKind == JsonValueKind.String)
{
embeddingCreateRequest.Input = str.ToString();
}
else if (str.ValueKind == JsonValueKind.Array)
{
var inputString = str.EnumerateArray().Select(x => x.ToString()).ToArray();
embeddingCreateRequest.InputAsList = inputString.ToList();
}
else
{
throw new Exception("Input输入格式错误非string或Array类型");
}
}
else if (input.Input is string strInput)
{
embeddingCreateRequest.Input = strInput;
}
else
{
throw new Exception("Input输入格式错误未找到类型");
}
var stream =
await embeddingService.EmbeddingAsync(embeddingCreateRequest, modelDescribe, context.RequestAborted);
var usage = new ThorUsageResponse()
{
2025-10-11 15:25:43 +08:00
PromptTokens = stream.Usage?.PromptTokens ?? 0,
InputTokens = stream.Usage?.InputTokens ?? 0,
CompletionTokens = 0,
TotalTokens = stream.Usage?.InputTokens ?? 0
};
await context.Response.WriteAsJsonAsync(new
{
input.Model,
stream.Data,
stream.Error,
Object = stream.ObjectTypeName,
Usage = usage
});
//知识库暂不使用message统计
// await _aiMessageManager.CreateUserMessageAsync(userId, sessionId,
// new MessageInputDto
// {
// Content = string.Empty,
// ModelId = input.Model,
// TokenUsage = usage,
// });
//
// await _aiMessageManager.CreateSystemMessageAsync(userId, sessionId,
// new MessageInputDto
// {
// Content = string.Empty,
// ModelId = input.Model,
// TokenUsage = usage
// });
await _usageStatisticsManager.SetUsageAsync(userId, input.Model, usage);
}
catch (ThorRateLimitException)
{
context.Response.StatusCode = 429;
}
catch (UnauthorizedAccessException e)
{
context.Response.StatusCode = 401;
}
catch (Exception e)
{
var errorContent = $"嵌入Ai异常异常信息\n当前Ai模型{input.Model}\n异常信息{e.Message}\n异常堆栈:{e}";
throw new UserFriendlyException(errorContent);
}
}
2025-10-11 15:25:43 +08:00
/// <summary>
/// Anthropic聊天完成-流式
/// </summary>
/// <param name="request"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public async IAsyncEnumerable<(string, AnthropicStreamDto?)> AnthropicCompleteChatStreamAsync(
AnthropicInput request,
[EnumeratorCancellation] CancellationToken cancellationToken)
{
_specialCompatible.AnthropicCompatible(request);
var modelDescribe = await GetModelAsync(request.Model);
var chatService =
LazyServiceProvider.GetRequiredKeyedService<IAnthropicChatCompletionService>(modelDescribe.HandlerName);
await foreach (var result in chatService.StreamChatCompletionsAsync(modelDescribe, request, cancellationToken))
{
yield return result;
}
}
/// <summary>
/// Anthropic聊天完成-非流式
/// </summary>
/// <param name="httpContext"></param>
/// <param name="request"></param>
/// <param name="userId"></param>
/// <param name="sessionId"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public async Task AnthropicCompleteChatForStatisticsAsync(HttpContext httpContext,
AnthropicInput request,
Guid? userId = null,
Guid? sessionId = null,
CancellationToken cancellationToken = default)
{
_specialCompatible.AnthropicCompatible(request);
var response = httpContext.Response;
// 设置响应头,声明是 json
//response.ContentType = "application/json; charset=UTF-8";
var modelDescribe = await GetModelAsync(request.Model);
var chatService =
LazyServiceProvider.GetRequiredKeyedService<IAnthropicChatCompletionService>(modelDescribe.HandlerName);
var data = await chatService.ChatCompletionsAsync(modelDescribe, request, cancellationToken);
if (userId is not null)
{
await _aiMessageManager.CreateUserMessageAsync(userId.Value, sessionId,
new MessageInputDto
{
Content = request.Messages?.FirstOrDefault()?.Content ?? string.Empty,
ModelId = request.Model,
TokenUsage = data.TokenUsage,
});
await _aiMessageManager.CreateSystemMessageAsync(userId.Value, sessionId,
new MessageInputDto
{
Content = data.content?.FirstOrDefault()?.text,
ModelId = request.Model,
TokenUsage = data.TokenUsage
});
await _usageStatisticsManager.SetUsageAsync(userId.Value, request.Model, data.TokenUsage);
}
await response.WriteAsJsonAsync(data, cancellationToken);
}
/// <summary>
/// Anthropic聊天完成-缓存处理
/// </summary>
/// <param name="httpContext"></param>
/// <param name="request"></param>
/// <param name="userId"></param>
/// <param name="sessionId"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public async Task AnthropicCompleteChatStreamForStatisticsAsync(
HttpContext httpContext,
AnthropicInput request,
Guid? userId = null,
Guid? sessionId = null,
CancellationToken cancellationToken = default)
{
var response = httpContext.Response;
// 设置响应头,声明是 SSE 流
response.ContentType = "text/event-stream;charset=utf-8;";
response.Headers.TryAdd("Cache-Control", "no-cache");
response.Headers.TryAdd("Connection", "keep-alive");
var gateWay = LazyServiceProvider.GetRequiredService<AiGateWayManager>();
var completeChatResponse = gateWay.AnthropicCompleteChatStreamAsync(request, cancellationToken);
ThorUsageResponse? tokenUsage = null;
StringBuilder backupSystemContent = new StringBuilder();
try
{
await foreach (var responseResult in completeChatResponse)
{
//message_start是为了保底机制
if (responseResult.Item1.Contains("message_delta")||responseResult.Item1.Contains("message_start"))
{
tokenUsage = responseResult.Item2?.TokenUsage;
}
2025-10-11 15:25:43 +08:00
backupSystemContent.Append(responseResult.Item2?.Delta?.Text);
await WriteAsEventStreamDataAsync(httpContext, responseResult.Item1, responseResult.Item2,
cancellationToken);
}
}
catch (Exception e)
{
_logger.LogError(e, $"Ai对话异常");
var errorContent = $"对话Ai异常异常信息\n当前Ai模型{request.Model}\n异常信息{e.Message}\n异常堆栈:{e}";
2025-10-12 16:42:26 +08:00
throw new UserFriendlyException(errorContent);
// var model = new AnthropicStreamDto
// {
// Message = new AnthropicChatCompletionDto
// {
// content =
// [
// new AnthropicChatCompletionDtoContent
// {
// text = errorContent,
// }
// ],
// },
// Error = new AnthropicStreamErrorDto
// {
// Type = null,
// Message = errorContent
// }
// };
// var message = JsonConvert.SerializeObject(model, new JsonSerializerSettings
// {
// ContractResolver = new CamelCasePropertyNamesContractResolver()
// });
// await response.WriteAsJsonAsync(message, ThorJsonSerializer.DefaultOptions);
2025-10-11 15:25:43 +08:00
}
await _aiMessageManager.CreateUserMessageAsync(userId, sessionId,
new MessageInputDto
{
Content = request.Messages?.LastOrDefault()?.Content ?? string.Empty,
ModelId = request.Model,
TokenUsage = tokenUsage,
});
await _aiMessageManager.CreateSystemMessageAsync(userId, sessionId,
new MessageInputDto
{
Content = backupSystemContent.ToString(),
ModelId = request.Model,
TokenUsage = tokenUsage
});
await _usageStatisticsManager.SetUsageAsync(userId, request.Model, tokenUsage);
}
#region Anthropic格式Http响应
private static readonly byte[] EventPrefix = "event: "u8.ToArray();
private static readonly byte[] DataPrefix = "data: "u8.ToArray();
private static readonly byte[] NewLine = "\n"u8.ToArray();
private static readonly byte[] DoubleNewLine = "\n\n"u8.ToArray();
/// <summary>
/// 使用 JsonSerializer.SerializeAsync 直接序列化到响应流
/// </summary>
private static async ValueTask WriteAsEventStreamDataAsync<T>(
HttpContext context,
string @event,
T value,
CancellationToken cancellationToken = default)
where T : class
{
var response = context.Response;
var bodyStream = response.Body;
// 确保 SSE Header 已经设置好
// e.g. Content-Type: text/event-stream; charset=utf-8
await response.StartAsync(cancellationToken).ConfigureAwait(false);
// 写事件类型
await bodyStream.WriteAsync(EventPrefix, cancellationToken).ConfigureAwait(false);
await WriteUtf8StringAsync(bodyStream, @event.Trim(), cancellationToken).ConfigureAwait(false);
await bodyStream.WriteAsync(NewLine, cancellationToken).ConfigureAwait(false);
// 写 data: + JSON
await bodyStream.WriteAsync(DataPrefix, cancellationToken).ConfigureAwait(false);
await JsonSerializer.SerializeAsync(
bodyStream,
value,
ThorJsonSerializer.DefaultOptions,
cancellationToken
).ConfigureAwait(false);
// 事件结束 \n\n
await bodyStream.WriteAsync(DoubleNewLine, cancellationToken).ConfigureAwait(false);
// 及时把数据发送给客户端
await bodyStream.FlushAsync(cancellationToken).ConfigureAwait(false);
}
private static async ValueTask WriteUtf8StringAsync(Stream stream, string value, CancellationToken token)
{
if (string.IsNullOrEmpty(value))
return;
var buffer = Encoding.UTF8.GetBytes(value);
await stream.WriteAsync(buffer, token).ConfigureAwait(false);
}
#endregion
2025-06-21 01:08:14 +08:00
}