Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System.Threading;
using System.Threading.Tasks;
using EfficientDynamoDb.Exceptions;
using EfficientDynamoDb.Internal.Extensions;
using EfficientDynamoDb.Internal.Operations.BatchWriteItem;
using EfficientDynamoDb.Operations.BatchWriteItem;
using EfficientDynamoDb.Operations.Query;
Expand All @@ -20,7 +21,7 @@ internal async Task BatchWriteItemAsync(BuilderNode node, CancellationToken canc
using var httpContent = new BatchWriteItemHighLevelHttpContent(this, node, Config.TableNamePrefix);

using var response = await Api.SendAsync(httpContent, cancellationToken).ConfigureAwait(false);
var documentResult = await DynamoDbLowLevelContext.ReadDocumentAsync(response, BatchWriteItemParsingOptions.Instance, cancellationToken).ConfigureAwait(false);
var documentResult = await response.ReadDocumentAsync(BatchWriteItemParsingOptions.Instance, cancellationToken).ConfigureAwait(false);

var attempt = 0;
while (documentResult != null)
Expand All @@ -36,7 +37,7 @@ internal async Task BatchWriteItemAsync(BuilderNode node, CancellationToken canc
using var unprocessedHttpContent = new BatchWriteItemHttpContent(new BatchWriteItemRequest{RequestItems = unprocessedItems}, null);

using var unprocessedResponse = await Api.SendAsync(unprocessedHttpContent, cancellationToken).ConfigureAwait(false);
documentResult = await DynamoDbLowLevelContext.ReadDocumentAsync(unprocessedResponse, BatchWriteItemParsingOptions.Instance, cancellationToken).ConfigureAwait(false);
documentResult = await unprocessedResponse.ReadDocumentAsync(BatchWriteItemParsingOptions.Instance, cancellationToken).ConfigureAwait(false);
}
}
}
Expand Down
3 changes: 1 addition & 2 deletions src/EfficientDynamoDb/DynamoDbContext/DynamoDbContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
using EfficientDynamoDb.Internal.Converters.Json;
using EfficientDynamoDb.Internal.Extensions;
using EfficientDynamoDb.Internal.Reader;
using static EfficientDynamoDb.DynamoDbLowLevelContext;

namespace EfficientDynamoDb
{
Expand Down Expand Up @@ -46,7 +45,7 @@ private async ValueTask<TResult> ReadAsync<TResult>(HttpResponseMessage response
{
await using var responseStream = await response.Content.ReadAsStreamAsync().ConfigureAwait(false);

var expectedCrc = GetExpectedCrc(response);
var expectedCrc = response.GetExpectedCrc();
var classInfo = Config.Metadata.GetOrAddClassInfo(typeof(TResult), typeof(JsonObjectDdbConverter<TResult>));
var result = await EntityDdbJsonReader.ReadAsync<TResult>(responseStream, classInfo, Config.Metadata, expectedCrc.HasValue, cancellationToken: cancellationToken).ConfigureAwait(false);

Expand Down
46 changes: 10 additions & 36 deletions src/EfficientDynamoDb/DynamoDbContext/DynamoDbLowLevelContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
using System.Threading;
using System.Threading.Tasks;
using EfficientDynamoDb.DocumentModel;
using EfficientDynamoDb.Exceptions;
using EfficientDynamoDb.Internal;
using EfficientDynamoDb.Internal.Extensions;
using EfficientDynamoDb.Internal.Operations.BatchGetItem;
Expand All @@ -18,7 +17,6 @@
using EfficientDynamoDb.Internal.Operations.TransactGetItems;
using EfficientDynamoDb.Internal.Operations.TransactWriteItems;
using EfficientDynamoDb.Internal.Operations.UpdateItem;
using EfficientDynamoDb.Internal.Reader;
using EfficientDynamoDb.Operations.BatchGetItem;
using EfficientDynamoDb.Operations.BatchWriteItem;
using EfficientDynamoDb.Operations.DeleteItem;
Expand Down Expand Up @@ -57,7 +55,7 @@ public async Task<BatchGetItemResponse> BatchGetItemAsync(BatchGetItemRequest re
using var httpContent = new BatchGetItemHttpContent(request, Config.TableNamePrefix);

using var response = await Api.SendAsync(httpContent, cancellationToken).ConfigureAwait(false);
var result = await ReadDocumentAsync(response, BatchGetItemParsingOptions.Instance, cancellationToken).ConfigureAwait(false);
var result = await response.ReadDocumentAsync(BatchGetItemParsingOptions.Instance, cancellationToken).ConfigureAwait(false);

return BatchGetItemResponseParser.Parse(result!);
}
Expand All @@ -67,7 +65,7 @@ public async Task<BatchWriteItemResponse> BatchWriteItemAsync(BatchWriteItemRequ
using var httpContent = new BatchWriteItemHttpContent(request, Config.TableNamePrefix);

using var response = await Api.SendAsync(httpContent, cancellationToken).ConfigureAwait(false);
var result = await ReadDocumentAsync(response, BatchWriteItemParsingOptions.Instance, cancellationToken).ConfigureAwait(false);
var result = await response.ReadDocumentAsync(BatchWriteItemParsingOptions.Instance, cancellationToken).ConfigureAwait(false);

return BatchWriteItemResponseParser.Parse(result!);
}
Expand All @@ -77,7 +75,7 @@ public async Task<QueryResponse> QueryAsync(QueryRequest request, CancellationTo
using var httpContent = new QueryHttpContent(request, Config.TableNamePrefix);

using var response = await Api.SendAsync(httpContent, cancellationToken).ConfigureAwait(false);
var result = await ReadDocumentAsync(response, QueryParsingOptions.Instance, cancellationToken).ConfigureAwait(false);
var result = await response.ReadDocumentAsync(QueryParsingOptions.Instance, cancellationToken).ConfigureAwait(false);

return QueryResponseParser.Parse(result!);
}
Expand All @@ -87,7 +85,7 @@ public async Task<ScanResponse> ScanAsync(ScanRequest request, CancellationToken
using var httpContent = new ScanHttpContent(request, Config.TableNamePrefix);

using var response = await Api.SendAsync(httpContent, cancellationToken).ConfigureAwait(false);
var result = await ReadDocumentAsync(response, QueryParsingOptions.Instance, cancellationToken).ConfigureAwait(false);
var result = await response.ReadDocumentAsync(QueryParsingOptions.Instance, cancellationToken).ConfigureAwait(false);

return ScanResponseParser.Parse(result!);
}
Expand All @@ -97,7 +95,7 @@ public async Task<TransactGetItemsResponse> TransactGetItemsAsync(TransactGetIte
using var httpContent = new TransactGetItemsHttpContent(request, Config.TableNamePrefix);

using var response = await Api.SendAsync(httpContent, cancellationToken).ConfigureAwait(false);
var result = await ReadDocumentAsync(response, TransactGetItemsParsingOptions.Instance, cancellationToken).ConfigureAwait(false);
var result = await response.ReadDocumentAsync(TransactGetItemsParsingOptions.Instance, cancellationToken).ConfigureAwait(false);

return TransactGetItemsResponseParser.Parse(result!);
}
Expand All @@ -107,7 +105,7 @@ public async Task<PutItemResponse> PutItemAsync(PutItemRequest request, Cancella
using var httpContent = new PutItemHttpContent(request, Config.TableNamePrefix);

using var response = await Api.SendAsync(httpContent, cancellationToken).ConfigureAwait(false);
var result = await ReadDocumentAsync(response, PutItemParsingOptions.Instance, cancellationToken).ConfigureAwait(false);
var result = await response.ReadDocumentAsync(PutItemParsingOptions.Instance, cancellationToken).ConfigureAwait(false);

return PutItemResponseParser.Parse(result);
}
Expand All @@ -117,7 +115,7 @@ public async Task<UpdateItemResponse> UpdateItemAsync(UpdateItemRequest request,
using var httpContent = await BuildHttpContentAsync(request).ConfigureAwait(false);

using var response = await Api.SendAsync(httpContent, cancellationToken).ConfigureAwait(false);
var result = await ReadDocumentAsync(response, UpdateItemParsingOptions.Instance, cancellationToken).ConfigureAwait(false);
var result = await response.ReadDocumentAsync(UpdateItemParsingOptions.Instance, cancellationToken).ConfigureAwait(false);

return UpdateItemResponseParser.Parse(result);
}
Expand All @@ -131,7 +129,7 @@ public async Task<DeleteItemResponse> DeleteItemAsync(DeleteItemRequest request,
using var httpContent = new DeleteItemHttpContent(request, pkName, skName, Config.TableNamePrefix);

using var response = await Api.SendAsync(httpContent, cancellationToken).ConfigureAwait(false);
var result = await ReadDocumentAsync(response, PutItemParsingOptions.Instance, cancellationToken).ConfigureAwait(false);
var result = await response.ReadDocumentAsync(PutItemParsingOptions.Instance, cancellationToken).ConfigureAwait(false);

return DeleteItemResponseParser.Parse(result);
}
Expand All @@ -141,7 +139,7 @@ public async Task<TransactWriteItemsResponse> TransactWriteItemsAsync(TransactWr
using var httpContent = new TransactWriteItemsHttpContent(request, Config.TableNamePrefix);

using var response = await Api.SendAsync(httpContent, cancellationToken).ConfigureAwait(false);
var result = await ReadDocumentAsync(response, TransactWriteItemsParsingOptions.Instance, cancellationToken).ConfigureAwait(false);
var result = await response.ReadDocumentAsync(TransactWriteItemsParsingOptions.Instance, cancellationToken).ConfigureAwait(false);

return TransactWriteItemsResponseParser.Parse(result);
}
Expand All @@ -153,7 +151,7 @@ public async Task<TransactWriteItemsResponse> TransactWriteItemsAsync(TransactWr
private async ValueTask<GetItemResponse> GetItemInternalAsync(HttpContent httpContent, CancellationToken cancellationToken = default)
{
using var response = await Api.SendAsync(httpContent, cancellationToken).ConfigureAwait(false);
var result = await ReadDocumentAsync(response, GetItemParsingOptions.Instance, cancellationToken).ConfigureAwait(false);
var result = await response.ReadDocumentAsync(GetItemParsingOptions.Instance, cancellationToken).ConfigureAwait(false);

// TODO: Consider removing root dictionary
return GetItemResponseParser.Parse(result!);
Expand Down Expand Up @@ -197,29 +195,5 @@ private async ValueTask<HttpContent> BuildHttpContentAsync(UpdateItemRequest req
keySchema.FirstOrDefault(x => x.KeyType == KeyType.Range)?.AttributeName);
}
}

internal static async ValueTask<Document?> ReadDocumentAsync(HttpResponseMessage response, IParsingOptions options, CancellationToken cancellationToken = default)
{
await using var responseStream = await response.Content.ReadAsStreamAsync().ConfigureAwait(false);

var expectedCrc = GetExpectedCrc(response);
var result = await DdbJsonReader.ReadAsync(responseStream, options, expectedCrc.HasValue, cancellationToken).ConfigureAwait(false);

if (expectedCrc.HasValue && expectedCrc.Value != result.Crc)
throw new ChecksumMismatchException();

return result.Value;
}

internal static uint? GetExpectedCrc(HttpResponseMessage response)
{
if (!response.Content.Headers.ContentLength.HasValue)
return null;

if (response.Headers.TryGetValues("x-amz-crc32", out var crcHeaderValues) && uint.TryParse(crcHeaderValues.FirstOrDefault(), out var expectedCrc))
return expectedCrc;

return null;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
using EfficientDynamoDb.Internal;
using EfficientDynamoDb.Internal.Constants;

namespace EfficientDynamoDb
{
public interface IDynamoDbStreamsContext
{
IDynamoDbStreamsLowLevelContext LowLevel { get; }
}

public class DynamoDbStreamsContext : IDynamoDbStreamsContext
{
private readonly DynamoDbStreamsLowLevelContext _lowLevel;

public DynamoDbStreamsContext(DynamoDbContextConfig config)
{
_lowLevel = new DynamoDbStreamsLowLevelContext(config, new HttpApi(config, ServiceNames.DynamoDbStreams));
}

public IDynamoDbStreamsLowLevelContext LowLevel => _lowLevel;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
using System.Threading;
using System.Threading.Tasks;
using EfficientDynamoDb.Internal;
using EfficientDynamoDb.Internal.Extensions;
using EfficientDynamoDb.Internal.Operations.Streams;
using EfficientDynamoDb.Operations;

namespace EfficientDynamoDb
{
public class DynamoDbStreamsLowLevelContext : IDynamoDbStreamsLowLevelContext
{
internal DynamoDbContextConfig Config { get; }

internal HttpApi Api { get; }

internal DynamoDbStreamsLowLevelContext(DynamoDbContextConfig config, HttpApi api)
{
Config = config;
Api = api;
}

public async Task<ListStreamsResponse> ListStreamsAsync(ListStreamsRequest request, CancellationToken cancellationToken = default)
{
using var httpContent = new ListStreamsHttpContent(request, Config.TableNamePrefix);

var response = await Api.SendAsync<ListStreamsResponse>(httpContent, cancellationToken).ConfigureAwait(false);
return response;
}

public async Task<GetShardIteratorResponse> GetShardIteratorAsync(GetShardIteratorRequest request, CancellationToken cancellationToken = default)
{
using var httpContext = new GetShardIteratorHttpContent(request);

var response = await Api.SendAsync<GetShardIteratorResponse>(httpContext, cancellationToken).ConfigureAwait(false);
return response;
}

public async Task<DescribeStreamResponse> DescribeStreamAsync(DescribeStreamRequest request, CancellationToken cancellationToken = default)
{
using var httpContext = new DescribeStreamHttpContent(request);

var response = await Api.SendAsync<DescribeStreamResponse>(httpContext, cancellationToken).ConfigureAwait(false);
return response;
}

public async Task<GetRecordsResponse> GetRecordsAsync(GetRecordsRequest request, CancellationToken cancellationToken = default)
{
using var httpContext = new GetRecordsHttpContent(request);

var response = await Api.SendAsync(httpContext, cancellationToken).ConfigureAwait(false);
var result = await response.ReadDocumentAsync(GetRecordsParsingOptions.Instance, cancellationToken).ConfigureAwait(false);

return GetRecordsResponseParser.Parse(result!);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
using System.Threading;
using System.Threading.Tasks;
using EfficientDynamoDb.Operations;

namespace EfficientDynamoDb
{
public interface IDynamoDbStreamsLowLevelContext
{
Task<ListStreamsResponse> ListStreamsAsync(ListStreamsRequest request, CancellationToken cancellationToken = default);

Task<GetShardIteratorResponse> GetShardIteratorAsync(GetShardIteratorRequest request, CancellationToken cancellationToken = default);

Task<DescribeStreamResponse> DescribeStreamAsync(DescribeStreamRequest request, CancellationToken cancellationToken = default);

Task<GetRecordsResponse> GetRecordsAsync(GetRecordsRequest request, CancellationToken cancellationToken = default);
}
}
17 changes: 16 additions & 1 deletion src/EfficientDynamoDb/EfficientDynamoDb.csproj.DotSettings
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,23 @@
<s:Boolean x:Key="/Default/CodeInspection/NamespaceProvider/NamespaceFoldersToSkip/=context_005Cdynamodbcontext/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/CodeInspection/NamespaceProvider/NamespaceFoldersToSkip/=documentmodel_005Cattributevalues/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/CodeInspection/NamespaceProvider/NamespaceFoldersToSkip/=dynamodbcontext/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/CodeInspection/NamespaceProvider/NamespaceFoldersToSkip/=dynamodbstreamcontext/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/CodeInspection/NamespaceProvider/NamespaceFoldersToSkip/=dynamodbstreamscontext/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/CodeInspection/NamespaceProvider/NamespaceFoldersToSkip/=internal_005Coperations_005Cdescribestream/@EntryIndexedValue">False</s:Boolean>
<s:Boolean x:Key="/Default/CodeInspection/NamespaceProvider/NamespaceFoldersToSkip/=internal_005Coperations_005Cstreams_005Cdescribestream/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/CodeInspection/NamespaceProvider/NamespaceFoldersToSkip/=internal_005Coperations_005Cstreams_005Cgetrecords/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/CodeInspection/NamespaceProvider/NamespaceFoldersToSkip/=internal_005Coperations_005Cstreams_005Cgetsharditerator/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/CodeInspection/NamespaceProvider/NamespaceFoldersToSkip/=internal_005Coperations_005Cstreams_005Cliststreams/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/CodeInspection/NamespaceProvider/NamespaceFoldersToSkip/=internal_005Creader_005Cddbjsonreader/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/CodeInspection/NamespaceProvider/NamespaceFoldersToSkip/=internal_005Creader_005Centityddbjsonreader/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/CodeInspection/NamespaceProvider/NamespaceFoldersToSkip/=internal_005Csigning_005Cmodels/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/CodeInspection/NamespaceProvider/NamespaceFoldersToSkip/=json_005Cinternal/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/CodeInspection/NamespaceProvider/NamespaceFoldersToSkip/=operations_005Cshared_005Cenums/@EntryIndexedValue">True</s:Boolean></wpf:ResourceDictionary>
<s:Boolean x:Key="/Default/CodeInspection/NamespaceProvider/NamespaceFoldersToSkip/=operations_005Cdescribestream/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/CodeInspection/NamespaceProvider/NamespaceFoldersToSkip/=operations_005Cgetrecords/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/CodeInspection/NamespaceProvider/NamespaceFoldersToSkip/=operations_005Cgetsharditerator/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/CodeInspection/NamespaceProvider/NamespaceFoldersToSkip/=operations_005Cliststreams/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/CodeInspection/NamespaceProvider/NamespaceFoldersToSkip/=operations_005Cshared_005Cenums/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/CodeInspection/NamespaceProvider/NamespaceFoldersToSkip/=operations_005Cstreams_005Cdescribestream/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/CodeInspection/NamespaceProvider/NamespaceFoldersToSkip/=operations_005Cstreams_005Cgetrecords/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/CodeInspection/NamespaceProvider/NamespaceFoldersToSkip/=operations_005Cstreams_005Cgetsharditerator/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/CodeInspection/NamespaceProvider/NamespaceFoldersToSkip/=operations_005Cstreams_005Cliststreams/@EntryIndexedValue">True</s:Boolean></wpf:ResourceDictionary>
Loading