Files
WCS/Plugins/Wcs/Plugin.Cowain.Wcs/Services/StationQueueHostedService.cs
2026-03-02 09:13:29 +08:00

191 lines
8.1 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
using Cowain.Base.Helpers;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Plugin.Cowain.Wcs.IServices;
using Plugin.Cowain.Wcs.Models.Enum;
using Plugin.Cowain.Wcs.ViewModels;
using System.Collections.Concurrent;
using System.Threading.Channels;
namespace Plugin.Cowain.Wcs.Services;
public class StationQueueHostedService : BackgroundHostedService
{
public Action<StationViewModel>? FindTaskAction { get; set; }
private Channel<StationViewModel> _channel = Channel.CreateUnbounded<StationViewModel>();
private IServiceScopeFactory _scopeFactory;
// 工站级别的锁,避免不同工站之间相互阻塞
private static readonly ConcurrentDictionary<int, SemaphoreSlim> _stationLocks = new();
private readonly ILogger<StationQueueHostedService> _logger;
public StationQueueHostedService(IServiceScopeFactory scopeFactory, ILogger<StationQueueHostedService> logger)
{
_scopeFactory = scopeFactory;
_logger = logger;
}
/// <summary>
/// 获取工站级别的锁
/// </summary>
private SemaphoreSlim GetStationLock(int stationId)
{
return _stationLocks.GetOrAdd(stationId, _ => new SemaphoreSlim(1, 1));
}
public override async Task StopAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("StationQueueHostedService Stop");
// 清理所有锁
foreach (var kvp in _stationLocks)
{
kvp.Value.Dispose();
}
_stationLocks.Clear();
_channel.Writer.Complete();
await base.StopAsync(cancellationToken);
_logger.LogInformation("StationQueueHostedService Stoped");
}
public void Enqueue(StationViewModel item)
{
try
{
item.EnqueueTime = DateTime.Now;
_logger.LogDebug($"入队列开始StationId={item.Id},Source={item.UpdateSource},JsonData={JsonConvert.SerializeObject(item)}");
_ = _channel.Writer.WriteAsync(item).AsTask().ContinueWith(task =>
{
if (task.IsFaulted)
{
_logger.LogError($"入队列错误StationId={item.Id},Source={item.UpdateSource},JsonData={JsonConvert.SerializeObject(item)}");
}
else if (task.IsCompletedSuccessfully)
{
_logger.LogDebug($"入队列成功StationId={item.Id},Source={item.UpdateSource},JsonData={JsonConvert.SerializeObject(item)}");
}
});
}
catch (Exception ex)
{
_logger.LogError(ex, $"入队列异常StationId={item.Id},Source={item.UpdateSource},JsonData={JsonConvert.SerializeObject(item)}");
}
}
private void SetUpdateFunc(StationViewModel item, bool result)
{
if (item?.UpdatedAction != null)
{
item.UpdatedAction(result);
}
}
private void FindTaskFunc(StationViewModel item)
{
if (this.FindTaskAction != null)
{
FindTaskAction(item);
}
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await foreach (var item in _channel.Reader.ReadAllAsync(stoppingToken))
{
// 获取工站级别的锁
var stationLock = GetStationLock(item.Id);
await stationLock.WaitAsync(stoppingToken);
try
{
_logger.LogInformation($"消费队列开始StationId={item.Id},Source={item.UpdateSource},JsonData={JsonConvert.SerializeObject(item)}");
item.UpdateTime = DateTime.Now;
using var scope = _scopeFactory.CreateScope();
var stationService = scope.ServiceProvider.GetRequiredService<IStationService>();
bool isValid = false;
var stations = await stationService.GetAllAsync();
var station = stations.FirstOrDefault(x => x.Id == item.Id);
if (station == null)
{
_logger.LogInformation($"更新工站失败Id不存在StationId={item.Id},Source={item.UpdateSource},JsonData={JsonConvert.SerializeObject(item)}");
}
else
{
if (item.UpdateSource == StationUpdateSourceEnum.User)
{
// 用户手动更新
isValid = true;
}
else if (item.UpdateSource == StationUpdateSourceEnum.PLC)
{
if (station?.Status != StationStateEnum.Picking.ToString() && station?.Status != StationStateEnum.Placing.ToString())
{
//工站不能是搬运中搬运中PLC不可更新状态
isValid = true;
}
else
{
_logger.LogInformation($"更新工站失败工站是搬运中不可更新StationId={item.Id},Source={item.UpdateSource},JsonData={JsonConvert.SerializeObject(item)}stationData={JsonConvert.SerializeObject(station)}");
}
}
else if (item.UpdateSource == StationUpdateSourceEnum.Task)
{
if (station?.Status == StationStateEnum.UnKnown.ToString() ||
station?.Status == StationStateEnum.Picking.ToString() ||
station?.Status == StationStateEnum.Placing.ToString() ||
item.Status == StationStateEnum.Picking.ToString() ||
item.Status == StationStateEnum.Placing.ToString())
{
isValid = true;
}
}
if (!isValid)
{
_logger.LogInformation($"消费队列取消不需要消费StationId={item.Id},Source={item.UpdateSource},JsonData={JsonConvert.SerializeObject(item)}");
continue;
}
var updateStation = await stationService.UpdateAsync(item);
if (updateStation.IsSuccess)
{
SetUpdateFunc(item, true);
_logger.LogInformation($"消费队列成功StationId={item.Id},Source={item.UpdateSource},JsonData={JsonConvert.SerializeObject(item)}");
if (item.Status == StationStateEnum.RequestPick.ToString() || item.Status == StationStateEnum.RequestPlace.ToString() || item.Status == StationStateEnum.PlaceFinished.ToString() || item.Status == StationStateEnum.NgPick.ToString())
{
//20250909更改放完成时需要查找任务
FindTaskFunc(item);
//出现可取料或者可放料时更新任务
//var findTaskService = scope.ServiceProvider.GetRequiredService<IFindFlowTaskService>();
//await findTaskService.FindTaskAsync();
}
}
else
{
SetUpdateFunc(item, false);
_logger.LogError($"消费队列错误StationId={item.Id},Source={item.UpdateSource},JsonData={JsonConvert.SerializeObject(item)}");
}
}
}
catch (Exception ex)
{
SetUpdateFunc(item, false);
_logger.LogError(ex, $"消费队列异常StationId={item.Id},Source={item.UpdateSource},JsonData={JsonConvert.SerializeObject(item)}");
}
finally
{
// 计算消费用时ms
item.ConsumeUseTime = (DateTime.Now - item.EnqueueTime).TotalMilliseconds;
_logger.LogInformation($"消费队列用时StationId={item.Id},Source={item.UpdateSource},JsonData={JsonConvert.SerializeObject(item)}");
stationLock.Release();
}
}
}
}