Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,20 @@ public class ConfigOptions {
"This configuration controls the directory where fluss will store its data. "
+ "The default value is /tmp/fluss-data");

public static final ConfigOption<List<String>> DATA_DIRS =
key("data.dirs")
.stringType()
.asList()
.noDefaultValue()
.withDescription(
"A comma-separated list of local directories used by TabletServer to store "
+ "local log, kv, checkpoints, and other node-local files. "
+ "If configured, this option takes precedence over `"
+ DATA_DIR.key()
+ "`. If not configured, `"
+ DATA_DIR.key()
+ "` is used as the only local data directory.");

public static final ConfigOption<Duration> WRITER_ID_EXPIRATION_TIME =
key("server.writer-id.expiration-time")
.durationType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -71,7 +73,7 @@ public enum TabletType {
KV
}

protected final File dataDir;
protected final List<File> dataDirs;

protected final Configuration conf;

Expand All @@ -83,22 +85,31 @@ public enum TabletType {
private final String tabletDirPrefix;

public TabletManagerBase(
TabletType tabletType, File dataDir, Configuration conf, int recoveryThreads) {
TabletType tabletType, List<File> dataDirs, Configuration conf, int recoveryThreads) {
this.tabletType = tabletType;
this.tabletDirPrefix = getTabletDirPrefix(tabletType);
this.dataDir = dataDir;
this.dataDirs = new ArrayList<>(dataDirs);
this.conf = conf;
this.recoveryThreads = recoveryThreads;
}

/**
* Return the directories of the tablets to be loaded.
* Return the directories of the tablets to be loaded, grouped by configured data directory.
*
* <p>See more about the local directory contracts: {@link FlussPaths#logTabletDir(File,
* PhysicalTablePath, TableBucket)} and {@link FlussPaths#kvTabletDir(File, PhysicalTablePath,
* TableBucket)}.
*/
protected List<File> listTabletsToLoad() {
protected Map<File, List<File>> listTabletsToLoad() {
Map<File, List<File>> tabletsToLoadByDataDir = new LinkedHashMap<>();
for (File dataDir : dataDirs) {
tabletsToLoadByDataDir.put(dataDir, listTabletsToLoad(dataDir));
}
return tabletsToLoadByDataDir;
}

/** Returns the tablet directories to be loaded from a single configured data directory. */
protected List<File> listTabletsToLoad(File dataDir) {
List<File> tabletsToLoad = new ArrayList<>();
// Get all database directory.
File[] dbDirs = FileUtils.listDirectories(dataDir);
Expand Down Expand Up @@ -140,6 +151,11 @@ protected ExecutorService createThreadPool(String poolName) {
return Executors.newFixedThreadPool(recoveryThreads, new ExecutorThreadFactory(poolName));
}

protected ExecutorService createThreadPoolByDir(String poolName, File dataDir) {
return Executors.newSingleThreadExecutor(
new ExecutorThreadFactory(poolName + "-" + dataDir.getAbsolutePath()));
}

/** Running a series of jobs in a thread pool, and return the count of the successful job. */
protected int runInThreadPool(Runnable[] runnableJobs, String poolName) throws Throwable {
List<Future<?>> jobsForTabletDir = new ArrayList<>();
Expand All @@ -164,29 +180,34 @@ protected int runInThreadPool(Runnable[] runnableJobs, String poolName) throws T
}

/**
* Get the tablet directory with given directory name for the given table path and table bucket.
* Get the tablet directory with given directory name for the given data directory, table path
* and table bucket.
*
* <p>When the parent directory of the tablet directory is missing, it will create the
* directory.
*
* @param dataDir the local data directory chosen for this tablet
* @param tablePath the table path of the bucket
* @param tableBucket the table bucket
* @return the tablet directory
*/
protected File getOrCreateTabletDir(PhysicalTablePath tablePath, TableBucket tableBucket) {
File tabletDir = getTabletDir(tablePath, tableBucket);
protected File getOrCreateTabletDir(
File dataDir, PhysicalTablePath tablePath, TableBucket tableBucket) {
File tabletDir = getTabletDir(dataDir, tablePath, tableBucket);
if (tabletDir.exists()) {
return tabletDir;
}
createTabletDirectory(tabletDir);
return tabletDir;
}

public Path getTabletParentDir(PhysicalTablePath tablePath, TableBucket tableBucket) {
return getTabletDir(tablePath, tableBucket).toPath().getParent();
public Path getTabletParentDir(
File dataDir, PhysicalTablePath tablePath, TableBucket tableBucket) {
return getTabletDir(dataDir, tablePath, tableBucket).toPath().getParent();
}

protected File getTabletDir(PhysicalTablePath tablePath, TableBucket tableBucket) {
protected File getTabletDir(
File dataDir, PhysicalTablePath tablePath, TableBucket tableBucket) {
switch (tabletType) {
case LOG:
return FlussPaths.logTabletDir(dataDir, tablePath, tableBucket);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.fluss.server.log.LogManager;
import org.apache.fluss.server.log.LogTablet;
import org.apache.fluss.server.metrics.group.TabletServerMetricGroup;
import org.apache.fluss.server.storage.LocalDiskManager;
import org.apache.fluss.server.zk.ZooKeeperClient;
import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BufferAllocator;
import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.RootAllocator;
Expand Down Expand Up @@ -110,6 +111,7 @@ public static RateLimiter getDefaultRateLimiter() {
}

private final LogManager logManager;
private final LocalDiskManager localDiskManager;

private final TabletServerMetricGroup serverMetricGroup;

Expand Down Expand Up @@ -141,14 +143,15 @@ public static RateLimiter getDefaultRateLimiter() {
private volatile boolean isShutdown = false;

private KvManager(
File dataDir,
LocalDiskManager localDiskManager,
Configuration conf,
ZooKeeperClient zkClient,
int recoveryThreadsPerDataDir,
LogManager logManager,
TabletServerMetricGroup tabletServerMetricGroup)
throws IOException {
super(TabletType.KV, dataDir, conf, recoveryThreadsPerDataDir);
super(TabletType.KV, localDiskManager.dataDirs(), conf, recoveryThreadsPerDataDir);
this.localDiskManager = localDiskManager;
this.logManager = logManager;
this.arrowBufferAllocator = new RootAllocator(Long.MAX_VALUE);
this.memorySegmentPool = LazyMemorySegmentPool.createServerBufferPool(conf);
Expand Down Expand Up @@ -183,12 +186,11 @@ public static KvManager create(
Configuration conf,
ZooKeeperClient zkClient,
LogManager logManager,
TabletServerMetricGroup tabletServerMetricGroup)
TabletServerMetricGroup tabletServerMetricGroup,
LocalDiskManager localDiskManager)
throws IOException {
String dataDirString = conf.getString(ConfigOptions.DATA_DIR);
File dataDir = new File(dataDirString).getAbsoluteFile();
return new KvManager(
dataDir,
localDiskManager,
conf,
zkClient,
conf.getInt(ConfigOptions.NETTY_SERVER_NUM_WORKER_THREADS),
Expand Down Expand Up @@ -233,6 +235,7 @@ public void shutdown() {
* @param kvFormat the kv format
*/
public KvTablet getOrCreateKv(
File dataDir,
PhysicalTablePath tablePath,
TableBucket tableBucket,
LogTablet logTablet,
Expand All @@ -248,7 +251,7 @@ public KvTablet getOrCreateKv(
return currentKvs.get(tableBucket);
}

File tabletDir = getOrCreateTabletDir(tablePath, tableBucket);
File tabletDir = getOrCreateTabletDir(dataDir, tablePath, tableBucket);
RowMerger merger = RowMerger.create(tableConfig, kvFormat, schemaGetter);
AutoIncrementManager autoIncrementManager =
new AutoIncrementManager(
Expand Down Expand Up @@ -294,8 +297,9 @@ public KvTablet getOrCreateKv(
* @param tableBucket the table bucket
* @return the tablet directory
*/
public File createTabletDir(PhysicalTablePath tablePath, TableBucket tableBucket) {
File tabletDir = getTabletDir(tablePath, tableBucket);
public File createTabletDir(
File dataDir, PhysicalTablePath tablePath, TableBucket tableBucket) {
File tabletDir = getTabletDir(dataDir, tablePath, tableBucket);

// delete the tablet dir if exists
FileUtils.deleteDirectoryQuietly(tabletDir);
Expand Down
Loading
Loading