-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathMongoDBWorkflowInstanceStore.cs
More file actions
100 lines (86 loc) · 4.14 KB
/
MongoDBWorkflowInstanceStore.cs
File metadata and controls
100 lines (86 loc) · 4.14 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Birko.Data.MongoDB.Stores;
using Birko.Data.Stores;
using Birko.Configuration;
using Birko.Workflow.Core;
using Birko.Workflow.Execution;
using Birko.Workflow.MongoDB.Models;
namespace Birko.Workflow.MongoDB
{
public class MongoDBWorkflowInstanceStore<TData> : IWorkflowInstanceStore<TData>
where TData : class
{
private readonly AsyncMongoDBStore<MongoWorkflowInstanceModel> _store;
public MongoDBWorkflowInstanceStore(Birko.Data.MongoDB.Stores.Settings settings)
{
_store = new AsyncMongoDBStore<MongoWorkflowInstanceModel>();
_store.SetSettings(settings);
}
public MongoDBWorkflowInstanceStore(AsyncMongoDBStore<MongoWorkflowInstanceModel> store)
{
_store = store ?? throw new ArgumentNullException(nameof(store));
}
public AsyncMongoDBStore<MongoWorkflowInstanceModel> Store => _store;
public async Task<Guid> SaveAsync(string workflowName, WorkflowInstance<TData> instance, CancellationToken cancellationToken = default)
{
var existing = await _store.ReadAsync(m => m.Guid == instance.InstanceId, cancellationToken).ConfigureAwait(false);
if (existing != null)
{
existing.UpdateFromInstance(instance);
existing.WorkflowName = workflowName;
await _store.UpdateAsync(existing, ct: cancellationToken).ConfigureAwait(false);
return instance.InstanceId;
}
var model = MongoWorkflowInstanceModel.FromInstance(workflowName, instance);
return await _store.CreateAsync(model, ct: cancellationToken).ConfigureAwait(false);
}
public async Task<WorkflowInstance<TData>?> LoadAsync(Guid instanceId, CancellationToken cancellationToken = default)
{
var model = await _store.ReadAsync(m => m.Guid == instanceId, cancellationToken).ConfigureAwait(false);
return model?.ToInstance<TData>();
}
public async Task DeleteAsync(Guid instanceId, CancellationToken cancellationToken = default)
{
var model = await _store.ReadAsync(m => m.Guid == instanceId, cancellationToken).ConfigureAwait(false);
if (model != null)
{
await _store.DeleteAsync(model, cancellationToken).ConfigureAwait(false);
}
}
public async Task<IEnumerable<WorkflowInstance<TData>>> FindByStateAsync(string state, int limit = 100, CancellationToken cancellationToken = default)
{
var models = await _store.ReadAsync(
filter: m => m.CurrentState == state,
orderBy: OrderBy<MongoWorkflowInstanceModel>.ByDescending(m => m.UpdatedAt),
limit: limit,
ct: cancellationToken
).ConfigureAwait(false);
return models.Select(m => m.ToInstance<TData>());
}
public async Task<IEnumerable<WorkflowInstance<TData>>> FindByStatusAsync(WorkflowStatus status, int limit = 100, CancellationToken cancellationToken = default)
{
var statusInt = (int)status;
var models = await _store.ReadAsync(
filter: m => m.Status == statusInt,
orderBy: OrderBy<MongoWorkflowInstanceModel>.ByDescending(m => m.UpdatedAt),
limit: limit,
ct: cancellationToken
).ConfigureAwait(false);
return models.Select(m => m.ToInstance<TData>());
}
public async Task<IEnumerable<WorkflowInstance<TData>>> FindByWorkflowNameAsync(string workflowName, int limit = 100, CancellationToken cancellationToken = default)
{
var models = await _store.ReadAsync(
filter: m => m.WorkflowName == workflowName,
orderBy: OrderBy<MongoWorkflowInstanceModel>.ByDescending(m => m.UpdatedAt),
limit: limit,
ct: cancellationToken
).ConfigureAwait(false);
return models.Select(m => m.ToInstance<TData>());
}
}
}