Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/Data.Common/ADO.NET/FileTransaction.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,18 @@ public override void Commit()
rwLock.EnterWriteLock();
try
{
// Stop file watching to prevent mid-write file watcher events from
// triggering stale re-reads during the commit.
connection.FileReader?.StopWatching();

Writers.ForEach(writer =>
{
writer.Execute();
});
}
finally
{
connection.FileReader?.StartWatching();
rwLock.ExitWriteLock();
}

Expand Down
42 changes: 39 additions & 3 deletions src/Data.Common/FileIO/Write/FileInsertWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,13 @@ public override int Execute()
else
{
var transactionScopedRow = TransactionScopedRow.Value;
var table = fileReader.DataSet.Tables[transactionScopedRow.TableName];

// Force re-read from disk to get latest data (e.g., from other committed
// transactions). Without this, the in-memory DataSet may be stale and Save()
// would overwrite changes from other connections.
// shouldLock: false because the write lock is already held by FileTransaction.Commit().
fileReader.MarkTableToUpdate(transactionScopedRow.TableName);
var table = fileReader.ReadFile(fileInsertStatement, null, shouldLock: false);

table.AppendRow(transactionScopedRow.Row);
}
Expand Down Expand Up @@ -165,8 +171,9 @@ private void AddMissingValues(IDictionary<string, object> newRow, VirtualDataTab
// Look for missing identity values
if (!columnValueSet && ColumnNameIndicatesIdentity(columnName))
{
// This could be an expensive operation depending on number of rows here.
var lastRow = virtualDataTable.Rows.Cast<DataRow>().LastOrDefault();
// Find the row with the maximum identity column value (not just the last row)
// to prevent identity collisions after row deletions.
var lastRow = FindRowWithMaxIdentity(virtualDataTable.Rows.Cast<DataRow>(), columnName, dataColumn.DataType);

//Since we don't have a datatype for values in a CSV, we need to determine if the last
//row 'looks' like a datatype that can be an identity (i.e. Guid or integer).
Expand Down Expand Up @@ -223,6 +230,35 @@ private void AddMissingValues(IDictionary<string, object> newRow, VirtualDataTab
}


/// <summary>
/// Finds the row with the maximum value for the specified identity column.
/// Uses MAX instead of last-row to avoid identity collisions after row deletions.
/// </summary>
private static DataRow FindRowWithMaxIdentity(IEnumerable<DataRow> rows, string columnName, Type dataType)
{
if (dataType == typeof(float) || dataType == typeof(double) || dataType == typeof(decimal))
{
DataRow maxRow = null;
decimal maxVal = decimal.MinValue;
foreach (var row in rows)
{
var val = row[columnName];
if (val == DBNull.Value) continue;

var decVal = Convert.ToDecimal(val);
if (maxRow == null || decVal > maxVal)
{
maxVal = decVal;
maxRow = row;
}
}
return maxRow;
}

// For non-numeric types (GUID, etc.), any row works for type detection
return rows.LastOrDefault();
}

protected static bool ColumnNameIndicatesIdentity(string columnName) =>
string.Compare(columnName, "Id", true) == 0 || columnName.EndsWith("Id", StringComparison.InvariantCultureIgnoreCase);

Expand Down
45 changes: 9 additions & 36 deletions tests/Data.Tests.Common/ConcurrencyTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -117,19 +117,17 @@ public static void SelectDuringMutations_ShouldNotDeadlock<TFileParameter>(
}

/// <summary>
/// Multiple concurrent transactions against the same table should not deadlock.
/// Each transaction inserts, commits, and completes without hanging. IO errors
/// from concurrent file access are tolerated (known limitation of file-based
/// storage), but deadlocks (timeout) are not.
/// Multiple concurrent transactions against the same table should not deadlock
/// or corrupt data. Each transaction inserts, commits, and completes without
/// hanging. All transactions should succeed and their data should be preserved.
/// </summary>
public static void ConcurrentTransactions_ShouldNotDeadlock<TFileParameter>(
FileConnectionString connectionString,
Func<FileConnectionString, FileConnection<TFileParameter>> createConnection)
where TFileParameter : FileParameter<TFileParameter>, new()
{
const int concurrency = 5;
var ioErrors = new List<Exception>();
var nonIoErrors = new List<Exception>();
var exceptions = new List<Exception>();
var barrier = new Barrier(concurrency);
var completedCount = 0;

Expand All @@ -148,45 +146,20 @@ public static void ConcurrentTransactions_ShouldNotDeadlock<TFileParameter>(
transaction.Commit();
Interlocked.Increment(ref completedCount);
}
catch (Exception ex) when (IsFileAccessError(ex))
{
// IO errors from concurrent file access are a known limitation,
// not a deadlock. Track but don't fail the test for these.
lock (ioErrors) { ioErrors.Add(ex); }
}
catch (Exception ex)
{
lock (nonIoErrors) { nonIoErrors.Add(ex); }
lock (exceptions) { exceptions.Add(ex); }
}
})).ToArray();

var completed = Task.WaitAll(tasks, TimeSpan.FromSeconds(60));
Assert.True(completed, "Concurrent transactions should complete without deadlock within 60s");

if (nonIoErrors.Count > 0)
throw new AggregateException("Concurrent transaction failures (non-IO)", nonIoErrors);

// At least one transaction should succeed
Assert.True(completedCount > 0,
$"At least one transaction should commit successfully. IO errors: {ioErrors.Count}");
}

private static bool IsFileAccessError(Exception ex)
{
if (ex is IOException || ex is System.Xml.XmlException)
return true;

// Check inner exceptions for IO/XML errors (often wrapped)
var inner = ex.InnerException;
while (inner != null)
{
if (inner is IOException || inner is System.Xml.XmlException)
return true;
inner = inner.InnerException;
}
if (exceptions.Count > 0)
throw new AggregateException("Concurrent transaction failures", exceptions);

// Check for TableNotFoundException wrapping an IO error
return ex.GetType().Name == "TableNotFoundException" && ex.InnerException != null && IsFileAccessError(ex.InnerException);
// All transactions should succeed
Assert.Equal(concurrency, completedCount);
}

/// <summary>
Expand Down
46 changes: 46 additions & 0 deletions tests/Data.Tests.Common/InsertTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -259,4 +259,50 @@ public static void Insert_IndentityColumn_LastRow_Guid<TFileParameter>(Func<File
//TODO
}

/// <summary>
/// After deleting the last row, inserting a new row should use MAX(id)+1,
/// not lastRow+1, to avoid identity collisions.
/// </summary>
public static void Insert_IdentityColumn_AfterDelete_UsesMaxValue<TFileParameter>(Func<FileConnection<TFileParameter>> createFileConnection)
where TFileParameter : FileParameter<TFileParameter>, new()
{
using (var connection = createFileConnection())
{
connection.Open();

// Setup: locations table has rows with ids 1, 2.
// Delete the row with the highest id (2).
var command = connection.CreateCommand("DELETE FROM locations WHERE id = 2");
var rowsAffected = command.ExecuteNonQuery();
Assert.Equal(1, rowsAffected);

// Act: Insert a new row. The new id should be 3 (MAX(1)+1=2 would collide
// with the just-deleted row in concurrent scenarios; correct is MAX(1)+1=2
// but more importantly, after inserting id=2 and deleting it, the next should
// still be > any existing id).
command = connection.CreateCommand("INSERT INTO locations (city, state, zip) VALUES ('Portland', 'Oregon', 97201)");
rowsAffected = command.ExecuteNonQuery();
Assert.Equal(1, rowsAffected);

// Verify the new row got an id that doesn't collide with row id=1
command = connection.CreateCommand("SELECT * FROM locations ORDER BY id");
using (var reader = command.ExecuteReader())
{
// First row: id=1 (original)
Assert.True(reader.Read());
Assert.Equal(connection.GetProperlyTypedValue(1), reader["id"]);

// Second row: the new insert should have id=2 (MAX(1)+1)
Assert.True(reader.Read());
var newId = reader["id"];
Assert.Equal(connection.GetProperlyTypedValue(2), newId);
Assert.Equal("Portland", reader["city"]);

Assert.False(reader.Read());
}

connection.Close();
}
}

}
Loading