Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,262 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.linkis.common.utils;

import org.apache.commons.lang3.StringUtils;

/**
* Code masking utility for security logging. Prevents sensitive user code from being logged in
* plain text.
*/
public class CodeUtils {

/** Default maximum preview length */
private static final int DEFAULT_MAX_PREVIEW_LENGTH = 50;

/** Default preview length for code snippet (first N characters) */
private static final int DEFAULT_CODE_SNIPPET_LENGTH = 6;

/** Maximum lines to preview */
private static final int MAX_PREVIEW_LINES = 3;

/**
* Mask code for logging - only shows length and line count
*
* @param code the code to mask
* @return masked code information
*/
public static String maskCode(String code) {
if (StringUtils.isBlank(code)) {
return "[empty code]";
}
int length = code.length();
int lines = code.split("\n", -1).length;
return String.format("[code length: %d, lines: %d]", length, lines);
}

/**
* Mask code for logging - shows type, length, line count, and code snippet
*
* @param code the code to mask
* @param codeType the type of code (e.g., "SQL", "Scala", "Python")
* @return masked code information with snippet
*/
public static String maskCode(String code, String codeType) {
if (StringUtils.isBlank(code)) {
return "[empty " + codeType + " code]";
}
int length = code.length();
int lines = code.split("\n", -1).length;

// Get code snippet (first N characters for debugging)
String snippet = getCodeSnippet(code, DEFAULT_CODE_SNIPPET_LENGTH);

return String.format(
"[%s code, length: %d, lines: %d, snippet: %s]", codeType, length, lines, snippet);
}

/**
* Get code snippet (first N characters) for debugging
*
* @param code the code
* @param length number of characters to show
* @return code snippet (always truncated for security)
*/
public static String getCodeSnippet(String code, int length) {
if (StringUtils.isBlank(code)) {
return "";
}

String trimmed = code.trim();

// Always truncate for security, even if code is short
if (trimmed.length() <= length) {
return trimmed + "...";
}

return trimmed.substring(0, length) + "...";
}

/**
* Mask code for logging - shows type, length, line count, and preview
*
* @param code the code to mask
* @param codeType the type of code (e.g., "SQL", "Scala", "Python")
* @param maxPreviewLength maximum characters to preview
* @return masked code information
*/
public static String maskCode(String code, String codeType, int maxPreviewLength) {
if (StringUtils.isBlank(code)) {
return "[empty " + codeType + " code]";
}
int length = code.length();
int lines = code.split("\n", -1).length;

if (maxPreviewLength <= 0) {
return String.format("[%s code, length: %d, lines: %d]", codeType, length, lines);
}

String preview = getPreview(code, maxPreviewLength);
return String.format(
"[%s code, length: %d, lines: %d, preview: %s]", codeType, length, lines, preview);
}

/**
* Get a safe preview of the code (truncated and cleaned)
*
* @param code the code to preview
* @param maxLength maximum length of preview
* @return safe preview string
*/
public static String getPreview(String code, int maxLength) {
if (StringUtils.isBlank(code)) {
return "";
}

// Remove sensitive patterns (passwords, tokens, etc.)
String cleaned = removeSensitiveInfo(code);

// Truncate to max length
if (cleaned.length() <= maxLength) {
return cleaned;
}

return cleaned.substring(0, maxLength) + "...";
}

/**
* Remove sensitive information from code preview
*
* @param code the code to clean
* @return cleaned code
*/
private static String removeSensitiveInfo(String code) {
// Remove common sensitive patterns
String cleaned = code;

// Remove password values
cleaned =
cleaned.replaceAll(
"(?i)(password|passwd|pwd)\\s*['\"]?\\s*[:=]\\s*['\"][^'\"]*['\"]", "$1='***'");
cleaned = cleaned.replaceAll("(?i)(password|passwd|pwd)\\s*[:=]\\s*[^\\s'\"]+", "$1=***");

// Remove token/key values
cleaned =
cleaned.replaceAll(
"(?i)(token|api[_-]?key|secret|access[_-]?key)\\s*['\"]?\\s*[:=]\\s*['\"][^'\"]*['\"]",
"$1='***'");
cleaned =
cleaned.replaceAll(
"(?i)(token|api[_-]?key|secret|access[_-]?key)\\s*[:=]\\s*[^\\s'\"]+", "$1=***");

// Remove connection strings (jdbc:, mysql:, postgres:, etc.)
cleaned =
cleaned.replaceAll("(?i)(jdbc:[^\\s'\"]+|mysql://[^\\s'\"]+|postgres://[^\\s'\"]+)", "***");

return cleaned;
}

/**
* Get only the first few lines of code (for preview)
*
* @param code the code to preview
* @param maxLines maximum lines to show
* @return preview string
*/
public static String getLinePreview(String code, int maxLines) {
if (StringUtils.isBlank(code)) {
return "";
}

String[] lines = code.split("\n", -1);
StringBuilder preview = new StringBuilder();

for (int i = 0; i < Math.min(maxLines, lines.length); i++) {
if (i > 0) {
preview.append("\n");
}
preview.append(lines[i]);
}

if (lines.length > maxLines) {
preview.append("\n... (").append(lines.length - maxLines).append(" more lines)");
}

return preview.toString();
}

/**
* Get code type from file extension or code content
*
* @param code the code
* @param fileType file extension (e.g., ".sql", ".scala", ".py")
* @return detected code type
*/
public static String detectCodeType(String code, String fileType) {
if (StringUtils.isNotBlank(fileType)) {
switch (fileType.toLowerCase()) {
case ".sql":
return "SQL";
case ".scala":
return "Scala";
case ".py":
return "Python";
case ".java":
return "Java";
case ".sh":
case ".bash":
return "Shell";
case "hql":
return "HiveQL";
default:
break;
}
}

// Try to detect from content
if (StringUtils.isBlank(code)) {
return "Unknown";
}

String trimmed = code.trim().toUpperCase();

if (trimmed.startsWith("SELECT")
|| trimmed.startsWith("INSERT")
|| trimmed.startsWith("UPDATE")
|| trimmed.startsWith("DELETE")
|| trimmed.startsWith("CREATE")
|| trimmed.startsWith("ALTER")
|| trimmed.startsWith("DROP")
|| trimmed.startsWith("WITH")) {
return "SQL";
}

if (trimmed.contains("DEF ") || trimmed.contains("CLASS ") || trimmed.startsWith("IMPORT ")) {
if (code.contains("println") || code.contains("Array(")) {
return "Scala";
}
return "Java";
}

if (trimmed.startsWith("IMPORT ") || trimmed.contains("PRINT ") || trimmed.contains("DEF ")) {
return "Python";
}

return "Unknown";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.linkis.common.io.Record;
import org.apache.linkis.common.io.resultset.ResultSetWriter;
import org.apache.linkis.common.log.LogUtils;
import org.apache.linkis.common.utils.CodeUtils;
import org.apache.linkis.common.utils.JsonUtils;
import org.apache.linkis.common.utils.OverloadUtils;
import org.apache.linkis.engineconn.computation.executor.entity.EngineConnTask;
Expand All @@ -40,6 +41,7 @@
import org.apache.linkis.manager.common.entity.resource.NodeResource;
import org.apache.linkis.manager.engineplugin.common.util.NodeResourceUtils;
import org.apache.linkis.manager.label.entity.Label;
import org.apache.linkis.manager.label.entity.engine.EngineType;
import org.apache.linkis.manager.label.entity.engine.EngineTypeLabel;
import org.apache.linkis.manager.label.entity.engine.UserCreatorLabel;
import org.apache.linkis.protocol.engine.JobProgressInfo;
Expand Down Expand Up @@ -188,7 +190,9 @@ public ExecuteResponse executeLine(EngineExecutionContext engineExecutorContext,
} else {
realCode = code.trim();
}
logger.info("Doris engine begins to run code:\n {}", realCode);
logger.info(
"Doris engine begins to run code: {}",
CodeUtils.maskCode(realCode, EngineType.DORIS().toString()));

checkRequiredParameter(realCode);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@

package org.apache.linkis.engineplugin.elasticsearch.executor.client.impl;

import org.apache.linkis.common.utils.CodeUtils;
import org.apache.linkis.engineplugin.elasticsearch.conf.ElasticSearchConfiguration;
import org.apache.linkis.engineplugin.elasticsearch.executor.client.*;
import org.apache.linkis.manager.label.entity.engine.EngineType;
import org.apache.linkis.protocol.constants.TaskConstant;
import org.apache.linkis.storage.utils.StorageUtils;

Expand Down Expand Up @@ -70,7 +72,11 @@ public void open() throws Exception {
@Override
public ElasticSearchResponse executeLine(String code) {
String realCode = code.trim();
logger.info("es client begins to run {} code:\n {}", runType, realCode.trim());
logger.info(
"es client begins to run {} code: {}",
runType,
CodeUtils.maskCode(
realCode.trim(), EngineType.ELASTICSEARCH().toString() + "-" + runType.toUpperCase()));
CountDownLatch countDown = new CountDownLatch(1);
ElasticSearchResponse[] executeResponse = {
new ElasticSearchErrorResponse("INCOMPLETE", null, null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,13 @@
package org.apache.linkis.engineconnplugin.flink.client.factory;

import org.apache.linkis.engineconnplugin.flink.client.utils.YarnConfLoader;
import org.apache.linkis.engineconnplugin.flink.config.FlinkEnvConfiguration;

import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptionsInternal;
import org.apache.flink.yarn.YarnClientYarnClusterInformationRetriever;
import org.apache.flink.yarn.YarnClusterClientFactory;
import org.apache.flink.yarn.YarnClusterDescriptor;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.flink.yarn.configuration.YarnLogConfigUtil;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.client.api.YarnClient;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.linkis.engineconnplugin.flink.executor

import org.apache.linkis.common.utils.{ByteTimeUtils, Utils, VariableUtils}
import org.apache.linkis.common.utils.{ByteTimeUtils, CodeUtils, Utils, VariableUtils}
import org.apache.linkis.engineconn.once.executor.OnceExecutorExecutionContext
import org.apache.linkis.engineconnplugin.flink.client.deployment.YarnPerJobClusterDescriptorAdapter
import org.apache.linkis.engineconnplugin.flink.client.shims.errorcode.FlinkErrorCodeSummary._
Expand All @@ -30,6 +30,7 @@ import org.apache.linkis.engineconnplugin.flink.client.sql.operation.result.Resu
import org.apache.linkis.engineconnplugin.flink.client.sql.parser.{SqlCommand, SqlCommandParser}
import org.apache.linkis.engineconnplugin.flink.context.FlinkEngineConnContext
import org.apache.linkis.governance.common.paser.{CodeParserFactory, CodeType}
import org.apache.linkis.manager.label.entity.engine.EngineType
import org.apache.linkis.protocol.constants.TaskConstant
import org.apache.linkis.scheduler.executer.ErrorExecuteResponse

Expand Down Expand Up @@ -64,14 +65,17 @@ class FlinkCodeOnceExecutor(
if (StringUtils.isBlank(codes)) {
throw new FlinkInitFailedException(SQL_CODE_EMPTY.getErrorDesc)
}
logger.info(s"Ready to submit flink application, sql is: $codes.")
logger.info(s"Ready to submit flink application, sql is: ${CodeUtils
.maskCode(codes, EngineType.FLINK.toString() + "-SQL")}.")
val variableMap =
if (onceExecutorExecutionContext.getOnceExecutorContent.getVariableMap != null) {
onceExecutorExecutionContext.getOnceExecutorContent.getVariableMap
.asInstanceOf[util.Map[String, Any]]
} else new util.HashMap[String, Any]
codes = VariableUtils.replace(codes, variableMap)
logger.info(s"After variable replace, sql is: $codes.")
logger.info(
s"After variable replace, sql is: ${CodeUtils.maskCode(codes, EngineType.FLINK.toString() + "-SQL")}."
)
case runType =>
// Now, only support sql code.
throw new FlinkInitFailedException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,26 @@ import org.apache.linkis.engineconnplugin.flink.config.FlinkEnvConfiguration
import org.apache.linkis.governance.common.conf.GovernanceCommonConf
import org.apache.linkis.governance.common.constant.ec.ECConstants
import org.apache.linkis.manager.common.entity.enumeration.NodeStatus

import org.apache.commons.lang3.StringUtils
import org.apache.flink
import org.apache.flink.client.program.rest.RestClusterClient
import org.apache.flink.configuration.{HighAvailabilityOptions, JobManagerOptions, RestOptions}
import org.apache.flink.runtime.client.JobStatusMessage
import org.apache.flink.yarn.configuration.YarnConfigOptions
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.yarn.api.records.{ApplicationId, ApplicationReport, FinalApplicationStatus, YarnApplicationState}
import org.apache.hadoop.yarn.api.records.{
ApplicationId,
ApplicationReport,
FinalApplicationStatus,
YarnApplicationState
}
import org.apache.hadoop.yarn.client.api.YarnClient
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.util.ConverterUtils

import java.util

import scala.collection.JavaConverters.collectionAsScalaIterableConverter
import scala.collection.mutable.ArrayBuffer

Expand Down
Loading
Loading