191 lines
8.1 KiB
C#
191 lines
8.1 KiB
C#
using Cowain.Base.Helpers;
|
||
using Microsoft.Extensions.DependencyInjection;
|
||
using Microsoft.Extensions.Logging;
|
||
using Newtonsoft.Json;
|
||
using Plugin.Cowain.Wcs.IServices;
|
||
using Plugin.Cowain.Wcs.Models.Enum;
|
||
using Plugin.Cowain.Wcs.ViewModels;
|
||
using System.Collections.Concurrent;
|
||
using System.Threading.Channels;
|
||
|
||
namespace Plugin.Cowain.Wcs.Services;
|
||
|
||
public class StationQueueHostedService : BackgroundHostedService
|
||
{
|
||
public Action<StationViewModel>? FindTaskAction { get; set; }
|
||
private Channel<StationViewModel> _channel = Channel.CreateUnbounded<StationViewModel>();
|
||
|
||
private IServiceScopeFactory _scopeFactory;
|
||
|
||
// 工站级别的锁,避免不同工站之间相互阻塞
|
||
private static readonly ConcurrentDictionary<int, SemaphoreSlim> _stationLocks = new();
|
||
|
||
private readonly ILogger<StationQueueHostedService> _logger;
|
||
|
||
public StationQueueHostedService(IServiceScopeFactory scopeFactory, ILogger<StationQueueHostedService> logger)
|
||
{
|
||
_scopeFactory = scopeFactory;
|
||
_logger = logger;
|
||
}
|
||
|
||
/// <summary>
|
||
/// 获取工站级别的锁
|
||
/// </summary>
|
||
private SemaphoreSlim GetStationLock(int stationId)
|
||
{
|
||
return _stationLocks.GetOrAdd(stationId, _ => new SemaphoreSlim(1, 1));
|
||
}
|
||
|
||
public override async Task StopAsync(CancellationToken cancellationToken)
|
||
{
|
||
_logger.LogInformation("StationQueueHostedService Stop");
|
||
|
||
// 清理所有锁
|
||
foreach (var kvp in _stationLocks)
|
||
{
|
||
kvp.Value.Dispose();
|
||
}
|
||
_stationLocks.Clear();
|
||
|
||
_channel.Writer.Complete();
|
||
await base.StopAsync(cancellationToken);
|
||
_logger.LogInformation("StationQueueHostedService Stoped");
|
||
}
|
||
|
||
public void Enqueue(StationViewModel item)
|
||
{
|
||
try
|
||
{
|
||
item.EnqueueTime = DateTime.Now;
|
||
_logger.LogDebug($"入队列开始:StationId={item.Id},Source={item.UpdateSource},JsonData={JsonConvert.SerializeObject(item)}");
|
||
_ = _channel.Writer.WriteAsync(item).AsTask().ContinueWith(task =>
|
||
{
|
||
if (task.IsFaulted)
|
||
{
|
||
_logger.LogError($"入队列错误:StationId={item.Id},Source={item.UpdateSource},JsonData={JsonConvert.SerializeObject(item)}");
|
||
}
|
||
else if (task.IsCompletedSuccessfully)
|
||
{
|
||
_logger.LogDebug($"入队列成功:StationId={item.Id},Source={item.UpdateSource},JsonData={JsonConvert.SerializeObject(item)}");
|
||
}
|
||
});
|
||
}
|
||
catch (Exception ex)
|
||
{
|
||
_logger.LogError(ex, $"入队列异常:StationId={item.Id},Source={item.UpdateSource},JsonData={JsonConvert.SerializeObject(item)}");
|
||
}
|
||
}
|
||
|
||
private void SetUpdateFunc(StationViewModel item, bool result)
|
||
{
|
||
if (item?.UpdatedAction != null)
|
||
{
|
||
item.UpdatedAction(result);
|
||
}
|
||
}
|
||
|
||
private void FindTaskFunc(StationViewModel item)
|
||
{
|
||
if (this.FindTaskAction != null)
|
||
{
|
||
FindTaskAction(item);
|
||
}
|
||
}
|
||
|
||
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
||
{
|
||
await foreach (var item in _channel.Reader.ReadAllAsync(stoppingToken))
|
||
{
|
||
// 获取工站级别的锁
|
||
var stationLock = GetStationLock(item.Id);
|
||
await stationLock.WaitAsync(stoppingToken);
|
||
|
||
try
|
||
{
|
||
_logger.LogInformation($"消费队列开始:StationId={item.Id},Source={item.UpdateSource},JsonData={JsonConvert.SerializeObject(item)}");
|
||
item.UpdateTime = DateTime.Now;
|
||
|
||
using var scope = _scopeFactory.CreateScope();
|
||
var stationService = scope.ServiceProvider.GetRequiredService<IStationService>();
|
||
bool isValid = false;
|
||
var stations = await stationService.GetAllAsync();
|
||
var station = stations.FirstOrDefault(x => x.Id == item.Id);
|
||
if (station == null)
|
||
{
|
||
_logger.LogInformation($"更新工站失败,Id不存在:StationId={item.Id},Source={item.UpdateSource},JsonData={JsonConvert.SerializeObject(item)}");
|
||
}
|
||
else
|
||
{
|
||
if (item.UpdateSource == StationUpdateSourceEnum.User)
|
||
{
|
||
// 用户手动更新
|
||
isValid = true;
|
||
}
|
||
else if (item.UpdateSource == StationUpdateSourceEnum.PLC)
|
||
{
|
||
if (station?.Status != StationStateEnum.Picking.ToString() && station?.Status != StationStateEnum.Placing.ToString())
|
||
{
|
||
//工站不能是搬运中,搬运中PLC不可更新状态
|
||
isValid = true;
|
||
}
|
||
else
|
||
{
|
||
_logger.LogInformation($"更新工站失败,工站是搬运中,不可更新:StationId={item.Id},Source={item.UpdateSource},JsonData={JsonConvert.SerializeObject(item)},stationData={JsonConvert.SerializeObject(station)}");
|
||
}
|
||
|
||
}
|
||
else if (item.UpdateSource == StationUpdateSourceEnum.Task)
|
||
{
|
||
if (station?.Status == StationStateEnum.UnKnown.ToString() ||
|
||
station?.Status == StationStateEnum.Picking.ToString() ||
|
||
station?.Status == StationStateEnum.Placing.ToString() ||
|
||
item.Status == StationStateEnum.Picking.ToString() ||
|
||
item.Status == StationStateEnum.Placing.ToString())
|
||
{
|
||
isValid = true;
|
||
}
|
||
}
|
||
|
||
if (!isValid)
|
||
{
|
||
_logger.LogInformation($"消费队列取消,不需要消费:StationId={item.Id},Source={item.UpdateSource},JsonData={JsonConvert.SerializeObject(item)}");
|
||
continue;
|
||
}
|
||
|
||
var updateStation = await stationService.UpdateAsync(item);
|
||
if (updateStation.IsSuccess)
|
||
{
|
||
SetUpdateFunc(item, true);
|
||
_logger.LogInformation($"消费队列成功:StationId={item.Id},Source={item.UpdateSource},JsonData={JsonConvert.SerializeObject(item)}");
|
||
if (item.Status == StationStateEnum.RequestPick.ToString() || item.Status == StationStateEnum.RequestPlace.ToString() || item.Status == StationStateEnum.PlaceFinished.ToString() || item.Status == StationStateEnum.NgPick.ToString())
|
||
{
|
||
//20250909更改,放完成时,需要查找任务
|
||
FindTaskFunc(item);
|
||
//出现可取料或者可放料时更新任务
|
||
//var findTaskService = scope.ServiceProvider.GetRequiredService<IFindFlowTaskService>();
|
||
//await findTaskService.FindTaskAsync();
|
||
}
|
||
}
|
||
else
|
||
{
|
||
SetUpdateFunc(item, false);
|
||
_logger.LogError($"消费队列错误:StationId={item.Id},Source={item.UpdateSource},JsonData={JsonConvert.SerializeObject(item)}");
|
||
}
|
||
}
|
||
}
|
||
catch (Exception ex)
|
||
{
|
||
SetUpdateFunc(item, false);
|
||
_logger.LogError(ex, $"消费队列异常:StationId={item.Id},Source={item.UpdateSource},JsonData={JsonConvert.SerializeObject(item)}");
|
||
}
|
||
finally
|
||
{
|
||
// 计算消费用时(ms)
|
||
item.ConsumeUseTime = (DateTime.Now - item.EnqueueTime).TotalMilliseconds;
|
||
_logger.LogInformation($"消费队列用时:StationId={item.Id},Source={item.UpdateSource},JsonData={JsonConvert.SerializeObject(item)}");
|
||
stationLock.Release();
|
||
}
|
||
}
|
||
}
|
||
}
|