femto-backend/Femto.Modules.Blog/Infrastructure/Integration/Outbox/Outbox.cs
2025-05-04 21:46:24 +02:00

40 lines
1.4 KiB
C#

using System.Reflection;
using System.Text.Json;
using Femto.Common.Attributes;
using Femto.Common.Integration;
using Femto.Modules.Blog.Data;
using Microsoft.EntityFrameworkCore;
namespace Femto.Modules.Blog.Infrastructure.Integration.Outbox;
internal class Outbox(BlogContext context)
{
public async Task AddMessage<TMessage>(Guid aggregateId, TMessage message, CancellationToken cancellationToken)
where TMessage : IIntegrationEvent
{
var eventType = typeof(TMessage).GetCustomAttribute<EventTypeAttribute>();
if (eventType is null)
throw new InvalidOperationException($"{typeof(TMessage).Name} does not have EventType attribute");
await context.Outbox.AddAsync(
new(
message.EventId,
aggregateId,
eventType.Name,
JsonSerializer.Serialize(message)
),
cancellationToken
);
}
public async Task<IList<OutboxEntry>> GetPendingMessages(CancellationToken cancellationToken)
{
var now = DateTime.UtcNow;
return await context
.Outbox.Where(message => message.Status == OutboxEntryStatus.Pending)
.Where(message => message.NextRetryAt == null || message.NextRetryAt <= now)
.OrderBy(message => message.CreatedAt)
.ToListAsync(cancellationToken);
}
}