femto-backend/Femto.Common/Infrastructure/Outbox/OutboxProcessor.cs

84 lines
2.8 KiB
C#

using System.Text.Json;
using MediatR;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;
using Quartz;
namespace Femto.Common.Infrastructure.Outbox;
[DisallowConcurrentExecution]
public class OutboxProcessor<TContext>(
TContext context,
ILogger<OutboxProcessor<TContext>> logger,
IOutboxMessageMapping mapping,
IOutboxMessageHandler handler
) : IJob
where TContext : DbContext, IOutboxContext
{
public async Task Execute(IJobExecutionContext executionContext)
{
try
{
var now = DateTime.UtcNow;
var messages = await context
.Outbox.Where(message => message.Status == OutboxEntryStatus.Pending)
.Where(message => message.NextRetryAt == null || message.NextRetryAt <= now)
.OrderBy(message => message.CreatedAt)
.ToListAsync(executionContext.CancellationToken);
logger.LogTrace("loaded {Count} outbox messages to process", messages.Count);
foreach (var message in messages)
{
try
{
var notificationType = mapping.GetTypeOfEvent(message.EventType);
if (notificationType is null)
{
logger.LogWarning(
"unmapped event type {Type}. skipping.",
message.EventType
);
continue;
}
var notification =
JsonSerializer.Deserialize(message.Payload, notificationType)
as INotification;
if (notification is null)
throw new Exception("notification is null");
logger.LogTrace(
"publishing outbox message {EventType}. Id: {Id}, AggregateId: {AggregateId}",
message.EventType,
message.Id,
message.AggregateId
);
await handler.Publish(notification, executionContext.CancellationToken);
message.Succeed();
}
catch (Exception e)
{
logger.LogError(
e,
"Error processing event {EventId} for aggregate {AggregateId}",
message.Id,
message.AggregateId
);
message.Fail(e.ToString());
}
await context.SaveChangesAsync(executionContext.CancellationToken);
}
}
catch (Exception e)
{
logger.LogError(e, "Error while processing outbox");
throw;
}
}
}