Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions Frends.MicrosoftSQL.BulkInsert/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# Changelog

## [3.3.0] - 2026-06-18

### Changed

- In successful execution, Result.Count will show number of all rows.
- In case of failure, Result.Count will show estimated number of rows copied before the failure.

## [3.2.0] - 2026-06-18

### Changed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ public async Task TestBulkInsert_NotifyAfterZero()

var result = await MicrosoftSQL.BulkInsert(_input, options, default);
Assert.IsTrue(result.Success);
Assert.AreEqual(0, result.Count);
Assert.AreEqual(3, result.Count);
Assert.AreEqual(3, GetRowCount());

await MicrosoftSQL.BulkInsert(_input, options, default);
Expand Down Expand Up @@ -371,7 +371,7 @@ public async Task TestBulkInsert_NotifyAfterTooMuch()

var result = await MicrosoftSQL.BulkInsert(_input, options, default);
Assert.IsTrue(result.Success);
Assert.AreEqual(0, result.Count);
Assert.AreEqual(3, result.Count);
Assert.AreEqual(3, GetRowCount());

await MicrosoftSQL.BulkInsert(_input, options, default);
Expand Down Expand Up @@ -489,4 +489,4 @@ private static int GetRowCount()
connection.Dispose();
return count;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,20 @@ public class MicrosoftSQL
/// <param name="options">Optional parameters</param>
/// <param name="cancellationToken">Token generated by Frends to stop this Task.</param>
/// <returns>Object { bool Success, long Count, string ErrorMessage }</returns>
public static async Task<Result> BulkInsert([PropertyTab] Input input, [PropertyTab] Options options, CancellationToken cancellationToken)
public static async Task<Result> BulkInsert([PropertyTab] Input input, [PropertyTab] Options options,
CancellationToken cancellationToken)
{
var inputJson = @"{""data"": {""Table"": " + input.InputData + @"
}
}";
long rowsCopied;

try
{
DataSet dataSet = JObject.Parse(inputJson)["data"].ToObject<DataSet>();
_ = dataSet.Tables["Table"];


using var connection = new SqlConnection(input.ConnectionString);
await connection.OpenAsync(cancellationToken).ConfigureAwait(false);
cancellationToken.ThrowIfCancellationRequested();
Expand All @@ -47,15 +50,23 @@ public static async Task<Result> BulkInsert([PropertyTab] Input input, [Property
{
try
{
var result = await ExecuteHandler(options, input, dataSet, new SqlBulkCopy(connection, GetSqlBulkCopyOptions(options), null), cancellationToken).ConfigureAwait(false);
var result = await ExecuteHandler(options, input, dataSet,
new SqlBulkCopy(connection, GetSqlBulkCopyOptions(options), null), cancellationToken)
.ConfigureAwait(false);

return new Result(true, result, null);
}
catch (Exception ex)
{
if (options.ThrowErrorOnFailure)
throw new Exception("BulkInsert exception: 'Options.SqlTransactionIsolationLevel = None', so there was no transaction rollback.", ex);
else
return new Result(false, 0, $"ExecuteHandler exception: 'Options.SqlTransactionIsolationLevel = None', so there was no transaction rollback. {ex}");
throw new Exception(
"BulkInsert exception: 'Options.SqlTransactionIsolationLevel = None', so there was no transaction rollback.",
ex);

rowsCopied = ex.Data["RowsCopied"] != null ? (long)ex.Data["RowsCopied"] : 0;

return new Result(false, rowsCopied,
$"ExecuteHandler exception: 'Options.SqlTransactionIsolationLevel = None', so there was no transaction rollback. {ex}");
}
}

Expand All @@ -64,8 +75,11 @@ public static async Task<Result> BulkInsert([PropertyTab] Input input, [Property

try
{
var result = await ExecuteHandler(options, input, dataSet, new SqlBulkCopy(connection, GetSqlBulkCopyOptions(options), transaction), cancellationToken).ConfigureAwait(false);
var result = await ExecuteHandler(options, input, dataSet,
new SqlBulkCopy(connection, GetSqlBulkCopyOptions(options), transaction), cancellationToken)
.ConfigureAwait(false);
await transaction.CommitAsync(cancellationToken).ConfigureAwait(false);

return new Result(true, result, null);
}
catch (Exception ex)
Expand All @@ -77,33 +91,47 @@ public static async Task<Result> BulkInsert([PropertyTab] Input input, [Property
catch (Exception rollbackEx)
{
if (options.ThrowErrorOnFailure)
throw new Exception("BulkInsert exception: An exception occurred on transaction rollback.", rollbackEx);
else
return new Result(false, 0, $"BulkInsert exception: An exception occurred on transaction rollback. Rollback exception: {rollbackEx}. || Exception leading to rollback: {ex}");
throw new Exception("BulkInsert exception: An exception occurred on transaction rollback.",
rollbackEx);

rowsCopied = ex.Data["RowsCopied"] != null ? (long)ex.Data["RowsCopied"] : 0;

return new Result(false, rowsCopied,
$"BulkInsert exception: An exception occurred on transaction rollback. Rollback exception: {rollbackEx}. || Exception leading to rollback: {ex}");
}

if (options.ThrowErrorOnFailure)
throw new Exception("BulkInsert exception: (If required) transaction rollback completed without exception.", ex);
else
return new Result(false, 0, $"BulkInsert exception: (If required) transaction rollback completed without exception. {ex}.");
throw new Exception(
"BulkInsert exception: (If required) transaction rollback completed without exception.", ex);

rowsCopied = ex.Data["RowsCopied"] != null ? (long)ex.Data["RowsCopied"] : 0;

return new Result(false, rowsCopied,
$"BulkInsert exception: (If required) transaction rollback completed without exception. {ex}.");
}
}
catch (Exception e)
{
if (options.ThrowErrorOnFailure)
throw new Exception("BulkInsert exception: ", e);
else
return new Result(false, 0, $"BulkInsert exception: {e}");
{
rowsCopied = e.Data["RowsCopied"] != null ? (long)e.Data["RowsCopied"] : 0;

return new Result(false, rowsCopied, $"BulkInsert exception: {e}");
}
}
finally
{
SqlConnection.ClearAllPools();
}
}

private static async Task<long> ExecuteHandler(Options options, Input input, DataSet dataSet, SqlBulkCopy sqlBulkCopy, CancellationToken cancellationToken)
private static async Task<long> ExecuteHandler(Options options, Input input, DataSet dataSet,
SqlBulkCopy sqlBulkCopy, CancellationToken cancellationToken)
{
var rowsCopied = 0L;
var totalRows = dataSet.Tables[0].Rows.Count;

// JsonPropertyOrder is handled implicitly (default behavior) by not adding any column mappings,
// which means the columns will be mapped based on their order in the input JSON.
Expand All @@ -126,39 +154,49 @@ private static async Task<long> ExecuteHandler(Options options, Input input, Dat
sqlBulkCopy.DestinationTableName = input.TableName;
sqlBulkCopy.SqlRowsCopied += (s, e) => rowsCopied = e.RowsCopied;

if (options.NotifyAfter == 0)
sqlBulkCopy.NotifyAfter = options.NotifyAfter switch
{
// Calculate the number of rows and set value for NotifyAfter
var rowCount = dataSet.Tables[0].Rows.Count;
sqlBulkCopy.NotifyAfter = rowCount > 0 ? Math.Max(1, rowCount / 10) : 1;
}
else if (options.NotifyAfter > 0)
sqlBulkCopy.NotifyAfter = options.NotifyAfter;
else
sqlBulkCopy.NotifyAfter = 0;
0 => totalRows > 0 ? Math.Max(1, totalRows / 10) : 1,
> 0 => options.NotifyAfter,
_ => 0,
};

await sqlBulkCopy.WriteToServerAsync(dataSet.Tables[0], cancellationToken).ConfigureAwait(false);
}
}
catch (Exception ex)
{
var notifyRange = rowsCopied + (sqlBulkCopy.NotifyAfter - 1);
throw new Exception($"ExecuteHandler exception, processed row count between: {rowsCopied} and {notifyRange} (see NotifyAfter). {ex}");
var notifyAfter = sqlBulkCopy.NotifyAfter;
var notifyRange = notifyAfter > 0 ? rowsCopied + notifyAfter - 1 : rowsCopied;

throw new Exception(
$"ExecuteHandler exception, processed row count between: {rowsCopied} and {notifyRange} (see NotifyAfter). {ex}",
ex)
{
Data =
{
["RowsCopied"] = rowsCopied,
},
};
}

return rowsCopied;
//If code goes up to here, it means all rows were inserted.
return totalRows;
}

private static void SetEmptyDataRowsToNull(DataSet dataSet)
{
foreach (var table in dataSet.Tables.Cast<DataTable>())
{
foreach (var row in table.Rows.Cast<DataRow>())
foreach (var column in row.ItemArray)
if (column.ToString() == string.Empty)
{
var index = Array.IndexOf(row.ItemArray, column);
row[index] = null;
}
{
for (var i = 0; i < row.ItemArray.Length; i++)
{
if (row[i] is string value && value.Length == 0)
row[i] = DBNull.Value;
}
}
Comment thread
MatteoDelOmbra marked this conversation as resolved.
}
}

private static SqlBulkCopyOptions GetSqlBulkCopyOptions(Options options)
Expand Down Expand Up @@ -193,4 +231,4 @@ private static IsolationLevel GetIsolationLevel(Options options)
_ => IsolationLevel.ReadCommitted,
};
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ public class Options
public int CommandTimeoutSeconds { get; set; }

/// <summary>
/// Defines the number of rows to be processed before generating a notification event.
/// Defines the number of rows to be processed before generating a notification event.
/// The default value of 0 will set NotifyAfter dynamically to 10% of the total row count, with a minimum value of 1.
/// A value of -1 means there won't be any notifications until the task is completed, and Result.Count will be 0.
/// Setting a value greater than the total number of rows can cause Result.Count to be 0.
/// A value of -1 means there won't be any notifications until the task is completed.
/// Setting a value greater than the total number of rows can cause notification response to be 0.
/// Notification events can be used for error handling to see approximately which row the error occurred at.
/// </summary>
/// <example>0</example>
Expand Down Expand Up @@ -46,7 +46,7 @@ public class Options
public bool TableLock { get; set; }

/// <summary>
/// Preserve null values in the destination table regardless of the settings for default values.
/// Preserve null values in the destination table regardless of the settings for default values.
/// When not specified, null values are replaced by default values where applicable.
/// </summary>
/// <example>false</example>
Expand Down Expand Up @@ -82,4 +82,4 @@ public class Options
/// <example>SqlTransactionIsolationLevel.ReadCommitted</example>
[DefaultValue(SqlTransactionIsolationLevel.ReadCommitted)]
public SqlTransactionIsolationLevel SqlTransactionIsolationLevel { get; set; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ public class Result
public bool Success { get; private set; }

/// <summary>
/// Number of processed rows reported by SqlBulkCopy notifications.
/// The value is approximate and can be rounded down to the nearest NotifyAfter interval (or 0 if no notification is raised).
/// Number of processed rows.
/// In case of failure it shows notified number of processed rows. Approximation logic is defined by Options.NotifyAfter
/// </summary>
/// <example>100</example>
public long Count { get; private set; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
<Version>3.2.0</Version>
<Version>3.3.0</Version>
<Authors>Frends</Authors>
<Copyright>Frends</Copyright>
<Company>Frends</Company>
Expand Down
Loading