Skip to content

Commit dd3b346

Browse files
committed
fix: properly track delayedDbTasks
What happened: If two different UniqueData instances wanted to update the same column (in different rows) at the same time, and they both were doing so with a delay (for example if they used @UpdateInterval), then one of them would replace the other in the queue. The last enqueued update (within the delay window) would be the only one executed on the PG DB, both would be executed on the H2 DB.
1 parent 3dee358 commit dd3b346

7 files changed

Lines changed: 34 additions & 11 deletions

File tree

core/src/main/java/net/staticstudios/data/DataManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1548,7 +1548,7 @@ public void set(String schema, String table, String column, ColumnValuePairs idC
15481548
values.add(columnValuePair.value());
15491549
}
15501550
try {
1551-
dataAccessor.executeUpdate(SQLTransaction.Statement.of(h2Sql, pgSql), values, delay);
1551+
dataAccessor.executeUpdate(idColumns, SQLTransaction.Statement.of(h2Sql, pgSql), values, delay);
15521552
} catch (SQLException e) {
15531553
throw new RuntimeException(e);
15541554
}

core/src/main/java/net/staticstudios/data/UniqueData.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ public synchronized void delete() {
6464
@Language("SQL") String sql = stringBuilder.toString();
6565

6666
try {
67-
dataManager.getDataAccessor().executeUpdate(SQLTransaction.Statement.of(sql, sql), values, 0);
67+
dataManager.getDataAccessor().executeUpdate(idColumns, SQLTransaction.Statement.of(sql, sql), values, 0);
6868
} catch (SQLException e) {
6969
throw new RuntimeException(e);
7070
}

core/src/main/java/net/staticstudios/data/impl/DataAccessor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@ public interface DataAccessor {
1717

1818
ResultSet executeQuery(@Language("SQL") String sql, List<Object> values) throws SQLException;
1919

20-
default void executeUpdate(SQLTransaction.Statement statement, List<Object> values, int delay) throws SQLException {
21-
executeTransaction(new SQLTransaction().update(statement, values), delay);
20+
default void executeUpdate(@Nullable ColumnValuePairs idColumns, SQLTransaction.Statement statement, List<Object> values, int delay) throws SQLException {
21+
executeTransaction(new SQLTransaction(idColumns).update(statement, values), delay);
2222
}
2323

2424
void executeTransaction(SQLTransaction transaction, int delay) throws SQLException;

core/src/main/java/net/staticstudios/data/impl/data/ReferenceImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ private void setLocal(@Nullable T value) {
232232
@Language("SQL") String sql = sqlBuilder.toString();
233233

234234
try {
235-
holder.getDataManager().getDataAccessor().executeUpdate(SQLTransaction.Statement.of(sql, sql), values, 0);
235+
holder.getDataManager().getDataAccessor().executeUpdate(holder.getIdColumns(), SQLTransaction.Statement.of(sql, sql), values, 0);
236236
} catch (SQLException e) {
237237
throw new RuntimeException(e);
238238
}
@@ -315,7 +315,7 @@ private void setReferenced(@Nullable T value) {
315315
@Language("SQL") String updateSql = updateSqlBuilder.toString();
316316

317317
List<Object> existingIdValues = new ArrayList<>();
318-
SQLTransaction transaction = new SQLTransaction();
318+
SQLTransaction transaction = new SQLTransaction(holder.getIdColumns());
319319

320320
transaction.query(SQLTransaction.Statement.of(selectExistingSql, selectExistingSql), () -> holderLinkValues, rs -> {
321321
try {

core/src/main/java/net/staticstudios/data/impl/h2/H2DataAccessor.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ public class H2DataAccessor implements DataAccessor {
5757
private final Set<SchemaTable> knownTables = new HashSet<>();
5858
private final DataManager dataManager;
5959
private final PostgresListener postgresListener;
60-
private final Map<List<Pair<String, String>>, Runnable> delayedTasks = new ConcurrentHashMap<>();
60+
private final Map<EnqueuedDatabaseTaskKey, Runnable> delayedTasks = new ConcurrentHashMap<>();
6161
private final Map<String, Runnable> delayedRedisTasks = new ConcurrentHashMap<>();
6262
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(thread -> {
6363
Thread t = new Thread(thread);
@@ -798,10 +798,10 @@ private List<String> getColumnsInTable(String schema, String table) throws SQLEx
798798
}
799799

800800
private void runDatabaseTask(SQLTransaction transaction, int delay) {
801-
List<Pair<String, String>> key = new ArrayList<>(transaction.getOperations().size());
802-
for (SQLTransaction.Operation operation : transaction.getOperations()) {
803-
SQLTransaction.Statement sqlStatement = operation.getStatement();
804-
key.add(Pair.of(sqlStatement.getH2Sql(), sqlStatement.getPgSql()));
801+
if (transaction.getHolderIds() == null) {
802+
// has to run now, since we don't know who we are waiting on. Future updates need to replace only their holder's tasks, so id cols are a must.
803+
logger.warn("Transaction {} does not have holder IDs, running immediately. Requested delay was {}ms", transaction, delay);
804+
delay = -1;
805805
}
806806

807807
Runnable runnable = () -> taskQueue.submitTask(connection -> {
@@ -839,6 +839,14 @@ private void runDatabaseTask(SQLTransaction transaction, int delay) {
839839
runnable.run();
840840
return;
841841
}
842+
843+
SQLTransaction.Statement[] statements = new SQLTransaction.Statement[transaction.getOperations().size()];
844+
for (int i = 0; i < transaction.getOperations().size(); i++) {
845+
statements[i] = transaction.getOperations().get(i).getStatement();
846+
}
847+
848+
EnqueuedDatabaseTaskKey key = new EnqueuedDatabaseTaskKey(transaction.getHolderIds(), statements);
849+
842850
if (delayedTasks.put(key, runnable) == null) {
843851
scheduledExecutorService.schedule(() -> {
844852
Runnable removed = delayedTasks.remove(key);
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package net.staticstudios.data.util;
2+
3+
import org.jetbrains.annotations.NotNull;
4+
5+
public record EnqueuedDatabaseTaskKey(@NotNull ColumnValuePairs holderPairs, SQLTransaction.Statement[] statements) {
6+
}

core/src/main/java/net/staticstudios/data/util/SQLTransaction.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,24 @@
1212
import java.util.function.Supplier;
1313

1414
public class SQLTransaction {
15+
private @Nullable ColumnValuePairs holderIds;
1516
private final List<Operation> operations = new ArrayList<>();
1617

1718
public SQLTransaction() {
1819
}
1920

21+
public SQLTransaction(@Nullable ColumnValuePairs holderIds) {
22+
this.holderIds = holderIds;
23+
}
24+
2025
public List<Operation> getOperations() {
2126
return operations;
2227
}
2328

29+
public @Nullable ColumnValuePairs getHolderIds() {
30+
return holderIds;
31+
}
32+
2433
public SQLTransaction query(Statement statement, Supplier<List<Object>> valuesSupplier, @NotNull Consumer<ResultSet> resultHandler) {
2534
Preconditions.checkNotNull(resultHandler, "Use update() method for statements without result handlers");
2635
operations.add(new Operation(statement, valuesSupplier, resultHandler, () -> true));

0 commit comments

Comments
 (0)