Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
setup:
- do:
indices.create:
index: test_5150
- do:
query.settings:
body:
transient:
plugins.calcite.enabled : true

- do:
bulk:
index: test_5150
refresh: true
body:
- '{"index": {}}'
- '{"category":"X","value":100}'
- '{"index": {}}'
- '{"category":"X","value":200}'
- '{"index": {}}'
- '{"category":"Y","value":300}'
- '{"index": {}}'
- '{"category":"Y","value":400}'

---
teardown:
- do:
query.settings:
body:
transient:
plugins.calcite.enabled : false

---
"5150: Rename non-dedup field then dedup retains renamed values":
- skip:
features:
- headers
- allowed_warnings
- do:
allowed_warnings:
- 'Loading the fielddata on the _id field is deprecated and will be removed in future versions. If you require sorting or aggregating on this field you should also include the id in the body of your documents, and map this field as a keyword field that has [doc_values] enabled'
headers:
Content-Type: 'application/json'
ppl:
body:
query: source=test_5150 | rename value as val | dedup category | sort category | fields category, val

- match: { total: 2 }
- match: { schema: [{"name": "category", "type": "string"}, {"name": "val", "type": "bigint"}] }
- length: { datarows: 2 }
- match: { datarows.0.0: "X" }
- match: { datarows.0.1: 100 }
- match: { datarows.1.0: "Y" }
- match: { datarows.1.1: 300 }

---
"5150: Eval new field then dedup on different field retains eval values":
- skip:
features:
- headers
- allowed_warnings
- do:
allowed_warnings:
- 'Loading the fielddata on the _id field is deprecated and will be removed in future versions. If you require sorting or aggregating on this field you should also include the id in the body of your documents, and map this field as a keyword field that has [doc_values] enabled'
headers:
Content-Type: 'application/json'
ppl:
body:
query: source=test_5150 | eval doubled = value * 2 | dedup category | sort category | fields category, value, doubled

- match: { total: 2 }
- length: { datarows: 2 }
- match: { datarows.0.0: "X" }
- match: { datarows.0.1: 100 }
- match: { datarows.0.2: 200 }
- match: { datarows.1.0: "Y" }
- match: { datarows.1.1: 300 }
- match: { datarows.1.2: 600 }
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand All @@ -59,6 +60,8 @@
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlKind;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.script.Script;
import org.opensearch.search.aggregations.AggregationBuilder;
import org.opensearch.search.aggregations.AggregationBuilders;
Expand Down Expand Up @@ -109,6 +112,8 @@
*/
public class AggregateAnalyzer {

private static final Logger LOG = LogManager.getLogger();

/** metadata field used when there is no argument. Only apply to COUNT. */
private static final String METADATA_FIELD = "_index";

Expand Down Expand Up @@ -601,7 +606,32 @@ yield switch (functionName) {
TopHitsAggregationBuilder topHitsAggregationBuilder =
createTopHitsBuilder(
aggCall, args, aggName, helper, dedupNumber, false, false, null, null);
yield Pair.of(topHitsAggregationBuilder, new TopHitsParser(aggName, false, false));
// Build field name mapping for renamed fields (e.g., rename value as val).
// The top_hits response uses original OS field names, but the output schema expects
// the renamed names from the project.
// Known limitation: if multiple project args reference the same original field with
// different output names (e.g., eval pay2 = salary | rename salary as pay | dedup
// dept_id), the later mapping will overwrite the earlier one in this HashMap.
Map<String, String> fieldNameMapping = new HashMap<>();
for (Pair<RexNode, String> arg : args) {
if (arg.getKey() instanceof RexInputRef) {
String originalName = helper.inferNamedField(arg.getKey()).getRootName();
String outputName = arg.getValue();
if (!originalName.equals(outputName)) {
String previousMapping = fieldNameMapping.put(originalName, outputName);
if (previousMapping != null) {
LOG.warn(
"Field name mapping collision: field '{}' was mapped to '{}', now"
+ " overwritten by '{}'",
originalName,
previousMapping,
outputName);
}
}
}
}
yield Pair.of(
topHitsAggregationBuilder, new TopHitsParser(aggName, false, false, fieldNameMapping));
}
default ->
throw new AggregateAnalyzer.AggregateAnalyzerException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,27 @@ public class TopHitsParser implements MetricParser {
private final boolean returnSingleValue;
private final boolean returnMergeValue;

/**
* Mapping from original OpenSearch field names to output field names (e.g., renamed via {@code
* rename} command). When a field is renamed (e.g., {@code rename value as val}), the top_hits
* response still contains the original field name ({@code value}), but the output schema expects
* the renamed name ({@code val}). This mapping enables the translation.
*/
private final Map<String, String> fieldNameMapping;

public TopHitsParser(String name, boolean returnSingleValue, boolean returnMergeValue) {
this(name, returnSingleValue, returnMergeValue, Collections.emptyMap());
}

public TopHitsParser(
String name,
boolean returnSingleValue,
boolean returnMergeValue,
Map<String, String> fieldNameMapping) {
this.name = name;
this.returnSingleValue = returnSingleValue;
this.returnMergeValue = returnMergeValue;
this.fieldNameMapping = fieldNameMapping;
}

@Override
Expand All @@ -43,6 +60,9 @@ public List<Map<String, Object>> parse(Aggregation agg) {
new HashMap<>(Collections.singletonMap(agg.getName(), null)));
}

// Field name mapping is not applied in returnSingleValue or returnMergeValue paths
// because they use the aggregation name (agg.getName()) as the map key, not field names.
// Only the multi-row path below uses actual field names as keys and needs mapping.
if (returnSingleValue) {
Object value = null;
if (!isSourceEmpty(hits)) {
Expand Down Expand Up @@ -129,12 +149,28 @@ public List<Map<String, Object>> parse(Aggregation agg) {
? new LinkedHashMap<>()
: new LinkedHashMap<>(hit.getSourceAsMap());
hit.getFields().values().forEach(f -> map.put(f.getName(), f.getValue()));
return map;
return applyFieldNameMapping(map);
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The field name mapping is applied only in the multi-row return path (the else branch at line 140). The returnSingleValue path (used by ARG_MAX/ARG_MIN pushdown) and returnMergeValue path skip mapping entirely. If those paths are ever used with renamed fields via a future pushdown rule, they would have the same null-value bug. Worth confirming this is an intentional scope limitation for now, or if those branches should also apply the mapping defensively.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Confirmed this is intentional. The returnSingleValue and returnMergeValue paths use agg.getName() as the map key (not field names), so field name mapping is irrelevant for those paths. Added a clarifying comment in the code explaining why mapping is only applied in the multi-row return path.

})
.toList();
}
}

/**
* Apply field name mapping to translate original OpenSearch field names to output field names.
* Fields not present in the mapping are kept as-is.
*/
private Map<String, Object> applyFieldNameMapping(Map<String, Object> map) {
if (fieldNameMapping.isEmpty()) {
return map;
}
Map<String, Object> result = new LinkedHashMap<>();
for (Map.Entry<String, Object> entry : map.entrySet()) {
String mappedName = fieldNameMapping.getOrDefault(entry.getKey(), entry.getKey());
result.put(mappedName, entry.getValue());
}
return result;
}

private boolean isEmptyHits(SearchHit[] hits) {
return isFieldsEmpty(hits) && isSourceEmpty(hits);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,57 @@ void two_bucket_percentiles_should_pass() {
ImmutableMap.of("percentiles", List.of(21.0, 27.0, 30.0, 35.0, 55.0, 58.0, 60.0))));
}

/**
* Test for issue #5150: dedup aggregation pushdown with renamed fields. When a field is renamed
* (e.g., rename value as val), the top_hits response uses original field names. The TopHitsParser
* with fieldNameMapping should translate them to the renamed names.
*/
@Test
void dedup_top_hits_with_field_name_mapping_should_remap_fields() {
String response =
"{\n"
+ " \"composite#composite_buckets\": {\n"
+ " \"buckets\": [\n"
+ " {\n"
+ " \"key\": {\n"
+ " \"category\": \"X\"\n"
+ " },\n"
+ " \"doc_count\": 2,\n"
+ " \"top_hits#dedup\": {\n"
+ " \"hits\": {\n"
+ " \"total\": { \"value\": 1, \"relation\": \"eq\" },\n"
+ " \"max_score\": 1.0,\n"
+ " \"hits\": [\n"
+ " {\n"
+ " \"_index\": \"test\",\n"
+ " \"_id\": \"1\",\n"
+ " \"_score\": 1.0,\n"
+ " \"fields\": {\n"
+ " \"category\": [\"X\"],\n"
+ " \"value\": [100]\n"
+ " }\n"
+ " }\n"
+ " ]\n"
+ " }\n"
+ " }\n"
+ " }\n"
+ " ]\n"
+ " }\n"
+ "}";
// "value" is renamed to "val" — the mapping should translate it in the response.
// Use BucketAggregationParser as used by the dedup aggregation pushdown path.
OpenSearchAggregationResponseParser parser =
new BucketAggregationParser(
List.of(new TopHitsParser("dedup", false, false, Map.of("value", "val"))), List.of());
List<Map<String, Object>> result = parse(parser, response);
assertEquals(1, result.size());
Map<String, Object> row = result.get(0);
// The renamed field "val" should be present, not the original "value"
assertEquals(100, row.get("val"));
assertNull(row.get("value"));
assertEquals("X", row.get("category"));
}

public List<Map<String, Object>> parse(OpenSearchAggregationResponseParser parser, String json) {
return parser.parse(fromJson(json));
}
Expand Down
Loading