diff --git a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/conf/Configuration.scala b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/conf/Configuration.scala index b27cab796b7..bb4bb665777 100644 --- a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/conf/Configuration.scala +++ b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/conf/Configuration.scala @@ -57,6 +57,9 @@ object Configuration extends Logging { val CLOUD_CONSOLE_VARIABLE_SPRING_APPLICATION_NAME = CommonVars("wds.linkis.console.variable.application.name", "linkis-ps-publicservice") + val MANAGER_SPRING_APPLICATION_NAME = + CommonVars("wds.linkis.engineconn.manager.name", "linkis-cg-linkismanager") + val JOBHISTORY_SPRING_APPLICATION_NAME = CommonVars("wds.linkis.jobhistory.application.name", "linkis-ps-jobhistory") @@ -104,6 +107,34 @@ object Configuration extends Logging { val METRICS_INCREMENTAL_UPDATE_ENABLE = CommonVars[Boolean]("linkis.jobhistory.metrics.incremental.update.enable", false) + /** + * Whether to enable secondary queue feature Default: true Description: true enables smart queue + * selection, false disables the feature + */ + val SECONDARY_QUEUE_ENABLED: CommonVars[Boolean] = + CommonVars.apply("wds.linkis.rm.secondary.yarnqueue.enable", false) + + /** + * Secondary queue resource usage threshold Default: 0.9 (90%) Description: Use secondary queue + * when usage <= this value, use primary queue when usage > this value + */ + val SECONDARY_QUEUE_THRESHOLD: CommonVars[Double] = + CommonVars.apply("wds.linkis.rm.secondary.yarnqueue.threshold", 0.9) + + /** + * Supported engine type list (comma-separated) Default: spark Description: Only engines in this + * list will execute smart queue selection Case-insensitive + */ + val SECONDARY_QUEUE_ENGINES: CommonVars[String] = + CommonVars.apply("wds.linkis.rm.secondary.yarnqueue.engines", "spark") + + /** + * Supported Creator list (comma-separated) Default: IDE Description: Only Creators in this list + * will execute smart queue selection Case-insensitive + */ + val SECONDARY_QUEUE_CREATORS: CommonVars[String] = + CommonVars.apply("wds.linkis.rm.secondary.yarnqueue.creators", "IDE") + val EXECUTE_ERROR_CODE_INDEX = CommonVars("execute.error.code.index", "-1") diff --git a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/VariableUtils.scala b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/VariableUtils.scala index f2fbb7e5040..88a1357cf04 100644 --- a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/VariableUtils.scala +++ b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/VariableUtils.scala @@ -25,8 +25,6 @@ import org.apache.linkis.common.variable.DateTypeUtils.{ getCurHour, getMonthDay, getToday, - getWeekBegin, - getWeekEnd, getYesterday } @@ -250,16 +248,11 @@ object VariableUtils extends Logging { // Initialize week variables (with feature switch and exception handling) if (WEEK_VARIABLE_ENABLED.getValue) { Utils.tryAndWarn { - val weekBegin = getWeekBegin(std = false, run_date.getDate) - val weekBeginStd = getWeekBegin(std = true, run_date.getDate) - val weekEnd = getWeekEnd(std = false, run_date.getDate) - val weekEndStd = getWeekEnd(std = true, run_date.getDate) - - // Use WeekType for week-based arithmetic (unit = weeks, not days) - nameAndType("run_week_begin") = WeekType(new CustomDateType(weekBegin, false)) - nameAndType("run_week_begin_std") = WeekType(new CustomDateType(weekBeginStd, true)) - nameAndType("run_week_end") = WeekType(new CustomDateType(weekEnd, false)) - nameAndType("run_week_end_std") = WeekType(new CustomDateType(weekEndStd, true)) + // Use CustomWeekType following the same pattern as CustomMonthType + nameAndType("run_week_begin") = WeekType(new CustomWeekType(run_date_str, false, false)) + nameAndType("run_week_begin_std") = WeekType(new CustomWeekType(run_date_str, true, false)) + nameAndType("run_week_end") = WeekType(new CustomWeekType(run_date_str, false, true)) + nameAndType("run_week_end_std") = WeekType(new CustomWeekType(run_date_str, true, true)) logger.info("Week variables initialized successfully") } } else { diff --git a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/variable/CustomDateType.scala b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/variable/CustomDateType.scala index 0c528a4a9b6..680ef7b78c8 100644 --- a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/variable/CustomDateType.scala +++ b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/variable/CustomDateType.scala @@ -255,3 +255,32 @@ class CustomHourType(dateH: String, std: Boolean = true) { } } + +/** + * CustomWeekType: Week-based date type following the same pattern as CustomMonType + * + * @param date + * Base date string + * @param std + * Whether to use standard format (true: yyyy-MM-dd, false: yyyyMMdd) + * @param isEnd + * Whether to calculate week end (Sunday) instead of week begin (Monday) + */ +class CustomWeekType(date: String, std: Boolean = true, isEnd: Boolean = false) { + + def -(weeks: Int): String = { + val dateFormat = DateTypeUtils.dateFormatLocal.get() + DateTypeUtils.getWeek(std, isEnd, DateUtils.addWeeks(dateFormat.parse(date), -weeks)) + } + + def +(weeks: Int): String = { + val dateFormat = DateTypeUtils.dateFormatLocal.get() + DateTypeUtils.getWeek(std, isEnd, DateUtils.addWeeks(dateFormat.parse(date), weeks)) + } + + override def toString: String = { + val dateFormat = DateTypeUtils.dateFormatLocal.get() + DateTypeUtils.getWeek(std, isEnd, dateFormat.parse(date)) + } + +} diff --git a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/variable/DateTypeUtils.scala b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/variable/DateTypeUtils.scala index 989c893a178..46b8f5ba220 100644 --- a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/variable/DateTypeUtils.scala +++ b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/variable/DateTypeUtils.scala @@ -253,16 +253,18 @@ object DateTypeUtils { } /** - * Get the start date of the week (Monday) + * Get week date (Monday or Sunday) * * @param std * Whether to use standard format (true: yyyy-MM-dd, false: yyyyMMdd) + * @param isEnd + * Whether to get week end (Sunday) instead of week begin (Monday) * @param date * Base date * @return - * Monday date string + * Monday or Sunday date string depending on isEnd */ - def getWeekBegin(std: Boolean = true, date: Date): String = { + def getWeek(std: Boolean = true, isEnd: Boolean = false, date: Date): String = { try { val dateFormat = dateFormatLocal.get() val dateFormat_std = dateFormatStdLocal.get() @@ -272,75 +274,21 @@ object DateTypeUtils { // Get current day of week (Calendar.SUNDAY=1, Calendar.MONDAY=2, ..., Calendar.SATURDAY=7) val dayOfWeek = cal.get(Calendar.DAY_OF_WEEK) - // Calculate days to Monday - // Sunday(1) needs to go back 6 days to this week's Monday - // Monday(2) doesn't need adjustment - // Tuesday(3) needs to go back 1 day - // ... - // Saturday(7) needs to go back 5 days - val daysToMonday = if (dayOfWeek == Calendar.SUNDAY) { - -6 // Sunday goes back 6 days to this week's Monday - } else { - Calendar.MONDAY - dayOfWeek // Other dates go back to this week's Monday - } - - cal.add(Calendar.DAY_OF_MONTH, daysToMonday) - - if (std) { - dateFormat_std.format(cal.getTime) - } else { - dateFormat.format(cal.getTime) - } - } catch { - case e: Exception => - // Return current date as fallback on error - val fallbackFormat = if (std) dateFormatStdLocal.get() else dateFormatLocal.get() - fallbackFormat.format(date) - } - } - - /** - * Get the end date of the week (Sunday) - * - * @param std - * Whether to use standard format (true: yyyy-MM-dd, false: yyyyMMdd) - * @param date - * Base date - * @return - * Sunday date string - */ - def getWeekEnd(std: Boolean = true, date: Date): String = { - try { - val dateFormat = dateFormatLocal.get() - val dateFormat_std = dateFormatStdLocal.get() - val cal: Calendar = Calendar.getInstance() - cal.setTime(date) - - // Get current day of week - val dayOfWeek = cal.get(Calendar.DAY_OF_WEEK) - - // Calculate days to Sunday - // Sunday(1) doesn't need adjustment - // Monday(2) needs to go forward 6 days - // Tuesday(3) needs to go forward 5 days - // ... - // Saturday(7) needs to go forward 1 day - val daysToSunday = if (dayOfWeek == Calendar.SUNDAY) { - 0 // Sunday doesn't need adjustment + // Calculate days to target day (Monday or Sunday) + val daysToTarget = if (isEnd) { + // Calculate days to Sunday + if (dayOfWeek == Calendar.SUNDAY) 0 else Calendar.SUNDAY - dayOfWeek + 7 } else { - Calendar.SUNDAY - dayOfWeek + 7 // Other dates go forward to this week's Sunday + // Calculate days to Monday + if (dayOfWeek == Calendar.SUNDAY) -6 else Calendar.MONDAY - dayOfWeek } - cal.add(Calendar.DAY_OF_MONTH, daysToSunday) + cal.add(Calendar.DAY_OF_MONTH, daysToTarget) - if (std) { - dateFormat_std.format(cal.getTime) - } else { - dateFormat.format(cal.getTime) - } + if (std) dateFormat_std.format(cal.getTime) + else dateFormat.format(cal.getTime) } catch { case e: Exception => - // Return current date as fallback on error val fallbackFormat = if (std) dateFormatStdLocal.get() else dateFormatLocal.get() fallbackFormat.format(date) } diff --git a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/variable/VariableType.scala b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/variable/VariableType.scala index f7af40cef15..af6de1af2b0 100644 --- a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/variable/VariableType.scala +++ b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/variable/VariableType.scala @@ -41,19 +41,21 @@ case class DateType(value: CustomDateType) extends VariableType { } /** - * WeekType: A date type that operates in weeks instead of days The arithmetic operations (+/-) are - * performed in units of weeks (1 week = 7 days) Example: run_week_begin - 1 means subtract 1 week - * (7 days), not 1 day + * WeekType: A week-based date type following the same pattern as MonthType + * + * Arithmetic operations (+/-) are performed in units of weeks: + * - run_week_begin - 1 means subtract 1 week (previous Monday) + * - run_week_end + 2 means add 2 weeks (Sunday after 2 weeks) + * + * Uses CustomWeekType internally for week-specific calculations */ -case class WeekType(value: CustomDateType) extends VariableType { +case class WeekType(value: CustomWeekType) extends VariableType { override def getValue: String = value.toString def calculator(signal: String, bValue: String): String = { - // Convert weeks to days for the underlying date calculation - val days = bValue.toInt * 7 signal match { - case "+" => value + days - case "-" => value - days + case "+" => value + bValue.toInt + case "-" => value - bValue.toInt case _ => throw new LinkisCommonErrorException(20046, s"WeekType is not supported to use:$signal") } diff --git a/linkis-commons/linkis-common/src/test/scala/org/apache/linkis/common/variable/DateTypeUtilsTest.scala b/linkis-commons/linkis-common/src/test/scala/org/apache/linkis/common/variable/DateTypeUtilsTest.scala index 8bf3fbeaf67..b076da0cf6c 100644 --- a/linkis-commons/linkis-common/src/test/scala/org/apache/linkis/common/variable/DateTypeUtilsTest.scala +++ b/linkis-commons/linkis-common/src/test/scala/org/apache/linkis/common/variable/DateTypeUtilsTest.scala @@ -41,7 +41,7 @@ class DateTypeUtilsTest { // TC001: getWeekBegin - 周四返回本周一 val dateFormat = DateTypeUtils.dateFormatLocal.get() val date = dateFormat.parse("20260409") // 2026-04-09 is Thursday - val result = DateTypeUtils.getWeekBegin(std = false, date) + val result = DateTypeUtils.getWeek(std = false, isEnd = false, date) assertEquals("20260406", result) // Monday is 2026-04-06 } @@ -49,7 +49,7 @@ class DateTypeUtilsTest { // TC002: getWeekBegin - 周一返回自身 val dateFormat = DateTypeUtils.dateFormatLocal.get() val date = dateFormat.parse("20260406") // 2026-04-06 is Monday - val result = DateTypeUtils.getWeekBegin(std = false, date) + val result = DateTypeUtils.getWeek(std = false, isEnd = false, date) assertEquals("20260406", result) // Should return itself } @@ -57,7 +57,7 @@ class DateTypeUtilsTest { // TC003: getWeekBegin - 周日返回本周一 val dateFormat = DateTypeUtils.dateFormatLocal.get() val date = dateFormat.parse("20260412") // 2026-04-12 is Sunday - val result = DateTypeUtils.getWeekBegin(std = false, date) + val result = DateTypeUtils.getWeek(std = false, isEnd = false, date) assertEquals("20260406", result) // Monday is 2026-04-06 } @@ -65,7 +65,7 @@ class DateTypeUtilsTest { // TC004: getWeekBegin - 标准格式 val dateFormat = DateTypeUtils.dateFormatLocal.get() val date = dateFormat.parse("20260409") // 2026-04-09 is Thursday - val result = DateTypeUtils.getWeekBegin(std = true, date) + val result = DateTypeUtils.getWeek(std = true, isEnd = false, date) assertEquals("2026-04-06", result) // Standard format yyyy-MM-dd } @@ -73,7 +73,7 @@ class DateTypeUtilsTest { // TC005: getWeekEnd - 周四返回本周日 val dateFormat = DateTypeUtils.dateFormatLocal.get() val date = dateFormat.parse("20260409") // 2026-04-09 is Thursday - val result = DateTypeUtils.getWeekEnd(std = false, date) + val result = DateTypeUtils.getWeek(std = false, isEnd = true, date) assertEquals("20260412", result) // Sunday is 2026-04-12 } @@ -81,7 +81,7 @@ class DateTypeUtilsTest { // TC006: getWeekEnd - 周日返回自身 val dateFormat = DateTypeUtils.dateFormatLocal.get() val date = dateFormat.parse("20260412") // 2026-04-12 is Sunday - val result = DateTypeUtils.getWeekEnd(std = false, date) + val result = DateTypeUtils.getWeek(std = false, isEnd = true, date) assertEquals("20260412", result) // Should return itself } @@ -89,7 +89,7 @@ class DateTypeUtilsTest { // TC007: getWeekEnd - 周一返回本周日 val dateFormat = DateTypeUtils.dateFormatLocal.get() val date = dateFormat.parse("20260406") // 2026-04-06 is Monday - val result = DateTypeUtils.getWeekEnd(std = false, date) + val result = DateTypeUtils.getWeek(std = false, isEnd = true, date) assertEquals("20260412", result) // Sunday is 2026-04-12 } @@ -97,8 +97,8 @@ class DateTypeUtilsTest { // TC008: 跨年周 - 年末(2025-12-31 周三) val dateFormat = DateTypeUtils.dateFormatLocal.get() val date = dateFormat.parse("20251231") // 2025-12-31 is Wednesday - val begin = DateTypeUtils.getWeekBegin(std = false, date) - val end = DateTypeUtils.getWeekEnd(std = false, date) + val begin = DateTypeUtils.getWeek(std = false, isEnd = false, date) + val end = DateTypeUtils.getWeek(std = false, isEnd = true, date) assertEquals("20251229", begin) // Monday is 2025-12-29 assertEquals("20260104", end) // Sunday is 2026-01-04 (cross year) } @@ -107,8 +107,8 @@ class DateTypeUtilsTest { // TC009: 跨年周 - 年初(2026-01-01 周四) val dateFormat = DateTypeUtils.dateFormatLocal.get() val date = dateFormat.parse("20260101") // 2026-01-01 is Thursday - val begin = DateTypeUtils.getWeekBegin(std = false, date) - val end = DateTypeUtils.getWeekEnd(std = false, date) + val begin = DateTypeUtils.getWeek(std = false, isEnd = false, date) + val end = DateTypeUtils.getWeek(std = false, isEnd = true, date) assertEquals("20251229", begin) // Monday is 2025-12-29 (cross year) assertEquals("20260104", end) // Sunday is 2026-01-04 } @@ -117,8 +117,8 @@ class DateTypeUtilsTest { // TC010: 闰年 - 2024-02-29(闰日, 周四) val dateFormat = DateTypeUtils.dateFormatLocal.get() val date = dateFormat.parse("20240229") // 2024-02-29 is leap day, Thursday - val begin = DateTypeUtils.getWeekBegin(std = false, date) - val end = DateTypeUtils.getWeekEnd(std = false, date) + val begin = DateTypeUtils.getWeek(std = false, isEnd = false, date) + val end = DateTypeUtils.getWeek(std = false, isEnd = true, date) assertEquals("20240226", begin) // Monday is 2024-02-26 assertEquals("20240303", end) // Sunday is 2024-03-03 } @@ -127,8 +127,8 @@ class DateTypeUtilsTest { // TC011: 闰年 - 2020-02-29(闰日, 周六) val dateFormat = DateTypeUtils.dateFormatLocal.get() val date = dateFormat.parse("20200229") // 2020-02-29 is leap day, Saturday - val begin = DateTypeUtils.getWeekBegin(std = false, date) - val end = DateTypeUtils.getWeekEnd(std = false, date) + val begin = DateTypeUtils.getWeek(std = false, isEnd = false, date) + val end = DateTypeUtils.getWeek(std = false, isEnd = true, date) assertEquals("20200224", begin) // Monday is 2020-02-24 assertEquals("20200301", end) // Sunday is 2020-03-01 } @@ -137,8 +137,8 @@ class DateTypeUtilsTest { // TC012: 非闰年 - 2023-02-28(周二) val dateFormat = DateTypeUtils.dateFormatLocal.get() val date = dateFormat.parse("20230228") // 2023-02-28 is Tuesday - val begin = DateTypeUtils.getWeekBegin(std = false, date) - val end = DateTypeUtils.getWeekEnd(std = false, date) + val begin = DateTypeUtils.getWeek(std = false, isEnd = false, date) + val end = DateTypeUtils.getWeek(std = false, isEnd = true, date) assertEquals("20230227", begin) // Monday is 2023-02-27 assertEquals("20230305", end) // Sunday is 2023-03-05 } @@ -149,38 +149,38 @@ class DateTypeUtilsTest { // Monday val monday = dateFormat.parse("20260406") - assertEquals("20260406", DateTypeUtils.getWeekBegin(std = false, monday)) - assertEquals("20260412", DateTypeUtils.getWeekEnd(std = false, monday)) + assertEquals("20260406", DateTypeUtils.getWeek(std = false, isEnd = false, monday)) + assertEquals("20260412", DateTypeUtils.getWeek(std = false, isEnd = true, monday)) // Tuesday val tuesday = dateFormat.parse("20260407") - assertEquals("20260406", DateTypeUtils.getWeekBegin(std = false, tuesday)) - assertEquals("20260412", DateTypeUtils.getWeekEnd(std = false, tuesday)) + assertEquals("20260406", DateTypeUtils.getWeek(std = false, isEnd = false, tuesday)) + assertEquals("20260412", DateTypeUtils.getWeek(std = false, isEnd = true, tuesday)) // Wednesday val wednesday = dateFormat.parse("20260408") - assertEquals("20260406", DateTypeUtils.getWeekBegin(std = false, wednesday)) - assertEquals("20260412", DateTypeUtils.getWeekEnd(std = false, wednesday)) + assertEquals("20260406", DateTypeUtils.getWeek(std = false, isEnd = false, wednesday)) + assertEquals("20260412", DateTypeUtils.getWeek(std = false, isEnd = true, wednesday)) // Thursday val thursday = dateFormat.parse("20260409") - assertEquals("20260406", DateTypeUtils.getWeekBegin(std = false, thursday)) - assertEquals("20260412", DateTypeUtils.getWeekEnd(std = false, thursday)) + assertEquals("20260406", DateTypeUtils.getWeek(std = false, isEnd = false, thursday)) + assertEquals("20260412", DateTypeUtils.getWeek(std = false, isEnd = true, thursday)) // Friday val friday = dateFormat.parse("20260410") - assertEquals("20260406", DateTypeUtils.getWeekBegin(std = false, friday)) - assertEquals("20260412", DateTypeUtils.getWeekEnd(std = false, friday)) + assertEquals("20260406", DateTypeUtils.getWeek(std = false, isEnd = false, friday)) + assertEquals("20260412", DateTypeUtils.getWeek(std = false, isEnd = true, friday)) // Saturday val saturday = dateFormat.parse("20260411") - assertEquals("20260406", DateTypeUtils.getWeekBegin(std = false, saturday)) - assertEquals("20260412", DateTypeUtils.getWeekEnd(std = false, saturday)) + assertEquals("20260406", DateTypeUtils.getWeek(std = false, isEnd = false, saturday)) + assertEquals("20260412", DateTypeUtils.getWeek(std = false, isEnd = true, saturday)) // Sunday val sunday = dateFormat.parse("20260412") - assertEquals("20260406", DateTypeUtils.getWeekBegin(std = false, sunday)) - assertEquals("20260412", DateTypeUtils.getWeekEnd(std = false, sunday)) + assertEquals("20260406", DateTypeUtils.getWeek(std = false, isEnd = false, sunday)) + assertEquals("20260412", DateTypeUtils.getWeek(std = false, isEnd = true, sunday)) } // ========== WeekType Arithmetic Tests ========== @@ -190,7 +190,7 @@ class DateTypeUtilsTest { val dateFormat = DateTypeUtils.dateFormatLocal.get() val date = dateFormat.parse("20260409") // 2026-04-09 is Thursday val weekType = WeekType( - new CustomDateType("20260406", false) + new CustomWeekType("20260409", false, false) ) // run_week_begin = 20260406 (Monday) val result = weekType.calculator("-", "1") // Subtract 1 week @@ -205,7 +205,7 @@ class DateTypeUtilsTest { val dateFormat = DateTypeUtils.dateFormatLocal.get() val date = dateFormat.parse("20260409") // 2026-04-09 is Thursday val weekType = WeekType( - new CustomDateType("20260406", false) + new CustomWeekType("20260409", false, false) ) // run_week_begin = 20260406 (Monday) val result = weekType.calculator("-", "7") // Subtract 7 weeks @@ -220,7 +220,7 @@ class DateTypeUtilsTest { val dateFormat = DateTypeUtils.dateFormatLocal.get() val date = dateFormat.parse("20260409") // 2026-04-09 is Thursday val weekType = WeekType( - new CustomDateType("20260406", false) + new CustomWeekType("20260409", false, false) ) // run_week_begin = 20260406 (Monday) val result = weekType.calculator("+", "1") // Add 1 week @@ -235,7 +235,7 @@ class DateTypeUtilsTest { val dateFormat = DateTypeUtils.dateFormatLocal.get() val date = dateFormat.parse("20260409") // 2026-04-09 is Thursday val weekType = WeekType( - new CustomDateType("20260406", false) + new CustomWeekType("20260409", false, false) ) // run_week_begin = 20260406 (Monday) val result = weekType.calculator("+", "2") // Add 2 weeks @@ -250,7 +250,7 @@ class DateTypeUtilsTest { val dateFormat = DateTypeUtils.dateFormatLocal.get() val baseDate = "20260406" // Monday - val weekType = WeekType(new CustomDateType(baseDate, false)) + val weekType = WeekType(new CustomWeekType(baseDate, false, false)) val dateType = DateType(new CustomDateType(baseDate, false)) // WeekType - 1 = Previous Monday (7 days before) @@ -270,4 +270,115 @@ class DateTypeUtilsTest { assertNotEquals(weekResult, dateResult) } + // ========== CustomWeekType Tests ========== + + @Test def testCustomWeekType_ToString_WeekBegin(): Unit = { + // TC025: CustomWeekType.toString returns Monday for week begin + val dateFormat = DateTypeUtils.dateFormatLocal.get() + val customWeekType = new CustomWeekType("20260409", false, false) // Thursday, isEnd=false + + val result = customWeekType.toString + val resultDate = dateFormat.parse(result) + val expected = dateFormat.parse("20260406") // Monday + + assertEquals(expected, resultDate) + } + + @Test def testCustomWeekType_ToString_WeekEnd(): Unit = { + // TC026: CustomWeekType.toString returns Sunday for week end + val dateFormat = DateTypeUtils.dateFormatLocal.get() + val customWeekType = new CustomWeekType("20260409", false, true) // Thursday, isEnd=true + + val result = customWeekType.toString + val resultDate = dateFormat.parse(result) + val expected = dateFormat.parse("20260412") // Sunday + + assertEquals(expected, resultDate) + } + + @Test def testCustomWeekType_Subtract_Week(): Unit = { + // TC027: CustomWeekType - 1 week + val dateFormat = DateTypeUtils.dateFormatLocal.get() + val customWeekType = new CustomWeekType("20260409", false, false) // Thursday, week begin + + val result = customWeekType.-(1) // Subtract 1 week + val resultDate = dateFormat.parse(result) + val expected = dateFormat.parse("20260330") // Previous Monday + + assertEquals(expected, resultDate) + } + + @Test def testCustomWeekType_Subtract_Weeks_WeekEnd(): Unit = { + // TC028: CustomWeekType - 2 weeks (week end) + val dateFormat = DateTypeUtils.dateFormatLocal.get() + val customWeekType = new CustomWeekType("20260409", false, true) // Thursday, week end + + val result = customWeekType.-(2) // Subtract 2 weeks + val resultDate = dateFormat.parse(result) + val expected = dateFormat.parse("20260329") // 2 weeks before Sunday + + assertEquals(expected, resultDate) + } + + @Test def testCustomWeekType_Add_Week(): Unit = { + // TC029: CustomWeekType + 1 week + val dateFormat = DateTypeUtils.dateFormatLocal.get() + val customWeekType = new CustomWeekType("20260409", false, false) // Thursday, week begin + + val result = customWeekType.+(1) // Add 1 week + val resultDate = dateFormat.parse(result) + val expected = dateFormat.parse("20260413") // Next Monday + + assertEquals(expected, resultDate) + } + + @Test def testCustomWeekType_Add_Weeks_WeekEnd(): Unit = { + // TC030: CustomWeekType + 3 weeks (week end) + val dateFormat = DateTypeUtils.dateFormatLocal.get() + val customWeekType = new CustomWeekType("20260409", false, true) // Thursday, week end + + val result = customWeekType.+(3) // Add 3 weeks + val resultDate = dateFormat.parse(result) + val expected = dateFormat.parse("20260503") // 3 weeks after Sunday + + assertEquals(expected, resultDate) + } + + @Test def testCustomWeekType_StandardFormat(): Unit = { + // TC031: CustomWeekType with standard format + val customWeekType = new CustomWeekType("20260409", true, false) // std=true + + val result = customWeekType.toString + assertEquals("2026-04-06", result) // Standard format yyyy-MM-dd + } + + @Test def testCustomWeekType_CrossYear(): Unit = { + // TC032: CustomWeekType with cross-year week + val dateFormat = DateTypeUtils.dateFormatLocal.get() + val customWeekType = new CustomWeekType("20251231", false, false) // End of year + + val result = customWeekType.toString + val resultDate = dateFormat.parse(result) + val expected = dateFormat.parse("20251229") // Monday in previous year + + assertEquals(expected, resultDate) + } + + @Test def testCustomWeekType_IsEnd_Preservation(): Unit = { + // TC033: CustomWeekType preserves isEnd during arithmetic operations + val dateFormat = DateTypeUtils.dateFormatLocal.get() + + // Week begin - 1 should return Monday + val weekBegin = new CustomWeekType("20260409", false, false) + val beginResult = weekBegin.-(1) + val beginResultDate = dateFormat.parse(beginResult) + assertEquals("20260330", beginResult) // Previous Monday + + // Week end - 1 should return Sunday + val weekEnd = new CustomWeekType("20260409", false, true) + val endResult = weekEnd.-(1) + val endResultDate = dateFormat.parse(endResult) + assertEquals("20260329", endResult) // Previous Sunday + } + } diff --git a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/protocol/conf/SecondaryYarnConf.scala b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/protocol/conf/SecondaryYarnConf.scala new file mode 100644 index 00000000000..b6599fe825f --- /dev/null +++ b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/protocol/conf/SecondaryYarnConf.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.governance.common.protocol.conf + +import org.apache.linkis.manager.label.entity.Label +import org.apache.linkis.protocol.message.RequestProtocol + +import java.util + +trait SecondaryYarnConf extends RequestProtocol + +case class SecondaryYarnRequest( + taskId: String, + params: util.Map[String, AnyRef], + labels: java.util.List[Label[_]] +) extends SecondaryYarnConf + +case class SecondaryYarnResponse(selectQueue: String, primaryQueue: String, secondaryQueue: String) diff --git a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/conf/EntranceSpringConfiguration.java b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/conf/EntranceSpringConfiguration.java index e91a7d0ca7d..595812b0af9 100644 --- a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/conf/EntranceSpringConfiguration.java +++ b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/conf/EntranceSpringConfiguration.java @@ -138,7 +138,8 @@ public EntranceInterceptor[] entranceInterceptors() { new SQLLimitEntranceInterceptor(), new CommentInterceptor(), new UserCreatorIPCheckInterceptor(), - new TaskRetryInterceptor() + new TaskRetryInterceptor(), + new QueueSelectionInterceptor() }; } diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/conf/EntranceConfiguration.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/conf/EntranceConfiguration.scala index cd420a695df..16ba701f85a 100644 --- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/conf/EntranceConfiguration.scala +++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/conf/EntranceConfiguration.scala @@ -420,7 +420,6 @@ object EntranceConfiguration { var SPARK_DYNAMIC_ALLOCATION_ENABLED = CommonVars.apply("spark.dynamic.allocation.enabled", false).getValue - var SPARK_DYNAMIC_ALLOCATION_ADDITIONAL_CONFS = CommonVars.apply("spark.dynamic.allocation.additional.confs", "").getValue diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/interceptor/impl/QueueSelectionInterceptor.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/interceptor/impl/QueueSelectionInterceptor.scala new file mode 100644 index 00000000000..3aed86f6216 --- /dev/null +++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/interceptor/impl/QueueSelectionInterceptor.scala @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.entrance.interceptor.impl + +import org.apache.linkis.common.conf.Configuration +import org.apache.linkis.common.log.LogUtils +import org.apache.linkis.common.utils.{Logging, Utils} +import org.apache.linkis.entrance.interceptor.EntranceInterceptor +import org.apache.linkis.governance.common.constant.ec.ECConstants +import org.apache.linkis.governance.common.entity.job.JobRequest +import org.apache.linkis.governance.common.protocol.conf.{ + SecondaryYarnRequest, + SecondaryYarnResponse +} +import org.apache.linkis.manager.label.utils.LabelUtil +import org.apache.linkis.protocol.utils.TaskUtils +import org.apache.linkis.rpc.Sender + +import org.apache.commons.lang3.StringUtils + +import java.{lang, util} + +import scala.concurrent.duration.Duration + +/** + * QueueSelectionInterceptor: Intelligent queue selection interceptor at Entrance layer. + * + * This interceptor performs the following functions: + * 1. Check if the queue selection feature is enabled 2. Check if the engine type is in the + * supported list 3. Check if the creator is in the supported list 4. Call LinkisManager RPC to + * get the optimal queue 5. Replace the queue configuration in runtimeParams + */ +class QueueSelectionInterceptor extends EntranceInterceptor with Logging { + + private val RPC_TIMEOUT = Duration(5, "seconds") // RPC call timeout: 5 seconds + + /** + * Apply queue selection logic to the job request. + * + * @param jobRequest + * The job request to process + * @param logAppender + * Used to cache necessary reminder logs and pass them to the upper layer + * @return + * The processed job request + */ + override def apply(jobRequest: JobRequest, logAppender: lang.StringBuilder): JobRequest = { + val taskId = jobRequest.getId.toString + Utils.tryAndWarn { + // 1. Check if queue selection is enabled + if (!Configuration.SECONDARY_QUEUE_ENABLED.getValue) { + logger.info(s"Queue selection is not enabled at Entrance layer") + return jobRequest + } + + // 2. Get labels from job request + val labels = jobRequest.getLabels + if (labels == null || labels.isEmpty) { + logger.info(s"No labels found in job request, skip queue selection") + return jobRequest + } + + // 3. Extract engine type and creator (with null safety) + val userCreatorLabel = LabelUtil.getUserCreatorLabel(labels) + val creator = if (userCreatorLabel != null) userCreatorLabel.getCreator else null + + val engineTypeLabel = LabelUtil.getEngineTypeLabel(labels) + val engineType = if (engineTypeLabel != null) engineTypeLabel.getEngineType else null + + val supportedEngines = Configuration.SECONDARY_QUEUE_ENGINES.getValue + .split(",") + .map(_.trim.toLowerCase()) + .toSet + val supportedCreators = Configuration.SECONDARY_QUEUE_CREATORS.getValue + .split(",") + .map(_.trim.toUpperCase()) + .toSet + + val engineMatched = + engineType == null || supportedEngines.contains(engineType.toLowerCase()) + val creatorMatched = creator == null || supportedCreators.contains(creator.toUpperCase()) + + if (!engineMatched || !creatorMatched) { + logger.info( + s"Engine type or creator not in supported list - engineType: $engineType (matched: $engineMatched), creator: $creator (matched: $creatorMatched)" + ) + return jobRequest + } + val startMap: util.Map[String, AnyRef] = TaskUtils.getStartupMap(jobRequest.getParams) + // 4. Call LinkisManager RPC with timeout and error handling + val sender = Sender.getSender(Configuration.MANAGER_SPRING_APPLICATION_NAME.getValue) + val response = sender.ask(SecondaryYarnRequest(taskId, startMap, labels)) match { + case r: SecondaryYarnResponse => + logger.info(s"Received RPC response: ${r.selectQueue}") + r + case other => + logger.warn(s"Unexpected response type: ${other.getClass.getName}, using primary queue") + null + } + + if (response == null) { + logger.info(s"RPC returned null or timed out, using primary queue") + logAppender.append( + LogUtils.generateInfo( + "Queue selection failed: unable to get response from LinkisManager, using primary queue\n" + ) + ) + return jobRequest + } + + val yarnQueue = response.selectQueue + val primaryQueue = response.primaryQueue + val secondaryQueue = response.secondaryQueue + + // 5. Update queue configuration in startupMap + if (StringUtils.isNotBlank(yarnQueue)) { + startMap.put(ECConstants.YARN_QUEUE_NAME_CONFIG_KEY, yarnQueue) + logger.info( + s"Queue selection completed - primary: $primaryQueue, secondary: $secondaryQueue, selected: $yarnQueue" + ) + logAppender.append( + LogUtils.generateInfo( + s"Queue selection completed - primary: $primaryQueue, secondary: $secondaryQueue, selected: $yarnQueue\n" + ) + ) + TaskUtils.addStartupMap(jobRequest.getParams, startMap) + } else { + logger.info(s"Selected queue is blank, using primary queue") + logAppender.append( + LogUtils + .generateInfo(s"Queue selection error - selected queue is blank, using primary queue\n") + ) + } + } + jobRequest + } + +} diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.scala index 5188be07e75..2fbc3033897 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.scala +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.scala @@ -24,8 +24,10 @@ import org.apache.linkis.common.utils.{ByteTimeUtils, Logging, Utils} import org.apache.linkis.engineplugin.server.service.EngineConnResourceFactoryService import org.apache.linkis.governance.common.conf.GovernanceCommonConf import org.apache.linkis.governance.common.conf.GovernanceCommonConf.ENGINE_CONN_MANAGER_SPRING_NAME -import org.apache.linkis.governance.common.entity.job.JobRequest -import org.apache.linkis.governance.common.protocol.job.{JobReqQuery, JobReqUpdate} +import org.apache.linkis.governance.common.protocol.conf.{ + SecondaryYarnRequest, + SecondaryYarnResponse +} import org.apache.linkis.governance.common.utils.JobUtils import org.apache.linkis.manager.am.conf.{AMConfiguration, EngineConnConfigurationService} import org.apache.linkis.manager.am.exception.AMErrorException @@ -33,7 +35,6 @@ import org.apache.linkis.manager.am.label.{EngineReuseLabelChooser, LabelChecker import org.apache.linkis.manager.am.selector.{ECAvailableRule, NodeSelector} import org.apache.linkis.manager.am.utils.AMUtils import org.apache.linkis.manager.am.vo.CanCreateECRes -import org.apache.linkis.manager.common.conf.RMConfiguration import org.apache.linkis.manager.common.constant.AMConstant import org.apache.linkis.manager.common.entity.enumeration.NodeStatus import org.apache.linkis.manager.common.entity.node.{EMNode, EngineNode} @@ -47,10 +48,8 @@ import org.apache.linkis.manager.engineplugin.common.launch.entity.{ } import org.apache.linkis.manager.engineplugin.common.resource.TimeoutEngineResourceRequest import org.apache.linkis.manager.label.builder.factory.LabelBuilderFactoryContext -import org.apache.linkis.manager.label.conf.LabelCommonConfig import org.apache.linkis.manager.label.entity.{EngineNodeLabel, Label} import org.apache.linkis.manager.label.entity.engine.{EngineType, EngineTypeLabel} -import org.apache.linkis.manager.label.entity.engine.UserCreatorLabel import org.apache.linkis.manager.label.entity.node.AliasServiceInstanceLabel import org.apache.linkis.manager.label.service.{NodeLabelService, UserLabelService} import org.apache.linkis.manager.label.utils.{LabelUtil, LabelUtils} @@ -204,16 +203,14 @@ class DefaultEngineCreateService engineCreateRequest.setProperties(new util.HashMap[String, String]()) } - // 4. generate Resource val resource = generateResource( engineCreateRequest.getProperties, engineCreateRequest.getUser, labelFilter.choseEngineLabel(labelList), - timeout, - isCreateEngine = true + timeout ) - // 5. request resource + // 4. request resource val resourceTicketId = resourceManager.requestResource( LabelUtils.distinctLabel(labelList, emNode.getLabels), resource, @@ -227,7 +224,7 @@ class DefaultEngineCreateService throw new LinkisRetryException(AMConstant.EM_ERROR_CODE, s"not enough resource: : $reason") } - // 6. build engineConn request + // 5. build engineConn request val engineBuildRequest = EngineConnBuildRequestImpl( resourceTicketId, labelFilter.choseEngineLabel(labelList), @@ -239,7 +236,7 @@ class DefaultEngineCreateService ) ) - // 7. Call ECM to send engine start request + // 6. Call ECM to send engine start request // AM will update the serviceInstance table // It is necessary to replace the ticketID and update the Label of EngineConn // It is necessary to modify the id in EngineInstanceLabel to Instance information @@ -392,15 +389,13 @@ class DefaultEngineCreateService * @param engineCreateRequest * @param labelList * @param timeout - * @param isCreateEngine * @return */ def generateResource( props: util.Map[String, String], user: String, labelList: util.List[Label[_]], - timeout: Long, - isCreateEngine: Boolean = false + timeout: Long ): NodeResource = { val configProp = engineConnConfigurationService.getConsoleConfiguration(labelList) if (null != configProp && configProp.asScala.nonEmpty) { @@ -411,9 +406,6 @@ class DefaultEngineCreateService }) } - // Smart queue selection (executed before creating YarnResource) - performSmartQueueSelection(props, labelList, isCreateEngine) - val crossQueue = props.get(AMConfiguration.CROSS_QUEUE) if (StringUtils.isNotBlank(crossQueue)) { val queueName = props.getOrDefault(AMConfiguration.YARN_QUEUE_NAME_CONFIG_KEY, "default") @@ -432,86 +424,58 @@ class DefaultEngineCreateService * configuration Decides whether to use primary queue or secondary queue by checking secondary * queue resource usage * - * @param properties - * Task parameters - * @param labelList - * Label list + * @param secondaryYarnRequest + * Request containing labels and params + * @param sender + * RPC sender + * @return + * Response with selected queue */ - private def performSmartQueueSelection( - properties: util.Map[String, String], - labelList: util.List[Label[_]], - isCreateEngine: Boolean = false - ): Unit = { - try { - if (isCreateEngine) { - // 1. Get queue configuration + @Receiver + override def performSmartQueueSelection( + secondaryYarnRequest: SecondaryYarnRequest, + sender: Sender + ): SecondaryYarnResponse = { + val taskId = secondaryYarnRequest.taskId + logger.info(s"[$taskId]Received queue judgment request") + var secondaryYarnResponse = SecondaryYarnResponse("", "", "") + Utils.tryAndWarn { + if (Configuration.SECONDARY_QUEUE_ENABLED.getValue) { + val labelList = secondaryYarnRequest.labels + val props = secondaryYarnRequest.params + val configProp = engineConnConfigurationService.getConsoleConfiguration(labelList) + if (null != configProp && configProp.asScala.nonEmpty) { + configProp.asScala.foreach(keyValue => { + if (!props.containsKey(keyValue._1)) { + props.put(keyValue._1, keyValue._2) + } + }) + } + // 1. Get queue configuration with priority: user params > console config > default val primaryQueue = - properties.getOrDefault(AMConfiguration.YARN_QUEUE_NAME_CONFIG_KEY, "").trim + props.getOrDefault(AMConfiguration.YARN_QUEUE_NAME_CONFIG_KEY, "").toString.trim val secondaryQueue = - properties.getOrDefault(AMConfiguration.SECONDARY_YARN_QUEUE_NAME_CONFIG_KEY, "").trim - + props + .getOrDefault(AMConfiguration.SECONDARY_YARN_QUEUE_NAME_CONFIG_KEY, "") + .toString + .trim // 2. Get system configuration - val enabled = RMConfiguration.SECONDARY_QUEUE_ENABLED.getValue - val threshold = RMConfiguration.SECONDARY_QUEUE_THRESHOLD.getValue - val supportedEngines = RMConfiguration.SECONDARY_QUEUE_ENGINES.getValue - .split(",") - .map(_.trim) - .map(_.toLowerCase()) - .toSet - val supportedCreators = RMConfiguration.SECONDARY_QUEUE_CREATORS.getValue - .split(",") - .map(_.trim) - .map(_.toUpperCase()) - .toSet + val threshold = Configuration.SECONDARY_QUEUE_THRESHOLD.getValue logger.info( - s"Smart queue config - primary queue: $primaryQueue, secondary queue: $secondaryQueue" + s"[$taskId]Smart queue config - primary: $primaryQueue, secondary: $secondaryQueue, threshold: ${threshold * 100}%" ) // 3. Check if secondary queue feature is enabled - if (!enabled || StringUtils.isBlank(secondaryQueue) || StringUtils.isBlank(primaryQueue)) { + if (StringUtils.isBlank(secondaryQueue) || StringUtils.isBlank(primaryQueue)) { logger.info( - "Smart queue selection is not enabled or secondary queue is empty, using primary queue" + s"[$taskId]Smart queue selection is disabled - primary or secondary queue is empty, using primary queue" ) - return - } - - // 4. Get engine type and Creator - var engineType: String = null - var creator: String = null - - try { - if (labelList != null && !labelList.isEmpty) { - engineType = LabelUtil.getEngineType(labelList) - val userCreatorLabel = labelList.asScala - .find(_.isInstanceOf[UserCreatorLabel]) - .map(_.asInstanceOf[UserCreatorLabel]) - .orNull - if (userCreatorLabel != null) { - creator = userCreatorLabel.getCreator - } - } - } catch { - case e: Exception => - logger.error("Failed to parse labels for queue selection", e) - } - - // 5. Check if engine type and Creator are in supported list - val engineMatched = - engineType == null || supportedEngines.contains(engineType.toLowerCase()) - val creatorMatched = creator == null || supportedCreators.contains(creator.toUpperCase()) - - if (!engineMatched || !creatorMatched) { - logger.info( - s"Engine type or Creator not in supported list - engineType: $engineType (matched: $engineMatched), creator: $creator (matched: $creatorMatched)" - ) - return - } + secondaryYarnResponse + } else { - // 6. Query secondary queue resource usage - try { + // 4. Query secondary queue resource usage val labelContainer = labelResourceService.enrichLabels(labelList) - val yarnResourceIdentifier = new YarnResourceIdentifier(secondaryQueue) val queueInfo = externalResourceService.getResource( ResourceType.Yarn, @@ -523,70 +487,81 @@ class DefaultEngineCreateService val usedResource = queueInfo.getUsedResource.asInstanceOf[YarnResource] val maxResource = queueInfo.getMaxResource.asInstanceOf[YarnResource] - // 7. Three-dimensional independent judgment - val useSecondaryQueue = if (maxResource != null && maxResource.getQueueMemory > 0) { - val memoryUsage = - usedResource.getQueueMemory.toDouble / maxResource.getQueueMemory.toDouble - val cpuUsage = if (maxResource.getQueueCores > 0) { - usedResource.getQueueCores.toDouble / maxResource.getQueueCores.toDouble - } else { - 0.0 - } - val instanceUsage = if (maxResource.getQueueInstances > 0) { - usedResource.getQueueInstances.toDouble / maxResource.getQueueInstances.toDouble - } else { - 0.0 - } - // Do not use secondary queue if any dimension exceeds threshold - val memoryOverThreshold = memoryUsage > threshold - val cpuOverThreshold = cpuUsage > threshold - val instanceOverThreshold = instanceUsage > threshold - - if (memoryOverThreshold || cpuOverThreshold || instanceOverThreshold) { + // 5. Three-dimensional independent judgment with null safety + val useSecondaryQueue = + if (maxResource != null && maxResource.getQueueMemory > 0 && usedResource != null) { + val memoryUsage = + usedResource.getQueueMemory.toDouble / maxResource.getQueueMemory.toDouble + val cpuUsage = if (maxResource.getQueueCores > 0) { + usedResource.getQueueCores.toDouble / maxResource.getQueueCores.toDouble + } else { + 0.0 + } + val instanceUsage = if (maxResource.getQueueInstances > 0) { + usedResource.getQueueInstances.toDouble / maxResource.getQueueInstances.toDouble + } else { + 0.0 + } + + // Log detailed resource usage logger.info( - s"Secondary queue resource usage- memory: $memoryOverThreshold, cpu: $cpuOverThreshold, instance: $instanceOverThreshold, using primary queue" + s"[$taskId]Secondary queue :${secondaryQueue} resource usage - memory: ${formatPercent(memoryUsage)} (threshold: ${formatPercent(threshold)}), " + + s"cpu: ${formatPercent(cpuUsage)} (threshold: ${formatPercent(threshold)}), " + + s"instance: ${formatPercent(instanceUsage)} (threshold: ${formatPercent(threshold)})" ) - false + + // Do not use secondary queue if any dimension exceeds threshold + val memoryOverThreshold = memoryUsage > threshold + val cpuOverThreshold = cpuUsage > threshold + val instanceOverThreshold = instanceUsage > threshold + + if (memoryOverThreshold || cpuOverThreshold || instanceOverThreshold) { + logger.info( + s"[$taskId]Secondary queue exceeds threshold - memory over: $memoryOverThreshold, cpu over: $cpuOverThreshold, instance over: $instanceOverThreshold, using primary queue" + ) + false + } else { + logger.info( + s"[$taskId]Secondary queue has sufficient resources, using secondary queue" + ) + true + } } else { - logger.info("Secondary queue has sufficient resources, using secondary queue") - true + logger.warn( + s"[$taskId]Secondary queue resource info is incomplete (maxResource or usedResource is null), using primary queue" + ) + false } - } else { - logger.warn("Secondary queue max resource is empty, using primary queue") - false - } - - // 8. Determine which queue to use and update wds.linkis.rm.yarnqueue + // 6. Determine which queue to use and update response val selectedQueue = if (useSecondaryQueue) secondaryQueue else primaryQueue - val oldQueue = properties.get(AMConfiguration.YARN_QUEUE_NAME_CONFIG_KEY) - properties.put(AMConfiguration.YARN_QUEUE_NAME_CONFIG_KEY, selectedQueue) - logger.info( - s"Smart queue selection completed - original queue: $oldQueue, selected queue: $selectedQueue" + s"[$taskId]Smart queue selection completed - primary: $primaryQueue, secondary: $secondaryQueue, selected: $selectedQueue" ) - + secondaryYarnResponse = + SecondaryYarnResponse(selectedQueue, primaryQueue, secondaryQueue) } else { logger.warn( - s"Unable to get secondary queue $secondaryQueue information, using primary queue: $primaryQueue" + s"[$taskId]Unable to get secondary queue $secondaryQueue information from Yarn, using primary queue: $primaryQueue" ) + secondaryYarnResponse = + SecondaryYarnResponse(primaryQueue, primaryQueue, secondaryQueue) } - - } catch { - case e: Exception => - logger.error( - s"Exception in smart queue selection, using primary queue: $primaryQueue", - e - ) } } - - } catch { - case e: Exception => - logger.error( - "Exception occurred during smart queue selection, using original queue configuration", - e - ) } + secondaryYarnResponse + } + + /** + * Format decimal as percentage string. + * + * @param value + * decimal value (e.g., 0.85) + * @return + * formatted percentage string (e.g., "85.00%") + */ + private def formatPercent(value: Double): String = { + f"${value * 100}%.2f%%" } private def fromEMGetEngineLabels(emLabels: util.List[Label[_]]): util.List[Label[_]] = { diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/EngineCreateService.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/EngineCreateService.scala index c1efd0f964b..00cd0069fff 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/EngineCreateService.scala +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/EngineCreateService.scala @@ -18,6 +18,10 @@ package org.apache.linkis.manager.am.service.engine import org.apache.linkis.common.exception.LinkisRetryException +import org.apache.linkis.governance.common.protocol.conf.{ + SecondaryYarnRequest, + SecondaryYarnResponse +} import org.apache.linkis.manager.common.entity.node.EngineNode import org.apache.linkis.manager.common.protocol.engine.EngineCreateRequest import org.apache.linkis.rpc.Sender @@ -27,4 +31,9 @@ trait EngineCreateService { @throws[LinkisRetryException] def createEngine(engineCreateRequest: EngineCreateRequest, sender: Sender): EngineNode + def performSmartQueueSelection( + secondaryYarnRequest: SecondaryYarnRequest, + sender: Sender + ): SecondaryYarnResponse + } diff --git a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/conf/RMConfiguration.java b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/conf/RMConfiguration.java index f4f487e8f3d..78065d7b4b9 100644 --- a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/conf/RMConfiguration.java +++ b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/conf/RMConfiguration.java @@ -96,20 +96,4 @@ public class RMConfiguration { CommonVars.apply( "wds.linkis.rm.yarn.apps.filter.parms", "&deSelects=resourceRequests,timeouts,appNodeLabelExpression,amNodeLabelExpression,resourceInfo"); - - /** Whether to enable secondary queue feature Default: true Description: true enables smart queue selection, false disables the feature */ - public static final CommonVars SECONDARY_QUEUE_ENABLED = - CommonVars.apply("wds.linkis.rm.secondary.yarnqueue.enable", false); - - /** Secondary queue resource usage threshold Default: 0.9 (90%) Description: Use secondary queue when usage <= this value, use primary queue when usage > this value */ - public static final CommonVars SECONDARY_QUEUE_THRESHOLD = - CommonVars.apply("wds.linkis.rm.secondary.yarnqueue.threshold", 0.9); - - /** Supported engine type list (comma-separated) Default: spark Description: Only engines in this list will execute smart queue selection Case-insensitive */ - public static final CommonVars SECONDARY_QUEUE_ENGINES = - CommonVars.apply("wds.linkis.rm.secondary.yarnqueue.engines", "spark"); - - /** Supported Creator list (comma-separated) Default: IDE Description: Only Creators in this list will execute smart queue selection Case-insensitive */ - public static final CommonVars SECONDARY_QUEUE_CREATORS = - CommonVars.apply("wds.linkis.rm.secondary.yarnqueue.creators", "IDE"); }