This document provides a deep-dive into Milvaion's technical architecture, design decisions, and component interactions.
- Overview
- System Architecture
- Component Details
- Data Flow
- Database Schema
- Message Queue Design
- Caching Strategy
- Security Architecture
- Scalability Patterns
Milvaion is built on the principle of separation of concerns, splitting job scheduling from job execution. This distributed architecture enables:
- Independent scaling of scheduler and workers
- Fault isolation - worker failures don't affect scheduling
- Heterogeneous workers - different hardware for different job types
- Language agnostic workers - any language that can consume RabbitMQ
Milvaion follows Onion Architecture (also known as Clean Architecture) for the API layer.
- Domain has no dependencies on other layers
- Application depends only on Domain
- Infrastructure depends on Application and Domain
- Presentation depends on all layers
Milvaion.Api
└── Milvaion.Infrastructure
└── Milvaion.Application
└── Milvaion.Domain
Core business entities and logic:
| Component | Description |
|---|---|
ScheduledJob |
Job definition entity |
JobOccurrence |
Single execution instance |
FailedOccurrence |
DLQ entry for failed jobs |
Worker |
Registered worker entity |
MilvaionUser |
User entity with roles |
OccurrenceStatus |
Enum: Queued, Running, Completed, Failed, etc. |
Use cases and business logic orchestration:
| Component | Description |
|---|---|
| Commands | Create, Update, Delete operations |
| Queries | Read operations with filtering/pagination |
| DTOs | Data transfer objects |
| Validators | FluentValidation validators |
| Interfaces | Repository and service abstractions |
| Behaviors | MediatR pipeline behaviors (logging, validation) |
// Command
public record CreateJobCommand(CreateJobDto Dto) : IRequest<Response<JobDto>>;
// Query
public record GetJobsQuery(ListRequest Request) : IRequest<Response<ListResponse<JobDto>>>;
// Handler
public class CreateJobCommandHandler : IRequestHandler<CreateJobCommand, Response<JobDto>>
{
public async Task<Response<JobDto>> Handle(CreateJobCommand request, CancellationToken ct)
{
// Business logic here
}
}External concerns and implementations:
| Component | Description |
|---|---|
| Persistence | EF Core DbContext, Repositories |
| Messaging | RabbitMQ publishers and consumers |
| Caching | Redis cache implementations |
| Scheduling | Redis ZSET-based scheduling |
| Services | External service integrations |
HTTP API and background services:
| Component | Description |
|---|---|
| Controllers | REST API endpoints |
| Middlewares | Request/response processing |
| Background Services | Dispatcher, HealthMonitor, etc. |
| SignalR Hubs | Real-time dashboard updates |
| Startup | DI configuration, middleware pipeline |
1. Client sends POST /api/v1/jobs/job
↓
2. Controller receives request
↓
3. MediatR dispatches CreateJobCommand
↓
4. Validator validates input
↓
5. Handler creates ScheduledJob entity
↓
6. Repository saves to PostgreSQL
↓
7. Scheduler adds to Redis ZSET (next execution time as score)
↓
8. Response returned to client
1. Dispatcher polls Redis ZSET for due jobs (score <= now)
↓
2. For each due job:
a. Create JobOccurrence (status: Queued)
b. Save to PostgreSQL
c. Publish message to RabbitMQ
d. Update job's next execution time in Redis
↓
3. Worker consumes message from RabbitMQ
↓
4. Worker publishes status update (Running)
↓
5. StatusTracker updates occurrence in PostgreSQL
↓
6. Worker executes IJob.ExecuteAsync()
↓
7. Worker publishes status update (Completed/Failed)
+ Logs via LogCollector
↓
8. If failed after max retries → DLQ
↓
9. Dashboard receives SignalR update
-- Jobs table
CREATE TABLE "ScheduledJobs" (
"Id" UUID PRIMARY KEY,
"DisplayName" VARCHAR(200) NOT NULL,
"Description" TEXT,
"Tags" VARCHAR(500),
"WorkerId" VARCHAR(100) NOT NULL,
"JobType" VARCHAR(200) NOT NULL,
"JobData" JSONB,
"CronExpression" VARCHAR(100),
"ExecuteAt" TIMESTAMPTZ,
"IsActive" BOOLEAN DEFAULT TRUE,
"ConcurrentExecutionPolicy" INTEGER DEFAULT 0,
"TimeoutMinutes" INTEGER,
"Version" INTEGER DEFAULT 1,
"CreationDate" TIMESTAMPTZ NOT NULL,
"CreatorUserName" VARCHAR(100),
"LastModificationDate" TIMESTAMPTZ,
"LastModifierUserName" VARCHAR(100)
);
-- Occurrences table
CREATE TABLE "JobOccurrences" (
"Id" UUID PRIMARY KEY,
"JobId" UUID NOT NULL REFERENCES "ScheduledJobs"("Id"),
"CorrelationId" UUID NOT NULL,
"WorkerId" VARCHAR(100),
"Status" INTEGER NOT NULL,
"StartTime" TIMESTAMPTZ,
"EndTime" TIMESTAMPTZ,
"DurationMs" BIGINT,
"Result" TEXT,
"Exception" TEXT,
"Logs" JSONB,
"StatusChangeLogs" JSONB,
"RetryCount" INTEGER DEFAULT 0,
"LastHeartbeat" TIMESTAMPTZ,
"JobVersion" INTEGER,
"CreatedAt" TIMESTAMPTZ NOT NULL
);
-- Indexes for performance
CREATE INDEX "IX_JobOccurrences_JobId" ON "JobOccurrences"("JobId");
CREATE INDEX "IX_JobOccurrences_Status" ON "JobOccurrences"("Status");
CREATE INDEX "IX_JobOccurrences_CreatedAt" ON "JobOccurrences"("CreatedAt");
CREATE INDEX "IX_ScheduledJobs_WorkerId" ON "ScheduledJobs"("WorkerId");
CREATE INDEX "IX_ScheduledJobs_IsActive" ON "ScheduledJobs"("IsActive");┌─────────────────┐ ┌─────────────────┐
│ ScheduledJobs │──1:N──│ JobOccurrences │
└─────────────────┘ └─────────────────┘
│ │
│ │
└─────────┬───────────────┘
│
▼
┌──────────────────┐
│ FailedOccurrences│
└──────────────────┘
Exchange: milvaion.job (topic)
??? Queue: milvaion.job.email-worker (routing: emailworker.#)
??? Queue: milvaion.job.http-worker (routing: httpworker.#)
??? Queue: milvaion.job.sql-worker (routing: sqlworker.#)
??? Queue: milvaion.job.custom-worker (routing: customworker.#)
Exchange: milvaion.status (direct)
??? Queue: milvaion.status (routing: status)
Exchange: milvaion.logs (direct)
??? Queue: milvaion.logs (routing: logs)
Exchange: milvaion.dlx (fanout)
??? Queue: milvaion.dlq (dead letter queue)
Job Message:
{
"occurrenceId": "uuid",
"jobId": "uuid",
"jobType": "SendEmailJob",
"jobData": "{ ... }",
"correlationId": "uuid",
"timestamp": "2024-01-15T10:30:00Z",
"retryCount": 0,
"maxRetries": 3
}Status Update Message:
{
"occurrenceId": "uuid",
"status": 2,
"timestamp": "2024-01-15T10:30:05Z",
"result": "Success",
"durationMs": 5000
}Milvaion:JobScheduler:
├─ schedule # ZSET: job schedule (score = next run timestamp)
├─ job:{jobId} # HASH: job cache
├─ locks:dispatcher # STRING: distributed lock for dispatcher
├─ locks:job:{jobId} # STRING: per-job execution lock
├─ workers # HASH: registered workers
├─ heartbeat:{workerId} # STRING: worker heartbeat timestamp
├─ running:{occurrenceId} # STRING: running job heartbeat
└─ cancellation_channel # PUBSUB: job cancellation signals
-
Schedule Cache (Redis ZSET)
- All active jobs with next execution time as score
- Dispatcher polls with
ZRANGEBYSCORE
-
Job Cache (Redis HASH)
- Frequently accessed job details
- TTL-based expiration
-
In-Memory Cache (optional)
- Configuration data
- Static lookups
1. Client sends credentials to POST /api/v1/auth/login
↓
2. Validate credentials against database
↓
3. Generate JWT access token + refresh token
↓
4. Return tokens to client
↓
5. Client includes JWT in Authorization header
↓
6. JWT middleware validates token on each request
↓
7. Claims extracted and available in controllers
- Role-based access control (RBAC)
- Permission-based fine-grained control
- Roles: Admin, Operator, Viewer
API Servers:
- Stateless design
- Load balancer distribution
- Sticky sessions for SignalR (or Redis backplane)
- Leader election for dispatcher (only one active)
Workers:
- Unlimited horizontal scaling
- RabbitMQ handles distribution
- Each worker has unique instance ID
// Pseudo-code for dispatcher leader election
async Task RunDispatcher()
{
while (true)
{
var lockAcquired = await redis.AcquireLock(
"dispatcher-lock",
instanceId,
TimeSpan.FromMinutes(10)
);
if (lockAcquired)
{
// This instance is the leader
await DispatchDueJobs();
await redis.ExtendLock("dispatcher-lock", instanceId);
}
await Task.Delay(1000);
}
}- RabbitMQ prefetch count limits concurrent processing
- MaxParallelJobs configuration per worker
- Queue depth monitoring and alerting
Application
↓
Serilog
↓
├──> Console (Development)
├──> Seq (Production)
└──> PostgreSQL (Audit logs)
Application
↓
OpenTelemetry
↓
Prometheus/OTLP Collector
↓
Grafana
- Distributed tracing with correlation IDs
- OpenTelemetry integration
- Trace propagation across API → RabbitMQ → Worker
| Decision | Choice | Rationale |
|---|---|---|
| Database | PostgreSQL | JSONB support, reliability, performance |
| Cache | Redis | Speed, pub/sub, distributed locks, sorted sets |
| Message Queue | RabbitMQ | Reliability, routing, DLQ support, management UI |
| ORM | EF Core | Productivity, migrations, LINQ support |
| API Framework | ASP.NET Core | Performance, ecosystem, cross-platform |
| Real-time | SignalR | Native .NET integration, fallback transports |
| Serialization | System.Text.Json | Performance, native support |
| Logging | Serilog | Structured logging, extensible sinks |
| Validation | FluentValidation | Expressive, testable rules |
| CQRS | MediatR | Clean separation, pipeline behaviors |
