diff --git a/docs/ingestion/ingestion-spec.md b/docs/ingestion/ingestion-spec.md index d1f901cc9538..cd35872f0dd7 100644 --- a/docs/ingestion/ingestion-spec.md +++ b/docs/ingestion/ingestion-spec.md @@ -334,9 +334,21 @@ A `granularitySpec` can have the following components: ### `transformSpec` The `transformSpec` is located in `dataSchema` → `transformSpec` and is responsible for transforming and filtering -records during ingestion time. It is optional. An example `transformSpec` is: +records during ingestion time. It is optional. There are two types of transform specs: the default expression-based +transform spec and the [scan transform spec](#scan-transform-spec). -``` +:::info + Conceptually, after input data records are read, Druid applies ingestion spec components in a particular order: + first [`flattenSpec`](data-formats.md#flattenspec) (if any), then [`timestampSpec`](#timestampspec), then [`transformSpec`](#transformspec), + and finally [`dimensionsSpec`](#dimensionsspec) and [`metricsSpec`](#metricsspec). Keep this in mind when writing + your ingestion spec. +::: + +#### Expression transform spec + +The default `transformSpec` uses expression-based transforms and an optional filter: + +```json "transformSpec": { "transforms": [ { "type": "expression", "name": "countryUpper", "expression": "upper(country)" } @@ -349,14 +361,7 @@ records during ingestion time. It is optional. An example `transformSpec` is: } ``` -:::info - Conceptually, after input data records are read, Druid applies ingestion spec components in a particular order: - first [`flattenSpec`](data-formats.md#flattenspec) (if any), then [`timestampSpec`](#timestampspec), then [`transformSpec`](#transformspec), - and finally [`dimensionsSpec`](#dimensionsspec) and [`metricsSpec`](#metricsspec). Keep this in mind when writing - your ingestion spec. -::: - -#### Transforms +##### Transforms The `transforms` list allows you to specify a set of expressions to evaluate on top of input data. Each transform has a "name" which can be referred to by your `dimensionsSpec`, `metricsSpec`, etc. @@ -368,7 +373,7 @@ Transforms do have some limitations. They can only refer to fields present in th they cannot refer to other transforms. And they cannot remove fields, only add them. However, they can shadow a field with another field containing all nulls, which will act similarly to removing the field. -Druid currently includes one kind of built-in transform, the expression transform. It has the following syntax: +The expression transform has the following syntax: ``` { @@ -380,19 +385,93 @@ Druid currently includes one kind of built-in transform, the expression transfor The `expression` is a [Druid query expression](../querying/math-expr.md). -:::info - Conceptually, after input data records are read, Druid applies ingestion spec components in a particular order: - first [`flattenSpec`](data-formats.md#flattenspec) (if any), then [`timestampSpec`](#timestampspec), then [`transformSpec`](#transformspec), - and finally [`dimensionsSpec`](#dimensionsspec) and [`metricsSpec`](#metricsspec). Keep this in mind when writing - your ingestion spec. -::: - -#### Filter +##### Filter The `filter` conditionally filters input rows during ingestion. Only rows that pass the filter will be ingested. Any of Druid's standard [query filters](../querying/filters.md) can be used. Note that within a `transformSpec`, the `transforms` are applied before the `filter`, so the filter can refer to a transform. +#### Scan transform spec + +The scan transform spec (`"type": "scan"`) processes each input row through an embedded [scan query](../querying/scan-query.md). Its primary use case is unnesting array-valued columns into individual rows during streaming ingestion (Kafka, Kinesis), similar to existing UNNEST functionality with Druid SQL and the MSQ engine. + +The scan query uses `"__input__"` as the base table name and can include [unnest data sources](../querying/datasource.md#unnest), [virtual columns](../querying/virtual-columns.md) (for expression-based column derivations), and [filters](../querying/filters.md). + +**Example: Unnesting a string array** + +Given input rows with a `tags` column containing `["sports", "news"]`, this `transformSpec` produces one output row per tag: + +```json +"transformSpec": { + "type": "scan", + "query": { + "queryType": "scan", + "dataSource": { + "type": "unnest", + "base": { "type": "table", "name": "__input__" }, + "virtualColumn": { + "type": "expression", + "name": "tag", + "expression": "\"tags\"", + "outputType": "STRING" + } + }, + "intervals": { "type": "intervals", "intervals": ["-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z"] }, + "resultFormat": "list" + } +} +``` + +**Example: Unnesting with virtual columns and a filter** + +This example unnests both `tags` and `services` arrays (via nested unnest data sources), computes derived columns (`upper_user`, `user_tag`) via virtual columns, and filters with `unnestFilter`: + +```json +"transformSpec": { + "type": "scan", + "query": { + "queryType": "scan", + "dataSource": { + "type": "unnest", + "base": { + "type": "unnest", + "base": { "type": "table", "name": "__input__" }, + "virtualColumn": { + "type": "expression", + "name": "tag", + "expression": "\"tags\"", + "outputType": "STRING" + } + }, + "virtualColumn": { + "type": "expression", + "name": "service", + "expression": "\"services\"", + "outputType": "COMPLEX" + }, + "unnestFilter": { + "type": "selector", + "dimension": "service", + "value": "web" + } + }, + "virtualColumns": [ + { "type": "expression", "name": "upper_user", "expression": "upper(\"user\")", "outputType": "STRING" }, + { "type": "expression", "name": "user_tag", "expression": "concat(\"user\", '_', \"tag\")", "outputType": "STRING" } + ], + "intervals": { "type": "intervals", "intervals": ["-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z"] }, + "resultFormat": "list" + } +} +``` + +|Property|Description|Required| +|--------|-----------|--------| +|`type`|Must be `"scan"`.|Yes| +|`query`|A [scan query](../querying/scan-query.md) that defines how to process each input row. Use an [unnest data source](../querying/datasource.md#unnest) with `"__input__"` as the base table to unnest arrays. Nest multiple unnest data sources for cross-join unnesting. Add `virtualColumns` on the scan query for expression-based column derivations. Set `intervals` to eternity and `resultFormat` to `"list"`.|Yes| + +If an unnest column is missing or the array is empty, the input row passes through with the unnest output columns set to null. Virtual columns are still evaluated on passthrough rows. + ### Projections Projections are ingestion/compaction time aggregations that Druid computes on a subset of dimensions and metrics of a segment. They are stored within a segment. The pre-aggregated data reduces the number of rows the query engine needs to process when you run a query. This can speed up queries for query shapes that match a projection. diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaScanTransformTest.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaScanTransformTest.java new file mode 100644 index 000000000000..6b5ee6764991 --- /dev/null +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaScanTransformTest.java @@ -0,0 +1,434 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testing.embedded.indexing; + +import com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.indexer.granularity.UniformGranularitySpec; +import org.apache.druid.indexing.kafka.KafkaIndexTaskModule; +import org.apache.druid.indexing.kafka.simulate.KafkaResource; +import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec; +import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpecBuilder; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.math.expr.ExprMacroTable; +import org.apache.druid.query.DruidMetrics; +import org.apache.druid.query.Druids; +import org.apache.druid.query.TableDataSource; +import org.apache.druid.query.UnnestDataSource; +import org.apache.druid.query.scan.ScanQuery; +import org.apache.druid.segment.TestHelper; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.metadata.Metric; +import org.apache.druid.segment.transform.ScanTransformSpec; +import org.apache.druid.segment.virtual.ExpressionVirtualColumn; +import org.apache.druid.testing.embedded.EmbeddedBroker; +import org.apache.druid.testing.embedded.EmbeddedClusterApis; +import org.apache.druid.testing.embedded.EmbeddedCoordinator; +import org.apache.druid.testing.embedded.EmbeddedDruidCluster; +import org.apache.druid.testing.embedded.EmbeddedIndexer; +import org.apache.druid.testing.embedded.EmbeddedOverlord; +import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase; +import org.joda.time.Period; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; + +/** + * Verifies ScanTransform during Kafka ingestion: + * + */ +public class KafkaScanTransformTest extends EmbeddedClusterTestBase +{ + /** + * alice: 2 tags x 2 services = 4, bob: 1 tag x 3 services = 3 = 7 unnested rows + * carol (null arrays) and dave (missing columns) each produce 1 passthrough row = 2 + * total: 9 + */ + private static final int EXPECTED_ROWS = 9; + + private final KafkaResource kafka = new KafkaResource(); + private final EmbeddedBroker broker = new EmbeddedBroker(); + private final EmbeddedIndexer indexer = new EmbeddedIndexer(); + private final EmbeddedOverlord overlord = new EmbeddedOverlord(); + private final EmbeddedCoordinator coordinator = new EmbeddedCoordinator(); + + private String topic; + + @Override + public EmbeddedDruidCluster createCluster() + { + coordinator.addProperty("druid.manager.segments.useIncrementalCache", "always"); + + indexer.setServerMemory(300_000_000) + .addProperty("druid.segment.handoff.pollDuration", "PT0.1s") + .addProperty("druid.processing.numThreads", "2") + .addProperty("druid.worker.capacity", "2"); + + return EmbeddedDruidCluster + .withEmbeddedDerbyAndZookeeper() + .addExtension(KafkaIndexTaskModule.class) + .addResource(kafka) + .addCommonProperty("druid.monitoring.emissionPeriod", "PT0.1s") + .useLatchableEmitter() + .useDefaultTimeoutForLatchableEmitter(30) + .addServer(coordinator) + .addServer(overlord) + .addServer(broker) + .addServer(indexer); + } + + @Override + protected void refreshDatasourceName() + { + // Do not refresh — datasource is set once in setupAll + } + + @BeforeAll + void setupAll() throws JsonProcessingException + { + topic = EmbeddedClusterApis.createTestDatasourceName(); + kafka.createTopicWithPartitions(topic, 1); + + super.refreshDatasourceName(); + submitSupervisor(); + publishTestData(); + + indexer.latchableEmitter().waitForEventAggregate( + event -> event.hasMetricName("ingest/events/processed") + .hasDimension(DruidMetrics.DATASOURCE, dataSource), + agg -> agg.hasSumAtLeast(EXPECTED_ROWS) + ); + + broker.latchableEmitter().waitForEvent( + event -> event.hasMetricName(Metric.SCHEMA_ROW_SIGNATURE_COLUMN_COUNT) + .hasDimension(DruidMetrics.DATASOURCE, dataSource) + ); + } + + private void submitSupervisor() + { + final ScanTransformSpec transformSpec = new ScanTransformSpec( + Druids.newScanQueryBuilder() + .dataSource(UnnestDataSource.create( + UnnestDataSource.create( + new TableDataSource("__input__"), + new ExpressionVirtualColumn("tag", "\"tags\"", ColumnType.STRING, ExprMacroTable.nil()), + null + ), + new ExpressionVirtualColumn("svc", "\"services\"", ColumnType.NESTED_DATA, ExprMacroTable.nil()), + null + )) + .virtualColumns( + new ExpressionVirtualColumn( + "upper_user", + "upper(\"user\")", + ColumnType.STRING, + ExprMacroTable.nil() + ), + new ExpressionVirtualColumn( + "user_tag", + "concat(\"user\", '_', \"tag\")", + ColumnType.STRING, + ExprMacroTable.nil() + ) + ) + .eternityInterval() + .columns((List) null) + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_LIST) + .build() + ); + + final KafkaSupervisorSpec spec = new KafkaSupervisorSpecBuilder() + .withDataSchema( + schema -> schema + .withTimestamp(new TimestampSpec("__time", "auto", null)) + .withGranularity(new UniformGranularitySpec(Granularities.DAY, null, null)) + .withDimensions(DimensionsSpec.builder().useSchemaDiscovery(true).build()) + .withTransform(transformSpec) + ) + .withIoConfig( + ioConfig -> ioConfig + .withJsonInputFormat() + .withTaskCount(1) + .withTaskDuration(Period.hours(1)) + .withConsumerProperties(kafka.consumerProperties()) + .withStartDelay(Period.millis(10)) + .withSupervisorRunPeriod(Period.millis(500)) + .withUseEarliestSequenceNumber(true) + .withCompletionTimeout(Period.seconds(5)) + ) + .build(dataSource, topic); + + Assertions.assertEquals( + dataSource, + cluster.callApi().postSupervisor(spec) + ); + } + + private void publishTestData() throws JsonProcessingException + { + // alice: 2 tags x 2 services = 4 rows + // bob: 1 tag x 3 services = 3 rows + // carol: null tags x null services => 1 passthrough row + // dave: missing tags & services columns => 1 passthrough row + // total: 9 rows + final List> records = new ArrayList<>(); + records.add(Map.of( + "__time", "2024-01-01T00:00:00Z", + "user", "alice", + "tags", List.of("sports", "news"), + "services", List.of( + Map.of("type", "web", "dc", "us-east1"), + Map.of("type", "api", "dc", "us-west2") + ) + )); + records.add(Map.of( + "__time", "2024-01-01T00:01:00Z", + "user", "bob", + "tags", List.of("music"), + "services", List.of( + Map.of("type", "cdn", "dc", "eu-west1"), + Map.of("type", "cache", "dc", "eu-west1"), + Map.of("type", "db", "dc", "us-east1") + ) + )); + + // carol: explicit null values for both array columns + final HashMap carolRecord = new HashMap<>(); + carolRecord.put("__time", "2024-01-01T00:02:00Z"); + carolRecord.put("user", "carol"); + carolRecord.put("tags", null); + carolRecord.put("services", null); + records.add(carolRecord); + + // dave: columns not present at all + records.add(Map.of( + "__time", "2024-01-01T00:03:00Z", + "user", "dave" + )); + + final List recordBytes = new ArrayList<>(); + for (Map record : records) { + recordBytes.add(TestHelper.JSON_MAPPER.writeValueAsBytes(record)); + } + kafka.publishRecordsToTopic(topic, recordBytes); + } + + @Test + @Timeout(60) + public void test_countRows() + { + Assertions.assertEquals( + String.valueOf(EXPECTED_ROWS), + cluster.runSql(StringUtils.format("SELECT COUNT(*) FROM \"%s\"", dataSource)).trim() + ); + } + + @Test + @Timeout(60) + public void test_crossJoinUnnest() + { + // Use GROUP BY to get deterministic, order-independent results. + // Each user+tag pair count reflects the number of services it was crossed with. + final String result = cluster.runSql( + StringUtils.format( + "SELECT \"user\", \"tag\", COUNT(*) AS cnt FROM \"%s\" GROUP BY 1, 2 ORDER BY 1, 2", + dataSource + ) + ); + final Set actual = new TreeSet<>(List.of(result.trim().split("\n"))); + final Set expected = new TreeSet<>(List.of( + "alice,news,2", // news x 2 services (web, api) + "alice,sports,2", // sports x 2 services (web, api) + "bob,music,3", // music x 3 services (cdn, cache, db) + "carol,,1", // passthrough (null tag, null svc) + "dave,,1" // passthrough (missing tag, missing svc) + )); + Assertions.assertEquals(expected, actual); + } + + @Test + @Timeout(60) + public void test_groupByTag() + { + final String result = cluster.runSql( + StringUtils.format( + "SELECT \"tag\", COUNT(*) AS cnt FROM \"%s\" WHERE \"tag\" IS NOT NULL GROUP BY 1 ORDER BY 1", + dataSource + ) + ); + + Assertions.assertEquals( + "music,3\nnews,2\nsports,2", + result.trim() + ); + + // music: 1 x 3 services = 3, news: 1 x 2 services = 2, sports: 1 x 2 services = 2 + // carol/dave have null tags so they don't appear in this grouping + Assertions.assertEquals( + "music,3\nnews,2\nsports,2", + cluster.runSql( + StringUtils.format( + "SELECT \"tag\", COUNT(*) AS cnt FROM \"%s\" WHERE \"tag\" IS NOT NULL GROUP BY 1 ORDER BY 1", + dataSource + ) + ) + ); + } + + @Test + @Timeout(60) + public void test_groupByUser() + { + Assertions.assertEquals( + "alice,4\nbob,3\ncarol,1\ndave,1", + cluster.runSql( + StringUtils.format( + "SELECT \"user\", COUNT(*) AS cnt FROM \"%s\" GROUP BY 1 ORDER BY 1", + dataSource + ) + ) + ); + } + + @Test + @Timeout(60) + public void test_groupByServiceType() + { + // Extract the "type" field from the unnested service objects using JSON_VALUE + final String result = cluster.runSql( + StringUtils.format( + "SELECT JSON_VALUE(\"svc\", '$.type'), COUNT(*) AS cnt" + + " FROM \"%s\"" + + " WHERE \"svc\" IS NOT NULL" + + " GROUP BY 1 ORDER BY 1", + dataSource + ) + ); + // alice has 2 tags so each of her services appears twice (cross join) + // bob has 1 tag so each of his services appears once + final Set actual = new TreeSet<>(List.of(result.trim().split("\n"))); + final Set expected = new TreeSet<>(List.of( + "api,2", // alice: api x (sports, news) + "cache,1", // bob: cache x music + "cdn,1", // bob: cdn x music + "db,1", // bob: db x music + "web,2" // alice: web x (sports, news) + )); + Assertions.assertEquals(expected, actual); + } + + @Test + @Timeout(60) + public void test_groupByServiceDc() + { + // Extract the "dc" field from the unnested service objects + final String result = cluster.runSql( + StringUtils.format( + "SELECT JSON_VALUE(\"svc\", '$.dc'), COUNT(*) AS cnt" + + " FROM \"%s\"" + + " WHERE \"svc\" IS NOT NULL" + + " GROUP BY 1 ORDER BY 1", + dataSource + ) + ); + final Set actual = new TreeSet<>(List.of(result.trim().split("\n"))); + final Set expected = new TreeSet<>(List.of( + "eu-west1,2", // bob: cdn + cache (both eu-west1) x 1 tag + "us-east1,3", // alice: web(us-east1) x 2 tags + bob: db(us-east1) x 1 tag + "us-west2,2" // alice: api(us-west2) x 2 tags + )); + Assertions.assertEquals(expected, actual); + } + + @Test + @Timeout(60) + public void test_upperCaseVirtualColumn() + { + final String result = cluster.runSql( + StringUtils.format( + "SELECT \"upper_user\", COUNT(*) AS cnt FROM \"%s\" GROUP BY 1 ORDER BY 1", + dataSource + ) + ); + Assertions.assertEquals( + "ALICE,4\nBOB,3\nCAROL,1\nDAVE,1", + result.trim() + ); + } + + @Test + @Timeout(60) + public void test_concatVirtualColumn() + { + // user_tag = concat(user, '_', tag) — computed at ingest time via scan query virtual column + final String result = cluster.runSql( + StringUtils.format( + "SELECT \"user_tag\", COUNT(*) AS cnt" + + " FROM \"%s\"" + + " WHERE \"tag\" IS NOT NULL" + + " GROUP BY 1 ORDER BY 1", + dataSource + ) + ); + final Set actual = new TreeSet<>(List.of(result.trim().split("\n"))); + final Set expected = new TreeSet<>(List.of( + "alice_news,2", // alice_news x 2 services + "alice_sports,2", // alice_sports x 2 services + "bob_music,3" // bob_music x 3 services + )); + Assertions.assertEquals(expected, actual); + } + + @Test + @Timeout(60) + public void test_filterByServiceType() + { + // Filter to only rows where the service type is "web" + final String result = cluster.runSql( + StringUtils.format( + "SELECT \"user\", \"tag\", JSON_VALUE(\"svc\", '$.type'), JSON_VALUE(\"svc\", '$.dc')" + + " FROM \"%s\"" + + " WHERE JSON_VALUE(\"svc\", '$.type') = 'web'", + dataSource + ) + ); + final Set actual = new TreeSet<>(List.of(result.trim().split("\n"))); + final Set expected = new TreeSet<>(List.of( + "alice,news,web,us-east1", + "alice,sports,web,us-east1" + )); + Assertions.assertEquals(expected, actual); + } +} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/input/InputRowSchemas.java b/indexing-service/src/main/java/org/apache/druid/indexing/input/InputRowSchemas.java index a5f95c2e15f7..4ed007d4f83c 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/input/InputRowSchemas.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/input/InputRowSchemas.java @@ -25,8 +25,8 @@ import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.segment.indexing.DataSchema; +import org.apache.druid.segment.transform.BaseTransformSpec; import org.apache.druid.segment.transform.Transform; -import org.apache.druid.segment.transform.TransformSpec; import java.util.Arrays; import java.util.HashSet; @@ -71,7 +71,7 @@ public static InputRowSchema fromDataSchema(final DataSchema dataSchema) public static ColumnsFilter createColumnsFilter( final TimestampSpec timestampSpec, final DimensionsSpec dimensionsSpec, - final TransformSpec transformSpec, + final BaseTransformSpec transformSpec, final AggregatorFactory[] aggregators ) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SettableByteEntityReader.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SettableByteEntityReader.java index 2314d7408425..80f27de75f67 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SettableByteEntityReader.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SettableByteEntityReader.java @@ -28,7 +28,7 @@ import org.apache.druid.data.input.impl.ByteEntity; import org.apache.druid.data.input.impl.JsonInputFormat; import org.apache.druid.java.util.common.parsers.CloseableIterator; -import org.apache.druid.segment.transform.TransformSpec; +import org.apache.druid.segment.transform.BaseTransformSpec; import org.apache.druid.segment.transform.TransformingInputEntityReader; import java.io.File; @@ -46,7 +46,7 @@ class SettableByteEntityReader implements InputEntityReade SettableByteEntityReader( InputFormat inputFormat, InputRowSchema inputRowSchema, - TransformSpec transformSpec, + BaseTransformSpec transformSpec, File indexingTmpDir ) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/StreamChunkReader.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/StreamChunkReader.java index a0ac1f01ea5a..e58609c6f084 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/StreamChunkReader.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/StreamChunkReader.java @@ -31,7 +31,7 @@ import org.apache.druid.segment.incremental.InputRowFilterResult; import org.apache.druid.segment.incremental.ParseExceptionHandler; import org.apache.druid.segment.incremental.RowIngestionMeters; -import org.apache.druid.segment.transform.TransformSpec; +import org.apache.druid.segment.transform.BaseTransformSpec; import javax.annotation.Nullable; import java.io.File; @@ -56,7 +56,7 @@ class StreamChunkReader StreamChunkReader( InputFormat inputFormat, InputRowSchema inputRowSchema, - TransformSpec transformSpec, + BaseTransformSpec transformSpec, File indexingTmpDir, InputRowFilter rowFilter, RowIngestionMeters rowIngestionMeters, diff --git a/processing/src/main/java/org/apache/druid/segment/transform/BaseTransformSpec.java b/processing/src/main/java/org/apache/druid/segment/transform/BaseTransformSpec.java new file mode 100644 index 000000000000..35d68e63a71e --- /dev/null +++ b/processing/src/main/java/org/apache/druid/segment/transform/BaseTransformSpec.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.transform; + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import org.apache.druid.data.input.InputSourceReader; +import org.apache.druid.query.filter.DimFilter; + +import javax.annotation.Nullable; +import java.util.List; +import java.util.Set; + +/** + * Specification for how input rows should be transformed during ingestion. This is the base interface + * for the {@code transformSpec} field in {@link org.apache.druid.segment.indexing.DataSchema}. + * + *

Two implementations are provided: + *

    + *
  • {@link TransformSpec} — the default, for expression-based transforms and filters
  • + *
  • {@link ScanTransformSpec} — for scan-query-based transforms (unnest, virtual columns, filters)
  • + *
+ * + *

When no {@code "type"} is specified in JSON, the default {@link TransformSpec} is used for backward + * compatibility. + */ +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = TransformSpec.class) +@JsonSubTypes({ + @JsonSubTypes.Type(name = "scan", value = ScanTransformSpec.class) +}) +public interface BaseTransformSpec +{ + /** + * Creates a {@link BaseTransformer} that applies this spec's transforms to input rows. + */ + BaseTransformer toTransformer(); + + /** + * Wraps an {@link InputSourceReader} with this spec's transforms applied to each row. + */ + default InputSourceReader decorate(InputSourceReader reader) + { + return new TransformingInputSourceReader(reader, toTransformer()); + } + + /** + * Returns the names of all input columns required by this spec's transforms and filters. + */ + Set getRequiredColumns(); + + /** + * Returns the list of individual {@link Transform} objects, if applicable. + * Defaults to an empty list for specs that don't use the transforms list (e.g., {@link ScanTransformSpec}). + */ + default List getTransforms() + { + return List.of(); + } + + /** + * Returns the filter applied to input rows, if applicable. + * Defaults to null for specs that handle filtering internally (e.g., {@link ScanTransformSpec}). + */ + @Nullable + default DimFilter getFilter() + { + return null; + } +} diff --git a/processing/src/main/java/org/apache/druid/segment/transform/BaseTransformer.java b/processing/src/main/java/org/apache/druid/segment/transform/BaseTransformer.java new file mode 100644 index 000000000000..335b09c4ca2d --- /dev/null +++ b/processing/src/main/java/org/apache/druid/segment/transform/BaseTransformer.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.transform; + +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputRowListPlusRawValues; + +import javax.annotation.Nullable; +import java.io.Closeable; +import java.io.IOException; +import java.util.List; + +/** + * Interface for transforming input rows during ingestion. Created by {@link BaseTransformSpec#toTransformer()}. + * + * @see Transformer for expression-based transforms + * @see ScanTransformer for scan-query-based transforms + */ +public interface BaseTransformer extends Closeable +{ + /** + * Whether this transformer can produce multiple output rows from a single input row. + * When true, readers use {@link #transformToList} with flatMap iteration. + * When false, readers use {@link #transform(InputRow)} with map iteration. + */ + boolean hasMultiRowTransform(); + + /** + * Transforms a single input row, or returns null if the row should be filtered out. + * Only called when {@link #hasMultiRowTransform()} is false. + */ + @Nullable + InputRow transform(@Nullable InputRow row); + + /** + * Transforms a single input row into zero or more output rows. + * Returns an empty list if the row is null or filtered out. + */ + List transformToList(@Nullable InputRow row); + + /** + * Transforms a batch of input rows with their associated raw values, used by the sampling path. + * Applies transforms and filtering while maintaining the correspondence between input rows and raw values. + */ + @Nullable + InputRowListPlusRawValues transform(@Nullable InputRowListPlusRawValues row); + + /** + * Releases any resources held by this transformer. The default implementation is a no-op. + */ + @Override + default void close() throws IOException + { + } +} diff --git a/processing/src/main/java/org/apache/druid/segment/transform/CompactionTransformSpec.java b/processing/src/main/java/org/apache/druid/segment/transform/CompactionTransformSpec.java index 0a0c2243875d..bd0fafe016e7 100644 --- a/processing/src/main/java/org/apache/druid/segment/transform/CompactionTransformSpec.java +++ b/processing/src/main/java/org/apache/druid/segment/transform/CompactionTransformSpec.java @@ -39,7 +39,7 @@ public class CompactionTransformSpec { @Nullable - public static CompactionTransformSpec of(@Nullable TransformSpec transformSpec) + public static CompactionTransformSpec of(@Nullable BaseTransformSpec transformSpec) { if (transformSpec == null) { return null; @@ -47,6 +47,9 @@ public static CompactionTransformSpec of(@Nullable TransformSpec transformSpec) if (TransformSpec.NONE.equals(transformSpec)) { return null; } + if (!(transformSpec instanceof TransformSpec)) { + return null; + } return new CompactionTransformSpec(transformSpec.getFilter(), VirtualColumns.EMPTY); } diff --git a/processing/src/main/java/org/apache/druid/segment/transform/ScanTransformSpec.java b/processing/src/main/java/org/apache/druid/segment/transform/ScanTransformSpec.java new file mode 100644 index 000000000000..c2bac7e4e8cc --- /dev/null +++ b/processing/src/main/java/org/apache/druid/segment/transform/ScanTransformSpec.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.transform; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import org.apache.druid.query.DataSource; +import org.apache.druid.query.UnnestDataSource; +import org.apache.druid.query.scan.ScanQuery; +import org.apache.druid.segment.VirtualColumn; + +import java.util.HashSet; +import java.util.Objects; +import java.util.Set; + +/** + * A {@link BaseTransformSpec} that processes input rows through an embedded {@link ScanQuery}. + * The scan query can include unnest data sources, virtual columns, and filters. + * + *

Example JSON: + *

{@code
+ * "transformSpec": {
+ *   "type": "scan",
+ *   "query": {
+ *     "queryType": "scan",
+ *     "dataSource": {
+ *       "type": "unnest",
+ *       "base": { "type": "table", "name": "__input__" },
+ *       "virtualColumn": { "type": "expression", "name": "tag", "expression": "\"tags\"", "outputType": "STRING" }
+ *     },
+ *     "intervals": { "type": "intervals", "intervals": ["-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z"] },
+ *     "resultFormat": "list"
+ *   }
+ * }
+ * }
+ */ +@JsonTypeName("scan") +public class ScanTransformSpec implements BaseTransformSpec +{ + private final ScanQuery query; + + @JsonCreator + public ScanTransformSpec(@JsonProperty("query") final ScanQuery query) + { + this.query = query; + } + + @JsonProperty + public ScanQuery getQuery() + { + return query; + } + + @Override + public BaseTransformer toTransformer() + { + return new ScanTransformer(query); + } + + @Override + public Set getRequiredColumns() + { + final Set columns = new HashSet<>(); + collectRequiredColumns(query.getDataSource(), columns); + for (final VirtualColumn vc : query.getVirtualColumns().getVirtualColumns()) { + columns.addAll(vc.requiredColumns()); + } + if (query.getFilter() != null) { + columns.addAll(query.getFilter().getRequiredColumns()); + } + return columns; + } + + private static void collectRequiredColumns(final DataSource dataSource, final Set columns) + { + if (dataSource instanceof UnnestDataSource) { + final UnnestDataSource unnest = (UnnestDataSource) dataSource; + columns.addAll(unnest.getVirtualColumn().requiredColumns()); + collectRequiredColumns(unnest.getBase(), columns); + } + } + + @Override + public boolean equals(final Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final ScanTransformSpec that = (ScanTransformSpec) o; + return Objects.equals(query, that.query); + } + + @Override + public int hashCode() + { + return Objects.hash(query); + } + + @Override + public String toString() + { + return "ScanTransformSpec{query=" + query + '}'; + } +} diff --git a/processing/src/main/java/org/apache/druid/segment/transform/ScanTransformer.java b/processing/src/main/java/org/apache/druid/segment/transform/ScanTransformer.java new file mode 100644 index 000000000000..624a206ac30b --- /dev/null +++ b/processing/src/main/java/org/apache/druid/segment/transform/ScanTransformer.java @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.transform; + +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputRowListPlusRawValues; +import org.apache.druid.data.input.MapBasedInputRow; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.query.DataSource; +import org.apache.druid.query.QueryContexts; +import org.apache.druid.query.UnnestDataSource; +import org.apache.druid.query.scan.ScanQuery; +import org.apache.druid.segment.ColumnSelectorFactory; +import org.apache.druid.segment.Cursor; +import org.apache.druid.segment.CursorBuildSpec; +import org.apache.druid.segment.CursorFactory; +import org.apache.druid.segment.CursorHolder; +import org.apache.druid.segment.Segment; +import org.apache.druid.segment.SegmentMapFunction; +import org.apache.druid.segment.VirtualColumn; +import org.apache.druid.segment.column.ColumnHolder; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.segment.filter.Filters; +import org.apache.druid.timeline.SegmentId; +import org.joda.time.Interval; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +/** + * A {@link BaseTransformer} that processes input rows through a reusable scan query cursor pipeline. + * + *

The pipeline is built once at construction: a {@link SettableRowCursorFactory} is wrapped by the + * scan query's {@link SegmentMapFunction} (e.g., unnest, filter). For each input row, the row is set + * on the factory and the cursor is {@link Cursor#reset reset} — no per-row segment or cursor allocation. + * + *

When the scan query produces zero output rows (e.g., null/missing arrays, or filter rejection), + * the input row is dropped. This matches native Druid UNNEST / CROSS JOIN semantics where + * null or empty arrays produce zero rows. + * + *

This class is not thread-safe. Each reader thread should have its own instance. + */ +public class ScanTransformer implements BaseTransformer +{ + private final ScanQuery query; + private final SettableRowCursorFactory baseCursorFactory; + private final CursorHolder cursorHolder; + private Cursor cursor; + + ScanTransformer(final ScanQuery scanQuery) + { + this.query = scanQuery.withOverriddenContext( + Map.of(QueryContexts.TIMEOUT_KEY, 0) + ); + + final RowSignature broadSignature = RowSignature.builder() + .add(ColumnHolder.TIME_COLUMN_NAME, ColumnType.LONG) + .build(); + + final CursorBuildSpec cursorBuildSpec = CursorBuildSpec.builder() + .setInterval(query.getSingleInterval()) + .setFilter(Filters.toFilter(query.getFilter())) + .setVirtualColumns(query.getVirtualColumns()) + .build(); + + this.baseCursorFactory = new SettableRowCursorFactory(broadSignature); + final SegmentMapFunction segmentMapFunction = query.getDataSource().createSegmentMapFunction(query); + final Segment mappedSegment = segmentMapFunction.apply(Optional.of(new CursorFactorySegment(baseCursorFactory))) + .orElseThrow(() -> new ISE("SegmentMapFunction returned empty")); + final CursorFactory mappedCursorFactory = mappedSegment.as(CursorFactory.class); + this.cursorHolder = mappedCursorFactory.makeCursorHolder(cursorBuildSpec); + } + + @Override + public boolean hasMultiRowTransform() + { + return true; + } + + @Override + @Nullable + public InputRow transform(@Nullable final InputRow row) + { + throw new UnsupportedOperationException( + "ScanTransformer does not support single-row transform; use transformToList()" + ); + } + + @Override + public List transformToList(@Nullable final InputRow row) + { + if (row == null) { + return List.of(); + } + + return process(row); + } + + @Override + @Nullable + public InputRowListPlusRawValues transform(@Nullable final InputRowListPlusRawValues row) + { + if (row == null || row.getInputRows() == null) { + return row; + } + + final List inputRows = row.getInputRows(); + final List> inputRawValues = row.getRawValuesList(); + final List outputRows = new ArrayList<>(); + final List> outputRawValues = inputRawValues == null ? null : new ArrayList<>(); + + for (int i = 0; i < inputRows.size(); i++) { + final List expandedRows = transformToList(inputRows.get(i)); + outputRows.addAll(expandedRows); + if (outputRawValues != null) { + for (int j = 0; j < expandedRows.size(); j++) { + outputRawValues.add(inputRawValues.get(i)); + } + } + } + + return InputRowListPlusRawValues.ofList(outputRawValues, outputRows, row.getParseException()); + } + + @Override + public void close() throws IOException + { + cursorHolder.close(); + } + + private List process(final InputRow inputRow) + { + baseCursorFactory.set(inputRow); + + if (cursor == null) { + cursor = cursorHolder.asCursor(); + } else { + cursor.reset(); + } + + if (cursor == null || cursor.isDone()) { + return List.of(); + } + + final List columns = resolveColumnsForRow(inputRow); + final List dimensionColumns = resolveDimensionColumns(inputRow, columns); + final ColumnSelectorFactory selectorFactory = cursor.getColumnSelectorFactory(); + + final List result = new ArrayList<>(); + while (!cursor.isDone()) { + final Map event = new LinkedHashMap<>(); + for (final String col : columns) { + event.put(col, selectorFactory.makeColumnValueSelector(col).getObject()); + } + result.add(new MapBasedInputRow(inputRow.getTimestampFromEpoch(), dimensionColumns, event)); + cursor.advance(); + } + + return result; + } + + private List resolveColumnsForRow(final InputRow inputRow) + { + final Set columns = new LinkedHashSet<>(); + columns.add(ColumnHolder.TIME_COLUMN_NAME); + columns.addAll(inputRow.getDimensions()); + for (final VirtualColumn vc : query.getVirtualColumns().getVirtualColumns()) { + columns.add(vc.getOutputName()); + } + collectOutputColumnNames(query.getDataSource(), columns); + return new ArrayList<>(columns); + } + + private static void collectOutputColumnNames(final DataSource dataSource, final Set columns) + { + if (dataSource instanceof UnnestDataSource) { + final UnnestDataSource unnest = (UnnestDataSource) dataSource; + columns.add(unnest.getVirtualColumn().getOutputName()); + } + for (final DataSource child : dataSource.getChildren()) { + collectOutputColumnNames(child, columns); + } + } + + private static List resolveDimensionColumns( + final InputRow inputRow, + @Nullable final List resultColumns + ) + { + final LinkedHashSet dims = new LinkedHashSet<>(inputRow.getDimensions()); + if (resultColumns != null) { + for (final String col : resultColumns) { + if (!ColumnHolder.TIME_COLUMN_NAME.equals(col)) { + dims.add(col); + } + } + } + return new ArrayList<>(dims); + } + + private static class CursorFactorySegment implements Segment + { + private final CursorFactory cursorFactory; + + CursorFactorySegment(final CursorFactory cursorFactory) + { + this.cursorFactory = cursorFactory; + } + + @Nullable + @Override + public SegmentId getId() + { + return null; + } + + @Nonnull + @Override + public Interval getDataInterval() + { + return Intervals.ETERNITY; + } + + @Nullable + @Override + public T as(final Class clazz) + { + if (CursorFactory.class.equals(clazz)) { + return (T) cursorFactory; + } + return null; + } + + @Override + public void close() + { + } + } +} diff --git a/processing/src/main/java/org/apache/druid/segment/transform/SettableRowCursorFactory.java b/processing/src/main/java/org/apache/druid/segment/transform/SettableRowCursorFactory.java new file mode 100644 index 000000000000..f71da27e31e0 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/segment/transform/SettableRowCursorFactory.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.transform; + +import org.apache.druid.data.input.InputRow; +import org.apache.druid.query.filter.Filter; +import org.apache.druid.query.filter.ValueMatcher; +import org.apache.druid.segment.ColumnSelectorFactory; +import org.apache.druid.segment.Cursor; +import org.apache.druid.segment.CursorBuildSpec; +import org.apache.druid.segment.CursorFactory; +import org.apache.druid.segment.CursorHolder; +import org.apache.druid.segment.RowAdapters; +import org.apache.druid.segment.RowBasedColumnSelectorFactory; +import org.apache.druid.segment.RowIdSupplier; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.segment.filter.ValueMatchers; + +import javax.annotation.Nullable; + +/** + * A {@link CursorFactory} backed by a mutable {@link InputRow} holder. Each call to + * {@link #makeCursorHolder(CursorBuildSpec)} returns a cursor that applies the spec's virtual columns + * on top of the current row. The underlying row holder is shared — call {@link #set(InputRow)} to + * swap the current row, then {@link Cursor#reset()} on the returned cursor. + */ +class SettableRowCursorFactory implements CursorFactory +{ + private final RowSignature rowSignature; + private final ColumnSelectorFactory baseSelectorFactory; + private InputRow currentRow; + private long rowId = RowIdSupplier.INIT; + + SettableRowCursorFactory(final RowSignature rowSignature) + { + this.rowSignature = rowSignature; + this.baseSelectorFactory = new RowBasedColumnSelectorFactory<>( + this::getCurrentRow, + this::getRowId, + RowAdapters.standardRow(), + rowSignature, + false + ); + } + + void set(final InputRow row) + { + this.currentRow = row; + this.rowId++; + } + + @Override + public CursorHolder makeCursorHolder(final CursorBuildSpec spec) + { + final ColumnSelectorFactory selectorFactory = spec.getVirtualColumns().wrap(baseSelectorFactory); + final Filter filter = spec.getFilter(); + final ValueMatcher filterMatcher = filter == null + ? ValueMatchers.allTrue() + : filter.makeMatcher(selectorFactory); + + return new CursorHolder() + { + @Override + public Cursor asCursor() + { + return new Cursor() + { + private boolean done = currentRow == null || !filterMatcher.matches(false); + + @Override + public ColumnSelectorFactory getColumnSelectorFactory() + { + return selectorFactory; + } + + @Override + public void advance() + { + done = true; + } + + @Override + public void advanceUninterruptibly() + { + done = true; + } + + @Override + public boolean isDone() + { + return done; + } + + @Override + public boolean isDoneOrInterrupted() + { + return done || Thread.currentThread().isInterrupted(); + } + + @Override + public void reset() + { + done = currentRow == null || !filterMatcher.matches(false); + } + }; + } + + @Override + public void close() + { + } + }; + } + + private InputRow getCurrentRow() + { + return currentRow; + } + + private long getRowId() + { + return rowId; + } + + @Override + public RowSignature getRowSignature() + { + return rowSignature; + } + + @Nullable + @Override + public ColumnCapabilities getColumnCapabilities(final String column) + { + return rowSignature.getColumnCapabilities(column); + } +} diff --git a/processing/src/main/java/org/apache/druid/segment/transform/Transform.java b/processing/src/main/java/org/apache/druid/segment/transform/Transform.java index 8b6f75fa2d81..098f1b3a41e8 100644 --- a/processing/src/main/java/org/apache/druid/segment/transform/Transform.java +++ b/processing/src/main/java/org/apache/druid/segment/transform/Transform.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo; import org.apache.druid.guice.annotations.ExtensionPoint; +import javax.annotation.Nullable; import java.util.Set; /** @@ -53,6 +54,7 @@ public interface Transform * Returns the function for this transform. The RowFunction takes an entire row as input and returns a column value * as output. */ + @Nullable RowFunction getRowFunction(); /** diff --git a/processing/src/main/java/org/apache/druid/segment/transform/TransformSpec.java b/processing/src/main/java/org/apache/druid/segment/transform/TransformSpec.java index 30324524d6ca..8fcfd10a474b 100644 --- a/processing/src/main/java/org/apache/druid/segment/transform/TransformSpec.java +++ b/processing/src/main/java/org/apache/druid/segment/transform/TransformSpec.java @@ -37,9 +37,10 @@ * input rows) and "transforms" (which can add fields to input rows). Filters may refer to fields generated by * a transform. * - * See {@link Transform} for details on how each transform works. + * See {@link Transform} for details on how each transform works and only works for {@link Transform} like + * {@link ExpressionTransform}. For scan query transform, see {@link ScanTransformSpec}. */ -public class TransformSpec +public class TransformSpec implements BaseTransformSpec { public static final TransformSpec NONE = new TransformSpec(null, null); @@ -64,6 +65,7 @@ public TransformSpec( } } + @Override @JsonProperty @Nullable public DimFilter getFilter() @@ -71,26 +73,26 @@ public DimFilter getFilter() return filter; } + @Override @JsonProperty public List getTransforms() { return transforms; } + @Override public InputSourceReader decorate(InputSourceReader reader) { return new TransformingInputSourceReader(reader, toTransformer()); } - /** - * Create a {@link Transformer} from this TransformSpec, when the rows to be transformed do not have a known - * signature. - */ - public Transformer toTransformer() + @Override + public BaseTransformer toTransformer() { return new Transformer(this); } + @Override public Set getRequiredColumns() { final Set requiredColumns = new HashSet<>(); diff --git a/processing/src/main/java/org/apache/druid/segment/transform/Transformer.java b/processing/src/main/java/org/apache/druid/segment/transform/Transformer.java index 2ff263a64738..efa8ace6e699 100644 --- a/processing/src/main/java/org/apache/druid/segment/transform/Transformer.java +++ b/processing/src/main/java/org/apache/druid/segment/transform/Transformer.java @@ -35,9 +35,9 @@ import java.util.Map; /** - * + * Expression-based transformer {@link ExpressionTransform} that accepts {@link TransformSpec}. */ -public class Transformer +public class Transformer implements BaseTransformer { private final Map transforms = new HashMap<>(); private final ThreadLocal rowSupplierForValueMatcher = new ThreadLocal<>(); @@ -64,11 +64,18 @@ public class Transformer } } + @Override + public boolean hasMultiRowTransform() + { + return false; + } + /** * Transforms an input row, or returns null if the row should be filtered out. * * @param row the input row */ + @Override @Nullable public InputRow transform(@Nullable final InputRow row) { @@ -94,6 +101,14 @@ public InputRow transform(@Nullable final InputRow row) return transformedRow; } + @Override + public List transformToList(@Nullable final InputRow row) + { + final InputRow result = transform(row); + return result == null ? List.of() : List.of(result); + } + + @Override @Nullable public InputRowListPlusRawValues transform(@Nullable final InputRowListPlusRawValues row) { diff --git a/processing/src/main/java/org/apache/druid/segment/transform/TransformingInputEntityReader.java b/processing/src/main/java/org/apache/druid/segment/transform/TransformingInputEntityReader.java index 33bed4658691..28845f473273 100644 --- a/processing/src/main/java/org/apache/druid/segment/transform/TransformingInputEntityReader.java +++ b/processing/src/main/java/org/apache/druid/segment/transform/TransformingInputEntityReader.java @@ -22,16 +22,18 @@ import org.apache.druid.data.input.InputEntityReader; import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.InputRowListPlusRawValues; +import org.apache.druid.java.util.common.CloseableIterators; import org.apache.druid.java.util.common.parsers.CloseableIterator; import java.io.IOException; +import java.util.List; public class TransformingInputEntityReader implements InputEntityReader { private final InputEntityReader delegate; - private final Transformer transformer; + private final BaseTransformer transformer; - public TransformingInputEntityReader(InputEntityReader delegate, Transformer transformer) + public TransformingInputEntityReader(InputEntityReader delegate, BaseTransformer transformer) { this.delegate = delegate; this.transformer = transformer; @@ -40,9 +42,16 @@ public TransformingInputEntityReader(InputEntityReader delegate, Transformer tra @Override public CloseableIterator read() throws IOException { + if (transformer.hasMultiRowTransform()) { + return delegate.read().flatMap(row -> { + final List rows = transformer.transformToList(row); + return CloseableIterators.withEmptyBaggage(rows.iterator()); + }); + } return delegate.read().map(transformer::transform); } + @Override public CloseableIterator sample() throws IOException { diff --git a/processing/src/main/java/org/apache/druid/segment/transform/TransformingInputSourceReader.java b/processing/src/main/java/org/apache/druid/segment/transform/TransformingInputSourceReader.java index fe1353c32d33..ef557a207656 100644 --- a/processing/src/main/java/org/apache/druid/segment/transform/TransformingInputSourceReader.java +++ b/processing/src/main/java/org/apache/druid/segment/transform/TransformingInputSourceReader.java @@ -23,16 +23,18 @@ import org.apache.druid.data.input.InputRowListPlusRawValues; import org.apache.druid.data.input.InputSourceReader; import org.apache.druid.data.input.InputStats; +import org.apache.druid.java.util.common.CloseableIterators; import org.apache.druid.java.util.common.parsers.CloseableIterator; import java.io.IOException; +import java.util.List; public class TransformingInputSourceReader implements InputSourceReader { private final InputSourceReader delegate; - private final Transformer transformer; + private final BaseTransformer transformer; - TransformingInputSourceReader(InputSourceReader delegate, Transformer transformer) + TransformingInputSourceReader(InputSourceReader delegate, BaseTransformer transformer) { this.delegate = delegate; this.transformer = transformer; @@ -41,6 +43,12 @@ public class TransformingInputSourceReader implements InputSourceReader @Override public CloseableIterator read(InputStats inputStats) throws IOException { + if (transformer.hasMultiRowTransform()) { + return delegate.read(inputStats).flatMap(row -> { + final List rows = transformer.transformToList(row); + return CloseableIterators.withEmptyBaggage(rows.iterator()); + }); + } return delegate.read(inputStats).map(transformer::transform); } diff --git a/processing/src/test/java/org/apache/druid/segment/IndexBuilder.java b/processing/src/test/java/org/apache/druid/segment/IndexBuilder.java index f30b04c2c717..6d6ac6b7a1b6 100644 --- a/processing/src/test/java/org/apache/druid/segment/IndexBuilder.java +++ b/processing/src/test/java/org/apache/druid/segment/IndexBuilder.java @@ -41,6 +41,7 @@ import org.apache.druid.segment.incremental.IncrementalIndex; import org.apache.druid.segment.incremental.IncrementalIndexSchema; import org.apache.druid.segment.incremental.OnheapIncrementalIndex; +import org.apache.druid.segment.transform.BaseTransformSpec; import org.apache.druid.segment.transform.TransformSpec; import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory; @@ -106,7 +107,7 @@ public static IndexBuilder create(ObjectMapper jsonMapper, ColumnConfig columnCo @Nullable private InputFormat inputFormat = null; @Nullable - private TransformSpec transformSpec = null; + private BaseTransformSpec transformSpec = null; @Nullable private File inputSourceTmpDir = null; @@ -185,7 +186,7 @@ public IndexBuilder inputFormat(InputFormat inputFormat) return this; } - public IndexBuilder transform(TransformSpec transformSpec) + public IndexBuilder transform(BaseTransformSpec transformSpec) { this.transformSpec = transformSpec; return this; @@ -201,7 +202,7 @@ public IndexBuilder rows( InputSource inputSource, InputFormat inputFormat, InputRowSchema rowSchema, - TransformSpec transformSpec, + BaseTransformSpec transformSpec, File tmp ) throws IOException @@ -330,7 +331,7 @@ public QueryableIndex buildMMappedMergedIndex() Preconditions.checkNotNull(inputFormat, "inputFormat"); Preconditions.checkNotNull(inputSourceTmpDir, "inputSourceTmpDir"); - TransformSpec transformer = transformSpec != null ? transformSpec : TransformSpec.NONE; + BaseTransformSpec transformer = transformSpec != null ? transformSpec : TransformSpec.NONE; InputRowSchema rowSchema = new InputRowSchema(schema.getTimestampSpec(), schema.getDimensionsSpec(), null); InputSourceReader reader = inputSource.reader(rowSchema, inputFormat, inputSourceTmpDir); InputSourceReader transformingReader = transformer.decorate(reader); @@ -475,14 +476,14 @@ public static InputSourceReader buildIncrementalIndexWithInputSource( IncrementalIndexSchema schema, InputSource inputSource, InputFormat inputFormat, - @Nullable TransformSpec transformSpec, + @Nullable BaseTransformSpec transformSpec, File inputSourceTmpDir) { Preconditions.checkNotNull(schema, "schema"); Preconditions.checkNotNull(inputSource, "inputSource"); Preconditions.checkNotNull(inputFormat, "inputFormat"); Preconditions.checkNotNull(inputSourceTmpDir, "inputSourceTmpDir"); - TransformSpec transformer = transformSpec != null ? transformSpec : TransformSpec.NONE; + BaseTransformSpec transformer = transformSpec != null ? transformSpec : TransformSpec.NONE; InputRowSchema rowSchema = new InputRowSchema(schema.getTimestampSpec(), schema.getDimensionsSpec(), null); InputSourceReader reader = inputSource.reader(rowSchema, inputFormat, inputSourceTmpDir); InputSourceReader transformingReader = transformer.decorate(reader); diff --git a/processing/src/test/java/org/apache/druid/segment/generator/SegmentGenerator.java b/processing/src/test/java/org/apache/druid/segment/generator/SegmentGenerator.java index 97a9d25afd26..ffc4bd4e58d9 100644 --- a/processing/src/test/java/org/apache/druid/segment/generator/SegmentGenerator.java +++ b/processing/src/test/java/org/apache/druid/segment/generator/SegmentGenerator.java @@ -44,8 +44,8 @@ import org.apache.druid.segment.incremental.IncrementalIndex; import org.apache.druid.segment.incremental.IncrementalIndexSchema; import org.apache.druid.segment.serde.ComplexMetrics; +import org.apache.druid.segment.transform.BaseTransformer; import org.apache.druid.segment.transform.TransformSpec; -import org.apache.druid.segment.transform.Transformer; import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; @@ -250,7 +250,7 @@ public QueryableIndex generate( final List rows = new ArrayList<>(); final List indexes = new ArrayList<>(); - final Transformer transformer = transformSpec.toTransformer(); + final BaseTransformer transformer = transformSpec.toTransformer(); final InputRowSchema rowSchema = new InputRowSchema( TimestampSpec.DEFAULT, dimensionsSpec, @@ -368,7 +368,7 @@ public IncrementalIndex generateIncrementalIndex( final List rows = new ArrayList<>(); - final Transformer transformer = transformSpec.toTransformer(); + final BaseTransformer transformer = transformSpec.toTransformer(); final InputRowSchema rowSchema = new InputRowSchema( TimestampSpec.DEFAULT, dimensionsSpec, diff --git a/processing/src/test/java/org/apache/druid/segment/transform/ScanTransformTest.java b/processing/src/test/java/org/apache/druid/segment/transform/ScanTransformTest.java new file mode 100644 index 000000000000..003f9f53bd57 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/segment/transform/ScanTransformTest.java @@ -0,0 +1,412 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.transform; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.MapBasedInputRow; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.math.expr.ExprMacroTable; +import org.apache.druid.query.Druids; +import org.apache.druid.query.TableDataSource; +import org.apache.druid.query.UnnestDataSource; +import org.apache.druid.query.filter.SelectorDimFilter; +import org.apache.druid.query.scan.ScanQuery; +import org.apache.druid.segment.TestHelper; +import org.apache.druid.segment.column.ColumnHolder; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.virtual.ExpressionVirtualColumn; +import org.apache.druid.testing.InitializedNullHandlingTest; +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +public class ScanTransformTest extends InitializedNullHandlingTest +{ + private static final long TIMESTAMP = DateTimes.of("2024-01-01").getMillis(); + + private static InputRow makeRow(Object... kvPairs) + { + Preconditions.checkArgument(kvPairs.length % 2 == 0, "kvPairs must have even length"); + final LinkedHashMap event = new LinkedHashMap<>(); + final List dimensions = new ArrayList<>(); + for (int i = 0; i < kvPairs.length; i += 2) { + final String key = (String) kvPairs[i]; + event.put(key, kvPairs[i + 1]); + if (!ColumnHolder.TIME_COLUMN_NAME.equals(key)) { + dimensions.add(key); + } + } + return new MapBasedInputRow(TIMESTAMP, dimensions, event); + } + + private static ScanQuery makeUnnestQuery(String inputColumn, String outputName) + { + return makeUnnestQuery(inputColumn, outputName, ColumnType.STRING, null); + } + + private static ScanQuery makeUnnestQuery( + String inputColumn, + String outputName, + ColumnType outputType, + SelectorDimFilter unnestFilter + ) + { + return Druids.newScanQueryBuilder() + .dataSource(UnnestDataSource.create( + new TableDataSource("__input__"), + new ExpressionVirtualColumn(outputName, "\"" + inputColumn + "\"", outputType, ExprMacroTable.nil()), + unnestFilter + )) + .eternityInterval() + .columns((List) null) + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_LIST) + .build(); + } + + private static BaseTransformer makeTransformer(ScanQuery query) + { + return new ScanTransformSpec(query).toTransformer(); + } + + @Test + public void testBasicUnnest() + { + final BaseTransformer transformer = makeTransformer(makeUnnestQuery("tags", "tag")); + final InputRow input = makeRow("user", "alice", "tags", List.of("a", "b", "c")); + + final List result = transformer.transformToList(input); + Assert.assertEquals(3, result.size()); + + Assert.assertEquals("a", result.get(0).getRaw("tag")); + Assert.assertEquals("alice", result.get(0).getRaw("user")); + Assert.assertEquals(TIMESTAMP, result.get(0).getTimestampFromEpoch()); + + Assert.assertEquals("b", result.get(1).getRaw("tag")); + Assert.assertEquals("c", result.get(2).getRaw("tag")); + } + + @Test + public void testUnnestEmptyArray() + { + final BaseTransformer transformer = makeTransformer(makeUnnestQuery("tags", "tag")); + final InputRow input = makeRow("user", "alice", "tags", List.of()); + + final List result = transformer.transformToList(input); + // Empty array produces 0 rows, matching native CROSS JOIN UNNEST semantics + Assert.assertEquals(0, result.size()); + } + + @Test + public void testUnnestMissingColumn() + { + final BaseTransformer transformer = makeTransformer(makeUnnestQuery("services", "svc")); + final InputRow input = makeRow("user", "alice", "host", "web-01"); + + final List result = transformer.transformToList(input); + // Missing column produces 0 rows, matching native CROSS JOIN UNNEST semantics + Assert.assertEquals(0, result.size()); + } + + @Test + public void testUnnestSingleElement() + { + final BaseTransformer transformer = makeTransformer(makeUnnestQuery("tags", "tag")); + final InputRow input = makeRow("user", "alice", "tags", List.of("only")); + + final List result = transformer.transformToList(input); + Assert.assertEquals(1, result.size()); + Assert.assertEquals("only", result.get(0).getRaw("tag")); + } + + @Test + public void testUnnestScalarValue() + { + final BaseTransformer transformer = makeTransformer(makeUnnestQuery("tags", "tag")); + final InputRow input = makeRow("user", "alice", "tags", "scalar"); + + final List result = transformer.transformToList(input); + Assert.assertEquals(1, result.size()); + Assert.assertEquals("scalar", result.get(0).getRaw("tag")); + } + + @Test + public void testUnnestArrayOfJsonObjects() + { + final BaseTransformer transformer = makeTransformer( + makeUnnestQuery("items", "item", ColumnType.NESTED_DATA, null) + ); + final InputRow input = makeRow("user", "alice", "items", List.of( + Map.of("product", "shirt", "price", 25), + Map.of("product", "pants", "price", 40), + Map.of("product", "hat", "price", 15) + )); + + final List result = transformer.transformToList(input); + Assert.assertEquals(3, result.size()); + + final Object item0 = result.get(0).getRaw("item"); + Assert.assertNotNull(item0); + Assert.assertTrue("Expected a Map, got " + item0.getClass(), item0 instanceof Map); + Assert.assertEquals("shirt", ((Map) item0).get("product")); + + final Object item2 = result.get(2).getRaw("item"); + Assert.assertTrue(item2 instanceof Map); + Assert.assertEquals("hat", ((Map) item2).get("product")); + } + + @Test + public void testUnnestNestedArrays() + { + final BaseTransformer transformer = makeTransformer( + makeUnnestQuery("data", "element", ColumnType.NESTED_DATA, null) + ); + final InputRow input = makeRow( + "user", "alice", + "data", List.of(List.of(1, 2), List.of(3)) + ); + + final List result = transformer.transformToList(input); + Assert.assertEquals(2, result.size()); + + final Object elem0 = result.get(0).getRaw("element"); + Assert.assertNotNull(elem0); + Assert.assertArrayEquals(new Object[]{1L, 2L}, (Object[]) elem0); + + final Object elem1 = result.get(1).getRaw("element"); + Assert.assertNotNull(elem1); + Assert.assertArrayEquals(new Object[]{3L}, (Object[]) elem1); + + Assert.assertEquals("alice", result.get(0).getRaw("user")); + Assert.assertEquals("alice", result.get(1).getRaw("user")); + } + + @Test + public void testTimestampPreservation() + { + final BaseTransformer transformer = makeTransformer(makeUnnestQuery("tags", "tag")); + final InputRow input = makeRow("tags", List.of("a", "b")); + + final List result = transformer.transformToList(input); + for (final InputRow row : result) { + Assert.assertEquals(TIMESTAMP, row.getTimestampFromEpoch()); + } + } + + @Test + public void testWithUnnestFilter() + { + final BaseTransformer transformer = makeTransformer( + makeUnnestQuery("tags", "tag", ColumnType.STRING, new SelectorDimFilter("tag", "b", null)) + ); + final InputRow input = makeRow("user", "alice", "tags", List.of("a", "b", "c")); + + final List result = transformer.transformToList(input); + Assert.assertEquals(1, result.size()); + Assert.assertEquals("b", result.get(0).getRaw("tag")); + } + + // --- Transformer integration tests --- + + @Test + public void testTransformerWithSingleScanTransform() + { + final ScanTransformSpec spec = new ScanTransformSpec( + makeUnnestQuery("tags", "tag") + ); + + final BaseTransformer transformer = spec.toTransformer(); + Assert.assertTrue(transformer.hasMultiRowTransform()); + Assert.assertTrue(transformer instanceof ScanTransformer); + + final InputRow input = makeRow("user", "alice", "tags", List.of("x", "y")); + final List result = transformer.transformToList(input); + + Assert.assertEquals(2, result.size()); + Assert.assertEquals("x", result.get(0).getRaw("tag")); + Assert.assertEquals("y", result.get(1).getRaw("tag")); + } + + @Test + public void testNestedUnnestCrossJoin() + { + final BaseTransformer transformer = new ScanTransformSpec( + Druids.newScanQueryBuilder() + .dataSource(UnnestDataSource.create( + UnnestDataSource.create( + new TableDataSource("__input__"), + new ExpressionVirtualColumn("tag", "\"tags\"", ColumnType.STRING, ExprMacroTable.nil()), + null + ), + new ExpressionVirtualColumn("color", "\"colors\"", ColumnType.STRING, ExprMacroTable.nil()), + null + )) + .eternityInterval() + .columns((List) null) + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_LIST) + .build() + ).toTransformer(); + Assert.assertTrue(transformer.hasMultiRowTransform()); + + final InputRow input = makeRow( + "user", "alice", + "tags", List.of("a", "b"), + "colors", List.of("red", "blue", "green") + ); + final List result = transformer.transformToList(input); + + // 2 tags x 3 colors = 6 rows (cross join) + Assert.assertEquals(6, result.size()); + } + + @Test + public void testNestedUnnestWithMissingOuterColumn() + { + final BaseTransformer transformer = new ScanTransformSpec( + Druids.newScanQueryBuilder() + .dataSource(UnnestDataSource.create( + UnnestDataSource.create( + new TableDataSource("__input__"), + new ExpressionVirtualColumn("tag", "\"tags\"", ColumnType.STRING, ExprMacroTable.nil()), + null + ), + new ExpressionVirtualColumn("svc", "\"services\"", ColumnType.NESTED_DATA, ExprMacroTable.nil()), + null + )) + .eternityInterval() + .columns((List) null) + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_LIST) + .build() + ).toTransformer(); + + // tags present (2 elements), services missing + final InputRow input = makeRow("trace_id", "abc", "tags", List.of("music", "blll")); + final List result = transformer.transformToList(input); + + // Nested unnest is a cross join: tags x services. With services missing, the cross join + // produces 0 rows — matching native CROSS JOIN UNNEST semantics. + Assert.assertEquals(0, result.size()); + } + + @Test + public void testNestedUnnestFlattensNestedArrays() + { + final BaseTransformer transformer = new ScanTransformSpec( + Druids.newScanQueryBuilder() + .dataSource(UnnestDataSource.create( + UnnestDataSource.create( + new TableDataSource("__input__"), + new ExpressionVirtualColumn("inner", "\"data\"", ColumnType.NESTED_DATA, ExprMacroTable.nil()), + null + ), + new ExpressionVirtualColumn("val", "\"inner\"", ColumnType.LONG, ExprMacroTable.nil()), + null + )) + .eternityInterval() + .columns((List) null) + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_LIST) + .build() + ).toTransformer(); + + final InputRow input = makeRow( + "user", "alice", + "data", List.of(List.of(1, 2), List.of(3)) + ); + final List result = transformer.transformToList(input); + + // First unnest: [[1,2],[3]] -> [1,2], [3] (2 rows) + // Second unnest: [1,2] -> 1, 2 and [3] -> 3 (3 rows total) + Assert.assertEquals(3, result.size()); + + final List values = new ArrayList<>(); + for (final InputRow row : result) { + values.add(row.getRaw("val")); + Assert.assertEquals("alice", row.getRaw("user")); + } + Assert.assertEquals(3, values.size()); + Assert.assertEquals(1, ((Number) values.get(0)).intValue()); + Assert.assertEquals(2, ((Number) values.get(1)).intValue()); + Assert.assertEquals(3, ((Number) values.get(2)).intValue()); + } + + @Test + public void testScanTransformWithQueryFilter() + { + final BaseTransformer transformer = new ScanTransformSpec( + Druids.newScanQueryBuilder() + .dataSource(UnnestDataSource.create( + new TableDataSource("__input__"), + new ExpressionVirtualColumn("tag", "\"tags\"", ColumnType.STRING, ExprMacroTable.nil()), + null + )) + .eternityInterval() + .filters(new SelectorDimFilter("user", "not_alice", null)) + .columns((List) null) + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_LIST) + .build() + ).toTransformer(); + final InputRow input = makeRow("user", "alice", "tags", List.of("a", "b")); + final List result = transformer.transformToList(input); + // Filter rejects the row (user != "not_alice"), so 0 rows — matching native scan query semantics + Assert.assertEquals(0, result.size()); + } + + @Test + public void testTransformerWithoutScanTransform() + { + final TransformSpec spec = new TransformSpec(null, null); + final BaseTransformer transformer = spec.toTransformer(); + Assert.assertFalse(transformer.hasMultiRowTransform()); + Assert.assertTrue(transformer instanceof Transformer); + + final InputRow input = makeRow("user", "alice"); + final List result = transformer.transformToList(input); + Assert.assertEquals(1, result.size()); + } + + @Test + public void testTransformerTransformToListWithNull() + { + final TransformSpec spec = new TransformSpec(null, null); + final BaseTransformer transformer = spec.toTransformer(); + Assert.assertTrue(transformer.transformToList(null).isEmpty()); + } + + // --- Serde tests --- + + @Test + public void testScanTransformSpecSerde() throws Exception + { + final ScanTransformSpec spec = new ScanTransformSpec( + makeUnnestQuery("tags", "tag", ColumnType.STRING, new SelectorDimFilter("tag", "a", null)) + ); + + final ObjectMapper jsonMapper = TestHelper.makeJsonMapper(); + final String json = jsonMapper.writeValueAsString(spec); + final BaseTransformSpec deserialized = jsonMapper.readValue(json, BaseTransformSpec.class); + Assert.assertTrue(deserialized instanceof ScanTransformSpec); + Assert.assertEquals(spec, deserialized); + } +} diff --git a/processing/src/test/java/org/apache/druid/segment/transform/TransformerTest.java b/processing/src/test/java/org/apache/druid/segment/transform/TransformerTest.java index 2583b81deabb..9d032cfd5701 100644 --- a/processing/src/test/java/org/apache/druid/segment/transform/TransformerTest.java +++ b/processing/src/test/java/org/apache/druid/segment/transform/TransformerTest.java @@ -28,8 +28,14 @@ import org.apache.druid.data.input.Row; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.parsers.ParseException; +import org.apache.druid.query.Druids; +import org.apache.druid.query.TableDataSource; +import org.apache.druid.query.UnnestDataSource; import org.apache.druid.query.expression.TestExprMacroTable; import org.apache.druid.query.filter.SelectorDimFilter; +import org.apache.druid.query.scan.ScanQuery; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.virtual.ExpressionVirtualColumn; import org.apache.druid.testing.InitializedNullHandlingTest; import org.joda.time.DateTime; import org.junit.Assert; @@ -315,6 +321,38 @@ public void testInputRowListPlusRawValuesTransformWithFilter() Assert.assertEquals("val1", actual.getRawValuesList().get(0).get("dim")); } + @Test + public void testInputRowListPlusRawValuesTransformWithScanTransformExpandsRowsAndRawValues() + { + final BaseTransformer transformer = new ScanTransformSpec( + Druids.newScanQueryBuilder() + .dataSource(UnnestDataSource.create( + new TableDataSource("__input__"), + new ExpressionVirtualColumn("tag", "\"tags\"", ColumnType.STRING, TestExprMacroTable.INSTANCE), + null + )) + .eternityInterval() + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_LIST) + .build() + ).toTransformer(); + + final InputRow inputRow = new MapBasedInputRow( + DateTimes.nowUtc(), + ImmutableList.of("user", "tags"), + ImmutableMap.of("user", "alice", "tags", ImmutableList.of("a", "b")) + ); + final Map rawValues = ImmutableMap.of("user", "alice", "tags", ImmutableList.of("a", "b")); + + final InputRowListPlusRawValues transformed = transformer.transform(InputRowListPlusRawValues.of(inputRow, rawValues)); + Assert.assertNotNull(transformed); + Assert.assertEquals(2, transformed.getInputRows().size()); + Assert.assertEquals(2, transformed.getRawValuesList().size()); + Assert.assertEquals(rawValues, transformed.getRawValuesList().get(0)); + Assert.assertEquals(rawValues, transformed.getRawValuesList().get(1)); + Assert.assertEquals("a", transformed.getInputRows().get(0).getRaw("tag")); + Assert.assertEquals("b", transformed.getInputRows().get(1).getRaw("tag")); + } + @Test public void testTransformWithArrayStringInputsExpr() { diff --git a/server/src/main/java/org/apache/druid/segment/indexing/DataSchema.java b/server/src/main/java/org/apache/druid/segment/indexing/DataSchema.java index dbe0836c4dd8..3cf470e3270c 100644 --- a/server/src/main/java/org/apache/druid/segment/indexing/DataSchema.java +++ b/server/src/main/java/org/apache/druid/segment/indexing/DataSchema.java @@ -43,6 +43,7 @@ import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.segment.column.ValueType; import org.apache.druid.segment.projections.AggregateProjectionSchema; +import org.apache.druid.segment.transform.BaseTransformSpec; import org.apache.druid.segment.transform.TransformSpec; import javax.annotation.Nullable; @@ -77,7 +78,7 @@ public static Builder builder(DataSchema schema) private final String dataSource; private final AggregatorFactory[] aggregators; private final GranularitySpec granularitySpec; - private final TransformSpec transformSpec; + private final BaseTransformSpec transformSpec; @Nullable private final TimestampSpec timestampSpec; @Nullable @@ -92,7 +93,7 @@ public DataSchema( @JsonProperty("dimensionsSpec") @Nullable DimensionsSpec dimensionsSpec, @JsonProperty("metricsSpec") @Nullable AggregatorFactory[] aggregators, @JsonProperty("granularitySpec") @Nullable GranularitySpec granularitySpec, - @JsonProperty("transformSpec") TransformSpec transformSpec, + @JsonProperty("transformSpec") BaseTransformSpec transformSpec, @JsonProperty("projections") @Nullable List projections, @Deprecated @JsonProperty("parser") @Nullable Map parserMap ) @@ -168,7 +169,7 @@ public GranularitySpec getGranularitySpec() } @JsonProperty - public TransformSpec getTransformSpec() + public BaseTransformSpec getTransformSpec() { return transformSpec; } @@ -195,7 +196,7 @@ public DataSchema withGranularitySpec(GranularitySpec granularitySpec) return builder(this).withGranularity(granularitySpec).build(); } - public DataSchema withTransformSpec(TransformSpec transformSpec) + public DataSchema withTransformSpec(BaseTransformSpec transformSpec) { return builder(this).withTransform(transformSpec).build(); } @@ -492,7 +493,7 @@ public static class Builder private String dataSource; private AggregatorFactory[] aggregators; private GranularitySpec granularitySpec; - private TransformSpec transformSpec; + private BaseTransformSpec transformSpec; private TimestampSpec timestampSpec; private DimensionsSpec dimensionsSpec; private List projections; @@ -554,7 +555,7 @@ public Builder withGranularity(GranularitySpec granularitySpec) return this; } - public Builder withTransform(TransformSpec transformSpec) + public Builder withTransform(BaseTransformSpec transformSpec) { this.transformSpec = transformSpec; return this; diff --git a/website/.spelling b/website/.spelling index 28701817f362..8ed5ac2da309 100644 --- a/website/.spelling +++ b/website/.spelling @@ -2646,3 +2646,4 @@ nginx - ../docs/development/extensions-core/s3.md NIO +passthrough