Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ object HadoopConf {
CommonVars("wds.linkis.hadoop.hdfs.cache.max.time", new TimeType("12h")).getValue.toLong

/**
* Temporary directory for keytab files when LINKIS_KEYTAB_SWITCH is enabled
* 默认使用系统临时目录下的 keytab 子目录
* Temporary directory for keytab files when LINKIS_KEYTAB_SWITCH is enabled 默认使用系统临时目录下的 keytab
* 子目录
*/
val KEYTAB_TEMP_DIR = CommonVars("linkis.keytab.temp.dir", "/tmp/keytab")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import org.apache.linkis.hadoop.common.conf.HadoopConf
import org.apache.linkis.hadoop.common.conf.HadoopConf._
import org.apache.linkis.hadoop.common.entity.HDFSFileSystemContainer

import com.google.common.cache.{CacheBuilder, LoadingCache, RemovalCause, RemovalListener, RemovalNotification}
import org.apache.commons.io.IOUtils
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.conf.Configuration
Expand All @@ -40,6 +39,14 @@ import java.util.concurrent.atomic.AtomicLong

import scala.collection.JavaConverters._

import com.google.common.cache.{
CacheBuilder,
LoadingCache,
RemovalCause,
RemovalListener,
RemovalNotification
}

object HDFSUtils extends Logging {

private val fileSystemCache: java.util.Map[String, HDFSFileSystemContainer] =
Expand All @@ -52,9 +59,9 @@ object HDFSUtils extends Logging {
val key = notification.getKey
val path = notification.getValue
val cause = notification.getCause

logger.info(s"Keytab cache entry removed: $key, cause: $cause")

// 当缓存项被移除时,清理对应的临时文件
if (path != null) {
val file = new File(path)
Expand All @@ -69,7 +76,8 @@ object HDFSUtils extends Logging {
}
}

CacheBuilder.newBuilder()
CacheBuilder
.newBuilder()
.maximumSize(1000) // 最大缓存项数量
.expireAfterAccess(24, TimeUnit.HOURS) // 24小时未访问过期
.removalListener(removalListener)
Expand Down Expand Up @@ -525,7 +533,10 @@ object HDFSUtils extends Logging {
// 确保keytab临时目录存在
if (!Files.exists(keytabTempDir)) {
Files.createDirectories(keytabTempDir)
Files.setPosixFilePermissions(keytabTempDir, PosixFilePermissions.fromString("rwxr-xr-x"))
Files.setPosixFilePermissions(
keytabTempDir,
PosixFilePermissions.fromString("rwxr-xr-x")
)
}

val cachedPath = keytabTempFileCache.getIfPresent(cacheKey)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,18 @@

package org.apache.linkis.hadoop.common.utils

import org.junit.jupiter.api.{AfterAll, AfterEach, BeforeAll, DisplayName, Test}
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNotNull, assertTrue}

import java.io.File
import java.nio.file.{Files, Paths, StandardOpenOption}
import java.util.concurrent.{ConcurrentHashMap, Executors, TimeUnit}

import scala.collection.JavaConverters._

import org.junit.jupiter.api.{AfterAll, AfterEach, BeforeAll, DisplayName, Test}
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNotNull, assertTrue}

/**
* Unit tests for keytab file cache optimization in HDFSUtils.
* This test validates that the caching mechanism reduces Full GC by avoiding
* repeated creation of temporary keytab files.
* Unit tests for keytab file cache optimization in HDFSUtils. This test validates that the caching
* mechanism reduces Full GC by avoiding repeated creation of temporary keytab files.
*/
@DisplayName("HDFSUtils Keytab Cache Test")
class HDFSUtilsKeytabCacheTest {
Expand All @@ -40,7 +40,10 @@ class HDFSUtilsKeytabCacheTest {
@BeforeAll
def setupClass(): Unit = {
// Create test directory for keytab files
testKeytabDir = new File(System.getProperty("java.io.tmpdir"), "test_keytab_cache_" + System.currentTimeMillis())
testKeytabDir = new File(
System.getProperty("java.io.tmpdir"),
"test_keytab_cache_" + System.currentTimeMillis()
)
testKeytabDir.mkdirs()

// Create a dummy encrypted keytab file for testing
Expand Down Expand Up @@ -77,7 +80,8 @@ class HDFSUtilsKeytabCacheTest {
try {
val cacheMethod = HDFSUtils.getClass.getDeclaredMethod("keytabFileCache")
cacheMethod.setAccessible(true)
val cache = cacheMethod.invoke(HDFSUtils).asInstanceOf[ConcurrentHashMap[String, java.nio.file.Path]]
val cache =
cacheMethod.invoke(HDFSUtils).asInstanceOf[ConcurrentHashMap[String, java.nio.file.Path]]
cache.asScala.foreach { case (_, path) =>
try {
Files.deleteIfExists(path)
Expand Down Expand Up @@ -111,7 +115,8 @@ class HDFSUtilsKeytabCacheTest {
val label = null

// Verify cache key generation is consistent
val keyMethod = HDFSUtils.getClass.getDeclaredMethod("createKeytabCacheKey", classOf[String], classOf[String])
val keyMethod =
HDFSUtils.getClass.getDeclaredMethod("createKeytabCacheKey", classOf[String], classOf[String])
keyMethod.setAccessible(true)
val key1 = keyMethod.invoke(HDFSUtils, userName, label).asInstanceOf[String]
val key2 = keyMethod.invoke(HDFSUtils, userName, label).asInstanceOf[String]
Expand All @@ -126,7 +131,8 @@ class HDFSUtilsKeytabCacheTest {
val user2 = "testuser2"
val label = null

val keyMethod = HDFSUtils.getClass.getDeclaredMethod("createKeytabCacheKey", classOf[String], classOf[String])
val keyMethod =
HDFSUtils.getClass.getDeclaredMethod("createKeytabCacheKey", classOf[String], classOf[String])
keyMethod.setAccessible(true)
val key1 = keyMethod.invoke(HDFSUtils, user1, label).asInstanceOf[String]
val key2 = keyMethod.invoke(HDFSUtils, user2, label).asInstanceOf[String]
Expand All @@ -143,7 +149,8 @@ class HDFSUtilsKeytabCacheTest {
val label1 = "cluster1"
val label2 = "cluster2"

val keyMethod = HDFSUtils.getClass.getDeclaredMethod("createKeytabCacheKey", classOf[String], classOf[String])
val keyMethod =
HDFSUtils.getClass.getDeclaredMethod("createKeytabCacheKey", classOf[String], classOf[String])
keyMethod.setAccessible(true)
val key1 = keyMethod.invoke(HDFSUtils, userName, label1).asInstanceOf[String]
val key2 = keyMethod.invoke(HDFSUtils, userName, label2).asInstanceOf[String]
Expand All @@ -160,7 +167,8 @@ class HDFSUtilsKeytabCacheTest {
val label = null
val threadCount = 10

val keyMethod = HDFSUtils.getClass.getDeclaredMethod("createKeytabCacheKey", classOf[String], classOf[String])
val keyMethod =
HDFSUtils.getClass.getDeclaredMethod("createKeytabCacheKey", classOf[String], classOf[String])
keyMethod.setAccessible(true)

val executor = Executors.newFixedThreadPool(threadCount)
Expand Down Expand Up @@ -195,7 +203,8 @@ class HDFSUtilsKeytabCacheTest {
val label1 = null
val label2 = "default"

val keyMethod = HDFSUtils.getClass.getDeclaredMethod("createKeytabCacheKey", classOf[String], classOf[String])
val keyMethod =
HDFSUtils.getClass.getDeclaredMethod("createKeytabCacheKey", classOf[String], classOf[String])
keyMethod.setAccessible(true)
val key1 = keyMethod.invoke(HDFSUtils, userName, label1).asInstanceOf[String]
val key2 = keyMethod.invoke(HDFSUtils, userName, label2).asInstanceOf[String]
Expand Down Expand Up @@ -224,4 +233,5 @@ class HDFSUtilsKeytabCacheTest {
case _: Exception => // Field may not be accessible
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
import org.apache.linkis.common.ServiceInstance;
import org.apache.linkis.common.conf.Configuration;
import org.apache.linkis.entrance.EntranceServer;
import org.apache.linkis.entrance.scheduler.EntranceSchedulerContext;
import org.apache.linkis.entrance.conf.EntranceConfiguration;
import org.apache.linkis.entrance.protocol.EntranceGroupCacheClearBroadcast;
import org.apache.linkis.entrance.scheduler.EntranceSchedulerContext;
import org.apache.linkis.instance.label.client.InstanceLabelClient;
import org.apache.linkis.manager.label.constant.LabelKeyConstant;
import org.apache.linkis.manager.label.constant.LabelValueConstant;
Expand Down Expand Up @@ -149,8 +149,8 @@ public Message backOnline(HttpServletRequest req) {
try {
// 构造广播消息
EntranceGroupCacheClearBroadcast broadcast =
new EntranceGroupCacheClearBroadcast(
Sender.getThisInstance(), System.currentTimeMillis());
new EntranceGroupCacheClearBroadcast(
Sender.getThisInstance(), System.currentTimeMillis());
// 获取entrance服务的Sender并发送广播
Sender.getSender(Sender.getThisServiceInstance()).send(broadcast);
logger.info("Successfully sent cache clear broadcast for entrance offline");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,8 @@ object EntranceConfiguration {
val HIVE_LOCATION_CONTROL_WHITELIST_CREATORS: CommonVars[String] =
CommonVars("wds.linkis.hive.location.control.whitelist.creators", "")

/** Entrance Group缓存清理功能总开关
/**
* Entrance Group缓存清理功能总开关
*
* 控制以下功能是否启用:
* 1. Entrance offline时发送Group缓存清理广播 2. 接收并处理Group缓存清理广播 3. 手动清理Group缓存API
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.linkis.entrance.conf.EntranceConfiguration
import org.apache.linkis.entrance.interceptor.EntranceInterceptor
import org.apache.linkis.entrance.interceptor.exception.CodeCheckException
import org.apache.linkis.governance.common.entity.job.JobRequest
import org.apache.linkis.manager.label.entity.engine.EngineType
import org.apache.linkis.manager.label.utils.LabelUtil

import org.apache.commons.lang3.StringUtils
Expand Down Expand Up @@ -50,7 +51,7 @@ class SQLCodeCheckInterceptor extends EntranceInterceptor with Logging {
// Only check if: 1. Hive engine 2. Feature enabled 3. Creator NOT in whitelist
val engineType = LabelUtil.getEngineTypeLabel(jobRequest.getLabels).getEngineType
if (
"hive".equalsIgnoreCase(engineType) &&
EngineType.HIVE.toString.equalsIgnoreCase(engineType) &&
EntranceConfiguration.HIVE_LOCATION_CONTROL_ENABLE.getValue &&
!isCreatorWhitelisted(LabelUtil.getUserCreatorLabel(jobRequest.getLabels).getCreator)
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,7 @@ class EntranceGroupFactory extends GroupFactory with Logging {
* 清除所有Group缓存
*
* 调用时机:
* 1. 接收到EntranceGroupCacheClearBroadcast广播时(需功能开关启用)
* 2. 手动清除缓存(如管理API)
* 1. 接收到EntranceGroupCacheClearBroadcast广播时(需功能开关启用) 2. 手动清除缓存(如管理API)
*
* 线程安全:Guava Cache的invalidateAll()是原子操作,支持并发调用
*
Expand All @@ -162,7 +161,7 @@ class EntranceGroupFactory extends GroupFactory with Logging {
} catch {
case e: Exception =>
logger.error("Failed to clear Group cache", e)
// 不抛出异常,避免影响调用方
// 不抛出异常,避免影响调用方
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.linkis.entrance.interceptor.impl;

import org.apache.linkis.common.conf.BDPConfiguration;

import org.apache.linkis.governance.common.entity.job.JobRequest;

import org.junit.jupiter.api.Assertions;
Expand Down
Loading
Loading