Skip to content
Open
24 changes: 24 additions & 0 deletions src/SingleStoreConnector/ColumnReaders/ColumnReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,30 @@ internal abstract class ColumnReader
{
public static ColumnReader Create(bool isBinary, ColumnDefinitionPayload columnDefinition, SingleStoreConnection connection)
{
switch (columnDefinition.ExtendedTypeCode)
{
case SingleStoreExtendedTypeCode.Bson:
return BytesColumnReader.Instance;

case SingleStoreExtendedTypeCode.Vector:
return columnDefinition.VectorElementType switch
{
SingleStoreVectorElementType.F32 => VectorFloat32ColumnReader.Instance,
SingleStoreVectorElementType.F64 => VectorFloat64ColumnReader.Instance,
SingleStoreVectorElementType.I8 => VectorInt8ColumnReader.Instance,
SingleStoreVectorElementType.I16 => VectorInt16ColumnReader.Instance,
SingleStoreVectorElementType.I32 => VectorInt32ColumnReader.Instance,
SingleStoreVectorElementType.I64 => VectorInt64ColumnReader.Instance,
null => throw new FormatException("VECTOR column is missing VectorElementType metadata."),
_ => throw new NotSupportedException(
$"Unsupported VECTOR element type: {columnDefinition.VectorElementType}."),
};

case SingleStoreExtendedTypeCode.None:
default:
break;
}

var isUnsigned = (columnDefinition.ColumnFlags & ColumnFlags.Unsigned) != 0;
switch (columnDefinition.ColumnType)
{
Expand Down
153 changes: 153 additions & 0 deletions src/SingleStoreConnector/ColumnReaders/VectorColumnReader.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
using System.Buffers.Binary;
using System.Runtime.InteropServices;
using SingleStoreConnector.Protocol.Payloads;

namespace SingleStoreConnector.ColumnReaders;

internal abstract class VectorColumnReaderBase : ColumnReader
{
protected static void ValidateLength(ColumnDefinitionPayload columnDefinition, int dataLength, int elementSize, string elementTypeName)
{
if (dataLength % elementSize != 0)
{
throw new FormatException(
$"Expected VECTOR({elementTypeName}) payload length to be a multiple of {elementSize}, but got {dataLength}.");
}

if (columnDefinition.VectorDimensions is { } dimensions)
{
var expectedLength = checked((ulong) dimensions * (ulong) elementSize);
if ((ulong) dataLength != expectedLength)
{
throw new FormatException(
$"Expected VECTOR({dimensions}, {elementTypeName}) payload length to be {expectedLength} bytes, but got {dataLength}.");
}
}
}
}

internal sealed class VectorInt8ColumnReader : VectorColumnReaderBase
{
public static VectorInt8ColumnReader Instance { get; } = new();

public override object ReadValue(ReadOnlySpan<byte> data, ColumnDefinitionPayload columnDefinition)
{
ValidateLength(columnDefinition, data.Length, sizeof(sbyte), "I8");
return new ReadOnlyMemory<sbyte>(MemoryMarshal.Cast<byte, sbyte>(data).ToArray());
}
}

internal sealed class VectorInt16ColumnReader : VectorColumnReaderBase
{
public static VectorInt16ColumnReader Instance { get; } = new();

public override object ReadValue(ReadOnlySpan<byte> data, ColumnDefinitionPayload columnDefinition)
{
ValidateLength(columnDefinition, data.Length, sizeof(short), "I16");

if (BitConverter.IsLittleEndian)
return new ReadOnlyMemory<short>(MemoryMarshal.Cast<byte, short>(data).ToArray());

var values = new short[data.Length / sizeof(short)];
for (var i = 0; i < values.Length; i++)
values[i] = BinaryPrimitives.ReadInt16LittleEndian(data.Slice(i * sizeof(short), sizeof(short)));

return new ReadOnlyMemory<short>(values);
}
}

internal sealed class VectorInt32ColumnReader : VectorColumnReaderBase
{
public static VectorInt32ColumnReader Instance { get; } = new();

public override object ReadValue(ReadOnlySpan<byte> data, ColumnDefinitionPayload columnDefinition)
{
ValidateLength(columnDefinition, data.Length, sizeof(int), "I32");

if (BitConverter.IsLittleEndian)
return new ReadOnlyMemory<int>(MemoryMarshal.Cast<byte, int>(data).ToArray());

var values = new int[data.Length / sizeof(int)];
for (var i = 0; i < values.Length; i++)
values[i] = BinaryPrimitives.ReadInt32LittleEndian(data.Slice(i * sizeof(int), sizeof(int)));

return new ReadOnlyMemory<int>(values);
}
}

internal sealed class VectorInt64ColumnReader : VectorColumnReaderBase
{
public static VectorInt64ColumnReader Instance { get; } = new();

public override object ReadValue(ReadOnlySpan<byte> data, ColumnDefinitionPayload columnDefinition)
{
ValidateLength(columnDefinition, data.Length, sizeof(long), "I64");

if (BitConverter.IsLittleEndian)
return new ReadOnlyMemory<long>(MemoryMarshal.Cast<byte, long>(data).ToArray());

var values = new long[data.Length / sizeof(long)];
for (var i = 0; i < values.Length; i++)
values[i] = BinaryPrimitives.ReadInt64LittleEndian(data.Slice(i * sizeof(long), sizeof(long)));

return new ReadOnlyMemory<long>(values);
}
}

internal sealed class VectorFloat32ColumnReader : VectorColumnReaderBase
{
public static VectorFloat32ColumnReader Instance { get; } = new();

public override object ReadValue(ReadOnlySpan<byte> data, ColumnDefinitionPayload columnDefinition)
{
ValidateLength(columnDefinition, data.Length, sizeof(float), "F32");

if (BitConverter.IsLittleEndian)
return new ReadOnlyMemory<float>(MemoryMarshal.Cast<byte, float>(data).ToArray());

var values = new float[data.Length / sizeof(float)];

#if NET5_0_OR_GREATER
for (var i = 0; i < values.Length; i++)
values[i] = BinaryPrimitives.ReadSingleLittleEndian(data.Slice(i * sizeof(float), sizeof(float)));
#else
var bytes = data.ToArray();
for (var i = 0; i < values.Length; i++)
{
Array.Reverse(bytes, i * sizeof(float), sizeof(float));
values[i] = BitConverter.ToSingle(bytes, i * sizeof(float));
}
#endif

return new ReadOnlyMemory<float>(values);
}
}

internal sealed class VectorFloat64ColumnReader : VectorColumnReaderBase
{
public static VectorFloat64ColumnReader Instance { get; } = new();

public override object ReadValue(ReadOnlySpan<byte> data, ColumnDefinitionPayload columnDefinition)
{
ValidateLength(columnDefinition, data.Length, sizeof(double), "F64");

if (BitConverter.IsLittleEndian)
return new ReadOnlyMemory<double>(MemoryMarshal.Cast<byte, double>(data).ToArray());

var values = new double[data.Length / sizeof(double)];

#if NET5_0_OR_GREATER
for (var i = 0; i < values.Length; i++)
values[i] = BinaryPrimitives.ReadDoubleLittleEndian(data.Slice(i * sizeof(double), sizeof(double)));
#else
var bytes = data.ToArray();
for (var i = 0; i < values.Length; i++)
{
Array.Reverse(bytes, i * sizeof(double), sizeof(double));
values[i] = BitConverter.ToDouble(bytes, i * sizeof(double));
}
#endif

return new ReadOnlyMemory<double>(values);
}
}
6 changes: 6 additions & 0 deletions src/SingleStoreConnector/Core/ConnectionSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@ public ConnectionSettings(SingleStoreConnectionStringBuilder csb)
UseAffectedRows = csb.UseAffectedRows;
UseCompression = csb.UseCompression;
UseXaTransactions = false;
EnableExtendedDataTypes = csb.EnableExtendedDataTypes;
EnableExtendedDataTypesWasExplicitlySet = csb.ContainsKey("Enable Extended Data Types");

static int ToSigned(uint value) => value >= int.MaxValue ? int.MaxValue : (int) value;
}
Expand Down Expand Up @@ -248,6 +250,8 @@ private static SingleStoreGuidFormat GetEffectiveGuidFormat(SingleStoreGuidForma
public bool UseAffectedRows { get; }
public bool UseCompression { get; }
public bool UseXaTransactions { get; }
public bool EnableExtendedDataTypes { get; }
internal bool EnableExtendedDataTypesWasExplicitlySet { get; }

public string ConnAttrsExtra { get; set; }
public byte[]? ConnectionAttributes { get; set; }
Expand Down Expand Up @@ -341,6 +345,8 @@ private ConnectionSettings(ConnectionSettings other, string host, int port, stri
UseAffectedRows = other.UseAffectedRows;
UseCompression = other.UseCompression;
UseXaTransactions = other.UseXaTransactions;
EnableExtendedDataTypes = other.EnableExtendedDataTypes;
EnableExtendedDataTypesWasExplicitlySet = other.EnableExtendedDataTypesWasExplicitlySet;
}

private static readonly string[] s_localhostPipeServer = ["."];
Expand Down
11 changes: 11 additions & 0 deletions src/SingleStoreConnector/Core/Row.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Text;
using SingleStoreConnector.ColumnReaders;
using SingleStoreConnector.Protocol;
using SingleStoreConnector.Protocol.Payloads;
using SingleStoreConnector.Protocol.Serialization;
#if !NETCOREAPP2_1_OR_GREATER && !NETSTANDARD2_1_OR_GREATER
using SingleStoreConnector.Utilities;
Expand Down Expand Up @@ -449,6 +450,16 @@ private void CheckBinaryColumn(int ordinal)
throw new InvalidCastException("Column is NULL.");

var column = ResultSet.ColumnDefinitions![ordinal];

switch (column.ExtendedTypeCode)
{
case SingleStoreExtendedTypeCode.Bson:
return;

case SingleStoreExtendedTypeCode.Vector:
throw new InvalidCastException("Can't convert VECTOR to bytes.");
}

var columnType = column.ColumnType;
if ((column.ColumnFlags & ColumnFlags.Binary) == 0 ||
(columnType != ColumnType.String && columnType != ColumnType.VarString && columnType != ColumnType.TinyBlob &&
Expand Down
8 changes: 8 additions & 0 deletions src/SingleStoreConnector/Core/ServerSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -802,6 +802,14 @@ public async Task<bool> TryResetConnectionAsync(ConnectionSettings cs, SingleSto
payload = await ReceiveReplyAsync(ioBehavior, cancellationToken).ConfigureAwait(false);
OkPayload.Verify(payload.Span, this);

// re-enable extended types metadata if needed
if (cs.EnableExtendedDataTypes && S2ServerVersion.Version.CompareTo(S2Versions.SupportsExtendedDataTypes) >= 0)
{
await SendAsync(QueryPayload.Create(SupportsQueryAttributes, Encoding.ASCII.GetBytes("SET SESSION enable_extended_types_metadata = TRUE;")), ioBehavior, cancellationToken).ConfigureAwait(false);
payload = await ReceiveReplyAsync(ioBehavior, cancellationToken).ConfigureAwait(false);
OkPayload.Verify(payload.Span, this);
}

return true;
}
catch (IOException ex)
Expand Down
1 change: 1 addition & 0 deletions src/SingleStoreConnector/Core/ServerVersions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,5 @@ internal static class S2Versions
public static readonly Version SupportsUtf8Mb4 = new(7, 5, 0);
public static readonly Version SupportsResetConnection = new(7, 5, 0);
public static readonly Version HasDataConversionCompatibilityLevelParameter = new(8, 0, 0);
public static readonly Version SupportsExtendedDataTypes = new(8, 5, 28);
}
Loading
Loading