Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions ShardEqualizer/BsonSplitter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@ public static IList<BsonValue> Split(BsonValue min, BsonValue max, int zonesCoun
if (IsUuidLegacy(min) && IsUuidLegacy(max))
{
return Split(min.AsByteArray, max.AsByteArray, zonesCount)
.Select(_ => (BsonValue)new BsonBinaryData(_, BsonBinarySubType.UuidLegacy, GuidRepresentation.CSharpLegacy)).ToList();
.Select(_ => (BsonValue)new BsonBinaryData(_, BsonBinarySubType.UuidLegacy)).ToList();
}

if (IsUuidStandard(min) && IsUuidStandard(max))
{
return Split(min.AsByteArray, max.AsByteArray, zonesCount)
.Select(_ => (BsonValue)new BsonBinaryData(_, BsonBinarySubType.UuidStandard, GuidRepresentation.Standard)).ToList();
.Select(_ => (BsonValue)new BsonBinaryData(_, BsonBinarySubType.UuidStandard)).ToList();
}

if (min.IsObjectId && max.IsObjectId)
Expand Down
7 changes: 4 additions & 3 deletions ShardEqualizer/ConfigRepositories/ChunkRepository.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -20,9 +21,9 @@ public Task<Chunk> Find(string id)
return _coll.Find(_ => _.Id == id).SingleOrDefaultAsync();
}

public Filtered ByNamespace(CollectionNamespace ns)
public Filtered ByUuid(Guid namespaceId)
{
return new Filtered(_coll, Builders<Chunk>.Filter.Eq(_ => _.Namespace, ns));
return new Filtered(_coll, Builders<Chunk>.Filter.Eq(_ => _.Uuid, namespaceId));
}

public class Filtered
Expand All @@ -41,7 +42,7 @@ public async Task<IAsyncCursor<Chunk>> Find(CancellationToken token)
return await _coll.FindAsync(_filter, new FindOptions<Chunk>()
{
Sort = Builders<Chunk>.Sort
.Ascending(_ => _.Namespace)
.Ascending(_ => _.Uuid)
.Ascending(_ => _.Min)
}, token);
}
Expand Down
15 changes: 10 additions & 5 deletions ShardEqualizer/ConfigServices/ChunkService.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
Expand All @@ -17,20 +18,22 @@ public class ChunkService
private readonly ChunkRepository _repo;
private readonly ProgressRenderer _progressRenderer;
private readonly INsLocalStore<Container> _store;
private readonly ShardedCollectionService _shardedCollectionService;

public ChunkService(
ChunkRepository repo,
ProgressRenderer progressRenderer,
LocalStoreProvider storeProvider)
LocalStoreProvider storeProvider, ShardedCollectionService shardedCollectionService)
{
_repo = repo;
_progressRenderer = progressRenderer;
_shardedCollectionService = shardedCollectionService;
_store = storeProvider.Get("chunks", uploadChunks);
}

public async Task<IReadOnlyDictionary<CollectionNamespace, ChunksCache>> Get(IEnumerable<CollectionNamespace> nss, CancellationToken token)
public async Task<IReadOnlyDictionary<CollectionNamespace, ChunksCache>> Get(IEnumerable<CollectionNamespace> ns, CancellationToken token)
{
var nsList = nss.ToList();
var nsList = ns.ToList();

await using var reporter = _progressRenderer.Start($"Load chunks", nsList.Count);
{
Expand All @@ -49,9 +52,11 @@ public async Task<IReadOnlyDictionary<CollectionNamespace, ChunksCache>> Get(IEn

private async Task<Container> uploadChunks(CollectionNamespace ns, CancellationToken t)
{
var expectedCount = await _repo.ByNamespace(ns).Count(t);
var collection = await _shardedCollectionService.Get(ns, t);

var expectedCount = await _repo.ByUuid(collection.Uuid).Count(t);
var chunks = new List<ChunkInfo>((int)expectedCount);
using var cursor = await _repo.ByNamespace(ns).Find(t);
using var cursor = await _repo.ByUuid(collection.Uuid).Find(t);
while (await cursor.MoveNextAsync(t))
chunks.AddRange(cursor.Current.Select(_ => new ChunkInfo(_)));

Expand Down
10 changes: 9 additions & 1 deletion ShardEqualizer/ConfigServices/ShardedCollectionService.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
Expand Down Expand Up @@ -34,12 +35,19 @@ public async Task<IReadOnlyDictionary<CollectionNamespace, ShardedCollectionInfo
return container.ShardedCollection;
}

public async Task<ShardedCollectionInfo> Get(CollectionNamespace ns, CancellationToken token)
{
var container = await _store.Get(token);
return container.ShardedCollection.GetValueOrDefault(ns)
?? throw new InvalidOperationException($"Cannot find collection by namespace '{ns}'");
}

private async Task<Container> uploadData(CancellationToken token)
{
await using var reporter = _progressRenderer.Start("Load sharded collections");
var result = await _repo.FindAll(false, token);

var message = $"found {result.Count(x => !x.Dropped)} collections";
var message = $"found {result.Count} collections";
reporter.SetCompleteMessage(message);

return new Container()
Expand Down
9 changes: 6 additions & 3 deletions ShardEqualizer/JsonSerialization/ShellJsonWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,12 @@ public override void WriteBinaryData(BsonBinaryData binaryData)
}
else
{
guidRepresentation = subType == BsonBinarySubType.UuidStandard
? GuidRepresentation.Standard
: GuidRepresentation.Unspecified;
guidRepresentation = subType switch
{
BsonBinarySubType.UuidLegacy => GuidRepresentation.CSharpLegacy,
BsonBinarySubType.UuidStandard => GuidRepresentation.Standard,
_ => GuidRepresentation.Unspecified
};
}
#pragma warning restore 618

Expand Down
14 changes: 9 additions & 5 deletions ShardEqualizer/LocalStoring/BaseLocalStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ public abstract class BaseLocalStore<T>
{
private readonly bool _read;
private readonly bool _write;
private readonly object _lock = new object() ;

protected BaseLocalStore(bool read, bool write)
{
Expand All @@ -30,11 +31,14 @@ protected async Task<T> Get(string fileName, Func<CancellationToken, Task<T>> up

if (_write)
{
await using var stream = File.Open(fileName, FileMode.Create);
using var bsonWriter = new BsonBinaryWriter(stream);
BsonSerializer.Serialize(bsonWriter, data);
bsonWriter.Flush();
await stream.FlushAsync(token);
lock (_lock)
{
using var stream = File.Open(fileName, FileMode.Create);
using var bsonWriter = new BsonBinaryWriter(stream);
BsonSerializer.Serialize(bsonWriter, data);
bsonWriter.Flush();
stream.Flush(true);
}
}

return data;
Expand Down
26 changes: 4 additions & 22 deletions ShardEqualizer/Models/Chunk.cs
Original file line number Diff line number Diff line change
@@ -1,23 +1,17 @@
using System.Collections.Generic;
using System;
using MongoDB.Bson;
using MongoDB.Bson.Serialization.Attributes;
using MongoDB.Driver;

namespace ShardEqualizer.Models
{
[BsonIgnoreExtraElements]
public class Chunk
{
[BsonId]
public BsonValue Id { get; set; }

[BsonElement("lastmodEpoch"), BsonRequired]
public ObjectId LastmodEpoch { get; set; }

[BsonElement("lastmod"), BsonRequired]
public BsonTimestamp Lastmod { get; set; }

[BsonElement("ns"), BsonRequired]
public CollectionNamespace Namespace { get; set; }
[BsonElement("uuid"), BsonRequired, BsonGuidRepresentation(GuidRepresentation.Standard)]
public Guid Uuid { get; set; }

[BsonElement("min"), BsonRequired]
public BsonBound Min { get; set; }
Expand All @@ -30,17 +24,5 @@ public class Chunk

[BsonElement("jumbo"), BsonIgnoreIfDefault]
public bool Jumbo { get; set; }

[BsonElement("history"), BsonIgnoreIfNull] //BsonIgnoreIfNull - required for backward compatibility with MongoDB v3.6
public IReadOnlyList<HistoryEntry> History { get; set; }

public class HistoryEntry
{
[BsonElement("shard"), BsonRequired]
public ShardIdentity Shard { get; set; }

[BsonElement("validAfter"), BsonRequired]
public BsonTimestamp ValidAfter { get; set; }
}
}
}
8 changes: 3 additions & 5 deletions ShardEqualizer/Models/ShardedCollectionInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

namespace ShardEqualizer.Models
{
[BsonIgnoreExtraElements]
public class ShardedCollectionInfo
{
[BsonId]
Expand All @@ -16,9 +17,6 @@ public class ShardedCollectionInfo
[BsonElement("lastmod"), BsonRequired]
public DateTime Lastmod { get; private set; }

[BsonElement("dropped"), BsonRequired]
public bool Dropped { get; private set; }

[BsonElement("key"), BsonIgnoreIfNull]
public BsonDocument Key { get; private set; }

Expand All @@ -28,8 +26,8 @@ public class ShardedCollectionInfo
[BsonElement("noBalance"), BsonIgnoreIfNull]
public bool? NoBalance { get; private set; }

[BsonElement("uuid"), BsonIgnoreIfNull]
public Guid? UUID { get; private set; }
[BsonElement("uuid"), BsonRequired, BsonGuidRepresentation(GuidRepresentation.Standard)]
public Guid Uuid { get; private set; }

[BsonElement("distributionMode"), BsonIgnoreIfNull]
private string DistributionMode { get; set; }
Expand Down
3 changes: 3 additions & 0 deletions ShardEqualizer/MongoClientBuilder.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Linq;
using MongoDB.Bson;
using MongoDB.Driver;
using NLog;
using ShardEqualizer.Config;
Expand All @@ -20,6 +21,8 @@ public MongoClientBuilder(ConnectionConfig connectionConfig, ProgressRenderer pr

public IMongoClient Build()
{
BsonDefaults.GuidRepresentationMode = GuidRepresentationMode.V3;

_progressRenderer.WriteLine($"Connecting to {_connectionConfig.Servers}");
_log.Info("Connecting to {0}", _connectionConfig.Servers);

Expand Down
4 changes: 1 addition & 3 deletions ShardEqualizer/MongoCommands/CollStats.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

namespace ShardEqualizer.MongoCommands
{
[BsonIgnoreExtraElements]
public class CollStats: CommandResult
{
[BsonElement("ns"), BsonRequired]
Expand Down Expand Up @@ -73,8 +74,5 @@ public class CollStats: CommandResult

[BsonElement("scaleFactor")]
public double? ScaleFactor;

//[BsonExtraElements]
//public BsonDocument ExtraElements;
}
}
4 changes: 1 addition & 3 deletions ShardEqualizer/MongoCommands/CollStatsResult.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

namespace ShardEqualizer.MongoCommands
{
[BsonIgnoreExtraElements]
public class CollStatsResult: CommandResult
{
[BsonElement("primary"), BsonIgnoreIfNull]
Expand Down Expand Up @@ -77,8 +78,5 @@ public class CollStatsResult: CommandResult

[BsonElement("totalSize")]
public long? TotalSize;

//[BsonExtraElements]
//public BsonDocument ExtraElements;
}
}
6 changes: 5 additions & 1 deletion ShardEqualizer/Operations/BalancerStateOperation.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,22 @@ public class BalancerStateOperation: IOperation
{
private readonly ShardListService _shardListService;
private readonly TagRangeService _tagRangeService;
private readonly ShardedCollectionService _shardedCollectionService;
private readonly ChunkRepository _chunkRepo;
private readonly IReadOnlyList<Interval> _intervals;
private readonly ProgressRenderer _progressRenderer;

public BalancerStateOperation(
ShardListService shardListService,
TagRangeService tagRangeService,
ShardedCollectionService shardedCollectionService,
ChunkRepository chunkRepo,
IReadOnlyList<Interval> intervals,
ProgressRenderer progressRenderer)
{
_shardListService = shardListService;
_tagRangeService = tagRangeService;
_shardedCollectionService = shardedCollectionService;
_chunkRepo = chunkRepo;
_intervals = intervals;
_progressRenderer = progressRenderer;
Expand All @@ -51,7 +54,8 @@ private async Task<IList<UnMovedChunk>> scanInterval(Interval interval, IReadOnl

//TODO here supports multiple shards to scan all collections in the future

var unMovedChunks = await (await _chunkRepo.ByNamespace(interval.Namespace)
var collectionInfo = await _shardedCollectionService.Get(interval.Namespace, token);
var unMovedChunks = await (await _chunkRepo.ByUuid(collectionInfo.Uuid)
.From(tagRange.Min).To(tagRange.Max).NoJumbo().ExcludeShards(validShards).Find(token))
.ToListAsync(token);

Expand Down
2 changes: 1 addition & 1 deletion ShardEqualizer/Operations/ConfigInitOperation.cs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public async Task Run(CancellationToken token)
{
Servers = _connectionConfig.Servers,
DefaultZones = string.Join(",", zones.Select(_ => _.zoneName)),
ShardedCollections = shardedCollections.Values.Where(_ => !_.Dropped).Select(_ => _.Id.ToString()).ToList(), //TODO exclude hashed keys
ShardedCollections = shardedCollections.Values.Select(_ => _.Id.ToString()).ToList(), //TODO exclude hashed keys
SecretFileName = secretFileName
};

Expand Down
11 changes: 1 addition & 10 deletions ShardEqualizer/Operations/ConfigUpdateOperation.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,24 +40,15 @@ private void analyseIntervals()
{
foreach (var ns in _intervals.Select(_ => _.Namespace))
{
if (_shardedCollections.TryGetValue(ns, out var shardedCollection))
if (_shardedCollections.ContainsKey(ns))
{
if(shardedCollection.Dropped)
Console.WriteLine("\tcollection '{0}' dropped", ns);

_shardedCollections.Remove(ns);
}
else
{
Console.WriteLine("\tcollection '{0}' not sharded", ns);
}
}

foreach (var ns in _shardedCollections.Keys.ToList())
{
if(_shardedCollections[ns].Dropped)
_shardedCollections.Remove(ns);
}
}

public async Task Run(CancellationToken token)
Expand Down
13 changes: 12 additions & 1 deletion ShardEqualizer/Operations/MergeChunksOperation.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,22 @@ public class MergeChunksOperation : IOperation
private readonly ProgressRenderer _progressRenderer;
private readonly ShardListService _shardListService;
private readonly TagRangeService _tagRangeService;
private readonly ShardedCollectionService _shardedCollectionService;
private readonly ChunkRepository _chunkRepo;
private readonly CommandPlanWriter _commandPlanWriter;

public MergeChunksOperation(
ShardListService shardListService,
TagRangeService tagRangeService,
ShardedCollectionService shardedCollectionService,
ChunkRepository chunkRepo, //UNDONE use ChunkService
IReadOnlyList<Interval> intervals,
ProgressRenderer progressRenderer,
CommandPlanWriter commandPlanWriter)
{
_shardListService = shardListService;
_tagRangeService = tagRangeService;
_shardedCollectionService = shardedCollectionService;
_chunkRepo = chunkRepo;
_commandPlanWriter = commandPlanWriter;

Expand All @@ -46,9 +49,17 @@ private async Task<Tuple<List<MergeCommand>, int>> mergeInterval(IDictionary<Tag
{
var mergeCommands = new List<MergeCommand>();
var mergedChunks = 0;

if (!shardByTag.TryGetValue(zone.TagRange.Tag, out var shard))
{
_progressRenderer.WriteLine($"Shard for tag '{zone.TagRange.Tag}' not found");
return new Tuple<List<MergeCommand>, int>(new List<MergeCommand>(), 0);
}

var validShardId = shardByTag[zone.TagRange.Tag].Id;

var mergeCandidates = await (await _chunkRepo.ByNamespace(zone.Interval.Namespace)
var collectionInfo = await _shardedCollectionService.Get(zone.Interval.Namespace, token);
var mergeCandidates = await (await _chunkRepo.ByUuid(collectionInfo.Uuid)
.From(zone.TagRange.Min).To(zone.TagRange.Max).NoJumbo().ByShards(new [] { validShardId }).Find(token))
.ToListAsync(token);

Expand Down
2 changes: 1 addition & 1 deletion ShardEqualizer/Operations/PresplitDataOperation.cs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ private void presplitInterval(Interval interval, TagRangeCommandBuffer buffer)
{
var collInfo = _shardedCollectionByNs[interval.Namespace];

if (collInfo == null || collInfo.Dropped)
if (collInfo == null)
throw new InvalidOperationException($"collection {interval.Namespace.FullName} not sharded");

if (!interval.Adjustable)
Expand Down
Loading