From 93e8a45d37078a83953789527b86b440aa8d21ed Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Manciot?= Date: Tue, 3 Mar 2026 06:43:06 +0100 Subject: [PATCH 1/5] fix table type through sql round trip --- build.sbt | 2 +- .../app/softnetwork/elastic/sql/package.scala | 2 +- .../elastic/sql/parser/Parser.scala | 2 +- .../elastic/sql/parser/type/package.scala | 14 +- .../elastic/sql/query/package.scala | 14 +- .../elastic/sql/schema/package.scala | 70 +++--- .../elastic/sql/parser/ParserSpec.scala | 223 +++++++++++++++++- 7 files changed, 276 insertions(+), 51 deletions(-) diff --git a/build.sbt b/build.sbt index fa869f5c..73dbe828 100644 --- a/build.sbt +++ b/build.sbt @@ -20,7 +20,7 @@ ThisBuild / organization := "app.softnetwork" name := "softclient4es" -ThisBuild / version := "0.17.2" +ThisBuild / version := "0.17-SNAPSHOT" ThisBuild / scalaVersion := scala213 diff --git a/sql/src/main/scala/app/softnetwork/elastic/sql/package.scala b/sql/src/main/scala/app/softnetwork/elastic/sql/package.scala index 55ed2abc..db713684 100644 --- a/sql/src/main/scala/app/softnetwork/elastic/sql/package.scala +++ b/sql/src/main/scala/app/softnetwork/elastic/sql/package.scala @@ -573,7 +573,7 @@ package object sql { override def sql: String = s"""'$value'""" override def baseType: SQLType = SQLTypes.Varchar - override def ddl: String = s""""$value"""" + override def ddl: String = s""""${value.replace("\\", "\\\\").replace("\"", "\\\"")}"""" } case object IdValue extends Value[String]("_id") with TokenRegex { diff --git a/sql/src/main/scala/app/softnetwork/elastic/sql/parser/Parser.scala b/sql/src/main/scala/app/softnetwork/elastic/sql/parser/Parser.scala index eb2bbed9..03bfc196 100644 --- a/sql/src/main/scala/app/softnetwork/elastic/sql/parser/Parser.scala +++ b/sql/src/main/scala/app/softnetwork/elastic/sql/parser/Parser.scala @@ -1094,7 +1094,7 @@ trait Parser val endStruct: Parser[String] = "}" def objectValue: PackratParser[ObjectValue] = - lparen ~> rep1sep(option, comma) <~ rparen ^^ { opts => + lparen ~> repsep(option, comma) <~ rparen ^^ { opts => ObjectValue(ListMap(opts: _*)) } diff --git a/sql/src/main/scala/app/softnetwork/elastic/sql/parser/type/package.scala b/sql/src/main/scala/app/softnetwork/elastic/sql/parser/type/package.scala index e700d3f0..3ca400b1 100644 --- a/sql/src/main/scala/app/softnetwork/elastic/sql/parser/type/package.scala +++ b/sql/src/main/scala/app/softnetwork/elastic/sql/parser/type/package.scala @@ -24,6 +24,7 @@ import app.softnetwork.elastic.sql.{ Identifier, LongValue, LongValues, + Null, ParamValue, PiValue, RandomValue, @@ -38,9 +39,11 @@ package object `type` { trait TypeParser { self: Parser => def literal: PackratParser[StringValue] = - (("\"" ~> """([^"\\]|\\.)*""".r <~ "\"") | ("'" ~> """([^'\\]|\\.)*""".r <~ "'")) ^^ { str => - StringValue(str) - } + (("\"" ~> """([^"\\]|\\.)*""".r <~ "\"") ^^ { str => + StringValue(str.replace("\\\"", "\"").replace("\\\\", "\\")) + }) | (("'" ~> """([^'\\]|\\.)*""".r <~ "'") ^^ { str => + StringValue(str.replace("\\'", "'").replace("\\\\", "\\")) + }) def long: PackratParser[LongValue] = """(-)?(0|[1-9]\d*)""".r ^^ (str => LongValue(str.toLong)) @@ -59,6 +62,9 @@ package object `type` { def param: PackratParser[ParamValue.type] = "?" ^^ (_ => ParamValue) + def nullValue: PackratParser[Null.type] = + "(?i)NULL\\b".r ^^ (_ => Null) + def literals: PackratParser[Value[_]] = "[" ~> repsep(literal, ",") <~ "]" ^^ { list => StringValues(list) } @@ -78,7 +84,7 @@ package object `type` { def array: PackratParser[Value[_]] = literals | longs | doubles | booleans def value: PackratParser[Value[_]] = - literal | pi | random | double | long | boolean | param | array + literal | pi | random | double | long | boolean | nullValue | param | array def identifierWithValue: Parser[Identifier] = (value ^^ functionAsIdentifier) >> cast diff --git a/sql/src/main/scala/app/softnetwork/elastic/sql/query/package.scala b/sql/src/main/scala/app/softnetwork/elastic/sql/query/package.scala index bcca489c..5341802d 100644 --- a/sql/src/main/scala/app/softnetwork/elastic/sql/query/package.scala +++ b/sql/src/main/scala/app/softnetwork/elastic/sql/query/package.scala @@ -833,11 +833,15 @@ package object query { case None => Nil } - lazy val tableType: TableType = (options.get("type") match { - case Some(value) => - value match { - case s: StringValue => Some(TableType(s.value)) - case _ => None + lazy val tableType: TableType = (mappings.get("_meta") match { + case Some(meta) => + meta match { + case o: ObjectValue => + o.value.get("type") match { + case Some(s: StringValue) => Some(TableType(s.value)) + case _ => None + } + case _ => None } case None => None }).getOrElse(TableType.Regular) diff --git a/sql/src/main/scala/app/softnetwork/elastic/sql/schema/package.scala b/sql/src/main/scala/app/softnetwork/elastic/sql/schema/package.scala index c1a2f2eb..cc26f771 100644 --- a/sql/src/main/scala/app/softnetwork/elastic/sql/schema/package.scala +++ b/sql/src/main/scala/app/softnetwork/elastic/sql/schema/package.scala @@ -908,29 +908,43 @@ package object schema { ) ) } - .map("script" -> _) ++ ListMap( - "multi_fields" -> ObjectValue( - ListMap(multiFields.map(field => field.name -> ObjectValue(field._meta)): _*) - ) - ) ++ (if (lineage.nonEmpty) { - // ✅ Lineage as map of paths - ListMap( - "lineage" -> ObjectValue( - lineage.map { case (pathId, chain) => - pathId -> ObjectValues( - chain.map { case (table, column) => - ObjectValue( - ListMap( - "table" -> StringValue(table), - "column" -> StringValue(column) - ) - ) - } - ) - } - ) - ) - } else ListMap.empty) + .map("script" -> _) ++ (if (multiFields.nonEmpty) + ListMap( + "multi_fields" -> ObjectValue( + ListMap( + multiFields.map(field => + field.name -> ObjectValue(field._meta) + ): _* + ) + ) + ) + else ListMap.empty[String, Value[_]]) ++ (if (lineage.nonEmpty) { + // ✅ Lineage as map of paths + ListMap( + "lineage" -> ObjectValue( + lineage.map { case (pathId, chain) => + pathId -> ObjectValues( + chain.map { + case ( + table, + column + ) => + ObjectValue( + ListMap( + "table" -> StringValue( + table + ), + "column" -> StringValue( + column + ) + ) + ) + } + ) + } + ) + ) + } else ListMap.empty) } def updateStruct(): Column = { @@ -1443,9 +1457,11 @@ package object schema { "columns" -> ObjectValue(cols.map { case (name, col) => name -> ObjectValue(col._meta) }) ) ++ ListMap( "type" -> StringValue(tableType.name) - ) ++ ListMap( - "materialized_views" -> StringValues(materializedViews.map(StringValue)) - ) + ) ++ (if (materializedViews.nonEmpty) + ListMap( + "materialized_views" -> StringValues(materializedViews.map(StringValue)) + ) + else ListMap.empty[String, Value[_]]) def update(): Table = { val updated = @@ -1490,7 +1506,7 @@ package object schema { "" } val separator = if (partitionBy.nonEmpty) "," else "" - s"$separator\nOPTIONS = ${Seq( + s"$separator\nOPTIONS ${Seq( mappingOpts, settingsOpts, aliasesOpts diff --git a/sql/src/test/scala/app/softnetwork/elastic/sql/parser/ParserSpec.scala b/sql/src/test/scala/app/softnetwork/elastic/sql/parser/ParserSpec.scala index 8a386266..41f7fdf5 100644 --- a/sql/src/test/scala/app/softnetwork/elastic/sql/parser/ParserSpec.scala +++ b/sql/src/test/scala/app/softnetwork/elastic/sql/parser/ParserSpec.scala @@ -2,10 +2,12 @@ package app.softnetwork.elastic.sql.parser import app.softnetwork.elastic.schema.Index import app.softnetwork.elastic.sql.{ + BooleanValue, DateMathScript, Identifier, IngestTimestampValue, IntValue, + Null, ObjectValue, StringValue, StringValues @@ -19,6 +21,7 @@ import app.softnetwork.elastic.sql.policy.EnrichPolicyType import app.softnetwork.elastic.sql.query._ import app.softnetwork.elastic.sql.schema.{ mapper, + Column, DateIndexNameProcessor, EnrichProcessor, EnrichShapeRelation, @@ -27,7 +30,9 @@ import app.softnetwork.elastic.sql.schema.{ PartitionDate, PrimaryKeyProcessor, ScriptProcessor, - SetProcessor + SetProcessor, + Table, + TableType } import app.softnetwork.elastic.sql.time.TimeUnit.DAYS import app.softnetwork.elastic.sql.time.{CalendarInterval, TimeUnit} @@ -1032,7 +1037,7 @@ class ParserSpec extends AnyFlatSpec with Matchers { json shouldBe """{"description":"CREATE OR REPLACE PIPELINE users_ddl_default_pipeline WITH PROCESSORS (name SET DEFAULT 'anonymous', age INT SCRIPT AS (DATE_DIFF(birthdate, CURRENT_DATE, YEAR)), ingested_at SET DEFAULT _ingest.timestamp, profile.seniority INT SCRIPT AS (DATE_DIFF(profile.join_date, CURRENT_DATE, DAY)), PARTITION BY birthdate (MONTH), PRIMARY KEY (id))","processors":[{"set":{"description":"name SET DEFAULT 'anonymous'","field":"name","ignore_failure":true,"value":"anonymous","if":"ctx.name == null"}},{"script":{"description":"age INT SCRIPT AS (DATE_DIFF(birthdate, CURRENT_DATE, YEAR))","lang":"painless","source":"def param1 = ctx.birthdate; def param2 = ZonedDateTime.ofInstant(Instant.ofEpochMilli(ctx['_ingest']['timestamp']), ZoneId.of('Z')).toLocalDate(); def param3 = Long.valueOf(ChronoUnit.YEARS.between(param1, param2)); ctx.age = (param1 == null) ? null : param3","ignore_failure":true}},{"set":{"description":"ingested_at SET DEFAULT _ingest.timestamp","field":"ingested_at","ignore_failure":true,"value":"{{_ingest.timestamp}}","if":"ctx.ingested_at == null"}},{"script":{"description":"profile.seniority INT SCRIPT AS (DATE_DIFF(profile.join_date, CURRENT_DATE, DAY))","lang":"painless","source":"def param1 = ctx.profile?.join_date; def param2 = ZonedDateTime.ofInstant(Instant.ofEpochMilli(ctx['_ingest']['timestamp']), ZoneId.of('Z')).toLocalDate(); def param3 = Long.valueOf(ChronoUnit.DAYS.between(param1, param2)); ctx.profile.seniority = (param1 == null) ? null : param3","ignore_failure":true}},{"date_index_name":{"description":"PARTITION BY birthdate (MONTH)","field":"birthdate","date_rounding":"M","date_formats":["yyyy-MM"],"index_name_prefix":"users-","ignore_failure":true}},{"set":{"description":"PRIMARY KEY (id)","field":"_id","value":"{{id}}","ignore_failure":false,"ignore_empty_value":false}}]}""" val indexMappings = schema.indexMappings println(indexMappings) - indexMappings.toString shouldBe """{"properties":{"id":{"type":"integer"},"name":{"type":"text","fields":{"raw":{"type":"keyword"}},"analyzer":"french","search_analyzer":"french"},"birthdate":{"type":"date"},"age":{"type":"integer"},"ingested_at":{"type":"date"},"profile":{"type":"object","properties":{"bio":{"type":"text"},"followers":{"type":"integer"},"join_date":{"type":"date"},"seniority":{"type":"integer"}}}},"dynamic":false,"_meta":{"primary_key":["id"],"partition_by":{"column":"birthdate","granularity":"M"},"columns":{"id":{"data_type":"INT","not_null":"true","comment":"user identifier"},"name":{"data_type":"VARCHAR","not_null":"false","default_value":"anonymous","multi_fields":{"raw":{"data_type":"KEYWORD","not_null":"false","comment":"sortable"}}},"birthdate":{"data_type":"DATE","not_null":"false"},"age":{"data_type":"INT","not_null":"false","script":{"sql":"DATE_DIFF(birthdate, CURRENT_DATE, YEAR)","column":"age","painless":"def param1 = ctx.birthdate; def param2 = ZonedDateTime.ofInstant(Instant.ofEpochMilli(ctx['_ingest']['timestamp']), ZoneId.of('Z')).toLocalDate(); def param3 = Long.valueOf(ChronoUnit.YEARS.between(param1, param2)); ctx.age = (param1 == null) ? null : param3"}},"ingested_at":{"data_type":"TIMESTAMP","not_null":"false","default_value":"_ingest.timestamp"},"profile":{"data_type":"STRUCT","not_null":"false","comment":"user profile","multi_fields":{"bio":{"data_type":"VARCHAR","not_null":"false"},"followers":{"data_type":"INT","not_null":"false"},"join_date":{"data_type":"DATE","not_null":"false"},"seniority":{"data_type":"INT","not_null":"false","script":{"sql":"DATE_DIFF(profile.join_date, CURRENT_DATE, DAY)","column":"profile.seniority","painless":"def param1 = ctx.profile?.join_date; def param2 = ZonedDateTime.ofInstant(Instant.ofEpochMilli(ctx['_ingest']['timestamp']), ZoneId.of('Z')).toLocalDate(); def param3 = Long.valueOf(ChronoUnit.DAYS.between(param1, param2)); ctx.profile.seniority = (param1 == null) ? null : param3"}}}}},"type":"regular","materialized_views":[]}}""".stripMargin + indexMappings.toString shouldBe """{"properties":{"id":{"type":"integer"},"name":{"type":"text","fields":{"raw":{"type":"keyword"}},"analyzer":"french","search_analyzer":"french"},"birthdate":{"type":"date"},"age":{"type":"integer"},"ingested_at":{"type":"date"},"profile":{"type":"object","properties":{"bio":{"type":"text"},"followers":{"type":"integer"},"join_date":{"type":"date"},"seniority":{"type":"integer"}}}},"dynamic":false,"_meta":{"primary_key":["id"],"partition_by":{"column":"birthdate","granularity":"M"},"columns":{"id":{"data_type":"INT","not_null":"true","comment":"user identifier"},"name":{"data_type":"VARCHAR","not_null":"false","default_value":"anonymous","multi_fields":{"raw":{"data_type":"KEYWORD","not_null":"false","comment":"sortable"}}},"birthdate":{"data_type":"DATE","not_null":"false"},"age":{"data_type":"INT","not_null":"false","script":{"sql":"DATE_DIFF(birthdate, CURRENT_DATE, YEAR)","column":"age","painless":"def param1 = ctx.birthdate; def param2 = ZonedDateTime.ofInstant(Instant.ofEpochMilli(ctx['_ingest']['timestamp']), ZoneId.of('Z')).toLocalDate(); def param3 = Long.valueOf(ChronoUnit.YEARS.between(param1, param2)); ctx.age = (param1 == null) ? null : param3"}},"ingested_at":{"data_type":"TIMESTAMP","not_null":"false","default_value":"_ingest.timestamp"},"profile":{"data_type":"STRUCT","not_null":"false","comment":"user profile","multi_fields":{"bio":{"data_type":"VARCHAR","not_null":"false"},"followers":{"data_type":"INT","not_null":"false"},"join_date":{"data_type":"DATE","not_null":"false"},"seniority":{"data_type":"INT","not_null":"false","script":{"sql":"DATE_DIFF(profile.join_date, CURRENT_DATE, DAY)","column":"profile.seniority","painless":"def param1 = ctx.profile?.join_date; def param2 = ZonedDateTime.ofInstant(Instant.ofEpochMilli(ctx['_ingest']['timestamp']), ZoneId.of('Z')).toLocalDate(); def param3 = Long.valueOf(ChronoUnit.DAYS.between(param1, param2)); ctx.profile.seniority = (param1 == null) ? null : param3"}}}}},"type":"regular"}}""".stripMargin val indexSettings = schema.indexSettings println(indexSettings) indexSettings.toString shouldBe """{"index":{}}""" @@ -1094,7 +1099,7 @@ class ParserSpec extends AnyFlatSpec with Matchers { | zip_code KEYWORD COMMENT 'Required for enrichment (aggregated from customers.department.zip_code)'), | _last_updated TIMESTAMP DEFAULT _ingest.timestamp NOT NULL COMMENT 'Last update timestamp', | PRIMARY KEY (id) - |) OPTIONS = (mappings = (_meta = (created_by = "materialized_view_generator", join_keys = ["id"], source_table = "customers", purpose = "Captures field values for enrichment via Transform", required_fields = ["name","email","department.zip_code"])), settings = (number_of_shards = "1", refresh_interval = "30s"))""".stripMargin + |) OPTIONS (mappings = (_meta = (created_by = "materialized_view_generator", join_keys = ["id"], source_table = "customers", purpose = "Captures field values for enrichment via Transform", required_fields = ["name","email","department.zip_code"])), settings = (number_of_shards = "1", refresh_interval = "30s"))""".stripMargin val result = Parser(sql) result.isRight shouldBe true val stmt = result.toOption.get @@ -1113,6 +1118,200 @@ class ParserSpec extends AnyFlatSpec with Matchers { } } + it should "preserve tableType through sql round-trip" in { + val tableTypes = Seq( + TableType.MaterializedView, + TableType.Changelog, + TableType.Enrichment, + TableType.View, + TableType.External + ) + tableTypes.foreach { expectedType => + val table = Table( + name = "test_table", + columns = List( + Column("id", SQLTypes.Int), + Column("name", SQLTypes.Varchar) + ), + primaryKey = List("id"), + tableType = expectedType + ).update() + + // verify _meta contains the type + table.mappings.get("_meta") match { + case Some(ObjectValue(meta)) => + meta.get("type") match { + case Some(StringValue(t)) => t shouldBe expectedType.name + case other => + fail(s"Expected StringValue(${expectedType.name}) in _meta.type, got $other") + } + case other => fail(s"Expected ObjectValue for _meta, got $other") + } + + // round-trip through Index (ES indexMappings JSON → schema reconstruction) + val indexMappings = table.indexMappings + val indexSettings = table.indexSettings + val mappingsNode = mapper.createObjectNode() + mappingsNode.set("mappings", indexMappings) + val settingsNode = mapper.createObjectNode() + settingsNode.set("settings", indexSettings) + val esIndex = Index( + name = "test_table", + mappings = mappingsNode, + settings = settingsNode + ) + val reconstructed = esIndex.schema + reconstructed.tableType shouldBe expectedType + } + } + + it should "parse tableType from _meta in DDL OPTIONS" in { + val tableTypes = Seq( + ("materialized_view", TableType.MaterializedView), + ("changelog", TableType.Changelog), + ("enrichment", TableType.Enrichment), + ("view", TableType.View), + ("external", TableType.External) + ) + tableTypes.foreach { case (typeName, expectedType) => + val sql = + s"""CREATE OR REPLACE TABLE test_table ( + | id INT, + | name VARCHAR, + | PRIMARY KEY (id) + |) OPTIONS (mappings = (_meta = (type = "$typeName", created_by = "test")))""".stripMargin + val result = Parser(sql) + result match { + case Right(ct: CreateTable) => + ct.tableType shouldBe expectedType + ct.schema.tableType shouldBe expectedType + case Right(other) => + fail(s"Expected CreateTable for $typeName, got ${other.getClass.getSimpleName}") + case Left(err) => + fail(s"Failed to parse SQL for $typeName: $err") + } + } + } + + it should "default tableType to Regular when _meta has no type" in { + val sql = + """CREATE OR REPLACE TABLE test_table ( + | id INT, + | name VARCHAR + |) OPTIONS (mappings = (_meta = (created_by = "test")))""".stripMargin + val result = Parser(sql) + result.isRight shouldBe true + result.toOption.get match { + case ct: CreateTable => + ct.tableType shouldBe TableType.Regular + ct.schema.tableType shouldBe TableType.Regular + case _ => fail("Expected CreateTable") + } + } + + it should "preserve tableType through full DDL round-trip" in { + // painless script with double-quotes inside (reproduces the MV computed fields bug) + val painlessWithQuotes = + """def param1 = ctx.createdAt; def param2 = LocalDate.parse("2025-09-11", DateTimeFormatter.ofPattern("yyyy-MM-dd")); ctx.effective_date = param1""" + val table = Table( + name = "orders_with_customers_mv", + columns = List( + Column("id", SQLTypes.Int, notNull = true), + Column("amount", SQLTypes.Double), + Column("customer_name", SQLTypes.Varchar), + Column("email", SQLTypes.Varchar), + Column("customer_zip", SQLTypes.Keyword), + Column( + "effective_date", + SQLTypes.Date, + script = Some( + ScriptProcessor( + script = + "COALESCE(NULLIF(createdAt, DATE_PARSE('2025-09-11', '%Y-%m-%d') - INTERVAL 2 DAY), CURRENT_DATE)", + column = "effective_date", + dataType = SQLTypes.Date, + source = painlessWithQuotes + ) + ) + ), + Column("status", SQLTypes.Keyword) + ), + primaryKey = List("id"), + tableType = TableType.MaterializedView + ).update() + + table.tableType shouldBe TableType.MaterializedView + + val ddl = table.sql + println(s"Generated DDL:\n$ddl") + + val result = Parser(ddl) + result match { + case Right(ct: CreateTable) => + println(s"Parsed tableType: ${ct.tableType}") + println(s"Parsed mappings keys: ${ct.mappings.keys}") + ct.tableType shouldBe TableType.MaterializedView + // verify the script value round-trips correctly + ct.mappings.get("_meta") match { + case Some(meta: ObjectValue) => + meta.value.get("columns") match { + case Some(cols: ObjectValue) => + cols.value.get("effective_date") match { + case Some(colMeta: ObjectValue) => + colMeta.value.get("script") match { + case Some(scriptObj: ObjectValue) => + val painless = scriptObj.value.get("painless") + painless match { + case Some(sv: StringValue) => + sv.value shouldBe painlessWithQuotes + case other => + fail(s"Expected StringValue for painless, got $other") + } + case other => fail(s"Expected script ObjectValue, got $other") + } + case other => fail(s"Expected effective_date column, got $other") + } + case other => fail(s"Expected columns ObjectValue, got $other") + } + case other => fail(s"Expected _meta ObjectValue, got $other") + } + case Right(other) => + fail(s"Expected CreateTable, got ${other.getClass.getSimpleName}") + case Left(err) => + fail(s"Failed to parse generated DDL: $err") + } + } + + it should "parse OPTIONS with NULL values and empty arrays" in { + val sql = + """CREATE OR REPLACE TABLE orders_with_customers_mv ( + | id INT, + | name VARCHAR, + | PRIMARY KEY (id) + |) OPTIONS (mappings = (_meta = (group_by = NULL, aggregations = [], has_where = true, query = "SELECT * FROM orders", type = "materialized_view")))""".stripMargin + val result = Parser(sql) + result match { + case Right(ct: CreateTable) => + ct.tableType shouldBe TableType.MaterializedView + ct.mappings.get("_meta") match { + case Some(meta: ObjectValue) => + meta.value.get("group_by") match { + case Some(Null) => // expected + case other => fail(s"Expected Null for group_by, got $other") + } + meta.value.get("has_where") match { + case Some(BooleanValue(true)) => // expected + case other => fail(s"Expected BooleanValue(true) for has_where, got $other") + } + case other => fail(s"Expected _meta ObjectValue, got $other") + } + case Right(other) => + fail(s"Expected CreateTable, got ${other.getClass.getSimpleName}") + case Left(err) => + fail(s"Failed to parse SQL: $err") + } + } + it should "parse DROP TABLE if exists" in { val sql = "DROP TABLE IF EXISTS users" val result = Parser(sql) @@ -1772,7 +1971,7 @@ class ParserSpec extends AnyFlatSpec with Matchers { create.input shouldBe simpleInput val json = create.watcher.node println(json.toPrettyString) - json.toString shouldBe """{"trigger":{"schedule":{"interval":"5m"}},"input":{"simple":{"keys":["value1","value2"]}},"condition":{"never":{}},"actions":{"webhook_action":{"foreach":"ctx.payload.keys","max_iterations":2,"webhook":{"scheme":"https","host":"example.com","port":443,"method":"post","path":"/webhook","headers":{"Content-Type":"application/json"},"params":{"watch_id":"{{ctx.watch_id}}"},"body":"{\\\"message\\\": \\\"Watcher triggered with {{ctx.payload._value}}\\\"}","connection_timeout":"10s","read_timeout":"30s"}}}}""" + json.toString shouldBe """{"trigger":{"schedule":{"interval":"5m"}},"input":{"simple":{"keys":["value1","value2"]}},"condition":{"never":{}},"actions":{"webhook_action":{"foreach":"ctx.payload.keys","max_iterations":2,"webhook":{"scheme":"https","host":"example.com","port":443,"method":"post","path":"/webhook","headers":{"Content-Type":"application/json"},"params":{"watch_id":"{{ctx.watch_id}}"},"body":"{\"message\": \"Watcher triggered with {{ctx.payload._value}}\"}","connection_timeout":"10s","read_timeout":"30s"}}}}""" case _ => fail("Expected CreateWatcher") } } @@ -1820,7 +2019,7 @@ class ParserSpec extends AnyFlatSpec with Matchers { create.input shouldBe simpleInput val json = create.watcher.node println(json.toPrettyString) - json.toString shouldBe """{"trigger":{"schedule":{"interval":"5m"}},"input":{"simple":{"keys":["value1","value2"]}},"condition":{"always":{}},"actions":{"webhook_action":{"foreach":"ctx.payload.keys","max_iterations":2,"webhook":{"scheme":"https","host":"example.com","port":443,"method":"post","path":"/webhook","headers":{"Content-Type":"application/json"},"params":{"watch_id":"{{ctx.watch_id}}"},"body":"{\\\"message\\\": \\\"Watcher triggered with {{ctx.payload._value}}\\\"}","connection_timeout":"10s","read_timeout":"30s"}}}}""" + json.toString shouldBe """{"trigger":{"schedule":{"interval":"5m"}},"input":{"simple":{"keys":["value1","value2"]}},"condition":{"always":{}},"actions":{"webhook_action":{"foreach":"ctx.payload.keys","max_iterations":2,"webhook":{"scheme":"https","host":"example.com","port":443,"method":"post","path":"/webhook","headers":{"Content-Type":"application/json"},"params":{"watch_id":"{{ctx.watch_id}}"},"body":"{\"message\": \"Watcher triggered with {{ctx.payload._value}}\"}","connection_timeout":"10s","read_timeout":"30s"}}}}""" case _ => fail("Expected CreateWatcher") } } @@ -1870,7 +2069,7 @@ class ParserSpec extends AnyFlatSpec with Matchers { create.input shouldBe simpleInput val json = create.watcher.node println(json.toPrettyString) - json.toString shouldBe """{"trigger":{"schedule":{"interval":"5m"}},"input":{"simple":{"keys":["value1","value2"]}},"condition":{"compare":{"ctx.payload.hits.total":{"gt":10}}},"actions":{"webhook_action":{"foreach":"ctx.payload.keys","max_iterations":2,"webhook":{"scheme":"https","host":"example.com","port":443,"method":"post","path":"/webhook","headers":{"Content-Type":"application/json"},"params":{"watch_id":"{{ctx.watch_id}}"},"body":"{\\\"message\\\": \\\"Watcher triggered with {{ctx.payload._value}}\\\"}","connection_timeout":"10s","read_timeout":"30s"}}}}""" + json.toString shouldBe """{"trigger":{"schedule":{"interval":"5m"}},"input":{"simple":{"keys":["value1","value2"]}},"condition":{"compare":{"ctx.payload.hits.total":{"gt":10}}},"actions":{"webhook_action":{"foreach":"ctx.payload.keys","max_iterations":2,"webhook":{"scheme":"https","host":"example.com","port":443,"method":"post","path":"/webhook","headers":{"Content-Type":"application/json"},"params":{"watch_id":"{{ctx.watch_id}}"},"body":"{\"message\": \"Watcher triggered with {{ctx.payload._value}}\"}","connection_timeout":"10s","read_timeout":"30s"}}}}""" case _ => fail("Expected CreateWatcher") } } @@ -1920,7 +2119,7 @@ class ParserSpec extends AnyFlatSpec with Matchers { create.trigger shouldBe intervalTrigger val json = create.watcher.node println(json.toPrettyString) - json.toString shouldBe """{"trigger":{"schedule":{"interval":"5m"}},"input":{"simple":{"keys":["value1","value2"]}},"condition":{"compare":{"ctx.execution_time":{"gt":"now-5d/d"}}},"actions":{"webhook_action":{"foreach":"ctx.payload.keys","max_iterations":2,"webhook":{"scheme":"https","host":"example.com","port":443,"method":"post","path":"/webhook","headers":{"Content-Type":"application/json"},"params":{"watch_id":"{{ctx.watch_id}}"},"body":"{\\\"message\\\": \\\"Watcher triggered with {{ctx.payload._value}}\\\"}","connection_timeout":"10s","read_timeout":"30s"}}}}""" + json.toString shouldBe """{"trigger":{"schedule":{"interval":"5m"}},"input":{"simple":{"keys":["value1","value2"]}},"condition":{"compare":{"ctx.execution_time":{"gt":"now-5d/d"}}},"actions":{"webhook_action":{"foreach":"ctx.payload.keys","max_iterations":2,"webhook":{"scheme":"https","host":"example.com","port":443,"method":"post","path":"/webhook","headers":{"Content-Type":"application/json"},"params":{"watch_id":"{{ctx.watch_id}}"},"body":"{\"message\": \"Watcher triggered with {{ctx.payload._value}}\"}","connection_timeout":"10s","read_timeout":"30s"}}}}""" case _ => fail("Expected CreateWatcher") } } @@ -1970,7 +2169,7 @@ class ParserSpec extends AnyFlatSpec with Matchers { create.input shouldBe simpleInput val json = create.watcher.node println(json.toPrettyString) - json.toString shouldBe """{"trigger":{"schedule":{"interval":"5m"}},"input":{"simple":{"keys":["value1","value2"]}},"condition":{"script":{"source":"ctx.payload.keys.size > params.threshold","lang":"painless","params":{"threshold":1}}},"actions":{"webhook_action":{"foreach":"ctx.payload.keys","max_iterations":2,"webhook":{"scheme":"https","host":"example.com","port":443,"method":"post","path":"/webhook","headers":{"Content-Type":"application/json"},"params":{"watch_id":"{{ctx.watch_id}}"},"body":"{\\\"message\\\": \\\"Watcher triggered with {{ctx.payload._value}}\\\"}","connection_timeout":"10s","read_timeout":"30s"}}}}""" + json.toString shouldBe """{"trigger":{"schedule":{"interval":"5m"}},"input":{"simple":{"keys":["value1","value2"]}},"condition":{"script":{"source":"ctx.payload.keys.size > params.threshold","lang":"painless","params":{"threshold":1}}},"actions":{"webhook_action":{"foreach":"ctx.payload.keys","max_iterations":2,"webhook":{"scheme":"https","host":"example.com","port":443,"method":"post","path":"/webhook","headers":{"Content-Type":"application/json"},"params":{"watch_id":"{{ctx.watch_id}}"},"body":"{\"message\": \"Watcher triggered with {{ctx.payload._value}}\"}","connection_timeout":"10s","read_timeout":"30s"}}}}""" case _ => fail("Expected CreateWatcher") } } @@ -2020,7 +2219,7 @@ class ParserSpec extends AnyFlatSpec with Matchers { create.input shouldBe simpleInput val json = create.watcher.node println(json.toPrettyString) - json.toString shouldBe """{"trigger":{"schedule":{"cron":"0 */5 * * * ?"}},"input":{"simple":{"keys":["value1","value2"]}},"condition":{"script":{"source":"ctx.payload.keys.size > params.threshold","lang":"painless","params":{"threshold":1}}},"actions":{"webhook_action":{"foreach":"ctx.payload.keys","max_iterations":2,"webhook":{"scheme":"https","host":"example.com","port":443,"method":"post","path":"/webhook","headers":{"Content-Type":"application/json"},"params":{"watch_id":"{{ctx.watch_id}}"},"body":"{\\\"message\\\": \\\"Watcher triggered with {{ctx.payload._value}}\\\"}","connection_timeout":"10s","read_timeout":"30s"}}}}""" + json.toString shouldBe """{"trigger":{"schedule":{"cron":"0 */5 * * * ?"}},"input":{"simple":{"keys":["value1","value2"]}},"condition":{"script":{"source":"ctx.payload.keys.size > params.threshold","lang":"painless","params":{"threshold":1}}},"actions":{"webhook_action":{"foreach":"ctx.payload.keys","max_iterations":2,"webhook":{"scheme":"https","host":"example.com","port":443,"method":"post","path":"/webhook","headers":{"Content-Type":"application/json"},"params":{"watch_id":"{{ctx.watch_id}}"},"body":"{\"message\": \"Watcher triggered with {{ctx.payload._value}}\"}","connection_timeout":"10s","read_timeout":"30s"}}}}""" case _ => fail("Expected CreateWatcher") } } @@ -2047,7 +2246,7 @@ class ParserSpec extends AnyFlatSpec with Matchers { create.input shouldBe simpleInput val json = create.watcher.node println(json.toPrettyString) - json.toString shouldBe """{"trigger":{"schedule":{"cron":"0 */5 * * * ?"}},"input":{"simple":{"keys":["value1","value2"]}},"condition":{"script":{"source":"ctx.payload.keys.size > params.threshold","lang":"painless","params":{"threshold":1}}},"actions":{"log_action":{"foreach":"ctx.payload.hits.hits","max_iterations":500,"logging":{"text":"Watcher triggered with {{ctx.payload.hits.total}} hits","level":"info"}},"webhook_action":{"foreach":"ctx.payload.keys","max_iterations":2,"webhook":{"scheme":"https","host":"example.com","port":443,"method":"post","path":"/webhook","headers":{"Content-Type":"application/json"},"params":{"watch_id":"{{ctx.watch_id}}"},"body":"{\\\"message\\\": \\\"Watcher triggered with {{ctx.payload._value}}\\\"}","connection_timeout":"10s","read_timeout":"30s"}}}}""" + json.toString shouldBe """{"trigger":{"schedule":{"cron":"0 */5 * * * ?"}},"input":{"simple":{"keys":["value1","value2"]}},"condition":{"script":{"source":"ctx.payload.keys.size > params.threshold","lang":"painless","params":{"threshold":1}}},"actions":{"log_action":{"foreach":"ctx.payload.hits.hits","max_iterations":500,"logging":{"text":"Watcher triggered with {{ctx.payload.hits.total}} hits","level":"info"}},"webhook_action":{"foreach":"ctx.payload.keys","max_iterations":2,"webhook":{"scheme":"https","host":"example.com","port":443,"method":"post","path":"/webhook","headers":{"Content-Type":"application/json"},"params":{"watch_id":"{{ctx.watch_id}}"},"body":"{\"message\": \"Watcher triggered with {{ctx.payload._value}}\"}","connection_timeout":"10s","read_timeout":"30s"}}}}""" case _ => fail("Expected CreateWatcher") } } @@ -2074,7 +2273,7 @@ class ParserSpec extends AnyFlatSpec with Matchers { create.input shouldBe httpInput val json = create.watcher.node println(json.toPrettyString) - json.toString shouldBe """{"trigger":{"schedule":{"cron":"0 */5 * * * ?"}},"input":{"http":{"request":{"scheme":"https","host":"www.example.com","port":443,"method":"get","path":"/api/data","headers":{"Authorization":"Bearer token"},"connection_timeout":"5s"}}},"condition":{"script":{"source":"ctx.payload.keys.size > params.threshold","lang":"painless","params":{"threshold":1}}},"actions":{"log_action":{"foreach":"ctx.payload.hits.hits","max_iterations":500,"logging":{"text":"Watcher triggered with {{ctx.payload.hits.total}} hits","level":"info"}},"webhook_action":{"foreach":"ctx.payload.keys","max_iterations":2,"webhook":{"scheme":"https","host":"example.com","port":443,"method":"post","path":"/webhook","headers":{"Content-Type":"application/json"},"params":{"watch_id":"{{ctx.watch_id}}"},"body":"{\\\"message\\\": \\\"Watcher triggered with {{ctx.payload._value}}\\\"}","connection_timeout":"10s","read_timeout":"30s"}}}}""" + json.toString shouldBe """{"trigger":{"schedule":{"cron":"0 */5 * * * ?"}},"input":{"http":{"request":{"scheme":"https","host":"www.example.com","port":443,"method":"get","path":"/api/data","headers":{"Authorization":"Bearer token"},"connection_timeout":"5s"}}},"condition":{"script":{"source":"ctx.payload.keys.size > params.threshold","lang":"painless","params":{"threshold":1}}},"actions":{"log_action":{"foreach":"ctx.payload.hits.hits","max_iterations":500,"logging":{"text":"Watcher triggered with {{ctx.payload.hits.total}} hits","level":"info"}},"webhook_action":{"foreach":"ctx.payload.keys","max_iterations":2,"webhook":{"scheme":"https","host":"example.com","port":443,"method":"post","path":"/webhook","headers":{"Content-Type":"application/json"},"params":{"watch_id":"{{ctx.watch_id}}"},"body":"{\"message\": \"Watcher triggered with {{ctx.payload._value}}\"}","connection_timeout":"10s","read_timeout":"30s"}}}}""" case _ => fail("Expected CreateWatcher") } } @@ -2101,7 +2300,7 @@ class ParserSpec extends AnyFlatSpec with Matchers { create.input shouldBe chainInput val json = create.watcher.node println(json.toPrettyString) - json.toString shouldBe """{"trigger":{"schedule":{"cron":"0 */5 * * * ?"}},"input":{"chain":{"inputs":[{"search_data":{"search":{"request":{"indices":["my_index"],"body":{"query":{"match_all":{}}}},"timeout":"2m"}}},{"http_data":{"http":{"request":{"scheme":"https","host":"www.example.com","port":443,"method":"get","path":"/api/data","headers":{"Authorization":"Bearer token"},"connection_timeout":"5s"}}}}]}},"condition":{"script":{"source":"ctx.payload.keys.size > params.threshold","lang":"painless","params":{"threshold":1}}},"actions":{"log_action":{"foreach":"ctx.payload.hits.hits","max_iterations":500,"logging":{"text":"Watcher triggered with {{ctx.payload.hits.total}} hits","level":"info"}},"webhook_action":{"foreach":"ctx.payload.keys","max_iterations":2,"webhook":{"scheme":"https","host":"example.com","port":443,"method":"post","path":"/webhook","headers":{"Content-Type":"application/json"},"params":{"watch_id":"{{ctx.watch_id}}"},"body":"{\\\"message\\\": \\\"Watcher triggered with {{ctx.payload._value}}\\\"}","connection_timeout":"10s","read_timeout":"30s"}}}}""" + json.toString shouldBe """{"trigger":{"schedule":{"cron":"0 */5 * * * ?"}},"input":{"chain":{"inputs":[{"search_data":{"search":{"request":{"indices":["my_index"],"body":{"query":{"match_all":{}}}},"timeout":"2m"}}},{"http_data":{"http":{"request":{"scheme":"https","host":"www.example.com","port":443,"method":"get","path":"/api/data","headers":{"Authorization":"Bearer token"},"connection_timeout":"5s"}}}}]}},"condition":{"script":{"source":"ctx.payload.keys.size > params.threshold","lang":"painless","params":{"threshold":1}}},"actions":{"log_action":{"foreach":"ctx.payload.hits.hits","max_iterations":500,"logging":{"text":"Watcher triggered with {{ctx.payload.hits.total}} hits","level":"info"}},"webhook_action":{"foreach":"ctx.payload.keys","max_iterations":2,"webhook":{"scheme":"https","host":"example.com","port":443,"method":"post","path":"/webhook","headers":{"Content-Type":"application/json"},"params":{"watch_id":"{{ctx.watch_id}}"},"body":"{\"message\": \"Watcher triggered with {{ctx.payload._value}}\"}","connection_timeout":"10s","read_timeout":"30s"}}}}""" case _ => fail("Expected CreateWatcher") } } From 032bce2af3d4de64c42646f21b43ab7da7005c2f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Manciot?= Date: Thu, 5 Mar 2026 08:26:25 +0100 Subject: [PATCH 2/5] invalidate schema cache after deletion of an index --- .../main/scala/app/softnetwork/elastic/client/IndicesApi.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/main/scala/app/softnetwork/elastic/client/IndicesApi.scala b/core/src/main/scala/app/softnetwork/elastic/client/IndicesApi.scala index aa1617bf..a5aa2afc 100644 --- a/core/src/main/scala/app/softnetwork/elastic/client/IndicesApi.scala +++ b/core/src/main/scala/app/softnetwork/elastic/client/IndicesApi.scala @@ -443,6 +443,7 @@ trait IndicesApi extends ElasticClientHelpers { executeDeleteIndex(index) match { case success @ ElasticSuccess(true) => + invalidateSchema(index) logger.info(s"✅ Index '$index' deleted successfully") success case success @ ElasticSuccess(_) => From 2d65c3a74354d6e7aa04bb9d7a9c9223fd63d629 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Manciot?= Date: Thu, 5 Mar 2026 08:26:45 +0100 Subject: [PATCH 3/5] minor documentation fix --- documentation/sql/dml_statements.md | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/documentation/sql/dml_statements.md b/documentation/sql/dml_statements.md index 30e7f985..3cf361f9 100644 --- a/documentation/sql/dml_statements.md +++ b/documentation/sql/dml_statements.md @@ -329,13 +329,13 @@ ON CONFLICT (uuid) DO UPDATE; `COPY INTO` transparently supports remote file systems by auto-detecting the URI scheme in the `FROM` path. No SQL syntax change is required — simply use the appropriate URI scheme. -| URI scheme | File system | Required JAR | -| --- | --- | --- | -| `s3a://` or `s3://` | AWS S3 | `hadoop-aws` | -| `abfs://`, `abfss://`, `wasb://`, `wasbs://` | Azure ADLS Gen2 / Blob Storage | `hadoop-azure` | -| `gs://` | Google Cloud Storage | `gcs-connector-hadoop3` | -| `hdfs://` | HDFS | _(bundled with hadoop-client)_ | -| _(no scheme / local path)_ | Local filesystem | _(no extra JAR needed)_ | +| URI scheme | File system | Required JAR | +|----------------------------------------------|--------------------------------|--------------------------------| +| `s3a://` or `s3://` | AWS S3 | `hadoop-aws` | +| `abfs://`, `abfss://`, `wasb://`, `wasbs://` | Azure ADLS Gen2 / Blob Storage | `hadoop-azure` | +| `gs://` | Google Cloud Storage | `gcs-connector-hadoop3` | +| `hdfs://` | HDFS | _(bundled with hadoop-client)_ | +| _(no scheme / local path)_ | Local filesystem | _(no extra JAR needed)_ | > **Important:** Cloud connector JARs are declared as `provided` dependencies and are **not bundled** in the library. > They must be present in the runtime classpath (e.g. added to the CLI assembly or the application's fat-jar). From 3d516c28b5e76139f171062b49d1d8be3a43e28e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Manciot?= Date: Thu, 5 Mar 2026 09:59:23 +0100 Subject: [PATCH 4/5] add support for SQL UPDATE with scripts --- .../elastic/sql/parser/Parser.scala | 61 ++++--------------- .../elastic/sql/query/package.scala | 27 +++++--- .../elastic/sql/schema/package.scala | 29 +++++++++ .../elastic/sql/parser/ParserSpec.scala | 47 +++++++++++++- .../client/GatewayApiIntegrationSpec.scala | 59 ++++++++++++++++++ .../client/GatewayIntegrationTestKit.scala | 23 +++++++ 6 files changed, 187 insertions(+), 59 deletions(-) diff --git a/sql/src/main/scala/app/softnetwork/elastic/sql/parser/Parser.scala b/sql/src/main/scala/app/softnetwork/elastic/sql/parser/Parser.scala index 03bfc196..15719298 100644 --- a/sql/src/main/scala/app/softnetwork/elastic/sql/parser/Parser.scala +++ b/sql/src/main/scala/app/softnetwork/elastic/sql/parser/Parser.scala @@ -16,7 +16,6 @@ package app.softnetwork.elastic.sql.parser -import app.softnetwork.elastic.sql.PainlessContextType.Processor import app.softnetwork.elastic.sql._ import app.softnetwork.elastic.sql.function.time.DateTimeFunction import app.softnetwork.elastic.sql.function._ @@ -230,11 +229,13 @@ object Parser case None => None } - def script: PackratParser[PainlessScript] = - ("SCRIPT" ~ "AS") ~ start ~ (identifierWithArithmeticExpression | + def scriptValue: PackratParser[PainlessScript] = identifierWithArithmeticExpression | identifierWithTransformation | identifierWithIntervalFunction | - identifierWithFunction) ~ end ^^ { case _ ~ _ ~ s ~ _ => s } + identifierWithFunction + + def script: PackratParser[PainlessScript] = + ("SCRIPT" ~ "AS") ~ start ~ scriptValue ~ end ^^ { case _ ~ _ ~ s ~ _ => s } def column: PackratParser[Column] = ident ~ extension_type ~ (script | multiFields) ~ defaultVal ~ notNull ~ comment ~ (options | success( @@ -242,30 +243,10 @@ object Parser )) ^^ { case name ~ dt ~ mfs ~ dv ~ nn ~ ct ~ opts => mfs match { case script: PainlessScript => - val ctx = PainlessContext(Processor) - val scr = script.painless(Some(ctx)) - val temp = s"$ctx$scr" - val ret = - temp.split(";") match { - case Array(single) if single.trim.startsWith("return ") => - val stripReturn = single.trim.stripPrefix("return ").trim - s"ctx.$name = $stripReturn" - case multiple => - val last = multiple.last.trim - val temp = multiple.dropRight(1) :+ s" ctx.$name = $last" - temp.mkString(";") - } Column( name, dt, - Some( - ScriptProcessor( - script = script.sql, - column = name, - dataType = dt, - source = ret - ) - ), + Some(ScriptProcessor.fromScript(name, script, Some(dt))), Nil, dv, nn, @@ -521,27 +502,9 @@ object Parser def alterColumnScript: PackratParser[AlterColumnScript] = alterColumnIfExists ~ ident ~ "SET" ~ script ^^ { case ie ~ name ~ _ ~ ns => - val ctx = PainlessContext(Processor) - val scr = ns.painless(Some(ctx)) - val temp = s"$ctx$scr" - val ret = - temp.split(";") match { - case Array(single) if single.trim.startsWith("return ") => - val stripReturn = single.trim.stripPrefix("return ").trim - s"ctx.$name = $stripReturn" - case multiple => - val last = multiple.last.trim - val temp = multiple.dropRight(1) :+ s" ctx.$name = $last" - temp.mkString(";") - } AlterColumnScript( name, - ScriptProcessor( - script = ns.sql, - column = name, - dataType = ns.out, - source = ret - ), + ScriptProcessor.fromScript(name, ns, Some(ns.out)), ifExists = ie ) } @@ -1025,10 +988,12 @@ object Parser /** UPDATE table SET col1 = v1, col2 = v2 [WHERE ...] */ def update: PackratParser[Update] = - ("UPDATE" ~> ident) ~ ("SET" ~> repsep(ident ~ "=" ~ value, separator)) ~ where.? ^^ { - case table ~ assigns ~ w => - val values = ListMap(assigns.map { case col ~ _ ~ v => col -> v }: _*) - Update(table, values, w) + ("UPDATE" ~> ident) ~ ("SET" ~> repsep( + ident ~ "=" ~ (value | scriptValue), + separator + )) ~ where.? ^^ { case table ~ assigns ~ w => + val values = ListMap(assigns.map { case col ~ _ ~ v => col -> v }: _*) + Update(table, values, w) } /** DELETE FROM table [WHERE ...] */ diff --git a/sql/src/main/scala/app/softnetwork/elastic/sql/query/package.scala b/sql/src/main/scala/app/softnetwork/elastic/sql/query/package.scala index 5341802d..060b26b3 100644 --- a/sql/src/main/scala/app/softnetwork/elastic/sql/query/package.scala +++ b/sql/src/main/scala/app/softnetwork/elastic/sql/query/package.scala @@ -453,20 +453,31 @@ package object query { } } - case class Update(table: String, values: ListMap[String, Value[_]], where: Option[Where]) + case class Update(table: String, values: ListMap[String, PainlessScript], where: Option[Where]) extends DmlStatement { override def sql: String = s"UPDATE $table SET ${values - .map { case (k, v) => s"$k = ${v.value}" } - .mkString(", ")}${where.map(w => s" ${w.sql}").getOrElse("")}" + .map { case (k, v) => + v match { + case value: Value[_] => s"$k = ${value.value}" + case painlessScript => s"$k = ${painlessScript.sql}" + } + } + .mkString(", ")}${where.map(w => s"${w.sql}").getOrElse("")}" lazy val customPipeline: IngestPipeline = IngestPipeline( - s"update-$table-${Instant.now}", + s"update-$table-${Instant.now.toEpochMilli}", IngestPipelineType.Custom, values.map { case (k, v) => - SetProcessor( - column = k, - value = v - ) + v match { + case value: Value[_] => + SetProcessor( + pipelineType = IngestPipelineType.Custom, + column = k, + value = value + ) + case script => + ScriptProcessor.fromScript(k, script, pipelineType = IngestPipelineType.Custom) + } }.toSeq ) diff --git a/sql/src/main/scala/app/softnetwork/elastic/sql/schema/package.scala b/sql/src/main/scala/app/softnetwork/elastic/sql/schema/package.scala index cc26f771..ba4324f4 100644 --- a/sql/src/main/scala/app/softnetwork/elastic/sql/schema/package.scala +++ b/sql/src/main/scala/app/softnetwork/elastic/sql/schema/package.scala @@ -436,6 +436,35 @@ package object schema { } + object ScriptProcessor { + def fromScript( + column: String, + script: PainlessScript, + dataType: Option[SQLType] = None, + pipelineType: IngestPipelineType = IngestPipelineType.Default + ): ScriptProcessor = { + val ctx = PainlessContext(PainlessContextType.Processor) + val scr = script.painless(Some(ctx)) + val painless = s"$ctx$scr" + val source = painless.split(";") match { + case Array(single) if single.trim.startsWith("return ") => + val stripped = single.trim.stripPrefix("return ").trim + s"ctx.$column = $stripped" + case parts => + val last = parts.last.trim + val updated = parts.dropRight(1) :+ s" ctx.$column = $last" + updated.mkString(";") + } + ScriptProcessor( + pipelineType = pipelineType, + script = script.sql, + column = column, + dataType = dataType.getOrElse(script.out), + source = source + ) + } + } + case class RenameProcessor( pipelineType: IngestPipelineType = IngestPipelineType.Default, description: Option[String] = None, diff --git a/sql/src/test/scala/app/softnetwork/elastic/sql/parser/ParserSpec.scala b/sql/src/test/scala/app/softnetwork/elastic/sql/parser/ParserSpec.scala index 41f7fdf5..9fcfdfe0 100644 --- a/sql/src/test/scala/app/softnetwork/elastic/sql/parser/ParserSpec.scala +++ b/sql/src/test/scala/app/softnetwork/elastic/sql/parser/ParserSpec.scala @@ -10,7 +10,8 @@ import app.softnetwork.elastic.sql.{ Null, ObjectValue, StringValue, - StringValues + StringValues, + Value } import app.softnetwork.elastic.sql.http._ import app.softnetwork.elastic.sql.`type`.SQLTypes @@ -2740,13 +2741,53 @@ class ParserSpec extends AnyFlatSpec with Matchers { val stmt = result.toOption.get stmt match { case Update("users", values, Some(where)) => - values("name").value shouldBe "Bob" - values("age").value shouldBe 42 + values("name").asInstanceOf[Value[_]].value shouldBe "Bob" + values("age").asInstanceOf[Value[_]].value shouldBe 42 where.sql should include("id = 1") case _ => fail("Expected Update") } } + it should "parse UPDATE with scripts" in { + val sql = + "UPDATE products SET price = price * 1.1, updated_at = CURRENT_TIMESTAMP WHERE category = 'Electronics'" + val result = Parser(sql) + result.isRight shouldBe true + val stmt = result.toOption.get + stmt match { + case u @ Update("products", values, Some(where)) => + // CURRENT_TIMESTAMP must not be captured as a plain Value[_] + values("price") should not be a[Value[_]] + values("updated_at") should not be a[Value[_]] + // SQL representation + values("price").sql shouldBe "price * 1.1" + values("updated_at").sql shouldBe "CURRENT_TIMESTAMP" + where.sql should include("category = 'Electronics'") + // SQL round-trip + u.sql shouldBe sql + // Painless pipeline + val pipeline = u.customPipeline + pipeline.name should startWith("update-products-") + pipeline.processors should have size 2 + pipeline.processors.foreach(_ shouldBe a[ScriptProcessor]) + pipeline.processors.find(_.column == "price") match { + case Some(priceProc) => + priceProc + .asInstanceOf[ScriptProcessor] + .source shouldBe "def param1 = ctx.price; ctx.price = (param1 == null) ? null : (param1 * 1.1)" + case None => fail("Expected processor for price") + } + pipeline.processors.find(_.column == "updated_at") match { + case Some(updatedAtProc) => + updatedAtProc + .asInstanceOf[ScriptProcessor] + .source shouldBe "def param1 = ZonedDateTime.ofInstant(Instant.ofEpochMilli(ctx['_ingest']['timestamp']), ZoneId.of('Z')); ctx.updated_at = param1" + case None => fail("Expected processor for updated_at") + } + case _ => fail("Expected Update") + } + } + it should "parse DELETE" in { val sql = "DELETE FROM users WHERE age > 30" val result = Parser(sql) diff --git a/testkit/src/main/scala/app/softnetwork/elastic/client/GatewayApiIntegrationSpec.scala b/testkit/src/main/scala/app/softnetwork/elastic/client/GatewayApiIntegrationSpec.scala index e988cb41..39e2dab8 100644 --- a/testkit/src/main/scala/app/softnetwork/elastic/client/GatewayApiIntegrationSpec.scala +++ b/testkit/src/main/scala/app/softnetwork/elastic/client/GatewayApiIntegrationSpec.scala @@ -597,6 +597,65 @@ trait GatewayApiIntegrationSpec extends GatewayIntegrationTestKit { assertSelectResult(System.nanoTime(), q) } + // --------------------------------------------------------------------------- + // UPDATE with scripts + // --------------------------------------------------------------------------- + + it should "update rows using scripted SET clauses and return a DmlResult" in { + val create = + """CREATE TABLE IF NOT EXISTS dml_products ( + | id INT NOT NULL, + | name KEYWORD, + | price DOUBLE, + | category KEYWORD, + | updated_at DATETIME + |);""".stripMargin + + assertDdl(System.nanoTime(), client.run(create).futureValue) + + val insert = + """INSERT INTO dml_products (id, name, price, category) VALUES + | (1, 'Laptop', 1000.00, 'Electronics'), + | (2, 'T-Shirt', 25.00, 'Clothing'), + | (3, 'Phone', 800.00, 'Electronics');""".stripMargin + + assertDml(System.nanoTime(), client.run(insert).futureValue) + + val update = + """UPDATE dml_products + |SET price = price * 1.1, updated_at = CURRENT_TIMESTAMP + |WHERE category = 'Electronics';""".stripMargin + + val res = client.run(update).futureValue + assertDml(System.nanoTime(), res) + + val dml = res.toOption.get.asInstanceOf[DmlResult] + dml.updated should be >= 1L + + // Verify via SELECT that prices were increased and updated_at is set + val select = + """SELECT id, price, updated_at + |FROM dml_products + |WHERE category = 'Electronics' + |ORDER BY id ASC;""".stripMargin + + val rows = collectRows(System.nanoTime(), client.run(select).futureValue) + rows should have size 2 + + // id=1 Laptop: 1000.0 * 1.1 = 1100.0 — id=3 Phone: 800.0 * 1.1 = 880.0 + rows.zipWithIndex.foreach { case (row, i) => + val price = row("price") match { + case d: Double => d + case n: Number => n.doubleValue() + } + val originalPrice = if (i == 0) 1000.0 else 800.0 + price shouldBe (originalPrice * 1.1) +- 0.01 + + // updated_at must be set by the CURRENT_TIMESTAMP script + row.get("updated_at") shouldBe defined + } + } + // --------------------------------------------------------------------------- // DELETE // --------------------------------------------------------------------------- diff --git a/testkit/src/main/scala/app/softnetwork/elastic/client/GatewayIntegrationTestKit.scala b/testkit/src/main/scala/app/softnetwork/elastic/client/GatewayIntegrationTestKit.scala index 1eb8e180..b3bc2a81 100644 --- a/testkit/src/main/scala/app/softnetwork/elastic/client/GatewayIntegrationTestKit.scala +++ b/testkit/src/main/scala/app/softnetwork/elastic/client/GatewayIntegrationTestKit.scala @@ -205,6 +205,29 @@ trait GatewayIntegrationTestKit extends AnyFlatSpecLike with Matchers with Scala res.toOption.get.asInstanceOf[QueryRows].rows } + // ------------------------------------------------------------------------- + // Helper: collect rows from any QueryResult type (QueryRows or QueryStream) + // ------------------------------------------------------------------------- + + def collectRows(startTime: Long, res: ElasticResult[QueryResult]): Seq[Map[String, Any]] = { + renderResults(startTime, res) + res.isSuccess shouldBe true + res.toOption.get match { + case QueryStream(stream) => + val sink = + Sink.fold[Seq[ListMap[String, Any]], (ListMap[String, Any], ScrollMetrics)](Seq.empty) { + case (acc, (row, _)) => acc :+ normalizeRow(row) + } + stream.runWith(sink).futureValue + case q: QueryRows => + q.rows.map(normalizeRow) + case QueryStructured(response) => + response.results.map(normalizeRow) + case other => + fail(s"Unexpected QueryResult type for SELECT: $other") + } + } + // ------------------------------------------------------------------------- // Helper: assert SHOW TABLE result type // ------------------------------------------------------------------------- From 97d5a6ddb31b6d726112db1419228090df54e306 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Manciot?= Date: Thu, 5 Mar 2026 10:11:30 +0100 Subject: [PATCH 5/5] prepare release 0.17.3 --- README.md | 2 +- build.sbt | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 09b48551..1232ed3b 100644 --- a/README.md +++ b/README.md @@ -205,7 +205,7 @@ For programmatic access, add SoftClient4ES to your project: resolvers += "Softnetwork" at "https://softnetwork.jfrog.io/artifactory/releases/" // Choose your Elasticsearch version -libraryDependencies += "app.softnetwork.elastic" %% "softclient4es8-java-client" % "0.17.2" +libraryDependencies += "app.softnetwork.elastic" %% "softclient4es8-java-client" % "0.17.3" // Add the community extensions for materialized views (optional) libraryDependencies += "app.softnetwork.elastic" %% "softclient4es8-community-extensions" % "0.1.0" ``` diff --git a/build.sbt b/build.sbt index 73dbe828..a614445c 100644 --- a/build.sbt +++ b/build.sbt @@ -20,7 +20,7 @@ ThisBuild / organization := "app.softnetwork" name := "softclient4es" -ThisBuild / version := "0.17-SNAPSHOT" +ThisBuild / version := "0.17.3" ThisBuild / scalaVersion := scala213