-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathElasticSearchViewManager.cs
More file actions
158 lines (133 loc) · 5.71 KB
/
ElasticSearchViewManager.cs
File metadata and controls
158 lines (133 loc) · 5.71 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
using System;
using System.Threading;
using System.Threading.Tasks;
using Birko.Data.Views;
using Nest;
namespace Birko.Data.ElasticSearch.Views;
/// <summary>
/// ElasticSearch implementation of <see cref="IViewManager"/>.
/// </summary>
/// <remarks>
/// ElasticSearch does not have native database views. Persistent views in ES would typically
/// require Transforms (X-Pack, deprecated in 8.x) or a reindexing strategy where data is
/// pre-aggregated into a destination index. This manager treats the persistent view name as
/// an index name and provides basic index lifecycle operations.
///
/// For production use of persistent views, consider:
/// - Using ES Transforms (if available) to continuously materialize aggregated data
/// - Scheduling reindex jobs via Birko.BackgroundJobs to populate the destination index
/// - Using index aliases to swap between old and new materialized data
/// </remarks>
public class ElasticSearchViewManager : IViewManager
{
private readonly ElasticClient _connector;
/// <summary>
/// Initializes a new instance of the <see cref="ElasticSearchViewManager"/> class.
/// </summary>
/// <param name="connector">The NEST ElasticClient instance.</param>
public ElasticSearchViewManager(ElasticClient connector)
{
_connector = connector ?? throw new ArgumentNullException(nameof(connector));
}
/// <inheritdoc />
/// <remarks>
/// For persistent views, ElasticSearch does not support native view creation.
/// This method ensures the destination index exists. Data population must be handled
/// separately (e.g., via Transforms, reindex pipelines, or background jobs).
/// For OnTheFly views, this is a no-op.
/// </remarks>
public async Task EnsureAsync(ViewDefinition definition, CancellationToken ct = default)
{
ct.ThrowIfCancellationRequested();
if (definition == null)
{
throw new ArgumentNullException(nameof(definition));
}
if (definition.QueryMode == ViewQueryMode.OnTheFly)
{
return;
}
var indexName = ResolveIndexName(definition);
if (string.IsNullOrEmpty(indexName))
{
throw new InvalidOperationException("View name is required for persistent views.");
}
var existsResponse = await _connector.Indices.ExistsAsync(indexName, null, ct).ConfigureAwait(false);
if (existsResponse.Exists)
{
return;
}
// Create the destination index with dynamic mapping.
// The index will be populated by an external process (transform, reindex, or background job).
var createResponse = await _connector.Indices.CreateAsync(indexName, c => c, ct).ConfigureAwait(false);
if (!createResponse.IsValid || createResponse.OriginalException != null)
{
throw new InvalidOperationException(
$"Failed to create view index '{indexName}'. DebugInfo: {createResponse.DebugInformation}",
createResponse.OriginalException);
}
}
/// <inheritdoc />
public async Task DropAsync(string viewName, CancellationToken ct = default)
{
ct.ThrowIfCancellationRequested();
if (string.IsNullOrWhiteSpace(viewName))
{
throw new ArgumentException("View name cannot be null or empty.", nameof(viewName));
}
var indexName = viewName.ToLowerInvariant();
var response = await _connector.Indices.DeleteAsync(indexName, null, ct).ConfigureAwait(false);
if (!response.IsValid || response.OriginalException != null)
{
// Ignore 404 (index not found) — drop is idempotent
if (response.ServerError?.Status == 404)
{
return;
}
throw new InvalidOperationException(
$"Failed to drop view index '{indexName}'. DebugInfo: {response.DebugInformation}",
response.OriginalException);
}
}
/// <inheritdoc />
public async Task<bool> ExistsAsync(string viewName, CancellationToken ct = default)
{
ct.ThrowIfCancellationRequested();
if (string.IsNullOrWhiteSpace(viewName))
{
throw new ArgumentException("View name cannot be null or empty.", nameof(viewName));
}
var indexName = viewName.ToLowerInvariant();
var response = await _connector.Indices.ExistsAsync(indexName, null, ct).ConfigureAwait(false);
return response.Exists;
}
/// <inheritdoc />
/// <remarks>
/// Refreshes the ElasticSearch index, making all recently indexed documents available for search.
/// This is useful after populating a persistent view index via reindexing or transforms.
/// </remarks>
public async Task RefreshAsync(string viewName, CancellationToken ct = default)
{
ct.ThrowIfCancellationRequested();
if (string.IsNullOrWhiteSpace(viewName))
{
throw new ArgumentException("View name cannot be null or empty.", nameof(viewName));
}
var indexName = viewName.ToLowerInvariant();
var response = await _connector.Indices.RefreshAsync(indexName, null, ct).ConfigureAwait(false);
if (!response.IsValid || response.OriginalException != null)
{
throw new InvalidOperationException(
$"Failed to refresh view index '{indexName}'. DebugInfo: {response.DebugInformation}",
response.OriginalException);
}
}
private static string ResolveIndexName(ViewDefinition definition)
{
if (!string.IsNullOrEmpty(definition.Name))
{
return definition.Name!.ToLowerInvariant();
}
return definition.PrimarySource.Name.ToLowerInvariant();
}
}