From da11584e0da13564ced0ec48838cd26dc9898bfb Mon Sep 17 00:00:00 2001 From: taiyang-li Date: Fri, 26 Jun 2026 12:00:37 +0800 Subject: [PATCH] [GLUTEN][CORE] Pass Spark task attempt id and pool name from Java to native runtime/memory-manager This patch extends RuntimeJniWrapper.createRuntime and the NativeMemoryManagerJniWrapper create/hold/release JNI methods with extra parameters that propagate the Spark task attempt id (and a memory-pool name) from the JVM down to the native side. The native Runtime and NativeMemoryManager just store the values and expose simple getters; no behavior change for existing Velox/CH backends. This makes per-task diagnostics and per-pool logging possible, and unblocks future backends (e.g. Bolt) that need task-level identification at the native layer. Generated-by: Claude claude-sonnet-4-6 Co-Authored-By: Aime Change-Id: Iff1124ec0e8946f46dbbed096f9197ff45c9f433 --- cpp/core/compute/Runtime.h | 9 +++++++++ cpp/core/jni/JniWrapper.cc | 16 ++++++++++++---- cpp/core/memory/MemoryManager.h | 9 +++++++++ .../memory/NativeMemoryManagerJniWrapper.java | 6 +++--- .../apache/gluten/runtime/RuntimeJniWrapper.java | 2 +- .../gluten/memory/NativeMemoryManager.scala | 9 ++++++--- .../org/apache/gluten/runtime/Runtime.scala | 4 +++- 7 files changed, 43 insertions(+), 12 deletions(-) diff --git a/cpp/core/compute/Runtime.h b/cpp/core/compute/Runtime.h index 07120684d33..ea712ed7b57 100644 --- a/cpp/core/compute/Runtime.h +++ b/cpp/core/compute/Runtime.h @@ -194,6 +194,14 @@ class Runtime : public std::enable_shared_from_this { return objStore_->save(obj); } + int64_t taskAttemptId() const { + return taskAttemptId_; + } + + void setTaskAttemptId(int64_t id) { + taskAttemptId_ = id; + } + protected: std::string kind_; MemoryManager* memoryManager_; @@ -206,5 +214,6 @@ class Runtime : public std::enable_shared_from_this { std::optional taskInfo_{std::nullopt}; std::shared_ptr dumper_{nullptr}; + int64_t taskAttemptId_{-1}; }; } // namespace gluten diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc index efdfd36cba4..572fed33e66 100644 --- a/cpp/core/jni/JniWrapper.cc +++ b/cpp/core/jni/JniWrapper.cc @@ -345,7 +345,8 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_runtime_RuntimeJniWrapper_createR jstring jBackendType, jlong nmmHandle, jlong ntmHandle, - jbyteArray sessionConf) { + jbyteArray sessionConf, + jlong taskAttemptId) { JNI_METHOD_START MemoryManager* memoryManager = jniCastOrThrow(nmmHandle); ThreadManager* threadManager = jniCastOrThrow(ntmHandle); @@ -354,6 +355,7 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_runtime_RuntimeJniWrapper_createR auto backendType = jStringToCString(env, jBackendType); auto runtime = Runtime::create(backendType, memoryManager, threadManager, sparkConf); + runtime->setTaskAttemptId(static_cast(taskAttemptId)); return reinterpret_cast(runtime); JNI_METHOD_END(kInvalidObjectHandle) } @@ -378,13 +380,15 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_memory_NativeMemoryManagerJniWrap jclass, jstring jBackendType, jobject jListener, - jbyteArray sessionConf) { + jbyteArray sessionConf, + jstring jName) { JNI_METHOD_START JavaVM* vm; if (env->GetJavaVM(&vm) != JNI_OK) { throw GlutenException("Unable to get JavaVM instance"); } auto backendType = jStringToCString(env, jBackendType); + auto name = jStringToCString(env, jName); auto safeArray = getByteArrayElementsSafe(env, sessionConf); auto sparkConf = parseConfMap(env, safeArray.elems(), safeArray.length()); std::unique_ptr listener = std::make_unique(vm, jListener); @@ -393,6 +397,7 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_memory_NativeMemoryManagerJniWrap listener = std::make_unique(std::move(listener)); } MemoryManager* mm = MemoryManager::create(backendType, std::move(listener)); + mm->setName(name); return reinterpret_cast(mm); JNI_METHOD_END(-1L) } @@ -457,7 +462,9 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_memory_NativeMemoryManagerJniWrap JNIEXPORT void JNICALL Java_org_apache_gluten_memory_NativeMemoryManagerJniWrapper_hold( // NOLINT JNIEnv* env, jclass, - jlong nmmHandle) { + jlong nmmHandle, + jstring jName, + jlong taskAttemptId) { JNI_METHOD_START auto* memoryManager = jniCastOrThrow(nmmHandle); memoryManager->hold(); @@ -467,7 +474,8 @@ JNIEXPORT void JNICALL Java_org_apache_gluten_memory_NativeMemoryManagerJniWrapp JNIEXPORT void JNICALL Java_org_apache_gluten_memory_NativeMemoryManagerJniWrapper_release( // NOLINT JNIEnv* env, jclass, - jlong nmmHandle) { + jlong nmmHandle, + jlong taskAttemptId) { JNI_METHOD_START auto* memoryManager = jniCastOrThrow(nmmHandle); MemoryManager::release(memoryManager); diff --git a/cpp/core/memory/MemoryManager.h b/cpp/core/memory/MemoryManager.h index 93d875ad1e6..ee9da7ed516 100644 --- a/cpp/core/memory/MemoryManager.h +++ b/cpp/core/memory/MemoryManager.h @@ -39,6 +39,14 @@ class MemoryManager { return kind_; } + std::string name() const { + return name_; + } + + void setName(const std::string& name) { + name_ = name; + } + // Get the default Arrow memory pool for this memory manager. This memory pool is held by the memory manager. virtual arrow::MemoryPool* defaultArrowMemoryPool() = 0; @@ -58,6 +66,7 @@ class MemoryManager { private: std::string kind_; + std::string name_; }; } // namespace gluten diff --git a/gluten-arrow/src/main/java/org/apache/gluten/memory/NativeMemoryManagerJniWrapper.java b/gluten-arrow/src/main/java/org/apache/gluten/memory/NativeMemoryManagerJniWrapper.java index c23b9704eca..15dc8f3152c 100644 --- a/gluten-arrow/src/main/java/org/apache/gluten/memory/NativeMemoryManagerJniWrapper.java +++ b/gluten-arrow/src/main/java/org/apache/gluten/memory/NativeMemoryManagerJniWrapper.java @@ -22,13 +22,13 @@ public class NativeMemoryManagerJniWrapper { private NativeMemoryManagerJniWrapper() {} public static native long create( - String backendType, ReservationListener listener, byte[] sessionConf); + String backendType, ReservationListener listener, byte[] sessionConf, String name); public static native byte[] collectUsage(long handle); public static native long shrink(long handle, long size); - public static native void hold(long handle); + public static native void hold(long handle, String name, long taskAttemptId); - public static native void release(long handle); + public static native void release(long handle, long taskAttemptId); } diff --git a/gluten-arrow/src/main/java/org/apache/gluten/runtime/RuntimeJniWrapper.java b/gluten-arrow/src/main/java/org/apache/gluten/runtime/RuntimeJniWrapper.java index ed227596722..cb22250e4e8 100644 --- a/gluten-arrow/src/main/java/org/apache/gluten/runtime/RuntimeJniWrapper.java +++ b/gluten-arrow/src/main/java/org/apache/gluten/runtime/RuntimeJniWrapper.java @@ -21,7 +21,7 @@ public class RuntimeJniWrapper { private RuntimeJniWrapper() {} public static native long createRuntime( - String backendType, long nmm, long ntm, byte[] sessionConf); + String backendType, long nmm, long ntm, byte[] sessionConf, long taskAttemptId); public static native void releaseRuntime(long handle); } diff --git a/gluten-arrow/src/main/scala/org/apache/gluten/memory/NativeMemoryManager.scala b/gluten-arrow/src/main/scala/org/apache/gluten/memory/NativeMemoryManager.scala index 159e1bba5ec..ed3fba8ee37 100644 --- a/gluten-arrow/src/main/scala/org/apache/gluten/memory/NativeMemoryManager.scala +++ b/gluten-arrow/src/main/scala/org/apache/gluten/memory/NativeMemoryManager.scala @@ -23,6 +23,7 @@ import org.apache.gluten.memory.memtarget.{KnownNameAndStats, MemoryTarget, Spil import org.apache.gluten.proto.MemoryUsageStats import org.apache.gluten.utils.ConfigUtil +import org.apache.spark.TaskContext import org.apache.spark.memory.SparkMemoryUtil import org.apache.spark.sql.internal.{GlutenConfigUtil, SQLConf} import org.apache.spark.task.{TaskResource, TaskResources} @@ -54,7 +55,8 @@ object NativeMemoryManager { ConfigUtil.serialize( GlutenConfig .getNativeSessionConf(backendName, GlutenConfigUtil.parseConfig(SQLConf.get.getAllConfs)) - .asJava) + .asJava), + name ) spillers.append(new Spiller() { override def spill(self: MemoryTarget, phase: Spiller.Phase, size: Long): Long = phase match { @@ -76,7 +78,8 @@ object NativeMemoryManager { private val released: AtomicBoolean = new AtomicBoolean(false) override def addSpiller(spiller: Spiller): Unit = spillers.append(spiller) - override def hold(): Unit = NativeMemoryManagerJniWrapper.hold(handle) + override def hold(): Unit = + NativeMemoryManagerJniWrapper.hold(handle, name, TaskContext.get().taskAttemptId()) override def getHandle(): Long = handle override def release(): Unit = { if (!released.compareAndSet(false, true)) { @@ -97,7 +100,7 @@ object NativeMemoryManager { LOGGER.debug("About to release memory manager, " + dump()) } - NativeMemoryManagerJniWrapper.release(handle) + NativeMemoryManagerJniWrapper.release(handle, TaskContext.get().taskAttemptId()) if (rl.getUsedBytes != 0) { LOGGER.warn( diff --git a/gluten-arrow/src/main/scala/org/apache/gluten/runtime/Runtime.scala b/gluten-arrow/src/main/scala/org/apache/gluten/runtime/Runtime.scala index 2e3c4681400..18b7589de25 100644 --- a/gluten-arrow/src/main/scala/org/apache/gluten/runtime/Runtime.scala +++ b/gluten-arrow/src/main/scala/org/apache/gluten/runtime/Runtime.scala @@ -22,6 +22,7 @@ import org.apache.gluten.memory.NativeMemoryManager import org.apache.gluten.threads.{NativeThreadManager, TaskChildThreadInitializer} import org.apache.gluten.utils.ConfigUtil +import org.apache.spark.TaskContext import org.apache.spark.sql.internal.{GlutenConfigUtil, SQLConf} import org.apache.spark.task.{TaskResource, TaskResources} @@ -64,7 +65,8 @@ object Runtime { (GlutenConfig .getNativeSessionConf( backendName, - GlutenConfigUtil.parseConfig(SQLConf.get.getAllConfs)) ++ extraConf.asScala).asJava) + GlutenConfigUtil.parseConfig(SQLConf.get.getAllConfs)) ++ extraConf.asScala).asJava), + TaskContext.get().taskAttemptId() ) private val released: AtomicBoolean = new AtomicBoolean(false)