media upload
This commit is contained in:
parent
befaa207d7
commit
0d7da2ea85
33 changed files with 257 additions and 353 deletions
|
@ -1,3 +1,3 @@
|
||||||
namespace Femto.Api.Controllers.Authors;
|
namespace Femto.Api.Controllers.Authors.Dto;
|
||||||
|
|
||||||
public record GetAuthorPostsSearchParams(Guid? From, int? Amount);
|
public record GetAuthorPostsSearchParams(Guid? From, int? Amount);
|
3
Femto.Api/Controllers/Media/Dto/UploadMediaResponse.cs
Normal file
3
Femto.Api/Controllers/Media/Dto/UploadMediaResponse.cs
Normal file
|
@ -0,0 +1,3 @@
|
||||||
|
namespace Femto.Api.Controllers.Media.Dto;
|
||||||
|
|
||||||
|
public record UploadMediaResponse(Guid MediaId, string Url);
|
39
Femto.Api/Controllers/Media/MediaController.cs
Normal file
39
Femto.Api/Controllers/Media/MediaController.cs
Normal file
|
@ -0,0 +1,39 @@
|
||||||
|
using Femto.Api.Controllers.Media.Dto;
|
||||||
|
using Femto.Modules.Media.Contracts;
|
||||||
|
using Femto.Modules.Media.Contracts.LoadFile;
|
||||||
|
using Femto.Modules.Media.Contracts.SaveFile;
|
||||||
|
using MediatR;
|
||||||
|
using Microsoft.AspNetCore.Mvc;
|
||||||
|
|
||||||
|
namespace Femto.Api.Controllers.Media;
|
||||||
|
|
||||||
|
[ApiController]
|
||||||
|
[Route("media")]
|
||||||
|
public class MediaController(IMediator mediator) : ControllerBase
|
||||||
|
{
|
||||||
|
[HttpPost]
|
||||||
|
public async Task<ActionResult<UploadMediaResponse>> UploadMedia(
|
||||||
|
IFormFile file,
|
||||||
|
CancellationToken cancellationToken
|
||||||
|
)
|
||||||
|
{
|
||||||
|
await using var data = file.OpenReadStream();
|
||||||
|
var id = await mediator.Send(
|
||||||
|
new SaveFileCommand(data, file.ContentType, file.Length),
|
||||||
|
cancellationToken
|
||||||
|
);
|
||||||
|
var fileGetUrl =
|
||||||
|
$"{this.HttpContext.Request.Scheme}://{this.HttpContext.Request.Host}/media/{id}";
|
||||||
|
return new UploadMediaResponse(id, fileGetUrl);
|
||||||
|
}
|
||||||
|
|
||||||
|
[HttpGet("{id}")]
|
||||||
|
public async Task GetMedia(Guid id, CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
var res = await mediator.Send(new LoadFileQuery(id), cancellationToken);
|
||||||
|
|
||||||
|
HttpContext.Response.ContentType = res.Type;
|
||||||
|
HttpContext.Response.ContentLength = res.Size;
|
||||||
|
await res.Data.CopyToAsync(HttpContext.Response.Body, cancellationToken);
|
||||||
|
}
|
||||||
|
}
|
|
@ -17,6 +17,7 @@
|
||||||
|
|
||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
<ProjectReference Include="..\Femto.Modules.Blog\Femto.Modules.Blog.csproj" />
|
<ProjectReference Include="..\Femto.Modules.Blog\Femto.Modules.Blog.csproj" />
|
||||||
|
<ProjectReference Include="..\Femto.Modules.Media\Femto.Modules.Media.csproj" />
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|
||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
|
|
|
@ -1,6 +1,8 @@
|
||||||
using System.Text.Json;
|
using System.Text.Json;
|
||||||
using System.Text.Json.Serialization;
|
using System.Text.Json.Serialization;
|
||||||
using Femto.Modules.Blog;
|
using Femto.Modules.Blog;
|
||||||
|
using Femto.Modules.Media;
|
||||||
|
using Quartz;
|
||||||
|
|
||||||
var builder = WebApplication.CreateBuilder(args);
|
var builder = WebApplication.CreateBuilder(args);
|
||||||
|
|
||||||
|
@ -10,7 +12,17 @@ var databaseConnectionString = builder.Configuration.GetConnectionString("Databa
|
||||||
if (databaseConnectionString is null)
|
if (databaseConnectionString is null)
|
||||||
throw new Exception("no database connection string found");
|
throw new Exception("no database connection string found");
|
||||||
|
|
||||||
|
var blobStorageRoot = builder.Configuration.GetValue<string>("BlobStorageRoot");
|
||||||
|
if (blobStorageRoot is null)
|
||||||
|
throw new Exception("no blob storage root found");
|
||||||
|
|
||||||
|
builder.Services.AddQuartzHostedService(options =>
|
||||||
|
{
|
||||||
|
options.WaitForJobsToComplete = true;
|
||||||
|
});
|
||||||
|
|
||||||
builder.Services.UseBlogModule(databaseConnectionString);
|
builder.Services.UseBlogModule(databaseConnectionString);
|
||||||
|
builder.Services.UseMediaModule(databaseConnectionString, blobStorageRoot);
|
||||||
|
|
||||||
builder.Services.AddControllers();
|
builder.Services.AddControllers();
|
||||||
|
|
||||||
|
|
|
@ -40,5 +40,14 @@ CREATE TABLE blog.outbox
|
||||||
retry_count int DEFAULT 0 NOT NULL,
|
retry_count int DEFAULT 0 NOT NULL,
|
||||||
last_error text,
|
last_error text,
|
||||||
status outbox_status DEFAULT 'pending' NOT NULL
|
status outbox_status DEFAULT 'pending' NOT NULL
|
||||||
|
);
|
||||||
|
|
||||||
)
|
CREATE SCHEMA media;
|
||||||
|
|
||||||
|
CREATE TABLE media.saved_blob
|
||||||
|
(
|
||||||
|
id uuid PRIMARY KEY,
|
||||||
|
created_on timestamp DEFAULT CURRENT_TIMESTAMP NOT NULL,
|
||||||
|
type varchar(64) NOT NULL,
|
||||||
|
size int
|
||||||
|
);
|
|
@ -1,5 +1,6 @@
|
||||||
using Femto.Modules.Blog.Domain.Posts;
|
using Femto.Modules.Blog.Domain.Posts;
|
||||||
using Femto.Modules.Blog.Infrastructure.Integration;
|
using Femto.Modules.Blog.Infrastructure.Integration;
|
||||||
|
using Femto.Modules.Blog.Infrastructure.Integration.Outbox;
|
||||||
using Microsoft.EntityFrameworkCore;
|
using Microsoft.EntityFrameworkCore;
|
||||||
|
|
||||||
namespace Femto.Modules.Blog.Data;
|
namespace Femto.Modules.Blog.Data;
|
||||||
|
|
|
@ -1,4 +1,5 @@
|
||||||
using Femto.Modules.Blog.Infrastructure.Integration;
|
using Femto.Modules.Blog.Infrastructure.Integration;
|
||||||
|
using Femto.Modules.Blog.Infrastructure.Integration.Outbox;
|
||||||
using Microsoft.EntityFrameworkCore;
|
using Microsoft.EntityFrameworkCore;
|
||||||
using Microsoft.EntityFrameworkCore.Metadata.Builders;
|
using Microsoft.EntityFrameworkCore.Metadata.Builders;
|
||||||
|
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
using Femto.Modules.Blog.Contracts.Events;
|
using Femto.Modules.Blog.Contracts.Events;
|
||||||
using Femto.Modules.Blog.Domain.Posts.Events;
|
using Femto.Modules.Blog.Domain.Posts.Events;
|
||||||
using Femto.Modules.Blog.Infrastructure.Integration;
|
using Femto.Modules.Blog.Infrastructure.Integration;
|
||||||
|
using Femto.Modules.Blog.Infrastructure.Integration.Outbox;
|
||||||
using MediatR;
|
using MediatR;
|
||||||
|
|
||||||
namespace Femto.Modules.Blog.Domain.Posts.Handlers;
|
namespace Femto.Modules.Blog.Domain.Posts.Handlers;
|
||||||
|
|
|
@ -1,11 +1,10 @@
|
||||||
using System.Text.Json;
|
using System.Text.Json;
|
||||||
using Femto.Modules.Blog.Data;
|
using Femto.Modules.Blog.Data;
|
||||||
using Femto.Modules.Blog.Infrastructure.Integration;
|
|
||||||
using MediatR;
|
using MediatR;
|
||||||
using Microsoft.Extensions.Logging;
|
using Microsoft.Extensions.Logging;
|
||||||
using Quartz;
|
using Quartz;
|
||||||
|
|
||||||
namespace Femto.Modules.Blog;
|
namespace Femto.Modules.Blog.Infrastructure.Integration.Outbox;
|
||||||
|
|
||||||
[DisallowConcurrentExecution]
|
[DisallowConcurrentExecution]
|
||||||
internal class MailmanJob(
|
internal class MailmanJob(
|
||||||
|
|
|
@ -5,7 +5,7 @@ using Femto.Common.Integration;
|
||||||
using Femto.Modules.Blog.Data;
|
using Femto.Modules.Blog.Data;
|
||||||
using Microsoft.EntityFrameworkCore;
|
using Microsoft.EntityFrameworkCore;
|
||||||
|
|
||||||
namespace Femto.Modules.Blog.Infrastructure.Integration;
|
namespace Femto.Modules.Blog.Infrastructure.Integration.Outbox;
|
||||||
|
|
||||||
internal class Outbox(BlogContext context)
|
internal class Outbox(BlogContext context)
|
||||||
{
|
{
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
namespace Femto.Modules.Blog.Infrastructure.Integration;
|
namespace Femto.Modules.Blog.Infrastructure.Integration.Outbox;
|
||||||
|
|
||||||
internal class OutboxEntry
|
internal class OutboxEntry
|
||||||
{
|
{
|
||||||
|
|
|
@ -1,10 +1,7 @@
|
||||||
using System.Collections.Concurrent;
|
using System.Collections.Concurrent;
|
||||||
using System.Collections.Immutable;
|
using System.Collections.Immutable;
|
||||||
using System.Reflection;
|
|
||||||
using Femto.Common.Attributes;
|
|
||||||
using MediatR;
|
|
||||||
|
|
||||||
namespace Femto.Modules.Blog.Infrastructure.Integration;
|
namespace Femto.Modules.Blog.Infrastructure.Integration.Outbox;
|
||||||
|
|
||||||
internal static class OutboxMessageTypeRegistry
|
internal static class OutboxMessageTypeRegistry
|
||||||
{
|
{
|
||||||
|
|
|
@ -3,6 +3,7 @@ using Femto.Modules.Blog.Data;
|
||||||
using Femto.Modules.Blog.Infrastructure;
|
using Femto.Modules.Blog.Infrastructure;
|
||||||
using Femto.Modules.Blog.Infrastructure.DbConnection;
|
using Femto.Modules.Blog.Infrastructure.DbConnection;
|
||||||
using Femto.Modules.Blog.Infrastructure.Integration;
|
using Femto.Modules.Blog.Infrastructure.Integration;
|
||||||
|
using Femto.Modules.Blog.Infrastructure.Integration.Outbox;
|
||||||
using Femto.Modules.Blog.Infrastructure.PipelineBehaviours;
|
using Femto.Modules.Blog.Infrastructure.PipelineBehaviours;
|
||||||
using MediatR;
|
using MediatR;
|
||||||
using Microsoft.EntityFrameworkCore;
|
using Microsoft.EntityFrameworkCore;
|
||||||
|
|
|
@ -0,0 +1,3 @@
|
||||||
|
namespace Femto.Modules.Media.Contracts.LoadFile.Dto;
|
||||||
|
|
||||||
|
public record LoadFileQueryResult(Stream Data, string Type, long? Size);
|
6
Femto.Modules.Media/Contracts/LoadFile/LoadFileQuery.cs
Normal file
6
Femto.Modules.Media/Contracts/LoadFile/LoadFileQuery.cs
Normal file
|
@ -0,0 +1,6 @@
|
||||||
|
using Femto.Modules.Media.Contracts.LoadFile.Dto;
|
||||||
|
using MediatR;
|
||||||
|
|
||||||
|
namespace Femto.Modules.Media.Contracts.LoadFile;
|
||||||
|
|
||||||
|
public record LoadFileQuery(Guid FileId) : IRequest<LoadFileQueryResult>;
|
|
@ -0,0 +1,25 @@
|
||||||
|
using Femto.Modules.Media.Contracts.LoadFile.Dto;
|
||||||
|
using Femto.Modules.Media.Data;
|
||||||
|
using Femto.Modules.Media.Infrastructure;
|
||||||
|
using MediatR;
|
||||||
|
using Microsoft.EntityFrameworkCore;
|
||||||
|
|
||||||
|
namespace Femto.Modules.Media.Contracts.LoadFile;
|
||||||
|
|
||||||
|
internal class LoadFileQueryHandler(IStorageProvider storage, MediaContext context)
|
||||||
|
: IRequestHandler<LoadFileQuery, LoadFileQueryResult>
|
||||||
|
{
|
||||||
|
public async Task<LoadFileQueryResult> Handle(
|
||||||
|
LoadFileQuery query,
|
||||||
|
CancellationToken cancellationToken
|
||||||
|
)
|
||||||
|
{
|
||||||
|
var blob = await context
|
||||||
|
.SavedBlobs.Where(b => b.Id == query.FileId)
|
||||||
|
.SingleAsync(cancellationToken: cancellationToken);
|
||||||
|
|
||||||
|
var data = await storage.LoadBlob(query.FileId.ToString());
|
||||||
|
|
||||||
|
return new(data, blob.Type, blob.Size);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,5 @@
|
||||||
|
using MediatR;
|
||||||
|
|
||||||
|
namespace Femto.Modules.Media.Contracts.SaveFile;
|
||||||
|
|
||||||
|
public record SaveFileCommand(Stream Data, string ContentType, long? ContentLength) : IRequest<Guid>;
|
|
@ -0,0 +1,18 @@
|
||||||
|
using Femto.Modules.Media.Data;
|
||||||
|
using Femto.Modules.Media.Infrastructure;
|
||||||
|
using MediatR;
|
||||||
|
|
||||||
|
namespace Femto.Modules.Media.Contracts.SaveFile;
|
||||||
|
|
||||||
|
internal class SaveFileCommandHandler(IStorageProvider storage, MediaContext context)
|
||||||
|
: IRequestHandler<SaveFileCommand, Guid>
|
||||||
|
{
|
||||||
|
public async Task<Guid> Handle(SaveFileCommand command, CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
var id = Guid.CreateVersion7();
|
||||||
|
await storage.SaveBlob(id.ToString(), command.Data);
|
||||||
|
await context.AddAsync(new SavedBlob(id, command.ContentType, command.ContentLength), cancellationToken);
|
||||||
|
await context.SaveChangesAsync(cancellationToken);
|
||||||
|
return id;
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,16 +1,15 @@
|
||||||
using Femto.Modules.Media.Infrastructure.Integration;
|
|
||||||
using Microsoft.EntityFrameworkCore;
|
using Microsoft.EntityFrameworkCore;
|
||||||
|
|
||||||
namespace Femto.Modules.Media.Data;
|
namespace Femto.Modules.Media.Data;
|
||||||
|
|
||||||
internal class MediaContext(DbContextOptions<MediaContext> options) : DbContext(options)
|
internal class MediaContext(DbContextOptions<MediaContext> options) : DbContext(options)
|
||||||
{
|
{
|
||||||
public virtual DbSet<OutboxEntry> Outbox { get; set; }
|
public virtual DbSet<SavedBlob> SavedBlobs { get; set; }
|
||||||
|
|
||||||
protected override void OnModelCreating(ModelBuilder builder)
|
protected override void OnModelCreating(ModelBuilder builder)
|
||||||
{
|
{
|
||||||
base.OnModelCreating(builder);
|
base.OnModelCreating(builder);
|
||||||
builder.HasDefaultSchema("blog");
|
builder.HasDefaultSchema("media");
|
||||||
builder.ApplyConfigurationsFromAssembly(typeof(MediaContext).Assembly);
|
builder.ApplyConfigurationsFromAssembly(typeof(MediaContext).Assembly);
|
||||||
}
|
}
|
||||||
}
|
}
|
22
Femto.Modules.Media/Data/SavedBlob.cs
Normal file
22
Femto.Modules.Media/Data/SavedBlob.cs
Normal file
|
@ -0,0 +1,22 @@
|
||||||
|
using System.ComponentModel.DataAnnotations.Schema;
|
||||||
|
|
||||||
|
namespace Femto.Modules.Media.Data;
|
||||||
|
|
||||||
|
[Table("saved_blob")]
|
||||||
|
internal class SavedBlob
|
||||||
|
{
|
||||||
|
public Guid Id { get; private set; }
|
||||||
|
public DateTime CreatedOn { get; private set; }
|
||||||
|
public string Type { get; private set; }
|
||||||
|
public long? Size { get; private set; }
|
||||||
|
|
||||||
|
public SavedBlob(Guid id, string type, long? size)
|
||||||
|
{
|
||||||
|
Id = id;
|
||||||
|
CreatedOn = DateTime.UtcNow;
|
||||||
|
Type = type;
|
||||||
|
Size = size;
|
||||||
|
}
|
||||||
|
|
||||||
|
private SavedBlob() { }
|
||||||
|
}
|
|
@ -6,10 +6,6 @@
|
||||||
<Nullable>enable</Nullable>
|
<Nullable>enable</Nullable>
|
||||||
</PropertyGroup>
|
</PropertyGroup>
|
||||||
|
|
||||||
<ItemGroup>
|
|
||||||
<Folder Include="Data\Configurations\" />
|
|
||||||
</ItemGroup>
|
|
||||||
|
|
||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
<Reference Include="MediatR">
|
<Reference Include="MediatR">
|
||||||
<HintPath>..\..\..\..\.nuget\packages\mediatr\12.5.0\lib\net6.0\MediatR.dll</HintPath>
|
<HintPath>..\..\..\..\.nuget\packages\mediatr\12.5.0\lib\net6.0\MediatR.dll</HintPath>
|
||||||
|
|
|
@ -0,0 +1,62 @@
|
||||||
|
using Femto.Modules.Media.Data;
|
||||||
|
|
||||||
|
namespace Femto.Modules.Media.Infrastructure;
|
||||||
|
|
||||||
|
internal class FilesystemStorageProvider : IStorageProvider
|
||||||
|
{
|
||||||
|
private readonly string _storageRoot;
|
||||||
|
|
||||||
|
public FilesystemStorageProvider(string storageRoot)
|
||||||
|
{
|
||||||
|
if (string.IsNullOrWhiteSpace(storageRoot))
|
||||||
|
throw new ArgumentException(
|
||||||
|
"Storage root cannot be null or empty.",
|
||||||
|
nameof(storageRoot)
|
||||||
|
);
|
||||||
|
|
||||||
|
this._storageRoot = Path.GetFullPath(storageRoot);
|
||||||
|
|
||||||
|
if (!Directory.Exists(this._storageRoot))
|
||||||
|
{
|
||||||
|
Directory.CreateDirectory(this._storageRoot);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task SaveBlob(string id, Stream data)
|
||||||
|
{
|
||||||
|
string filePath = this.GetSafeFilePath(id);
|
||||||
|
|
||||||
|
// Ensure the directory exists
|
||||||
|
Directory.CreateDirectory(Path.GetDirectoryName(filePath)!);
|
||||||
|
|
||||||
|
await using var fileStream = new FileStream(filePath, FileMode.Create, FileAccess.Write);
|
||||||
|
await data.CopyToAsync(fileStream);
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task<Stream> LoadBlob(string id)
|
||||||
|
{
|
||||||
|
string filePath = this.GetSafeFilePath(id);
|
||||||
|
|
||||||
|
if (!File.Exists(filePath))
|
||||||
|
throw new FileNotFoundException("The blob with the specified ID was not found.", id);
|
||||||
|
|
||||||
|
Stream fileStream = new FileStream(filePath, FileMode.Open, FileAccess.Read);
|
||||||
|
|
||||||
|
return fileStream;
|
||||||
|
}
|
||||||
|
|
||||||
|
private string GetSafeFilePath(string id)
|
||||||
|
{
|
||||||
|
if (string.IsNullOrWhiteSpace(id))
|
||||||
|
throw new ArgumentException("Blob ID cannot be null or empty.", nameof(id));
|
||||||
|
|
||||||
|
// Sanitize and validate the path
|
||||||
|
string combinedPath = Path.Combine(this._storageRoot, id);
|
||||||
|
string fullPath = Path.GetFullPath(combinedPath);
|
||||||
|
|
||||||
|
if (!fullPath.StartsWith(this._storageRoot, StringComparison.Ordinal))
|
||||||
|
throw new UnauthorizedAccessException("Access to the path is denied.");
|
||||||
|
|
||||||
|
return fullPath;
|
||||||
|
}
|
||||||
|
}
|
7
Femto.Modules.Media/Infrastructure/IStorageProvider.cs
Normal file
7
Femto.Modules.Media/Infrastructure/IStorageProvider.cs
Normal file
|
@ -0,0 +1,7 @@
|
||||||
|
namespace Femto.Modules.Media.Infrastructure;
|
||||||
|
|
||||||
|
internal interface IStorageProvider
|
||||||
|
{
|
||||||
|
Task SaveBlob(string id, Stream data);
|
||||||
|
Task<Stream> LoadBlob(string id);
|
||||||
|
}
|
|
@ -1,78 +0,0 @@
|
||||||
using System.Text.Json;
|
|
||||||
using Femto.Modules.Media.Data;
|
|
||||||
using MediatR;
|
|
||||||
using Microsoft.Extensions.Hosting;
|
|
||||||
using Microsoft.Extensions.Logging;
|
|
||||||
|
|
||||||
namespace Femto.Modules.Media.Infrastructure.Integration;
|
|
||||||
|
|
||||||
internal class Mailman(Outbox outbox, MediaContext context, ILogger<Mailman> logger, IMediator mediator)
|
|
||||||
: BackgroundService
|
|
||||||
{
|
|
||||||
protected override async Task ExecuteAsync(CancellationToken cancellationToken)
|
|
||||||
{
|
|
||||||
var timeToWait = TimeSpan.FromSeconds(1);
|
|
||||||
|
|
||||||
while (!cancellationToken.IsCancellationRequested)
|
|
||||||
{
|
|
||||||
try
|
|
||||||
{
|
|
||||||
await this.DeliverMail(cancellationToken);
|
|
||||||
}
|
|
||||||
catch (Exception e)
|
|
||||||
{
|
|
||||||
logger.LogError(e, "Error while processing outbox");
|
|
||||||
}
|
|
||||||
|
|
||||||
try
|
|
||||||
{
|
|
||||||
await Task.Delay(timeToWait, cancellationToken);
|
|
||||||
}
|
|
||||||
catch (TaskCanceledException)
|
|
||||||
{
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private async Task DeliverMail(CancellationToken cancellationToken)
|
|
||||||
{
|
|
||||||
var messages = await outbox.GetPendingMessages(cancellationToken);
|
|
||||||
|
|
||||||
foreach (var message in messages)
|
|
||||||
{
|
|
||||||
try
|
|
||||||
{
|
|
||||||
var notificationType = OutboxMessageTypeRegistry.GetType(message.EventType);
|
|
||||||
if (notificationType is null)
|
|
||||||
{
|
|
||||||
logger.LogWarning("unmapped event type {Type}. skipping.", message.EventType);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
var notification =
|
|
||||||
JsonSerializer.Deserialize(message.Payload, notificationType) as INotification;
|
|
||||||
|
|
||||||
if (notification is null)
|
|
||||||
throw new Exception("notification is null");
|
|
||||||
|
|
||||||
await mediator.Publish(notification, cancellationToken);
|
|
||||||
|
|
||||||
message.Succeed();
|
|
||||||
}
|
|
||||||
catch (Exception e)
|
|
||||||
{
|
|
||||||
logger.LogError(
|
|
||||||
e,
|
|
||||||
"Error processing event {EventId} for aggregate {AggregateId}",
|
|
||||||
message.Id,
|
|
||||||
message.AggregateId
|
|
||||||
);
|
|
||||||
|
|
||||||
message.Fail(e.ToString());
|
|
||||||
}
|
|
||||||
|
|
||||||
await context.SaveChangesAsync(cancellationToken);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,34 +0,0 @@
|
||||||
using System.Text.Json;
|
|
||||||
using Femto.Common.Integration;
|
|
||||||
using Femto.Modules.Media.Data;
|
|
||||||
using Microsoft.EntityFrameworkCore;
|
|
||||||
|
|
||||||
namespace Femto.Modules.Media.Infrastructure.Integration;
|
|
||||||
|
|
||||||
internal class Outbox(MediaContext context)
|
|
||||||
{
|
|
||||||
public async Task AddMessage<TMessage>(Guid aggregateId, TMessage message, CancellationToken cancellationToken)
|
|
||||||
where TMessage : IIntegrationEvent
|
|
||||||
{
|
|
||||||
await context.Outbox.AddAsync(
|
|
||||||
new(
|
|
||||||
message.EventId,
|
|
||||||
aggregateId,
|
|
||||||
typeof(TMessage).Name,
|
|
||||||
JsonSerializer.Serialize(message)
|
|
||||||
),
|
|
||||||
cancellationToken
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
public async Task<IEnumerable<OutboxEntry>> GetPendingMessages(CancellationToken cancellationToken)
|
|
||||||
{
|
|
||||||
var now = DateTime.UtcNow;
|
|
||||||
|
|
||||||
return await context
|
|
||||||
.Outbox.Where(message => message.Status == OutboxEntryStatus.Pending)
|
|
||||||
.Where(message => message.NextRetryAt == null || message.NextRetryAt <= now)
|
|
||||||
.OrderBy(message => message.CreatedAt)
|
|
||||||
.ToListAsync(cancellationToken);
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,59 +0,0 @@
|
||||||
namespace Femto.Modules.Media.Infrastructure.Integration;
|
|
||||||
|
|
||||||
internal class OutboxEntry
|
|
||||||
{
|
|
||||||
private const int MaxRetries = 5;
|
|
||||||
|
|
||||||
public Guid Id { get; private set; }
|
|
||||||
|
|
||||||
public string EventType { get; private set; } = null!;
|
|
||||||
public Guid AggregateId { get; private set; }
|
|
||||||
|
|
||||||
public string Payload { get; private set; } = null!;
|
|
||||||
|
|
||||||
public DateTime CreatedAt { get; private set; }
|
|
||||||
|
|
||||||
public DateTime? ProcessedAt { get; private set; }
|
|
||||||
public DateTime? NextRetryAt { get; private set; }
|
|
||||||
public int RetryCount { get; private set; } = 0;
|
|
||||||
public string? LastError { get; private set; }
|
|
||||||
public OutboxEntryStatus Status { get; private set; }
|
|
||||||
|
|
||||||
private OutboxEntry() { }
|
|
||||||
|
|
||||||
public OutboxEntry(Guid eventId, Guid aggregateId, string eventType, string payload)
|
|
||||||
{
|
|
||||||
this.Id = eventId;
|
|
||||||
this.EventType = eventType;
|
|
||||||
this.AggregateId = aggregateId;
|
|
||||||
this.Payload = payload;
|
|
||||||
this.CreatedAt = DateTime.UtcNow;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void Succeed()
|
|
||||||
{
|
|
||||||
this.ProcessedAt = DateTime.UtcNow;
|
|
||||||
this.Status = OutboxEntryStatus.Completed;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void Fail(string error)
|
|
||||||
{
|
|
||||||
if (this.RetryCount >= MaxRetries)
|
|
||||||
{
|
|
||||||
this.Status = OutboxEntryStatus.Failed;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
this.LastError = error;
|
|
||||||
this.NextRetryAt = DateTime.UtcNow.AddSeconds(Math.Pow(2, this.RetryCount));
|
|
||||||
this.RetryCount++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public enum OutboxEntryStatus
|
|
||||||
{
|
|
||||||
Pending,
|
|
||||||
Completed,
|
|
||||||
Failed
|
|
||||||
}
|
|
|
@ -1,35 +0,0 @@
|
||||||
using System.Collections.Concurrent;
|
|
||||||
using System.Reflection;
|
|
||||||
using Femto.Common.Attributes;
|
|
||||||
using MediatR;
|
|
||||||
|
|
||||||
namespace Femto.Modules.Media.Infrastructure.Integration;
|
|
||||||
|
|
||||||
internal static class OutboxMessageTypeRegistry
|
|
||||||
{
|
|
||||||
private static readonly ConcurrentDictionary<string, Type> Mapping = new();
|
|
||||||
|
|
||||||
public static void RegisterOutboxMessageTypesInAssembly(Assembly assembly)
|
|
||||||
{
|
|
||||||
var types = assembly.GetTypes();
|
|
||||||
|
|
||||||
foreach (var type in types)
|
|
||||||
{
|
|
||||||
if (!typeof(INotification).IsAssignableFrom(type) || type.IsAbstract || type.IsInterface)
|
|
||||||
continue;
|
|
||||||
|
|
||||||
var attribute = type.GetCustomAttribute<EventTypeAttribute>();
|
|
||||||
if (attribute == null)
|
|
||||||
continue;
|
|
||||||
|
|
||||||
var eventName = attribute.Name;
|
|
||||||
if (!string.IsNullOrWhiteSpace(eventName))
|
|
||||||
{
|
|
||||||
Mapping.TryAdd(eventName, type);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public static Type? GetType(string eventName) => Mapping.GetValueOrDefault(eventName);
|
|
||||||
|
|
||||||
}
|
|
|
@ -1,36 +0,0 @@
|
||||||
using Femto.Common.Domain;
|
|
||||||
using Femto.Modules.Media.Data;
|
|
||||||
using MediatR;
|
|
||||||
|
|
||||||
namespace Femto.Modules.Media.Infrastructure.PipelineBehaviours;
|
|
||||||
|
|
||||||
internal class DomainEventsPipelineBehaviour<TRequest, TResponse>(
|
|
||||||
MediaContext context,
|
|
||||||
IPublisher publisher) : IPipelineBehavior<TRequest, TResponse>
|
|
||||||
where TRequest : notnull
|
|
||||||
{
|
|
||||||
public async Task<TResponse> Handle(
|
|
||||||
TRequest request,
|
|
||||||
RequestHandlerDelegate<TResponse> next,
|
|
||||||
CancellationToken cancellationToken)
|
|
||||||
{
|
|
||||||
var response = await next(cancellationToken);
|
|
||||||
|
|
||||||
var domainEvents = context.ChangeTracker
|
|
||||||
.Entries<Entity>()
|
|
||||||
.SelectMany(e =>
|
|
||||||
{
|
|
||||||
var events = e.Entity.DomainEvents;
|
|
||||||
e.Entity.ClearDomainEvents();
|
|
||||||
return events;
|
|
||||||
})
|
|
||||||
.ToList();
|
|
||||||
|
|
||||||
foreach (var domainEvent in domainEvents)
|
|
||||||
{
|
|
||||||
await publisher.Publish(domainEvent, cancellationToken);
|
|
||||||
}
|
|
||||||
|
|
||||||
return response;
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,23 +0,0 @@
|
||||||
using Femto.Modules.Media.Data;
|
|
||||||
using MediatR;
|
|
||||||
|
|
||||||
namespace Femto.Modules.Media.Infrastructure.PipelineBehaviours;
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// automatically call unit of work after all requuests
|
|
||||||
/// </summary>
|
|
||||||
internal class SaveChangesPipelineBehaviour<TRequest, TResponse>(MediaContext context)
|
|
||||||
: IPipelineBehavior<TRequest, TResponse> where TRequest : notnull
|
|
||||||
{
|
|
||||||
public async Task<TResponse> Handle(
|
|
||||||
TRequest request,
|
|
||||||
RequestHandlerDelegate<TResponse> next,
|
|
||||||
CancellationToken cancellationToken
|
|
||||||
)
|
|
||||||
{
|
|
||||||
var response = await next(cancellationToken);
|
|
||||||
if (context.ChangeTracker.HasChanges())
|
|
||||||
await context.SaveChangesAsync(cancellationToken);
|
|
||||||
return response;
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,66 +0,0 @@
|
||||||
using Femto.Modules.Media.Data;
|
|
||||||
using Femto.Modules.Media.Infrastructure.Integration;
|
|
||||||
using Femto.Modules.Media.Infrastructure.PipelineBehaviours;
|
|
||||||
using MediatR;
|
|
||||||
using Microsoft.EntityFrameworkCore;
|
|
||||||
using Microsoft.Extensions.DependencyInjection;
|
|
||||||
using Microsoft.Extensions.Logging;
|
|
||||||
|
|
||||||
namespace Femto.Modules.Media;
|
|
||||||
|
|
||||||
public static class MediaModule
|
|
||||||
{
|
|
||||||
public static void UseBlogModule(this IServiceCollection services, string connectionString)
|
|
||||||
{
|
|
||||||
OutboxMessageTypeRegistry.RegisterOutboxMessageTypesInAssembly(typeof(MediaModule).Assembly);
|
|
||||||
|
|
||||||
services.AddDbContext<MediaContext>(builder =>
|
|
||||||
{
|
|
||||||
builder.UseNpgsql(
|
|
||||||
connectionString,
|
|
||||||
o =>
|
|
||||||
{
|
|
||||||
o.MapEnum<OutboxEntryStatus>("outbox_status");
|
|
||||||
}
|
|
||||||
);
|
|
||||||
;
|
|
||||||
builder.UseSnakeCaseNamingConvention();
|
|
||||||
|
|
||||||
var loggerFactory = LoggerFactory.Create(b =>
|
|
||||||
{
|
|
||||||
b.AddConsole();
|
|
||||||
// .AddFilter(
|
|
||||||
// (category, level) =>
|
|
||||||
// category == DbLoggerCategory.Database.Command.Name
|
|
||||||
// && level == LogLevel.Debug
|
|
||||||
// );
|
|
||||||
});
|
|
||||||
|
|
||||||
builder.UseLoggerFactory(loggerFactory);
|
|
||||||
builder.EnableSensitiveDataLogging();
|
|
||||||
});
|
|
||||||
|
|
||||||
services.AddMediatR(c =>
|
|
||||||
{
|
|
||||||
c.RegisterServicesFromAssembly(typeof(MediaModule).Assembly);
|
|
||||||
});
|
|
||||||
|
|
||||||
services.SetupMediatrPipeline();
|
|
||||||
|
|
||||||
services.AddTransient<Outbox, Outbox>();
|
|
||||||
services.AddHostedService<Mailman>();
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void SetupMediatrPipeline(this IServiceCollection services)
|
|
||||||
{
|
|
||||||
services.AddTransient(
|
|
||||||
typeof(IPipelineBehavior<,>),
|
|
||||||
typeof(DomainEventsPipelineBehaviour<,>)
|
|
||||||
);
|
|
||||||
|
|
||||||
services.AddTransient(
|
|
||||||
typeof(IPipelineBehavior<,>),
|
|
||||||
typeof(SaveChangesPipelineBehaviour<,>)
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
22
Femto.Modules.Media/Module.cs
Normal file
22
Femto.Modules.Media/Module.cs
Normal file
|
@ -0,0 +1,22 @@
|
||||||
|
using Femto.Modules.Media.Data;
|
||||||
|
using Femto.Modules.Media.Infrastructure;
|
||||||
|
using Microsoft.EntityFrameworkCore;
|
||||||
|
using Microsoft.Extensions.DependencyInjection;
|
||||||
|
using Microsoft.Extensions.Logging;
|
||||||
|
|
||||||
|
namespace Femto.Modules.Media;
|
||||||
|
|
||||||
|
public static class Module
|
||||||
|
{
|
||||||
|
public static void UseMediaModule(this IServiceCollection services, string connectionString, string storageRoot)
|
||||||
|
{
|
||||||
|
services.AddDbContext<MediaContext>(builder =>
|
||||||
|
{
|
||||||
|
builder.UseNpgsql(connectionString);
|
||||||
|
builder.UseSnakeCaseNamingConvention();
|
||||||
|
});
|
||||||
|
|
||||||
|
services.AddTransient<IStorageProvider>(s => new FilesystemStorageProvider(storageRoot));
|
||||||
|
services.AddMediatR(c => c.RegisterServicesFromAssembly(typeof(Module).Assembly));
|
||||||
|
}
|
||||||
|
}
|
|
@ -12,6 +12,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Femto.Common", "Femto.Commo
|
||||||
EndProject
|
EndProject
|
||||||
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Femto.Modules.Blog.Contracts", "Femto.Modules.Blog.Contracts\Femto.Modules.Blog.Contracts.csproj", "{35C42036-D53B-42EB-9A1C-B540E55F4FD0}"
|
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Femto.Modules.Blog.Contracts", "Femto.Modules.Blog.Contracts\Femto.Modules.Blog.Contracts.csproj", "{35C42036-D53B-42EB-9A1C-B540E55F4FD0}"
|
||||||
EndProject
|
EndProject
|
||||||
|
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Femto.Modules.Media", "Femto.Modules.Media\Femto.Modules.Media.csproj", "{AC9FBF11-FF29-4A80-B9EA-AFDF1E3DCA80}"
|
||||||
|
EndProject
|
||||||
Global
|
Global
|
||||||
GlobalSection(SolutionConfigurationPlatforms) = preSolution
|
GlobalSection(SolutionConfigurationPlatforms) = preSolution
|
||||||
Debug|Any CPU = Debug|Any CPU
|
Debug|Any CPU = Debug|Any CPU
|
||||||
|
@ -42,6 +44,10 @@ Global
|
||||||
{35C42036-D53B-42EB-9A1C-B540E55F4FD0}.Debug|Any CPU.Build.0 = Debug|Any CPU
|
{35C42036-D53B-42EB-9A1C-B540E55F4FD0}.Debug|Any CPU.Build.0 = Debug|Any CPU
|
||||||
{35C42036-D53B-42EB-9A1C-B540E55F4FD0}.Release|Any CPU.ActiveCfg = Release|Any CPU
|
{35C42036-D53B-42EB-9A1C-B540E55F4FD0}.Release|Any CPU.ActiveCfg = Release|Any CPU
|
||||||
{35C42036-D53B-42EB-9A1C-B540E55F4FD0}.Release|Any CPU.Build.0 = Release|Any CPU
|
{35C42036-D53B-42EB-9A1C-B540E55F4FD0}.Release|Any CPU.Build.0 = Release|Any CPU
|
||||||
|
{AC9FBF11-FF29-4A80-B9EA-AFDF1E3DCA80}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
|
||||||
|
{AC9FBF11-FF29-4A80-B9EA-AFDF1E3DCA80}.Debug|Any CPU.Build.0 = Debug|Any CPU
|
||||||
|
{AC9FBF11-FF29-4A80-B9EA-AFDF1E3DCA80}.Release|Any CPU.ActiveCfg = Release|Any CPU
|
||||||
|
{AC9FBF11-FF29-4A80-B9EA-AFDF1E3DCA80}.Release|Any CPU.Build.0 = Release|Any CPU
|
||||||
EndGlobalSection
|
EndGlobalSection
|
||||||
GlobalSection(NestedProjects) = preSolution
|
GlobalSection(NestedProjects) = preSolution
|
||||||
EndGlobalSection
|
EndGlobalSection
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue