78 lines
2.3 KiB
C#
78 lines
2.3 KiB
C#
using System.Text.Json;
|
|
using Femto.Modules.Media.Data;
|
|
using MediatR;
|
|
using Microsoft.Extensions.Hosting;
|
|
using Microsoft.Extensions.Logging;
|
|
|
|
namespace Femto.Modules.Media.Infrastructure.Integration;
|
|
|
|
internal class Mailman(Outbox outbox, MediaContext context, ILogger<Mailman> logger, IMediator mediator)
|
|
: BackgroundService
|
|
{
|
|
protected override async Task ExecuteAsync(CancellationToken cancellationToken)
|
|
{
|
|
var timeToWait = TimeSpan.FromSeconds(1);
|
|
|
|
while (!cancellationToken.IsCancellationRequested)
|
|
{
|
|
try
|
|
{
|
|
await this.DeliverMail(cancellationToken);
|
|
}
|
|
catch (Exception e)
|
|
{
|
|
logger.LogError(e, "Error while processing outbox");
|
|
}
|
|
|
|
try
|
|
{
|
|
await Task.Delay(timeToWait, cancellationToken);
|
|
}
|
|
catch (TaskCanceledException)
|
|
{
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
private async Task DeliverMail(CancellationToken cancellationToken)
|
|
{
|
|
var messages = await outbox.GetPendingMessages(cancellationToken);
|
|
|
|
foreach (var message in messages)
|
|
{
|
|
try
|
|
{
|
|
var notificationType = OutboxMessageTypeRegistry.GetType(message.EventType);
|
|
if (notificationType is null)
|
|
{
|
|
logger.LogWarning("unmapped event type {Type}. skipping.", message.EventType);
|
|
continue;
|
|
}
|
|
|
|
var notification =
|
|
JsonSerializer.Deserialize(message.Payload, notificationType) as INotification;
|
|
|
|
if (notification is null)
|
|
throw new Exception("notification is null");
|
|
|
|
await mediator.Publish(notification, cancellationToken);
|
|
|
|
message.Succeed();
|
|
}
|
|
catch (Exception e)
|
|
{
|
|
logger.LogError(
|
|
e,
|
|
"Error processing event {EventId} for aggregate {AggregateId}",
|
|
message.Id,
|
|
message.AggregateId
|
|
);
|
|
|
|
message.Fail(e.ToString());
|
|
}
|
|
|
|
await context.SaveChangesAsync(cancellationToken);
|
|
}
|
|
}
|
|
}
|