From 59275efca1743f7da38c61640c3c325f3bcd6f5f Mon Sep 17 00:00:00 2001 From: v-kkhuang <420895376@qq.com> Date: Thu, 23 Apr 2026 11:37:30 +0800 Subject: [PATCH] =?UTF-8?q?#AI=20commit#=20=E5=BC=80=E5=8F=91=E9=98=B6?= =?UTF-8?q?=E6=AE=B5=EF=BC=9A=20=E4=BC=98=E5=8C=96=E7=A6=81=E6=AD=A2?= =?UTF-8?q?=E5=BC=95=E6=93=8E=E6=89=93=E5=8D=B0code=E9=9A=90=E7=A7=81?= =?UTF-8?q?=E4=BF=A1=E6=81=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../apache/linkis/common/utils/CodeUtils.java | 262 ++++++++++++++++++ .../executor/DorisEngineConnExecutor.java | 6 +- .../impl/ElasticSearchExecutorImpl.java | 8 +- .../LinkisYarnClusterClientFactory.java | 2 - .../executor/FlinkCodeOnceExecutor.scala | 10 +- .../flink/util/YarnUtil.scala | 9 +- .../executor/ImpalaEngineConnExecutor.scala | 7 +- .../executor/NebulaEngineConnExecutor.java | 6 +- .../executor/PrestoEngineConnExecutor.java | 6 +- .../repl/executor/ReplEngineConnExecutor.java | 6 +- .../executor/SparkDataCalcExecutor.scala | 8 +- .../executor/SparkEngineConnExecutor.scala | 25 +- .../spark/executor/SparkSqlExecutor.scala | 7 +- .../executor/TrinoEngineConnExecutor.scala | 7 +- 14 files changed, 344 insertions(+), 25 deletions(-) create mode 100644 linkis-commons/linkis-common/src/main/java/org/apache/linkis/common/utils/CodeUtils.java diff --git a/linkis-commons/linkis-common/src/main/java/org/apache/linkis/common/utils/CodeUtils.java b/linkis-commons/linkis-common/src/main/java/org/apache/linkis/common/utils/CodeUtils.java new file mode 100644 index 00000000000..7d9a2648347 --- /dev/null +++ b/linkis-commons/linkis-common/src/main/java/org/apache/linkis/common/utils/CodeUtils.java @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.common.utils; + +import org.apache.commons.lang3.StringUtils; + +/** + * Code masking utility for security logging. Prevents sensitive user code from being logged in + * plain text. + */ +public class CodeUtils { + + /** Default maximum preview length */ + private static final int DEFAULT_MAX_PREVIEW_LENGTH = 50; + + /** Default preview length for code snippet (first N characters) */ + private static final int DEFAULT_CODE_SNIPPET_LENGTH = 6; + + /** Maximum lines to preview */ + private static final int MAX_PREVIEW_LINES = 3; + + /** + * Mask code for logging - only shows length and line count + * + * @param code the code to mask + * @return masked code information + */ + public static String maskCode(String code) { + if (StringUtils.isBlank(code)) { + return "[empty code]"; + } + int length = code.length(); + int lines = code.split("\n", -1).length; + return String.format("[code length: %d, lines: %d]", length, lines); + } + + /** + * Mask code for logging - shows type, length, line count, and code snippet + * + * @param code the code to mask + * @param codeType the type of code (e.g., "SQL", "Scala", "Python") + * @return masked code information with snippet + */ + public static String maskCode(String code, String codeType) { + if (StringUtils.isBlank(code)) { + return "[empty " + codeType + " code]"; + } + int length = code.length(); + int lines = code.split("\n", -1).length; + + // Get code snippet (first N characters for debugging) + String snippet = getCodeSnippet(code, DEFAULT_CODE_SNIPPET_LENGTH); + + return String.format( + "[%s code, length: %d, lines: %d, snippet: %s]", codeType, length, lines, snippet); + } + + /** + * Get code snippet (first N characters) for debugging + * + * @param code the code + * @param length number of characters to show + * @return code snippet (always truncated for security) + */ + public static String getCodeSnippet(String code, int length) { + if (StringUtils.isBlank(code)) { + return ""; + } + + String trimmed = code.trim(); + + // Always truncate for security, even if code is short + if (trimmed.length() <= length) { + return trimmed + "..."; + } + + return trimmed.substring(0, length) + "..."; + } + + /** + * Mask code for logging - shows type, length, line count, and preview + * + * @param code the code to mask + * @param codeType the type of code (e.g., "SQL", "Scala", "Python") + * @param maxPreviewLength maximum characters to preview + * @return masked code information + */ + public static String maskCode(String code, String codeType, int maxPreviewLength) { + if (StringUtils.isBlank(code)) { + return "[empty " + codeType + " code]"; + } + int length = code.length(); + int lines = code.split("\n", -1).length; + + if (maxPreviewLength <= 0) { + return String.format("[%s code, length: %d, lines: %d]", codeType, length, lines); + } + + String preview = getPreview(code, maxPreviewLength); + return String.format( + "[%s code, length: %d, lines: %d, preview: %s]", codeType, length, lines, preview); + } + + /** + * Get a safe preview of the code (truncated and cleaned) + * + * @param code the code to preview + * @param maxLength maximum length of preview + * @return safe preview string + */ + public static String getPreview(String code, int maxLength) { + if (StringUtils.isBlank(code)) { + return ""; + } + + // Remove sensitive patterns (passwords, tokens, etc.) + String cleaned = removeSensitiveInfo(code); + + // Truncate to max length + if (cleaned.length() <= maxLength) { + return cleaned; + } + + return cleaned.substring(0, maxLength) + "..."; + } + + /** + * Remove sensitive information from code preview + * + * @param code the code to clean + * @return cleaned code + */ + private static String removeSensitiveInfo(String code) { + // Remove common sensitive patterns + String cleaned = code; + + // Remove password values + cleaned = + cleaned.replaceAll( + "(?i)(password|passwd|pwd)\\s*['\"]?\\s*[:=]\\s*['\"][^'\"]*['\"]", "$1='***'"); + cleaned = cleaned.replaceAll("(?i)(password|passwd|pwd)\\s*[:=]\\s*[^\\s'\"]+", "$1=***"); + + // Remove token/key values + cleaned = + cleaned.replaceAll( + "(?i)(token|api[_-]?key|secret|access[_-]?key)\\s*['\"]?\\s*[:=]\\s*['\"][^'\"]*['\"]", + "$1='***'"); + cleaned = + cleaned.replaceAll( + "(?i)(token|api[_-]?key|secret|access[_-]?key)\\s*[:=]\\s*[^\\s'\"]+", "$1=***"); + + // Remove connection strings (jdbc:, mysql:, postgres:, etc.) + cleaned = + cleaned.replaceAll("(?i)(jdbc:[^\\s'\"]+|mysql://[^\\s'\"]+|postgres://[^\\s'\"]+)", "***"); + + return cleaned; + } + + /** + * Get only the first few lines of code (for preview) + * + * @param code the code to preview + * @param maxLines maximum lines to show + * @return preview string + */ + public static String getLinePreview(String code, int maxLines) { + if (StringUtils.isBlank(code)) { + return ""; + } + + String[] lines = code.split("\n", -1); + StringBuilder preview = new StringBuilder(); + + for (int i = 0; i < Math.min(maxLines, lines.length); i++) { + if (i > 0) { + preview.append("\n"); + } + preview.append(lines[i]); + } + + if (lines.length > maxLines) { + preview.append("\n... (").append(lines.length - maxLines).append(" more lines)"); + } + + return preview.toString(); + } + + /** + * Get code type from file extension or code content + * + * @param code the code + * @param fileType file extension (e.g., ".sql", ".scala", ".py") + * @return detected code type + */ + public static String detectCodeType(String code, String fileType) { + if (StringUtils.isNotBlank(fileType)) { + switch (fileType.toLowerCase()) { + case ".sql": + return "SQL"; + case ".scala": + return "Scala"; + case ".py": + return "Python"; + case ".java": + return "Java"; + case ".sh": + case ".bash": + return "Shell"; + case "hql": + return "HiveQL"; + default: + break; + } + } + + // Try to detect from content + if (StringUtils.isBlank(code)) { + return "Unknown"; + } + + String trimmed = code.trim().toUpperCase(); + + if (trimmed.startsWith("SELECT") + || trimmed.startsWith("INSERT") + || trimmed.startsWith("UPDATE") + || trimmed.startsWith("DELETE") + || trimmed.startsWith("CREATE") + || trimmed.startsWith("ALTER") + || trimmed.startsWith("DROP") + || trimmed.startsWith("WITH")) { + return "SQL"; + } + + if (trimmed.contains("DEF ") || trimmed.contains("CLASS ") || trimmed.startsWith("IMPORT ")) { + if (code.contains("println") || code.contains("Array(")) { + return "Scala"; + } + return "Java"; + } + + if (trimmed.startsWith("IMPORT ") || trimmed.contains("PRINT ") || trimmed.contains("DEF ")) { + return "Python"; + } + + return "Unknown"; + } +} diff --git a/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/executor/DorisEngineConnExecutor.java b/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/executor/DorisEngineConnExecutor.java index 5a9ae3a05b0..b57f057168c 100644 --- a/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/executor/DorisEngineConnExecutor.java +++ b/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/executor/DorisEngineConnExecutor.java @@ -21,6 +21,7 @@ import org.apache.linkis.common.io.Record; import org.apache.linkis.common.io.resultset.ResultSetWriter; import org.apache.linkis.common.log.LogUtils; +import org.apache.linkis.common.utils.CodeUtils; import org.apache.linkis.common.utils.JsonUtils; import org.apache.linkis.common.utils.OverloadUtils; import org.apache.linkis.engineconn.computation.executor.entity.EngineConnTask; @@ -40,6 +41,7 @@ import org.apache.linkis.manager.common.entity.resource.NodeResource; import org.apache.linkis.manager.engineplugin.common.util.NodeResourceUtils; import org.apache.linkis.manager.label.entity.Label; +import org.apache.linkis.manager.label.entity.engine.EngineType; import org.apache.linkis.manager.label.entity.engine.EngineTypeLabel; import org.apache.linkis.manager.label.entity.engine.UserCreatorLabel; import org.apache.linkis.protocol.engine.JobProgressInfo; @@ -188,7 +190,9 @@ public ExecuteResponse executeLine(EngineExecutionContext engineExecutorContext, } else { realCode = code.trim(); } - logger.info("Doris engine begins to run code:\n {}", realCode); + logger.info( + "Doris engine begins to run code: {}", + CodeUtils.maskCode(realCode, EngineType.DORIS().toString())); checkRequiredParameter(realCode); diff --git a/linkis-engineconn-plugins/elasticsearch/src/main/java/org/apache/linkis/engineplugin/elasticsearch/executor/client/impl/ElasticSearchExecutorImpl.java b/linkis-engineconn-plugins/elasticsearch/src/main/java/org/apache/linkis/engineplugin/elasticsearch/executor/client/impl/ElasticSearchExecutorImpl.java index bb0b35222ba..0c249519609 100644 --- a/linkis-engineconn-plugins/elasticsearch/src/main/java/org/apache/linkis/engineplugin/elasticsearch/executor/client/impl/ElasticSearchExecutorImpl.java +++ b/linkis-engineconn-plugins/elasticsearch/src/main/java/org/apache/linkis/engineplugin/elasticsearch/executor/client/impl/ElasticSearchExecutorImpl.java @@ -17,8 +17,10 @@ package org.apache.linkis.engineplugin.elasticsearch.executor.client.impl; +import org.apache.linkis.common.utils.CodeUtils; import org.apache.linkis.engineplugin.elasticsearch.conf.ElasticSearchConfiguration; import org.apache.linkis.engineplugin.elasticsearch.executor.client.*; +import org.apache.linkis.manager.label.entity.engine.EngineType; import org.apache.linkis.protocol.constants.TaskConstant; import org.apache.linkis.storage.utils.StorageUtils; @@ -70,7 +72,11 @@ public void open() throws Exception { @Override public ElasticSearchResponse executeLine(String code) { String realCode = code.trim(); - logger.info("es client begins to run {} code:\n {}", runType, realCode.trim()); + logger.info( + "es client begins to run {} code: {}", + runType, + CodeUtils.maskCode( + realCode.trim(), EngineType.ELASTICSEARCH().toString() + "-" + runType.toUpperCase())); CountDownLatch countDown = new CountDownLatch(1); ElasticSearchResponse[] executeResponse = { new ElasticSearchErrorResponse("INCOMPLETE", null, null) diff --git a/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/factory/LinkisYarnClusterClientFactory.java b/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/factory/LinkisYarnClusterClientFactory.java index 6cb999dace4..dbe2d5df3f3 100644 --- a/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/factory/LinkisYarnClusterClientFactory.java +++ b/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/factory/LinkisYarnClusterClientFactory.java @@ -18,7 +18,6 @@ package org.apache.linkis.engineconnplugin.flink.client.factory; import org.apache.linkis.engineconnplugin.flink.client.utils.YarnConfLoader; -import org.apache.linkis.engineconnplugin.flink.config.FlinkEnvConfiguration; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.Configuration; @@ -26,7 +25,6 @@ import org.apache.flink.yarn.YarnClientYarnClusterInformationRetriever; import org.apache.flink.yarn.YarnClusterClientFactory; import org.apache.flink.yarn.YarnClusterDescriptor; -import org.apache.flink.yarn.configuration.YarnConfigOptions; import org.apache.flink.yarn.configuration.YarnLogConfigUtil; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.client.api.YarnClient; diff --git a/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkCodeOnceExecutor.scala b/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkCodeOnceExecutor.scala index 4d2971750e1..33e86e490df 100644 --- a/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkCodeOnceExecutor.scala +++ b/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkCodeOnceExecutor.scala @@ -17,7 +17,7 @@ package org.apache.linkis.engineconnplugin.flink.executor -import org.apache.linkis.common.utils.{ByteTimeUtils, Utils, VariableUtils} +import org.apache.linkis.common.utils.{ByteTimeUtils, CodeUtils, Utils, VariableUtils} import org.apache.linkis.engineconn.once.executor.OnceExecutorExecutionContext import org.apache.linkis.engineconnplugin.flink.client.deployment.YarnPerJobClusterDescriptorAdapter import org.apache.linkis.engineconnplugin.flink.client.shims.errorcode.FlinkErrorCodeSummary._ @@ -30,6 +30,7 @@ import org.apache.linkis.engineconnplugin.flink.client.sql.operation.result.Resu import org.apache.linkis.engineconnplugin.flink.client.sql.parser.{SqlCommand, SqlCommandParser} import org.apache.linkis.engineconnplugin.flink.context.FlinkEngineConnContext import org.apache.linkis.governance.common.paser.{CodeParserFactory, CodeType} +import org.apache.linkis.manager.label.entity.engine.EngineType import org.apache.linkis.protocol.constants.TaskConstant import org.apache.linkis.scheduler.executer.ErrorExecuteResponse @@ -64,14 +65,17 @@ class FlinkCodeOnceExecutor( if (StringUtils.isBlank(codes)) { throw new FlinkInitFailedException(SQL_CODE_EMPTY.getErrorDesc) } - logger.info(s"Ready to submit flink application, sql is: $codes.") + logger.info(s"Ready to submit flink application, sql is: ${CodeUtils + .maskCode(codes, EngineType.FLINK.toString() + "-SQL")}.") val variableMap = if (onceExecutorExecutionContext.getOnceExecutorContent.getVariableMap != null) { onceExecutorExecutionContext.getOnceExecutorContent.getVariableMap .asInstanceOf[util.Map[String, Any]] } else new util.HashMap[String, Any] codes = VariableUtils.replace(codes, variableMap) - logger.info(s"After variable replace, sql is: $codes.") + logger.info( + s"After variable replace, sql is: ${CodeUtils.maskCode(codes, EngineType.FLINK.toString() + "-SQL")}." + ) case runType => // Now, only support sql code. throw new FlinkInitFailedException( diff --git a/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/util/YarnUtil.scala b/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/util/YarnUtil.scala index 7f664a35274..846975319bf 100644 --- a/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/util/YarnUtil.scala +++ b/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/util/YarnUtil.scala @@ -28,6 +28,7 @@ import org.apache.linkis.engineconnplugin.flink.config.FlinkEnvConfiguration import org.apache.linkis.governance.common.conf.GovernanceCommonConf import org.apache.linkis.governance.common.constant.ec.ECConstants import org.apache.linkis.manager.common.entity.enumeration.NodeStatus + import org.apache.commons.lang3.StringUtils import org.apache.flink import org.apache.flink.client.program.rest.RestClusterClient @@ -35,12 +36,18 @@ import org.apache.flink.configuration.{HighAvailabilityOptions, JobManagerOption import org.apache.flink.runtime.client.JobStatusMessage import org.apache.flink.yarn.configuration.YarnConfigOptions import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.yarn.api.records.{ApplicationId, ApplicationReport, FinalApplicationStatus, YarnApplicationState} +import org.apache.hadoop.yarn.api.records.{ + ApplicationId, + ApplicationReport, + FinalApplicationStatus, + YarnApplicationState +} import org.apache.hadoop.yarn.client.api.YarnClient import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.util.ConverterUtils import java.util + import scala.collection.JavaConverters.collectionAsScalaIterableConverter import scala.collection.mutable.ArrayBuffer diff --git a/linkis-engineconn-plugins/impala/src/main/scala/org/apache/linkis/engineplugin/impala/executor/ImpalaEngineConnExecutor.scala b/linkis-engineconn-plugins/impala/src/main/scala/org/apache/linkis/engineplugin/impala/executor/ImpalaEngineConnExecutor.scala index c96fc77297c..3ee89a9d16b 100644 --- a/linkis-engineconn-plugins/impala/src/main/scala/org/apache/linkis/engineplugin/impala/executor/ImpalaEngineConnExecutor.scala +++ b/linkis-engineconn-plugins/impala/src/main/scala/org/apache/linkis/engineplugin/impala/executor/ImpalaEngineConnExecutor.scala @@ -18,7 +18,7 @@ package org.apache.linkis.engineplugin.impala.executor import org.apache.linkis.common.log.LogUtils -import org.apache.linkis.common.utils.{OverloadUtils, Utils} +import org.apache.linkis.common.utils.{CodeUtils, OverloadUtils, Utils} import org.apache.linkis.engineconn.computation.executor.execute.{ ConcurrentComputationExecutor, EngineExecutionContext @@ -54,6 +54,7 @@ import org.apache.linkis.manager.common.entity.resource.{ import org.apache.linkis.manager.engineplugin.common.util.NodeResourceUtils import org.apache.linkis.manager.label.entity.Label import org.apache.linkis.manager.label.entity.engine.{EngineTypeLabel, UserCreatorLabel} +import org.apache.linkis.manager.label.entity.engine.EngineType import org.apache.linkis.protocol.engine.JobProgressInfo import org.apache.linkis.rpc.Sender import org.apache.linkis.scheduler.executer.{ @@ -111,7 +112,9 @@ class ImpalaEngineConnExecutor(override val outputPrintLimit: Int, val id: Int) code.trim } - logger.info(s"impala client begins to run code:\n $realCode") + logger.info( + s"impala client begins to run code: ${CodeUtils.maskCode(realCode, EngineType.IMPALA.toString())}" + ) val taskId = engineExecutionContext.getJobId.get val impalaClient = getOrCreateImpalaClient(engineExecutionContext) diff --git a/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/executor/NebulaEngineConnExecutor.java b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/executor/NebulaEngineConnExecutor.java index 3b1a931b469..bcb61f9e3d7 100644 --- a/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/executor/NebulaEngineConnExecutor.java +++ b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/executor/NebulaEngineConnExecutor.java @@ -21,6 +21,7 @@ import org.apache.linkis.common.io.resultset.ResultSetWriter; import org.apache.linkis.common.log.LogUtils; import org.apache.linkis.common.utils.AESUtils; +import org.apache.linkis.common.utils.CodeUtils; import org.apache.linkis.common.utils.OverloadUtils; import org.apache.linkis.engineconn.common.conf.EngineConnConf; import org.apache.linkis.engineconn.common.conf.EngineConnConstant; @@ -39,6 +40,7 @@ import org.apache.linkis.manager.common.entity.resource.NodeResource; import org.apache.linkis.manager.engineplugin.common.util.NodeResourceUtils; import org.apache.linkis.manager.label.entity.Label; +import org.apache.linkis.manager.label.entity.engine.EngineType; import org.apache.linkis.manager.label.entity.engine.EngineTypeLabel; import org.apache.linkis.manager.label.entity.engine.UserCreatorLabel; import org.apache.linkis.protocol.engine.JobProgressInfo; @@ -135,7 +137,9 @@ public ExecuteResponse executeLine(EngineExecutionContext engineExecutorContext, } else { realCode = code.trim(); } - logger.info("Nebula client begins to run ngql code:\n {}", realCode); + logger.info( + "Nebula client begins to run ngql code: {}", + CodeUtils.maskCode(realCode, EngineType.NEBULA().toString())); String taskId = engineExecutorContext.getJobId().get(); NebulaPool nebulaPool = nebulaPoolCache.getIfPresent(taskId); diff --git a/linkis-engineconn-plugins/presto/src/main/java/org/apache/linkis/engineplugin/presto/executor/PrestoEngineConnExecutor.java b/linkis-engineconn-plugins/presto/src/main/java/org/apache/linkis/engineplugin/presto/executor/PrestoEngineConnExecutor.java index 1bc16ee6018..edb0d8a5442 100644 --- a/linkis-engineconn-plugins/presto/src/main/java/org/apache/linkis/engineplugin/presto/executor/PrestoEngineConnExecutor.java +++ b/linkis-engineconn-plugins/presto/src/main/java/org/apache/linkis/engineplugin/presto/executor/PrestoEngineConnExecutor.java @@ -20,6 +20,7 @@ import org.apache.linkis.common.exception.ErrorException; import org.apache.linkis.common.io.resultset.ResultSetWriter; import org.apache.linkis.common.log.LogUtils; +import org.apache.linkis.common.utils.CodeUtils; import org.apache.linkis.common.utils.OverloadUtils; import org.apache.linkis.engineconn.common.conf.EngineConnConf; import org.apache.linkis.engineconn.common.conf.EngineConnConstant; @@ -39,6 +40,7 @@ import org.apache.linkis.manager.common.entity.resource.NodeResource; import org.apache.linkis.manager.engineplugin.common.util.NodeResourceUtils; import org.apache.linkis.manager.label.entity.Label; +import org.apache.linkis.manager.label.entity.engine.EngineType; import org.apache.linkis.manager.label.entity.engine.EngineTypeLabel; import org.apache.linkis.manager.label.entity.engine.UserCreatorLabel; import org.apache.linkis.protocol.engine.JobProgressInfo; @@ -145,7 +147,9 @@ public ExecuteResponse executeLine(EngineExecutionContext engineExecutorContext, } else { realCode = code.trim(); } - logger.info("presto client begins to run psql code:\n {}", realCode); + logger.info( + "presto client begins to run psql code: {}", + CodeUtils.maskCode(realCode, EngineType.PRESTO().toString())); String taskId = engineExecutorContext.getJobId().get(); ClientSession clientSession = clientSessionCache.getIfPresent(taskId); diff --git a/linkis-engineconn-plugins/repl/src/main/java/org/apache/linkis/engineplugin/repl/executor/ReplEngineConnExecutor.java b/linkis-engineconn-plugins/repl/src/main/java/org/apache/linkis/engineplugin/repl/executor/ReplEngineConnExecutor.java index 53b7094f650..b13fc4f4478 100644 --- a/linkis-engineconn-plugins/repl/src/main/java/org/apache/linkis/engineplugin/repl/executor/ReplEngineConnExecutor.java +++ b/linkis-engineconn-plugins/repl/src/main/java/org/apache/linkis/engineplugin/repl/executor/ReplEngineConnExecutor.java @@ -21,6 +21,7 @@ import org.apache.linkis.common.io.Record; import org.apache.linkis.common.io.resultset.ResultSetWriter; import org.apache.linkis.common.log.LogUtils; +import org.apache.linkis.common.utils.CodeUtils; import org.apache.linkis.common.utils.OverloadUtils; import org.apache.linkis.engineconn.computation.executor.entity.EngineConnTask; import org.apache.linkis.engineconn.computation.executor.execute.ConcurrentComputationExecutor; @@ -35,6 +36,7 @@ import org.apache.linkis.manager.common.entity.resource.NodeResource; import org.apache.linkis.manager.engineplugin.common.util.NodeResourceUtils; import org.apache.linkis.manager.label.entity.Label; +import org.apache.linkis.manager.label.entity.engine.EngineType; import org.apache.linkis.manager.label.entity.engine.EngineTypeLabel; import org.apache.linkis.manager.label.entity.engine.UserCreatorLabel; import org.apache.linkis.protocol.engine.JobProgressInfo; @@ -131,7 +133,9 @@ public ExecuteResponse executeLine(EngineExecutionContext engineExecutorContext, } else { realCode = code.trim(); } - logger.info("Repl engine begins to run code:\n {}", realCode); + logger.info( + "Repl engine begins to run code: {}", + CodeUtils.maskCode(realCode, EngineType.REPL().toString())); String taskId = engineExecutorContext.getJobId().get(); diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkDataCalcExecutor.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkDataCalcExecutor.scala index c2757c34e0e..f72efe31568 100644 --- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkDataCalcExecutor.scala +++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkDataCalcExecutor.scala @@ -18,7 +18,7 @@ package org.apache.linkis.engineplugin.spark.executor import org.apache.linkis.common.exception.FatalException -import org.apache.linkis.common.utils.Utils +import org.apache.linkis.common.utils.{CodeUtils, Utils} import org.apache.linkis.engineconn.computation.executor.execute.EngineExecutionContext import org.apache.linkis.engineconn.core.executor.ExecutorManager import org.apache.linkis.engineplugin.spark.common.{Kind, SparkDataCalc} @@ -27,6 +27,7 @@ import org.apache.linkis.engineplugin.spark.datacalc.model.{DataCalcArrayData, D import org.apache.linkis.engineplugin.spark.entity.SparkEngineSession import org.apache.linkis.engineplugin.spark.utils.EngineUtils import org.apache.linkis.governance.common.paser.EmptyCodeParser +import org.apache.linkis.manager.label.entity.engine.EngineType import org.apache.linkis.scheduler.executer.{ CompletedExecuteResponse, ErrorExecuteResponse, @@ -53,7 +54,10 @@ class SparkDataCalcExecutor(sparkEngineSession: SparkEngineSession, id: Long) context: EngineExecutionContext, jobGroup: String ): ExecuteResponse = { - logger.info("DataCalcExecutor run query: " + code) + logger.info( + "DataCalcExecutor run query: " + CodeUtils + .maskCode(code, EngineType.SPARK.toString() + "-DataCalc") + ) context.appendStdout(s"${EngineUtils.getName} >> $code") Utils.tryCatch { val execType = context.getProperties.getOrDefault("exec-type", "array").toString diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkEngineConnExecutor.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkEngineConnExecutor.scala index fd9d2385c7a..9e76719cba3 100644 --- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkEngineConnExecutor.scala +++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkEngineConnExecutor.scala @@ -18,7 +18,13 @@ package org.apache.linkis.engineplugin.spark.executor import org.apache.linkis.common.log.LogUtils -import org.apache.linkis.common.utils.{ByteTimeUtils, CodeAndRunTypeUtils, Logging, Utils} +import org.apache.linkis.common.utils.{ + ByteTimeUtils, + CodeAndRunTypeUtils, + CodeUtils, + Logging, + Utils +} import org.apache.linkis.engineconn.common.conf.{EngineConnConf, EngineConnConstant} import org.apache.linkis.engineconn.common.creation.EngineCreationContext import org.apache.linkis.engineconn.computation.executor.conf.ComputationExecutorConf @@ -58,6 +64,7 @@ import org.apache.linkis.manager.label.conf.LabelCommonConfig import org.apache.linkis.manager.label.constant.LabelKeyConstant import org.apache.linkis.manager.label.entity.Label import org.apache.linkis.manager.label.entity.engine.{CodeLanguageLabel, EngineType} +import org.apache.linkis.manager.label.entity.engine.EngineType import org.apache.linkis.manager.label.utils.LabelUtil import org.apache.linkis.protocol.engine.JobProgressInfo import org.apache.linkis.scheduler.executer.ExecuteResponse @@ -222,13 +229,19 @@ abstract class SparkEngineConnExecutor(val sc: SparkContext, id: Long) // with unit if set configuration with unit // if not set sc get will get the value of spark.yarn.executor.memoryOverhead such as 512(without unit) val memoryOverhead = sc.getConf.get("spark.executor.memoryOverhead", "1G") - val pythonVersion = SparkConfiguration.SPARK_PYTHON_VERSION.getValue( - EngineConnObject.getEngineCreationContext.getOptions - ) + val engineCreationOptions = EngineConnObject.getEngineCreationContext.getOptions + val pythonVersion = if (engineCreationOptions != null) { + SparkConfiguration.SPARK_PYTHON_VERSION.getValue(engineCreationOptions) + } else { + SparkConfiguration.SPARK_PYTHON_VERSION.getValue + } var engineType = "" val labels = engineExecutorContext.getLabels if (labels.length > 0) { - engineType = LabelUtil.getEngineTypeLabel(labels.toList.asJava).getStringValue + val engineTypeLabel = LabelUtil.getEngineTypeLabel(labels.toList.asJava) + if (engineTypeLabel != null) { + engineType = engineTypeLabel.getStringValue + } } val sb = new StringBuilder sb.append(s"spark.executor.instances=$executorNum\n") @@ -362,7 +375,7 @@ abstract class SparkEngineConnExecutor(val sc: SparkContext, id: Long) completedLine: String ): ExecuteResponse = { val newcode = completedLine + code - logger.info("newcode is " + newcode) + logger.info("newcode is " + CodeUtils.maskCode(newcode, EngineType.SPARK.toString())) executeLine(engineExecutorContext, newcode) } diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkSqlExecutor.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkSqlExecutor.scala index baeaf6e0408..69e73e407a8 100644 --- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkSqlExecutor.scala +++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkSqlExecutor.scala @@ -18,7 +18,7 @@ package org.apache.linkis.engineplugin.spark.executor import org.apache.linkis.common.io.FsPath -import org.apache.linkis.common.utils.Utils +import org.apache.linkis.common.utils.{CodeUtils, Utils} import org.apache.linkis.engineconn.computation.executor.execute.EngineExecutionContext import org.apache.linkis.engineplugin.spark.common.{Kind, SparkSQL} import org.apache.linkis.engineplugin.spark.config.SparkConfiguration @@ -28,6 +28,7 @@ import org.apache.linkis.engineplugin.spark.utils.{DirectPushCache, EngineUtils} import org.apache.linkis.governance.common.constant.job.JobRequestConstants import org.apache.linkis.governance.common.paser.SQLCodeParser import org.apache.linkis.governance.common.utils.JobUtils +import org.apache.linkis.manager.label.entity.engine.EngineType import org.apache.linkis.manager.label.utils.LabelUtil import org.apache.linkis.scheduler.executer._ @@ -86,7 +87,9 @@ class SparkSqlExecutor( sparkEngineSession.sqlContext.sql(s"use $defaultDB") } - logger.info("SQLExecutor run query: " + code) + logger.info( + "SQLExecutor run query: " + CodeUtils.maskCode(code, EngineType.SPARK.toString() + "-SQL") + ) engineExecutionContext.appendStdout(s"${EngineUtils.getName} >> $code") val standInClassLoader = Thread.currentThread().getContextClassLoader try { diff --git a/linkis-engineconn-plugins/trino/src/main/scala/org/apache/linkis/engineplugin/trino/executor/TrinoEngineConnExecutor.scala b/linkis-engineconn-plugins/trino/src/main/scala/org/apache/linkis/engineplugin/trino/executor/TrinoEngineConnExecutor.scala index 947155f2966..9773fbba4fa 100644 --- a/linkis-engineconn-plugins/trino/src/main/scala/org/apache/linkis/engineplugin/trino/executor/TrinoEngineConnExecutor.scala +++ b/linkis-engineconn-plugins/trino/src/main/scala/org/apache/linkis/engineplugin/trino/executor/TrinoEngineConnExecutor.scala @@ -18,7 +18,7 @@ package org.apache.linkis.engineplugin.trino.executor import org.apache.linkis.common.log.LogUtils -import org.apache.linkis.common.utils.{OverloadUtils, Utils} +import org.apache.linkis.common.utils.{CodeUtils, OverloadUtils, Utils} import org.apache.linkis.engineconn.acessible.executor.listener.event.TaskLogUpdateEvent import org.apache.linkis.engineconn.common.conf.{EngineConnConf, EngineConnConstant} import org.apache.linkis.engineconn.computation.executor.conf.ComputationExecutorConf @@ -51,6 +51,7 @@ import org.apache.linkis.manager.common.entity.resource.{ import org.apache.linkis.manager.engineplugin.common.util.NodeResourceUtils import org.apache.linkis.manager.label.entity.Label import org.apache.linkis.manager.label.entity.engine.{EngineTypeLabel, UserCreatorLabel} +import org.apache.linkis.manager.label.entity.engine.EngineType import org.apache.linkis.protocol.engine.JobProgressInfo import org.apache.linkis.rpc.Sender import org.apache.linkis.scheduler.executer.{ @@ -161,7 +162,9 @@ class TrinoEngineConnExecutor(override val outputPrintLimit: Int, val id: Int) } TrinoCode.checkCode(realCode) - logger.info(s"trino client begins to run psql code:\n $realCode") + logger.info( + s"trino client begins to run psql code: ${CodeUtils.maskCode(realCode, EngineType.TRINO.toString())}" + ) val jobId = JobUtils.getJobIdFromMap(engineExecutorContext.getProperties) // Add task id in the first line, and trino will customize it after receiving it.(在第一行加taskid,trino接收后做定制化处理) realCode = s"--linkis_task_id=$jobId" + "\n" + realCode