some changes

This commit is contained in:
john 2025-05-17 23:47:19 +02:00
parent 4ec9720541
commit b47bac67ca
37 changed files with 397 additions and 190 deletions

View file

@ -0,0 +1,35 @@
using Femto.Common.Domain;
using MediatR;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;
namespace Femto.Common.Infrastructure;
public static class DbContextDomainExtensions
{
public static async Task EmitDomainEvents(this DbContext context, ILogger logger, IPublisher publisher, CancellationToken cancellationToken)
{
var domainEvents = context
.ChangeTracker.Entries<Entity>()
.SelectMany(e =>
{
var events = e.Entity.DomainEvents;
e.Entity.ClearDomainEvents();
return events;
})
.ToList();
logger.LogTrace("loaded {Count} domain events", domainEvents.Count);
foreach (var domainEvent in domainEvents)
{
logger.LogTrace(
"publishing {Type} domain event {Id}",
domainEvent.GetType().Name,
domainEvent.EventId
);
await publisher.Publish(domainEvent, cancellationToken);
}
}
}

View file

@ -4,5 +4,5 @@ namespace Femto.Common.Infrastructure.Outbox;
public interface IOutboxMessageHandler
{
Task Publish<TNotification>(TNotification notification, CancellationToken executionContextCancellationToken);
Task HandleMessage<TNotification>(TNotification notification, CancellationToken cancellationToken = default);
}

View file

@ -13,7 +13,7 @@ public class Outbox<TContext>(TContext context, IOutboxMessageMapping mapping) w
TMessage message,
CancellationToken cancellationToken
)
where TMessage : IIntegrationEvent
where TMessage : IEvent
{
var eventName = mapping.GetEventName(typeof(TMessage));
if (eventName is null)

View file

@ -3,7 +3,7 @@ namespace Femto.Common.Infrastructure.Outbox;
public class OutboxEntry
{
private const int MaxRetries = 5;
public Guid Id { get; private set; }
public string EventType { get; private set; } = null!;
@ -18,7 +18,7 @@ public class OutboxEntry
public int RetryCount { get; private set; } = 0;
public string? LastError { get; private set; }
public OutboxEntryStatus Status { get; private set; }
private OutboxEntry() { }
public OutboxEntry(Guid eventId, Guid aggregateId, string eventType, string payload)
@ -35,7 +35,7 @@ public class OutboxEntry
this.ProcessedAt = DateTime.UtcNow;
this.Status = OutboxEntryStatus.Completed;
}
public void Fail(string error)
{
if (this.RetryCount >= MaxRetries)
@ -53,7 +53,7 @@ public class OutboxEntry
public enum OutboxEntryStatus
{
Pending,
Completed,
Failed
}
Pending = 0,
Completed = 1,
Failed = 2,
}

View file

@ -56,7 +56,7 @@ public class OutboxProcessor<TContext>(
message.AggregateId
);
await handler.Publish(notification, executionContext.CancellationToken);
await handler.HandleMessage(notification, executionContext.CancellationToken);
message.Succeed();
}

View file

@ -9,21 +9,23 @@ namespace Femto.Common.Infrastructure.Outbox;
public static class OutboxServiceExtension
{
public static void AddOutbox<TContext>(
public static void AddOutbox<TContext, TMessageHandler>(
this IServiceCollection services,
Func<IServiceProvider, TContext>? contextFactory = null
)
where TContext : DbContext, IOutboxContext
where TMessageHandler : class, IOutboxMessageHandler
{
services.AddSingleton<IOutboxMessageMapping, ClrTypenameMessageMapping>();
services.AddScoped<IOutboxContext>(c =>
contextFactory?.Invoke(c) ?? c.GetRequiredService<TContext>()
);
services.AddScoped<Outbox<TContext>>();
services.AddScoped<IOutboxMessageHandler, TMessageHandler>();
services.AddQuartz(q =>
{
var jobKey = JobKey.Create(nameof(OutboxProcessor<TContext>));

View file

@ -19,39 +19,12 @@ public class SaveChangesPipelineBehaviour<TRequest, TResponse>(
)
{
var response = await next(cancellationToken);
if (context.ChangeTracker.HasChanges())
{
await this.EmitDomainEvents(cancellationToken);
await context.EmitDomainEvents(logger, publisher, cancellationToken);
logger.LogDebug("saving changes");
await context.SaveChangesAsync(cancellationToken);
}
return response;
}
private async Task EmitDomainEvents(CancellationToken cancellationToken)
{
var domainEvents = context
.ChangeTracker.Entries<Entity>()
.SelectMany(e =>
{
var events = e.Entity.DomainEvents;
e.Entity.ClearDomainEvents();
return events;
})
.ToList();
logger.LogTrace("loaded {Count} domain events", domainEvents.Count);
foreach (var domainEvent in domainEvents)
{
logger.LogTrace(
"publishing {Type} domain event {Id}",
domainEvent.GetType().Name,
domainEvent.EventId
);
await publisher.Publish(domainEvent, cancellationToken);
}
}
}

View file

@ -0,0 +1,6 @@
namespace Femto.Common.Integration;
public abstract record Event : IEvent
{
public Guid EventId { get; } = Guid.CreateVersion7();
}

View file

@ -2,7 +2,7 @@ using MediatR;
namespace Femto.Common.Integration;
public interface IIntegrationEvent : INotification
public interface IEvent : INotification
{
public Guid EventId { get; }
}

View file

@ -0,0 +1,13 @@
namespace Femto.Common.Integration;
public interface IEventBus : IEventPublisher
{
public delegate Task Subscriber(IEvent evt, CancellationToken cancellationToken);
void Subscribe(Subscriber subscriber);
}
public interface IEventPublisher
{
Task Publish<T>(T evt) where T : IEvent;
}

View file

@ -0,0 +1,24 @@
namespace Femto.Common.Integration;
public interface IEventHandler
{
Task Handle(IEvent evt, CancellationToken cancellationToken = default);
}
public abstract class EventHandler<T> : IEventHandler
where T : IEvent
{
protected abstract Task Handle(T evt, CancellationToken cancellationToken);
public async Task Handle(IEvent evt, CancellationToken cancellationToken = default)
{
if (evt is not T typedEvt)
{
throw new InvalidOperationException(
$"Event {evt.GetType()} is not of type {typeof(T)}"
);
}
await Handle(typedEvt, cancellationToken);
}
}

View file

@ -1,6 +0,0 @@
namespace Femto.Common.Integration;
public interface IIntegrationEventBus
{
void Subscribe<T>() where T : IIntegrationEvent;
}