Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
256 changes: 183 additions & 73 deletions src/main/java/net/jodah/expiringmap/ExpiringMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.SortedSet;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Random;
import java.util.Set;
import java.util.SortedSet;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CopyOnWriteArrayList;
Expand Down Expand Up @@ -75,9 +76,15 @@
* @param <V> Value type
*/
public class ExpiringMap<K, V> implements ConcurrentMap<K, V> {
static volatile ScheduledExecutorService EXPIRER;
static volatile ThreadPoolExecutor LISTENER_SERVICE;
static ThreadFactory THREAD_FACTORY;
static volatile ScheduledExecutorService EXPIRER;
static volatile ThreadPoolExecutor LISTENER_SERVICE;
static ThreadFactory THREAD_FACTORY;
private static final ThreadLocal<Random> RNG = new ThreadLocal<Random>() {
@Override
protected Random initialValue() {
return new Random();
}
};

List<ExpirationListener<K, V>> expirationListeners;
List<ExpirationListener<K, V>> asyncExpirationListeners;
Expand All @@ -88,10 +95,14 @@ public class ExpiringMap<K, V> implements ConcurrentMap<K, V> {
private final ExpiringEntryLoader<? super K, ? extends V> expiringEntryLoader;
private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private final Lock readLock = readWriteLock.readLock();
private final Lock writeLock = readWriteLock.writeLock();
/** Guarded by "readWriteLock" */
private final EntryMap<K, V> entries;
private final boolean variableExpiration;
private final Lock writeLock = readWriteLock.writeLock();
/** Guarded by "readWriteLock" */
private final EntryMap<K, V> entries;
private final boolean variableExpiration;
private final double jitterPercent;
private final double probabilisticProbability;
private final double probabilisticWindowPercent;
private final boolean probabilisticExpirationEnabled;

/**
* Sets the {@link ThreadFactory} that is used to create expiration and listener callback threads for all ExpiringMap
Expand Down Expand Up @@ -126,14 +137,18 @@ private ExpiringMap(final Builder<K, V> builder) {
entries = variableExpiration ? new EntryTreeHashMap<K, V>() : new EntryLinkedHashMap<K, V>();
if (builder.expirationListeners != null)
expirationListeners = new CopyOnWriteArrayList<ExpirationListener<K, V>>(builder.expirationListeners);
if (builder.asyncExpirationListeners != null)
asyncExpirationListeners = new CopyOnWriteArrayList<ExpirationListener<K, V>>(builder.asyncExpirationListeners);
expirationPolicy = new AtomicReference<ExpirationPolicy>(builder.expirationPolicy);
expirationNanos = new AtomicLong(TimeUnit.NANOSECONDS.convert(builder.duration, builder.timeUnit));
maxSize = builder.maxSize;
entryLoader = builder.entryLoader;
expiringEntryLoader = builder.expiringEntryLoader;
}
if (builder.asyncExpirationListeners != null)
asyncExpirationListeners = new CopyOnWriteArrayList<ExpirationListener<K, V>>(builder.asyncExpirationListeners);
expirationPolicy = new AtomicReference<ExpirationPolicy>(builder.expirationPolicy);
expirationNanos = new AtomicLong(TimeUnit.NANOSECONDS.convert(builder.duration, builder.timeUnit));
jitterPercent = builder.jitterPercent;
probabilisticProbability = builder.probabilisticProbability;
probabilisticWindowPercent = builder.probabilisticWindowPercent;
probabilisticExpirationEnabled = probabilisticProbability > 0.0 && probabilisticWindowPercent > 0.0;
maxSize = builder.maxSize;
entryLoader = builder.entryLoader;
expiringEntryLoader = builder.expiringEntryLoader;
}

/**
* Builds ExpiringMap instances. Defaults to ExpirationPolicy.CREATED, expiration of 60 TimeUnit.SECONDS and
Expand All @@ -143,12 +158,15 @@ public static final class Builder<K, V> {
private ExpirationPolicy expirationPolicy = ExpirationPolicy.CREATED;
private List<ExpirationListener<K, V>> expirationListeners;
private List<ExpirationListener<K, V>> asyncExpirationListeners;
private TimeUnit timeUnit = TimeUnit.SECONDS;
private boolean variableExpiration;
private long duration = 60;
private int maxSize = Integer.MAX_VALUE;
private EntryLoader<K, V> entryLoader;
private ExpiringEntryLoader<K, V> expiringEntryLoader;
private TimeUnit timeUnit = TimeUnit.SECONDS;
private boolean variableExpiration;
private long duration = 60;
private int maxSize = Integer.MAX_VALUE;
private EntryLoader<K, V> entryLoader;
private ExpiringEntryLoader<K, V> expiringEntryLoader;
private double jitterPercent;
private double probabilisticProbability;
private double probabilisticWindowPercent;

/**
* Creates a new Builder object.
Expand All @@ -174,12 +192,42 @@ public <K1 extends K, V1 extends V> ExpiringMap<K1, V1> build() {
* @param timeUnit the unit that {@code duration} is expressed in
* @throws NullPointerException if {@code timeUnit} is null
*/
public Builder<K, V> expiration(long duration, TimeUnit timeUnit) {
this.duration = duration;
this.timeUnit = Assert.notNull(timeUnit, "timeUnit");
return this;
}

public Builder<K, V> expiration(long duration, TimeUnit timeUnit) {
this.duration = duration;
this.timeUnit = Assert.notNull(timeUnit, "timeUnit");
return this;
}

/**
* Adds random jitter to the base expiration duration. A jitter percent of
* 0.2 applies +/-20% to the configured duration.
*
* @param jitterPercent percentage of the base duration to jitter, in [0, 1)
*/
public Builder<K, V> expirationJitter(double jitterPercent) {
if (Double.isNaN(jitterPercent) || jitterPercent < 0.0 || jitterPercent >= 1.0)
throw new IllegalArgumentException("jitterPercent must be >= 0 and < 1");
this.jitterPercent = jitterPercent;
return this;
}

/**
* Enables probabilistic early expiration when an entry is within the given
* window of its TTL.
*
* @param probability chance to expire early on access, in [0, 1]
* @param windowPercent portion of TTL defining the early-expiration window, in [0, 1]
*/
public Builder<K, V> probabilisticExpiration(double probability, double windowPercent) {
if (Double.isNaN(probability) || probability < 0.0 || probability > 1.0)
throw new IllegalArgumentException("probability must be between 0 and 1");
if (Double.isNaN(windowPercent) || windowPercent < 0.0 || windowPercent > 1.0)
throw new IllegalArgumentException("windowPercent must be between 0 and 1");
this.probabilisticProbability = probability;
this.probabilisticWindowPercent = windowPercent;
return this;
}

/**
* Sets the maximum size of the map. Once this size has been reached, adding an additional entry will expire the
* first entry in line for expiration based on the expiration policy.
Expand Down Expand Up @@ -506,10 +554,11 @@ public final Map.Entry<K, V> next() {
}

/** Expiring map entry implementation. */
static class ExpiringEntry<K, V> implements Comparable<ExpiringEntry<K, V>> {
final AtomicLong expirationNanos;
/** Epoch time at which the entry is expected to expire */
final AtomicLong expectedExpiration;
static class ExpiringEntry<K, V> implements Comparable<ExpiringEntry<K, V>> {
final AtomicLong expirationNanos;
final double jitterPercent;
/** Epoch time at which the entry is expected to expire */
final AtomicLong expectedExpiration;
final AtomicReference<ExpirationPolicy> expirationPolicy;
final K key;
/** Guarded by "this" */
Expand All @@ -527,14 +576,16 @@ static class ExpiringEntry<K, V> implements Comparable<ExpiringEntry<K, V>> {
* @param expirationPolicy for the entry
* @param expirationNanos for the entry
*/
ExpiringEntry(K key, V value, AtomicReference<ExpirationPolicy> expirationPolicy, AtomicLong expirationNanos) {
this.key = key;
this.value = value;
this.expirationPolicy = expirationPolicy;
this.expirationNanos = expirationNanos;
this.expectedExpiration = new AtomicLong();
resetExpiration();
}
ExpiringEntry(K key, V value, AtomicReference<ExpirationPolicy> expirationPolicy, AtomicLong expirationNanos,
double jitterPercent) {
this.key = key;
this.value = value;
this.expirationPolicy = expirationPolicy;
this.expirationNanos = expirationNanos;
this.jitterPercent = jitterPercent;
this.expectedExpiration = new AtomicLong();
resetExpiration();
}

@Override
public int compareTo(ExpiringEntry<K, V> other) {
Expand Down Expand Up @@ -597,9 +648,11 @@ synchronized V getValue() {
}

/** Resets the entry's expected expiration. */
void resetExpiration() {
expectedExpiration.set(expirationNanos.get() + System.nanoTime());
}
void resetExpiration() {
long duration = expirationNanos.get();
long jittered = applyJitter(duration, jitterPercent);
expectedExpiration.set(jittered + System.nanoTime());
}

/** Marks the entry as scheduled. */
synchronized void schedule(Future<?> entryFuture) {
Expand All @@ -608,16 +661,28 @@ synchronized void schedule(Future<?> entryFuture) {
}

/** Sets the entry value. */
synchronized void setValue(V value) {
this.value = value;
}
}

/**
* Creates an ExpiringMap builder.
*
* @return New ExpiringMap builder
*/
synchronized void setValue(V value) {
this.value = value;
}
}

private static long applyJitter(long duration, double jitterPercent) {
if (jitterPercent <= 0.0 || duration <= 0)
return duration;
double delta = (RNG.get().nextDouble() * 2.0 - 1.0) * (duration * jitterPercent);
double result = duration + delta;
if (result >= Long.MAX_VALUE)
return Long.MAX_VALUE;
if (result <= 0.0)
return 0;
return (long) result;
}

/**
* Creates an ExpiringMap builder.
*
* @return New ExpiringMap builder
*/
public static Builder<Object, Object> builder() {
return new Builder<Object, Object>();
}
Expand Down Expand Up @@ -749,20 +814,65 @@ public boolean equals(Object obj) {
}
}

@Override
@SuppressWarnings("unchecked")
public V get(Object key) {
ExpiringEntry<K, V> entry = getEntry(key);

if (entry == null) {
return load((K) key);
} else if (ExpirationPolicy.ACCESSED.equals(entry.expirationPolicy.get()))
resetEntry(entry, false);

return entry.getValue();
}

private V load(K key) {
@Override
@SuppressWarnings("unchecked")
public V get(Object key) {
ExpiringEntry<K, V> entry = getEntry(key);

if (entry == null) {
return load((K) key);
} else if (shouldExpireEarly(entry)) {
if (expireEntryEarly(entry))
return load((K) key);
}

if (ExpirationPolicy.ACCESSED.equals(entry.expirationPolicy.get()))
resetEntry(entry, false);

return entry.getValue();
}

private boolean shouldExpireEarly(ExpiringEntry<K, V> entry) {
if (!probabilisticExpirationEnabled)
return false;

long duration = entry.expirationNanos.get();
if (duration <= 0)
return false;

long window = (long) (duration * probabilisticWindowPercent);
if (window <= 0)
return false;

long remaining = entry.expectedExpiration.get() - System.nanoTime();
if (remaining <= 0 || remaining > window)
return false;

return RNG.get().nextDouble() < probabilisticProbability;
}

private boolean expireEntryEarly(ExpiringEntry<K, V> entry) {
boolean expired = false;
writeLock.lock();
try {
ExpiringEntry<K, V> current = entries.get(entry.key);
if (current == null || current != entry)
return false;

entries.remove(entry.key);
if (entry.cancel())
scheduleEntry(entries.first());
expired = true;
} finally {
writeLock.unlock();
}

if (expired)
notifyListeners(entry);
return expired;
}

private V load(K key) {
if (entryLoader == null && expiringEntryLoader == null)
return null;

Expand Down Expand Up @@ -1292,9 +1402,9 @@ V putInternal(K key, V value, ExpirationPolicy expirationPolicy, long expiration
V oldValue = null;

if (entry == null) {
entry = new ExpiringEntry<K, V>(key, value,
variableExpiration ? new AtomicReference<ExpirationPolicy>(expirationPolicy) : this.expirationPolicy,
variableExpiration ? new AtomicLong(expirationNanos) : this.expirationNanos);
entry = new ExpiringEntry<K, V>(key, value,
variableExpiration ? new AtomicReference<ExpirationPolicy>(expirationPolicy) : this.expirationPolicy,
variableExpiration ? new AtomicLong(expirationNanos) : this.expirationNanos, jitterPercent);
if (entries.size() >= maxSize) {
ExpiringEntry<K, V> expiredEntry = entries.first();
entries.remove(expiredEntry.key);
Expand Down
39 changes: 39 additions & 0 deletions src/test/java/net/jodah/expiringmap/issues/Issue91.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package net.jodah.expiringmap.issues;

import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;

import java.util.concurrent.TimeUnit;

import org.testng.annotations.Test;

import net.jodah.expiringmap.ExpiringMap;

@Test
public class Issue91 {
public void testProbabilisticEarlyExpiration() throws Exception {
ExpiringMap<String, String> map = ExpiringMap.builder()
.expiration(200, TimeUnit.MILLISECONDS)
.probabilisticExpiration(1.0, 1.0)
.build();

map.put("k", "v");
Thread.sleep(10);

assertNull(map.get("k"));
}

public void testExpirationJitterWithinBounds() {
long baseMillis = 1000;
ExpiringMap<String, String> map = ExpiringMap.builder()
.expiration(baseMillis, TimeUnit.MILLISECONDS)
.expirationJitter(0.2)
.build();

map.put("k", "v");
long expected = map.getExpectedExpiration("k");

assertTrue(expected <= 1200, "expected expiration should not exceed base + jitter");
assertTrue(expected >= 700, "expected expiration should not be negative or too small");
}
}