Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
200 changes: 62 additions & 138 deletions core/src/main/java/net/staticstudios/data/DataManager.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
package net.staticstudios.data;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.RemovalCause;
import com.google.common.base.Preconditions;
import com.google.common.collect.MapMaker;
import net.staticstudios.data.impl.DataAccessor;
Expand Down Expand Up @@ -38,8 +35,6 @@
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;

@ApiStatus.Internal
Expand All @@ -65,12 +60,8 @@ public class DataManager {
private final Set<CachedValueMetadata> registeredUpdateHandlersForRedis = ConcurrentHashMap.newKeySet();
private final Set<PersistentCollectionMetadata> registeredChangeHandlersForCollection = ConcurrentHashMap.newKeySet();
private final Set<ReferenceMetadata> registeredUpdateHandlersForReference = ConcurrentHashMap.newKeySet();
private final Cache<SelectQuery, ReadCacheResult> relationCache;
private final Map<Cell, Set<SelectQuery>> dependencyToRelationCacheMapping = new ConcurrentHashMap<>();
private final AtomicLong relationCacheGeneration = new AtomicLong();
private final Cache<SelectQuery, ReadCacheResult> cellCache;
private final Map<Cell, Set<SelectQuery>> dependencyToCellCacheMapping = new ConcurrentHashMap<>();
private final AtomicLong cellCacheGeneration = new AtomicLong();
private final DependencyTrackingCache relationCache;
private final DependencyTrackingCache cellCache;

private final List<ValueSerializer<?, ?>> valueSerializers = new CopyOnWriteArrayList<>();
private final Consumer<Runnable> updateHandlerExecutor;
Expand Down Expand Up @@ -108,18 +99,8 @@ public DataManager(StaticDataConfig config, boolean setGlobal) {
sqlBuilder = new SQLBuilder(this);
dataAccessor = new H2DataAccessor(this, postgresListener, redisListener, taskQueue);

this.relationCache = Caffeine.newBuilder()
.maximumSize(10_000)
.expireAfterWrite(5, TimeUnit.MINUTES)
.removalListener((SelectQuery selectQuery, ReadCacheResult result, RemovalCause cause) -> cleanupRelationCacheEntry(selectQuery, result))
.executor(Runnable::run)
.build();
this.cellCache = Caffeine.newBuilder()
.maximumSize(20_000)
.expireAfterWrite(5, TimeUnit.MINUTES)
.removalListener((SelectQuery selectQuery, ReadCacheResult result, RemovalCause cause) -> cleanupCellCacheEntry(selectQuery, result))
.executor(Runnable::run)
.build();
this.relationCache = new DependencyTrackingCache("relation", 10_000, 5);
this.cellCache = new DependencyTrackingCache("cell", 20_000, 5);

//todo: when we reconnect to postgres, refresh the internal cache from the source
}
Expand Down Expand Up @@ -868,8 +849,12 @@ public void registerReferenceUpdateHandlers(ReferenceMetadata metadata, Collecti

public List<ValueUpdateHandlerWrapper<?, ?>> getUpdateHandlers(String schema, String table, String column, Class<? extends UniqueData> holderClass) {
String key = schema + "." + table + "." + column;
if (persistentValueUpdateHandlers.containsKey(key) && persistentValueUpdateHandlers.get(key).containsKey(holderClass.getName())) {
return persistentValueUpdateHandlers.get(key).get(holderClass.getName());
Map<String, List<ValueUpdateHandlerWrapper<?, ?>>> handlersForColumn = persistentValueUpdateHandlers.get(key);
if (handlersForColumn != null) {
List<ValueUpdateHandlerWrapper<?, ?>> handlers = handlersForColumn.get(holderClass.getName());
if (handlers != null) {
return handlers;
}
}
return Collections.emptyList();
}
Expand Down Expand Up @@ -1017,15 +1002,21 @@ public void handleDelete(List<String> columnNames, String schema, String table,
Preconditions.checkArgument(found, "Not all ID columnsInReferringTable were provided for UniqueData class %s. Required: %s, Provided: %s", uniqueDataMetadata.clazz().getName(), uniqueDataMetadata.idColumns(), Arrays.toString(values));
}

UniqueData instance = uniqueDataInstanceCache.getOrDefault(uniqueDataMetadata.clazz().getName(), Collections.emptyMap()).get(new ColumnValuePairs(idColumns));
if (instance == null) {
Map<ColumnValuePairs, UniqueData> classCache = uniqueDataInstanceCache.get(uniqueDataMetadata.clazz().getName());
if (classCache == null) {
return;
}
instance.markDeleted();
synchronized (classCache) {
UniqueData instance = classCache.get(new ColumnValuePairs(idColumns));
if (instance == null) {
return;
}
instance.markDeleted();
}
});
}

public synchronized void updateIdColumns(List<String> columnNames, String schema, String table, String column, Object[] oldValues, Object[] newValues) {
public void updateIdColumns(List<String> columnNames, String schema, String table, String column, Object[] oldValues, Object[] newValues) {
uniqueDataMetadataMap.values().forEach(uniqueDataMetadata -> {
if (!uniqueDataMetadata.schema().equals(schema) || !uniqueDataMetadata.table().equals(table)) {
return;
Expand Down Expand Up @@ -1058,21 +1049,23 @@ public synchronized void updateIdColumns(List<String> columnNames, String schema
Preconditions.checkArgument(found, "Not all ID columnsInReferringTable were provided for UniqueData class %s. Required: %s, Provided: %s", uniqueDataMetadata.clazz().getName(), uniqueDataMetadata.idColumns(), Arrays.toString(oldValues));
}
if (Arrays.equals(oldIdColumns, newIdColumns)) {
return; // no change to id columnsInReferringTable here
return;
}

ColumnValuePairs oldIdCols = new ColumnValuePairs(oldIdColumns);
Map<ColumnValuePairs, UniqueData> classCache = uniqueDataInstanceCache.get(uniqueDataMetadata.clazz().getName());
if (classCache == null) {
return;
}
UniqueData instance = classCache.remove(oldIdCols);
if (instance == null) {
return;
synchronized (classCache) {
UniqueData instance = classCache.remove(oldIdCols);
if (instance == null) {
return;
}
ColumnValuePairs newIdCols = new ColumnValuePairs(newIdColumns);
instance.setIdColumns(newIdCols);
classCache.put(newIdCols, instance);
}
ColumnValuePairs newIdCols = new ColumnValuePairs(newIdColumns);
instance.setIdColumns(newIdCols);
classCache.put(newIdCols, instance);
});
}

Expand Down Expand Up @@ -1145,10 +1138,13 @@ public <T extends UniqueData> T getInstance(Class<T> clazz, @NotNull ColumnValue

T instance;
Map<ColumnValuePairs, UniqueData> classCache = uniqueDataInstanceCache.get(clazz.getName());
if (classCache != null && (instance = (T) classCache.get(idColumns)) != null) {
logger.trace("Cache hit for UniqueData class {} with ID columnsInReferringTable {}", clazz.getName(), idColumns);
if (!instance.isDeleted()) {
return instance;
if (classCache != null) {
synchronized (classCache) {
instance = (T) classCache.get(idColumns);
if (instance != null && !instance.isDeleted()) {
logger.trace("Cache hit for UniqueData class {} with ID columnsInReferringTable {}", clazz.getName(), idColumns);
return instance;
}
}
}

Expand Down Expand Up @@ -1215,8 +1211,14 @@ public <T extends UniqueData> T getInstance(Class<T> clazz, @NotNull ColumnValue
PersistentManyToManyCollectionImpl.delegate(instance);
PersistentOneToManyValueCollectionImpl.delegate(instance);

uniqueDataInstanceCache.computeIfAbsent(clazz.getName(), k -> new MapMaker().weakValues().makeMap())
.put(idColumns, instance);
Map<ColumnValuePairs, UniqueData> cache = uniqueDataInstanceCache.computeIfAbsent(clazz.getName(), k -> new MapMaker().weakValues().makeMap());
synchronized (cache) {
T existing = (T) cache.get(idColumns);
if (existing != null && !existing.isDeleted()) {
return existing;
}
cache.put(idColumns, instance);
}

logger.trace("Cache miss for UniqueData class {} with ID columnsInReferringTable {}. Created new instance.", clazz.getName(), idColumns);

Expand Down Expand Up @@ -1726,107 +1728,47 @@ public void flushTaskQueue() {
public StaticDataStatistics getStatistics() {
StaticDataStatistics stats = new StaticDataStatistics();
dataAccessor.populateStatistics(stats);
stats.setRelationCacheSize((int) relationCache.estimatedSize());
stats.setDependenciesToRelationsCacheMappingSize(dependencyToRelationCacheMapping.size());
stats.setCellCacheSize((int) cellCache.estimatedSize());
stats.setDependenciesToCellCacheMappingSize(dependencyToCellCacheMapping.size());
stats.setRelationCacheSize(relationCache.estimatedSize());
stats.setDependenciesToRelationsCacheMappingSize(relationCache.dependencyMappingSize());
stats.setCellCacheSize(cellCache.estimatedSize());
stats.setDependenciesToCellCacheMappingSize(cellCache.dependencyMappingSize());
return stats;
}

public @Nullable ReadCacheResult getRelationCacheResult(SelectQuery query) {
return relationCache.getIfPresent(query);
return relationCache.get(query);
}

public long getRelationCacheGeneration() {
return relationCacheGeneration.get();
return relationCache.getGeneration();
}

public void putRelationCacheResult(SelectQuery query, @NotNull ReadCacheResult result, long expectedGeneration) {
if (relationCacheGeneration.get() != expectedGeneration) {
return;
}
logger.trace("Putting result in relation cache for query {} with result {}", query, result);
relationCache.put(query, result);
for (Cell cell : result.getDependencies()) {
dependencyToRelationCacheMapping.computeIfAbsent(cell, k -> ConcurrentHashMap.newKeySet())
.add(query);
}
if (relationCacheGeneration.get() != expectedGeneration) {
relationCache.invalidate(query);
cleanupRelationCacheEntry(query, result);
}
relationCache.put(query, result, expectedGeneration);
}

public void invalidateRelationCache(List<String> columnNames, String schema, String table, List<String> changedColumns, Object[] values) {
relationCacheGeneration.incrementAndGet();
for (UniqueDataMetadata metadata : uniqueDataMetadataMap.values()) {
if (metadata.schema().equals(schema) && metadata.table().equals(table)) {
ColumnValuePair[] idColumns = new ColumnValuePair[metadata.idColumns().size()];
for (ColumnMetadata idColumn : metadata.idColumns()) {
boolean found = false;
for (int i = 0; i < columnNames.size(); i++) {
if (idColumn.name().equals(columnNames.get(i))) {
idColumns[metadata.idColumns().indexOf(idColumn)] = new ColumnValuePair(idColumn.name(), values[i]);
found = true;
break;
}
}
Preconditions.checkArgument(found, "Not all ID columnsInReferringTable were provided for UniqueData class %s. Required: %s, Provided: %s", metadata.clazz().getName(), metadata.idColumns(), Arrays.toString(values));
}

ColumnValuePairs idCols = new ColumnValuePairs(idColumns);
for (String changedColumn : changedColumns) {
Cell cell = new Cell(schema, table, changedColumn, idCols);
Set<SelectQuery> queries = dependencyToRelationCacheMapping.remove(cell);
if (queries != null) {
for (SelectQuery query : queries) {
relationCache.invalidate(query);
logger.trace("Invalidated relation cache for query {} due to change in cell {}", query, cell);
}
}
}
}
}
}

private void cleanupRelationCacheEntry(@NotNull SelectQuery query, @NotNull ReadCacheResult res) {
for (Cell dependency : res.getDependencies()) {
Set<SelectQuery> dependentQueries = dependencyToRelationCacheMapping.get(dependency);
if (dependentQueries != null) {
dependentQueries.remove(query);
if (dependentQueries.isEmpty()) {
dependencyToRelationCacheMapping.remove(dependency);
}
}
}
relationCache.invalidate(resolveCells(columnNames, schema, table, changedColumns, values));
}

public @Nullable ReadCacheResult getCellCacheResult(SelectQuery query) {
return cellCache.getIfPresent(query);
return cellCache.get(query);
}

public long getCellCacheGeneration() {
return cellCacheGeneration.get();
return cellCache.getGeneration();
}

public void putCellCacheResult(SelectQuery query, @NotNull ReadCacheResult result, long expectedGeneration) {
if (cellCacheGeneration.get() != expectedGeneration) {
return;
}
logger.trace("Putting result in cell cache for query {} with result {}", query, result);
cellCache.put(query, result);
for (Cell cell : result.getDependencies()) {
dependencyToCellCacheMapping.computeIfAbsent(cell, k -> ConcurrentHashMap.newKeySet())
.add(query);
}
if (cellCacheGeneration.get() != expectedGeneration) {
cellCache.invalidate(query);
cleanupCellCacheEntry(query, result);
}
cellCache.put(query, result, expectedGeneration);
}

public void invalidateCellCache(List<String> columnNames, String schema, String table, List<String> changedColumns, Object[] values) {
cellCacheGeneration.incrementAndGet();
cellCache.invalidate(resolveCells(columnNames, schema, table, changedColumns, values));
}

private Set<Cell> resolveCells(List<String> columnNames, String schema, String table, List<String> changedColumns, Object[] values) {
Set<Cell> cells = new HashSet<>();
for (UniqueDataMetadata metadata : uniqueDataMetadataMap.values()) {
if (metadata.schema().equals(schema) && metadata.table().equals(table)) {
ColumnValuePair[] idColumns = new ColumnValuePair[metadata.idColumns().size()];
Expand All @@ -1844,28 +1786,10 @@ public void invalidateCellCache(List<String> columnNames, String schema, String

ColumnValuePairs idCols = new ColumnValuePairs(idColumns);
for (String changedColumn : changedColumns) {
Cell cell = new Cell(schema, table, changedColumn, idCols);
Set<SelectQuery> queries = dependencyToCellCacheMapping.remove(cell);
if (queries != null) {
for (SelectQuery query : queries) {
cellCache.invalidate(query);
logger.trace("Invalidated cell cache for query {} due to change in cell {}", query, cell);
}
}
}
}
}
}

private void cleanupCellCacheEntry(@NotNull SelectQuery query, @NotNull ReadCacheResult res) {
for (Cell dependency : res.getDependencies()) {
Set<SelectQuery> dependentQueries = dependencyToCellCacheMapping.get(dependency);
if (dependentQueries != null) {
dependentQueries.remove(query);
if (dependentQueries.isEmpty()) {
dependencyToCellCacheMapping.remove(dependency);
cells.add(new Cell(schema, table, changedColumn, idCols));
}
}
}
return cells;
}
}
Loading
Loading