This commit is contained in:
john 2025-05-03 15:38:57 +02:00
commit ab2e20f7e1
72 changed files with 2000 additions and 0 deletions

View file

@ -0,0 +1,16 @@
using Femto.Modules.Media.Infrastructure.Integration;
using Microsoft.EntityFrameworkCore;
namespace Femto.Modules.Media.Data;
internal class MediaContext(DbContextOptions<MediaContext> options) : DbContext(options)
{
public virtual DbSet<OutboxEntry> Outbox { get; set; }
protected override void OnModelCreating(ModelBuilder builder)
{
base.OnModelCreating(builder);
builder.HasDefaultSchema("blog");
builder.ApplyConfigurationsFromAssembly(typeof(MediaContext).Assembly);
}
}

View file

@ -0,0 +1,33 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net9.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<Folder Include="Data\Configurations\" />
</ItemGroup>
<ItemGroup>
<Reference Include="MediatR">
<HintPath>..\..\..\..\.nuget\packages\mediatr\12.5.0\lib\net6.0\MediatR.dll</HintPath>
</Reference>
<Reference Include="Microsoft.Extensions.Hosting.Abstractions">
<HintPath>..\..\..\..\.nuget\packages\microsoft.aspnetcore.app.ref\9.0.4\ref\net9.0\Microsoft.Extensions.Hosting.Abstractions.dll</HintPath>
</Reference>
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Femto.Common\Femto.Common.csproj" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="EFCore.NamingConventions" Version="9.0.0" />
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="9.0.4" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="9.0.4" />
<PackageReference Include="Npgsql.EntityFrameworkCore.PostgreSQL" Version="9.0.4" />
</ItemGroup>
</Project>

View file

@ -0,0 +1,78 @@
using System.Text.Json;
using Femto.Modules.Media.Data;
using MediatR;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
namespace Femto.Modules.Media.Infrastructure.Integration;
internal class Mailman(Outbox outbox, MediaContext context, ILogger<Mailman> logger, IMediator mediator)
: BackgroundService
{
protected override async Task ExecuteAsync(CancellationToken cancellationToken)
{
var timeToWait = TimeSpan.FromSeconds(1);
while (!cancellationToken.IsCancellationRequested)
{
try
{
await this.DeliverMail(cancellationToken);
}
catch (Exception e)
{
logger.LogError(e, "Error while processing outbox");
}
try
{
await Task.Delay(timeToWait, cancellationToken);
}
catch (TaskCanceledException)
{
break;
}
}
}
private async Task DeliverMail(CancellationToken cancellationToken)
{
var messages = await outbox.GetPendingMessages(cancellationToken);
foreach (var message in messages)
{
try
{
var notificationType = OutboxMessageTypeRegistry.GetType(message.EventType);
if (notificationType is null)
{
logger.LogWarning("unmapped event type {Type}. skipping.", message.EventType);
continue;
}
var notification =
JsonSerializer.Deserialize(message.Payload, notificationType) as INotification;
if (notification is null)
throw new Exception("notification is null");
await mediator.Publish(notification, cancellationToken);
message.Succeed();
}
catch (Exception e)
{
logger.LogError(
e,
"Error processing event {EventId} for aggregate {AggregateId}",
message.Id,
message.AggregateId
);
message.Fail(e.ToString());
}
await context.SaveChangesAsync(cancellationToken);
}
}
}

View file

@ -0,0 +1,34 @@
using System.Text.Json;
using Femto.Common.Integration;
using Femto.Modules.Media.Data;
using Microsoft.EntityFrameworkCore;
namespace Femto.Modules.Media.Infrastructure.Integration;
internal class Outbox(MediaContext context)
{
public async Task AddMessage<TMessage>(Guid aggregateId, TMessage message, CancellationToken cancellationToken)
where TMessage : IIntegrationEvent
{
await context.Outbox.AddAsync(
new(
message.EventId,
aggregateId,
typeof(TMessage).Name,
JsonSerializer.Serialize(message)
),
cancellationToken
);
}
public async Task<IEnumerable<OutboxEntry>> GetPendingMessages(CancellationToken cancellationToken)
{
var now = DateTime.UtcNow;
return await context
.Outbox.Where(message => message.Status == OutboxEntryStatus.Pending)
.Where(message => message.NextRetryAt == null || message.NextRetryAt <= now)
.OrderBy(message => message.CreatedAt)
.ToListAsync(cancellationToken);
}
}

View file

@ -0,0 +1,59 @@
namespace Femto.Modules.Media.Infrastructure.Integration;
internal class OutboxEntry
{
private const int MaxRetries = 5;
public Guid Id { get; private set; }
public string EventType { get; private set; } = null!;
public Guid AggregateId { get; private set; }
public string Payload { get; private set; } = null!;
public DateTime CreatedAt { get; private set; }
public DateTime? ProcessedAt { get; private set; }
public DateTime? NextRetryAt { get; private set; }
public int RetryCount { get; private set; } = 0;
public string? LastError { get; private set; }
public OutboxEntryStatus Status { get; private set; }
private OutboxEntry() { }
public OutboxEntry(Guid eventId, Guid aggregateId, string eventType, string payload)
{
this.Id = eventId;
this.EventType = eventType;
this.AggregateId = aggregateId;
this.Payload = payload;
this.CreatedAt = DateTime.UtcNow;
}
public void Succeed()
{
this.ProcessedAt = DateTime.UtcNow;
this.Status = OutboxEntryStatus.Completed;
}
public void Fail(string error)
{
if (this.RetryCount >= MaxRetries)
{
this.Status = OutboxEntryStatus.Failed;
}
else
{
this.LastError = error;
this.NextRetryAt = DateTime.UtcNow.AddSeconds(Math.Pow(2, this.RetryCount));
this.RetryCount++;
}
}
}
public enum OutboxEntryStatus
{
Pending,
Completed,
Failed
}

View file

@ -0,0 +1,35 @@
using System.Collections.Concurrent;
using System.Reflection;
using Femto.Common.Attributes;
using MediatR;
namespace Femto.Modules.Media.Infrastructure.Integration;
internal static class OutboxMessageTypeRegistry
{
private static readonly ConcurrentDictionary<string, Type> Mapping = new();
public static void RegisterOutboxMessageTypesInAssembly(Assembly assembly)
{
var types = assembly.GetTypes();
foreach (var type in types)
{
if (!typeof(INotification).IsAssignableFrom(type) || type.IsAbstract || type.IsInterface)
continue;
var attribute = type.GetCustomAttribute<EventTypeAttribute>();
if (attribute == null)
continue;
var eventName = attribute.Name;
if (!string.IsNullOrWhiteSpace(eventName))
{
Mapping.TryAdd(eventName, type);
}
}
}
public static Type? GetType(string eventName) => Mapping.GetValueOrDefault(eventName);
}

View file

@ -0,0 +1,36 @@
using Femto.Common.Domain;
using Femto.Modules.Media.Data;
using MediatR;
namespace Femto.Modules.Media.Infrastructure.PipelineBehaviours;
internal class DomainEventsPipelineBehaviour<TRequest, TResponse>(
MediaContext context,
IPublisher publisher) : IPipelineBehavior<TRequest, TResponse>
where TRequest : notnull
{
public async Task<TResponse> Handle(
TRequest request,
RequestHandlerDelegate<TResponse> next,
CancellationToken cancellationToken)
{
var response = await next(cancellationToken);
var domainEvents = context.ChangeTracker
.Entries<Entity>()
.SelectMany(e =>
{
var events = e.Entity.DomainEvents;
e.Entity.ClearDomainEvents();
return events;
})
.ToList();
foreach (var domainEvent in domainEvents)
{
await publisher.Publish(domainEvent, cancellationToken);
}
return response;
}
}

View file

@ -0,0 +1,23 @@
using Femto.Modules.Media.Data;
using MediatR;
namespace Femto.Modules.Media.Infrastructure.PipelineBehaviours;
/// <summary>
/// automatically call unit of work after all requuests
/// </summary>
internal class SaveChangesPipelineBehaviour<TRequest, TResponse>(MediaContext context)
: IPipelineBehavior<TRequest, TResponse> where TRequest : notnull
{
public async Task<TResponse> Handle(
TRequest request,
RequestHandlerDelegate<TResponse> next,
CancellationToken cancellationToken
)
{
var response = await next(cancellationToken);
if (context.ChangeTracker.HasChanges())
await context.SaveChangesAsync(cancellationToken);
return response;
}
}

View file

@ -0,0 +1,66 @@
using Femto.Modules.Media.Data;
using Femto.Modules.Media.Infrastructure.Integration;
using Femto.Modules.Media.Infrastructure.PipelineBehaviours;
using MediatR;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
namespace Femto.Modules.Media;
public static class MediaModule
{
public static void UseBlogModule(this IServiceCollection services, string connectionString)
{
OutboxMessageTypeRegistry.RegisterOutboxMessageTypesInAssembly(typeof(MediaModule).Assembly);
services.AddDbContext<MediaContext>(builder =>
{
builder.UseNpgsql(
connectionString,
o =>
{
o.MapEnum<OutboxEntryStatus>("outbox_status");
}
);
;
builder.UseSnakeCaseNamingConvention();
var loggerFactory = LoggerFactory.Create(b =>
{
b.AddConsole();
// .AddFilter(
// (category, level) =>
// category == DbLoggerCategory.Database.Command.Name
// && level == LogLevel.Debug
// );
});
builder.UseLoggerFactory(loggerFactory);
builder.EnableSensitiveDataLogging();
});
services.AddMediatR(c =>
{
c.RegisterServicesFromAssembly(typeof(MediaModule).Assembly);
});
services.SetupMediatrPipeline();
services.AddTransient<Outbox, Outbox>();
services.AddHostedService<Mailman>();
}
private static void SetupMediatrPipeline(this IServiceCollection services)
{
services.AddTransient(
typeof(IPipelineBehavior<,>),
typeof(DomainEventsPipelineBehaviour<,>)
);
services.AddTransient(
typeof(IPipelineBehavior<,>),
typeof(SaveChangesPipelineBehaviour<,>)
);
}
}