using System.Text.Json; using MediatR; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.Logging; using Quartz; namespace Femto.Common.Infrastructure.Outbox; [DisallowConcurrentExecution] public class OutboxProcessor( TContext context, ILogger> logger, IOutboxMessageMapping mapping, IOutboxMessageHandler handler ) : IJob where TContext : DbContext, IOutboxContext { public async Task Execute(IJobExecutionContext executionContext) { try { var now = DateTime.UtcNow; var messages = await context .Outbox.Where(message => message.Status == OutboxEntryStatus.Pending) .Where(message => message.NextRetryAt == null || message.NextRetryAt <= now) .OrderBy(message => message.CreatedAt) .ToListAsync(executionContext.CancellationToken); logger.LogTrace("loaded {Count} outbox messages to process", messages.Count); foreach (var message in messages) { try { var notificationType = mapping.GetTypeOfEvent(message.EventType); if (notificationType is null) { logger.LogWarning( "unmapped event type {Type}. skipping.", message.EventType ); continue; } var notification = JsonSerializer.Deserialize(message.Payload, notificationType) as INotification; if (notification is null) throw new Exception("notification is null"); logger.LogTrace( "publishing outbox message {EventType}. Id: {Id}, AggregateId: {AggregateId}", message.EventType, message.Id, message.AggregateId ); await handler.Publish(notification, executionContext.CancellationToken); message.Succeed(); } catch (Exception e) { logger.LogError( e, "Error processing event {EventId} for aggregate {AggregateId}", message.Id, message.AggregateId ); message.Fail(e.ToString()); } await context.SaveChangesAsync(executionContext.CancellationToken); } } catch (Exception e) { logger.LogError(e, "Error while processing outbox"); throw; } } }