From 1cafd2eddd1760170f8876e4db5caaa47136e460 Mon Sep 17 00:00:00 2001 From: zhuyufeng0809 <1547107965@qq.com> Date: Wed, 11 Mar 2026 11:02:39 +0800 Subject: [PATCH 1/3] [metric] Support influxdb reporter --- .../apache/fluss/config/ConfigOptions.java | 35 +++ fluss-metrics/fluss-metrics-influxdb/pom.xml | 79 ++++++ .../influxdb/InfluxdbPointProducer.java | 104 ++++++++ .../metrics/influxdb/InfluxdbReporter.java | 150 ++++++++++++ .../influxdb/InfluxdbReporterPlugin.java | 79 ++++++ ...luss.metrics.reporter.MetricReporterPlugin | 20 ++ .../influxdb/InfluxdbPointProducerTest.java | 157 ++++++++++++ .../influxdb/InfluxdbReporterPluginTest.java | 58 +++++ .../influxdb/InfluxdbReporterTest.java | 228 ++++++++++++++++++ .../org.junit.jupiter.api.extension.Extension | 19 ++ .../src/test/resources/log4j2-test.properties | 28 +++ fluss-metrics/pom.xml | 1 + 12 files changed, 958 insertions(+) create mode 100644 fluss-metrics/fluss-metrics-influxdb/pom.xml create mode 100644 fluss-metrics/fluss-metrics-influxdb/src/main/java/org/apache/fluss/metrics/influxdb/InfluxdbPointProducer.java create mode 100644 fluss-metrics/fluss-metrics-influxdb/src/main/java/org/apache/fluss/metrics/influxdb/InfluxdbReporter.java create mode 100644 fluss-metrics/fluss-metrics-influxdb/src/main/java/org/apache/fluss/metrics/influxdb/InfluxdbReporterPlugin.java create mode 100644 fluss-metrics/fluss-metrics-influxdb/src/main/resources/META-INF/services/org.apache.fluss.metrics.reporter.MetricReporterPlugin create mode 100644 fluss-metrics/fluss-metrics-influxdb/src/test/java/org/apache/fluss/metrics/influxdb/InfluxdbPointProducerTest.java create mode 100644 fluss-metrics/fluss-metrics-influxdb/src/test/java/org/apache/fluss/metrics/influxdb/InfluxdbReporterPluginTest.java create mode 100644 fluss-metrics/fluss-metrics-influxdb/src/test/java/org/apache/fluss/metrics/influxdb/InfluxdbReporterTest.java create mode 100644 fluss-metrics/fluss-metrics-influxdb/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension create mode 100644 fluss-metrics/fluss-metrics-influxdb/src/test/resources/log4j2-test.properties diff --git a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java index 90f262b07d..aa065a2ed5 100644 --- a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java +++ b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java @@ -1939,6 +1939,41 @@ public class ConfigOptions { + "the CoordinatorServer) it is advisable to use a port range " + "like 9990-9999."); + // ------------------------------------------------------------------------ + // ConfigOptions for influxdb reporter + // ------------------------------------------------------------------------ + public static final ConfigOption METRICS_REPORTER_INFLUXDB_HOST_URL = + key("metrics.reporter.influxdb.host-url") + .stringType() + .noDefaultValue() + .withDescription( + "The InfluxDB server host URL including scheme, host name, and port."); + + public static final ConfigOption METRICS_REPORTER_INFLUXDB_BUCKET = + key("metrics.reporter.influxdb.bucket") + .stringType() + .noDefaultValue() + .withFallbackKeys("metrics.reporter.influxdb.database") + .withDescription("The InfluxDB bucket/database name."); + + public static final ConfigOption METRICS_REPORTER_INFLUXDB_ORG = + key("metrics.reporter.influxdb.org") + .stringType() + .noDefaultValue() + .withDescription("The InfluxDB organization name."); + + public static final ConfigOption METRICS_REPORTER_INFLUXDB_TOKEN = + key("metrics.reporter.influxdb.token") + .stringType() + .noDefaultValue() + .withDescription("The InfluxDB authentication token."); + + public static final ConfigOption METRICS_REPORTER_INFLUXDB_PUSH_INTERVAL = + key("metrics.reporter.influxdb.push-interval") + .durationType() + .defaultValue(Duration.ofSeconds(10)) + .withDescription("The interval of reporting metrics to InfluxDB."); + // ------------------------------------------------------------------------ // ConfigOptions for lakehouse storage // ------------------------------------------------------------------------ diff --git a/fluss-metrics/fluss-metrics-influxdb/pom.xml b/fluss-metrics/fluss-metrics-influxdb/pom.xml new file mode 100644 index 0000000000..eef4ed8c34 --- /dev/null +++ b/fluss-metrics/fluss-metrics-influxdb/pom.xml @@ -0,0 +1,79 @@ + + + + + 4.0.0 + + org.apache.fluss + fluss-metrics + 0.10-SNAPSHOT + + + fluss-metrics-influxdb + Fluss : Metrics : InfluxDB + + + 1.8.0 + + + + + org.apache.fluss + fluss-common + ${project.version} + provided + + + + com.influxdb + influxdb3-java + ${influxdb3.version} + provided + + + + + org.apache.fluss + fluss-test-utils + + + org.apache.fluss + fluss-common + ${project.version} + test + test-jar + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + + ${skip.on.java8} + + ${skip.on.java8} + + + + + diff --git a/fluss-metrics/fluss-metrics-influxdb/src/main/java/org/apache/fluss/metrics/influxdb/InfluxdbPointProducer.java b/fluss-metrics/fluss-metrics-influxdb/src/main/java/org/apache/fluss/metrics/influxdb/InfluxdbPointProducer.java new file mode 100644 index 0000000000..04ee1f30ed --- /dev/null +++ b/fluss-metrics/fluss-metrics-influxdb/src/main/java/org/apache/fluss/metrics/influxdb/InfluxdbPointProducer.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.metrics.influxdb; + +import org.apache.fluss.metrics.Counter; +import org.apache.fluss.metrics.Gauge; +import org.apache.fluss.metrics.Histogram; +import org.apache.fluss.metrics.HistogramStatistics; +import org.apache.fluss.metrics.Meter; +import org.apache.fluss.metrics.Metric; + +import com.influxdb.v3.client.Point; + +import java.time.Instant; +import java.util.List; +import java.util.Map; + +/** Producer that creates InfluxDB {@link Point Points} from Fluss {@link Metric Metrics}. */ +public class InfluxdbPointProducer { + + private static final InfluxdbPointProducer INSTANCE = new InfluxdbPointProducer(); + + public static InfluxdbPointProducer getInstance() { + return INSTANCE; + } + + public Point createPoint( + Metric metric, String metricName, List> tags, Instant time) { + Point point = Point.measurement(metricName).setTimestamp(time); + + for (Map.Entry tag : tags) { + point.setTag(tag.getKey(), tag.getValue()); + } + + if (metric instanceof Counter) { + return createPointForCounter((Counter) metric, point); + } + + if (metric instanceof Gauge) { + return createPointForGauge((Gauge) metric, point); + } + + if (metric instanceof Meter) { + return createPointForMeter((Meter) metric, point); + } + + if (metric instanceof Histogram) { + return createPointForHistogram((Histogram) metric, point); + } + + throw new IllegalArgumentException("Unknown metric type: " + metric.getClass()); + } + + private Point createPointForCounter(Counter counter, Point point) { + return point.setField("count", counter.getCount()); + } + + private Point createPointForGauge(Gauge gauge, Point point) { + Object value = gauge.getValue(); + + if (value instanceof Number) { + return point.setField("value", ((Number) value)); + } else if (value instanceof Boolean) { + return point.setField("value", ((boolean) value)); + } else { + return point.setField("value", String.valueOf(value)); + } + } + + private Point createPointForMeter(Meter meter, Point point) { + return point.setField("rate", meter.getRate()).setField("count", meter.getCount()); + } + + private Point createPointForHistogram(Histogram histogram, Point point) { + HistogramStatistics stats = histogram.getStatistics(); + return point.setField("count", histogram.getCount()) + .setField("mean", stats.getMean()) + .setField("stddev", stats.getStdDev()) + .setField("min", stats.getMin()) + .setField("max", stats.getMax()) + .setField("p50", stats.getQuantile(0.5)) + .setField("p75", stats.getQuantile(0.75)) + .setField("p95", stats.getQuantile(0.95)) + .setField("p98", stats.getQuantile(0.98)) + .setField("p99", stats.getQuantile(0.99)) + .setField("p999", stats.getQuantile(0.999)); + } +} diff --git a/fluss-metrics/fluss-metrics-influxdb/src/main/java/org/apache/fluss/metrics/influxdb/InfluxdbReporter.java b/fluss-metrics/fluss-metrics-influxdb/src/main/java/org/apache/fluss/metrics/influxdb/InfluxdbReporter.java new file mode 100644 index 0000000000..ba43551f42 --- /dev/null +++ b/fluss-metrics/fluss-metrics-influxdb/src/main/java/org/apache/fluss/metrics/influxdb/InfluxdbReporter.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.metrics.influxdb; + +import org.apache.fluss.config.Configuration; +import org.apache.fluss.metrics.Metric; +import org.apache.fluss.metrics.groups.MetricGroup; +import org.apache.fluss.metrics.reporter.ScheduledMetricReporter; +import org.apache.fluss.utils.MapUtils; + +import com.influxdb.v3.client.InfluxDBClient; +import com.influxdb.v3.client.Point; +import com.influxdb.v3.client.config.ClientConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** {@link ScheduledMetricReporter} that exports {@link Metric Metrics} via InfluxDB. */ +public class InfluxdbReporter implements ScheduledMetricReporter { + + private static final Logger LOG = LoggerFactory.getLogger(InfluxdbReporter.class); + + private static final char SCOPE_SEPARATOR = '_'; + private static final String SCOPE_PREFIX = "fluss" + SCOPE_SEPARATOR; + + private final Map metricNames; + private final Map>> metricTags; + private final InfluxDBClient client; + private final Duration pushInterval; + private final InfluxdbPointProducer pointProducer; + + public InfluxdbReporter( + String hostUrl, String org, String bucket, String token, Duration pushInterval) { + ClientConfig clientConfig = + new ClientConfig.Builder() + .host(hostUrl) + .token(token.toCharArray()) + .organization(org) + .database(bucket) + .build(); + + this.client = InfluxDBClient.getInstance(clientConfig); + this.pushInterval = pushInterval; + this.metricNames = MapUtils.newConcurrentHashMap(); + this.metricTags = MapUtils.newConcurrentHashMap(); + this.pointProducer = InfluxdbPointProducer.getInstance(); + + LOG.info("Started InfluxDB reporter connecting to {}", hostUrl); + } + + @Override + public void open(Configuration config) { + // do nothing + } + + @Override + public void close() { + if (client != null) { + try { + client.close(); + } catch (Exception e) { + LOG.warn("Failed to close InfluxDB client", e); + } + } + } + + @Override + public void report() { + List points = new ArrayList<>(); + Instant now = Instant.now(); + + for (Map.Entry entry : metricNames.entrySet()) { + Metric metric = entry.getKey(); + String metricName = entry.getValue(); + List> tags = metricTags.get(metric); + + try { + Point point = pointProducer.createPoint(metric, metricName, tags, now); + points.add(point); + } catch (Exception e) { + LOG.warn("Failed to create point for metric {}", metricName, e); + } + } + + client.writePoints(points); + } + + @Override + public Duration scheduleInterval() { + return pushInterval; + } + + @Override + public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) { + String scopedMetricName = getScopedName(metricName, group); + List> tags = getTags(group); + + metricNames.put(metric, scopedMetricName); + metricTags.put(metric, tags); + } + + @Override + public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) { + metricNames.remove(metric); + metricTags.remove(metric); + } + + private String getScopedName(String metricName, MetricGroup group) { + return SCOPE_PREFIX + + group.getLogicalScope(this::filterCharacters, SCOPE_SEPARATOR) + + SCOPE_SEPARATOR + + filterCharacters(metricName); + } + + private List> getTags(MetricGroup group) { + List> tags = new ArrayList<>(); + for (Map.Entry entry : group.getAllVariables().entrySet()) { + tags.add( + new HashMap.SimpleEntry<>( + filterCharacters(entry.getKey()), filterCharacters(entry.getValue()))); + } + return tags; + } + + private String filterCharacters(String input) { + return input.replaceAll("[^a-zA-Z0-9_:]", String.valueOf(SCOPE_SEPARATOR)); + } +} diff --git a/fluss-metrics/fluss-metrics-influxdb/src/main/java/org/apache/fluss/metrics/influxdb/InfluxdbReporterPlugin.java b/fluss-metrics/fluss-metrics-influxdb/src/main/java/org/apache/fluss/metrics/influxdb/InfluxdbReporterPlugin.java new file mode 100644 index 0000000000..0f502cea0d --- /dev/null +++ b/fluss-metrics/fluss-metrics-influxdb/src/main/java/org/apache/fluss/metrics/influxdb/InfluxdbReporterPlugin.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.metrics.influxdb; + +import org.apache.fluss.config.Configuration; +import org.apache.fluss.metrics.reporter.MetricReporter; +import org.apache.fluss.metrics.reporter.MetricReporterPlugin; +import org.apache.fluss.utils.StringUtils; + +import java.time.Duration; + +import static org.apache.fluss.config.ConfigOptions.METRICS_REPORTER_INFLUXDB_BUCKET; +import static org.apache.fluss.config.ConfigOptions.METRICS_REPORTER_INFLUXDB_HOST_URL; +import static org.apache.fluss.config.ConfigOptions.METRICS_REPORTER_INFLUXDB_ORG; +import static org.apache.fluss.config.ConfigOptions.METRICS_REPORTER_INFLUXDB_PUSH_INTERVAL; +import static org.apache.fluss.config.ConfigOptions.METRICS_REPORTER_INFLUXDB_TOKEN; + +/** {@link MetricReporterPlugin} for {@link InfluxdbReporter}. */ +public class InfluxdbReporterPlugin implements MetricReporterPlugin { + + private static final String PLUGIN_NAME = "influxdb"; + + @Override + public MetricReporter createMetricReporter(Configuration configuration) { + String hostUrl = configuration.getString(METRICS_REPORTER_INFLUXDB_HOST_URL); + String org = configuration.getString(METRICS_REPORTER_INFLUXDB_ORG); + String bucket = configuration.getString(METRICS_REPORTER_INFLUXDB_BUCKET); + String token = configuration.getString(METRICS_REPORTER_INFLUXDB_TOKEN); + Duration pushInterval = configuration.get(METRICS_REPORTER_INFLUXDB_PUSH_INTERVAL); + + if (StringUtils.isNullOrWhitespaceOnly(hostUrl)) { + throw new IllegalArgumentException( + "InfluxDB host URL must be configured via '" + + METRICS_REPORTER_INFLUXDB_HOST_URL.key() + + "'"); + } + if (StringUtils.isNullOrWhitespaceOnly(org)) { + throw new IllegalArgumentException( + "InfluxDB organization must be configured via '" + + METRICS_REPORTER_INFLUXDB_ORG.key() + + "'"); + } + if (StringUtils.isNullOrWhitespaceOnly(bucket)) { + throw new IllegalArgumentException( + "InfluxDB bucket must be configured via '" + + METRICS_REPORTER_INFLUXDB_BUCKET.key() + + "'"); + } + if (StringUtils.isNullOrWhitespaceOnly(token)) { + throw new IllegalArgumentException( + "InfluxDB token must be configured via '" + + METRICS_REPORTER_INFLUXDB_TOKEN.key() + + "'"); + } + + return new InfluxdbReporter(hostUrl, org, bucket, token, pushInterval); + } + + @Override + public String identifier() { + return PLUGIN_NAME; + } +} diff --git a/fluss-metrics/fluss-metrics-influxdb/src/main/resources/META-INF/services/org.apache.fluss.metrics.reporter.MetricReporterPlugin b/fluss-metrics/fluss-metrics-influxdb/src/main/resources/META-INF/services/org.apache.fluss.metrics.reporter.MetricReporterPlugin new file mode 100644 index 0000000000..e2b8a6eae2 --- /dev/null +++ b/fluss-metrics/fluss-metrics-influxdb/src/main/resources/META-INF/services/org.apache.fluss.metrics.reporter.MetricReporterPlugin @@ -0,0 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +org.apache.fluss.metrics.influxdb.InfluxdbReporterPlugin diff --git a/fluss-metrics/fluss-metrics-influxdb/src/test/java/org/apache/fluss/metrics/influxdb/InfluxdbPointProducerTest.java b/fluss-metrics/fluss-metrics-influxdb/src/test/java/org/apache/fluss/metrics/influxdb/InfluxdbPointProducerTest.java new file mode 100644 index 0000000000..9cb999c03b --- /dev/null +++ b/fluss-metrics/fluss-metrics-influxdb/src/test/java/org/apache/fluss/metrics/influxdb/InfluxdbPointProducerTest.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.metrics.influxdb; + +import org.apache.fluss.metrics.Counter; +import org.apache.fluss.metrics.Gauge; +import org.apache.fluss.metrics.SimpleCounter; +import org.apache.fluss.metrics.util.TestHistogram; +import org.apache.fluss.metrics.util.TestMeter; + +import com.influxdb.v3.client.Point; +import org.junit.jupiter.api.Test; + +import java.math.BigInteger; +import java.time.Instant; +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for the {@link InfluxdbPointProducer}. */ +class InfluxdbPointProducerTest { + + private final InfluxdbPointProducer pointProducer = InfluxdbPointProducer.getInstance(); + + @Test + void testCreatePointForCounter() { + Counter counter = new SimpleCounter(); + counter.inc(42L); + + List> tags = new ArrayList<>(); + tags.add(new AbstractMap.SimpleEntry<>("host", "localhost")); + tags.add(new AbstractMap.SimpleEntry<>("server", "test")); + + Instant time = Instant.now(); + Point point = pointProducer.createPoint(counter, "test_counter", tags, time); + + assertThat(point).isNotNull(); + assertThat(point.getMeasurement()).isEqualTo("test_counter"); + assertThat(point.getField("count")).isEqualTo(42L); + assertThat(point.getTimestamp()) + .isEqualTo( + BigInteger.valueOf( + time.getEpochSecond() * 1_000_000_000L + time.getNano())); + } + + @Test + void testCreatePointForGauge() { + List> tags = new ArrayList<>(); + tags.add(new AbstractMap.SimpleEntry<>("host", "localhost")); + Instant time = Instant.now(); + + // Test with Number (Integer) + Gauge intGauge = () -> 123; + Point intPoint = pointProducer.createPoint(intGauge, "test_gauge_int", tags, time); + assertThat(intPoint).isNotNull(); + assertThat(intPoint.getMeasurement()).isEqualTo("test_gauge_int"); + assertThat(intPoint.getField("value")).isEqualTo(123); + assertThat(intPoint.getTimestamp()) + .isEqualTo( + BigInteger.valueOf( + time.getEpochSecond() * 1_000_000_000L + time.getNano())); + + // Test with Number (Double) + Gauge doubleGauge = () -> 123.456; + Point doublePoint = pointProducer.createPoint(doubleGauge, "test_gauge_double", tags, time); + assertThat(doublePoint).isNotNull(); + assertThat(doublePoint.getMeasurement()).isEqualTo("test_gauge_double"); + assertThat(doublePoint.getField("value")).isEqualTo(123.456); + + // Test with Boolean + Gauge boolGauge = () -> true; + Point boolPoint = pointProducer.createPoint(boolGauge, "test_gauge_boolean", tags, time); + assertThat(boolPoint).isNotNull(); + assertThat(boolPoint.getMeasurement()).isEqualTo("test_gauge_boolean"); + assertThat(boolPoint.getField("value")).isEqualTo(true); + + // Test with String + Gauge stringGauge = () -> "test_value"; + Point stringPoint = pointProducer.createPoint(stringGauge, "test_gauge_string", tags, time); + assertThat(stringPoint).isNotNull(); + assertThat(stringPoint.getMeasurement()).isEqualTo("test_gauge_string"); + assertThat(stringPoint.getField("value")).isEqualTo("test_value"); + + // Test with Null + Gauge nullGauge = () -> null; + Point nullPoint = pointProducer.createPoint(nullGauge, "test_gauge_null", tags, time); + assertThat(nullPoint).isNotNull(); + assertThat(nullPoint.getMeasurement()).isEqualTo("test_gauge_null"); + assertThat(nullPoint.getField("value")).isEqualTo("null"); + } + + @Test + void testCreatePointForMeter() { + TestMeter meter = new TestMeter(1000, 5.0); + + List> tags = new ArrayList<>(); + tags.add(new AbstractMap.SimpleEntry<>("host", "localhost")); + + Instant time = Instant.now(); + Point point = pointProducer.createPoint(meter, "test_meter", tags, time); + + assertThat(point).isNotNull(); + assertThat(point.getMeasurement()).isEqualTo("test_meter"); + assertThat(point.getField("rate")).isEqualTo(5.0); + assertThat(point.getField("count")).isEqualTo(1000L); + assertThat(point.getTimestamp()) + .isEqualTo( + BigInteger.valueOf( + time.getEpochSecond() * 1_000_000_000L + time.getNano())); + } + + @Test + void testCreatePointForHistogram() { + TestHistogram histogram = new TestHistogram(); + histogram.setCount(80); + histogram.setMean(50.5); + histogram.setStdDev(10.2); + histogram.setMax(100); + histogram.setMin(1); + + List> tags = new ArrayList<>(); + tags.add(new AbstractMap.SimpleEntry<>("host", "localhost")); + + Instant time = Instant.now(); + Point point = pointProducer.createPoint(histogram, "test_histogram", tags, time); + + assertThat(point).isNotNull(); + assertThat(point.getMeasurement()).isEqualTo("test_histogram"); + assertThat(point.getField("count")).isEqualTo(80L); + assertThat(point.getField("mean")).isEqualTo(50.5); + assertThat(point.getField("stddev")).isEqualTo(10.2); + assertThat(point.getField("max")).isEqualTo(100L); + assertThat(point.getField("min")).isEqualTo(1L); + assertThat(point.getTimestamp()) + .isEqualTo( + BigInteger.valueOf( + time.getEpochSecond() * 1_000_000_000L + time.getNano())); + } +} diff --git a/fluss-metrics/fluss-metrics-influxdb/src/test/java/org/apache/fluss/metrics/influxdb/InfluxdbReporterPluginTest.java b/fluss-metrics/fluss-metrics-influxdb/src/test/java/org/apache/fluss/metrics/influxdb/InfluxdbReporterPluginTest.java new file mode 100644 index 0000000000..4df8f1abf2 --- /dev/null +++ b/fluss-metrics/fluss-metrics-influxdb/src/test/java/org/apache/fluss/metrics/influxdb/InfluxdbReporterPluginTest.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.metrics.influxdb; + +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.metrics.util.MetricReporterTestUtils; + +import org.junit.jupiter.api.Test; + +import java.time.Duration; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for the {@link InfluxdbReporterPlugin}. */ +class InfluxdbReporterPluginTest { + + @Test + void testCreateMetricReporterWithValidConfig() { + Configuration config = new Configuration(); + config.setString(ConfigOptions.METRICS_REPORTER_INFLUXDB_HOST_URL, "http://localhost:8086"); + config.setString(ConfigOptions.METRICS_REPORTER_INFLUXDB_ORG, "test-org"); + config.setString(ConfigOptions.METRICS_REPORTER_INFLUXDB_BUCKET, "test-bucket"); + config.setString(ConfigOptions.METRICS_REPORTER_INFLUXDB_TOKEN, "test-token"); + config.set(ConfigOptions.METRICS_REPORTER_INFLUXDB_PUSH_INTERVAL, Duration.ofSeconds(15)); + + InfluxdbReporterPlugin plugin = new InfluxdbReporterPlugin(); + InfluxdbReporter reporter = (InfluxdbReporter) plugin.createMetricReporter(config); + + try { + assertThat(reporter).isNotNull(); + assertThat(reporter.scheduleInterval()).isEqualTo(Duration.ofSeconds(15)); + } finally { + reporter.close(); + } + } + + @Test + void testMetricReporterSetupViaSPI() { + MetricReporterTestUtils.testMetricReporterSetupViaSPI(InfluxdbReporterPlugin.class); + } +} diff --git a/fluss-metrics/fluss-metrics-influxdb/src/test/java/org/apache/fluss/metrics/influxdb/InfluxdbReporterTest.java b/fluss-metrics/fluss-metrics-influxdb/src/test/java/org/apache/fluss/metrics/influxdb/InfluxdbReporterTest.java new file mode 100644 index 0000000000..450c617f61 --- /dev/null +++ b/fluss-metrics/fluss-metrics-influxdb/src/test/java/org/apache/fluss/metrics/influxdb/InfluxdbReporterTest.java @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.metrics.influxdb; + +import org.apache.fluss.config.Configuration; +import org.apache.fluss.metrics.Counter; +import org.apache.fluss.metrics.Gauge; +import org.apache.fluss.metrics.SimpleCounter; +import org.apache.fluss.metrics.groups.MetricGroup; +import org.apache.fluss.metrics.util.TestHistogram; +import org.apache.fluss.metrics.util.TestMeter; +import org.apache.fluss.metrics.util.TestMetricGroup; + +import com.influxdb.v3.client.InfluxDBClient; +import com.influxdb.v3.client.Point; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; + +import java.lang.reflect.Field; +import java.time.Duration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +/** Tests for the {@link InfluxdbReporter}. */ +class InfluxdbReporterTest { + + private InfluxdbReporter reporter; + private MetricGroup metricGroup; + private InfluxDBClient mockClient; + + @BeforeEach + void setUp() throws Exception { + mockClient = mock(InfluxDBClient.class); + doNothing().when(mockClient).writePoints(any()); + + reporter = + new InfluxdbReporter( + "http://localhost:8086", + "test-org", + "test-bucket", + "test-token", + Duration.ofSeconds(10)) { + @Override + public void open(Configuration config) { + // Override to inject mock client + try { + Field clientField = InfluxdbReporter.class.getDeclaredField("client"); + clientField.setAccessible(true); + clientField.set(this, mockClient); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }; + reporter.open(new Configuration()); + + Map variables = new HashMap<>(); + variables.put("", "localhost"); + variables.put("table", "test_table"); + + metricGroup = + TestMetricGroup.newBuilder() + .setLogicalScopeFunction((characterFilter, character) -> "tabletServer") + .setVariables(variables) + .build(); + } + + @AfterEach + void tearDown() { + if (reporter != null) { + reporter.close(); + } + } + + // -------------------- Tests for notifyOfAddedMetric -------------------- + + @Test + void testNotifyOfAddedMetric() throws Exception { + Counter counter = new SimpleCounter(); + counter.inc(42); + + reporter.notifyOfAddedMetric(counter, "test_counter", metricGroup); + + Map metricNames = getMetricNames(); + assertThat(metricNames).containsKey(counter); + assertThat(metricNames.get(counter)).isEqualTo("fluss_tabletServer_test_counter"); + + Map>> metricTags = + getMetricTags(); + assertThat(metricTags).containsKey(counter); + assertThat(metricTags.get(counter)) + .containsExactlyInAnyOrder( + Map.entry("_host_", "localhost"), Map.entry("table", "test_table")); + } + + @Test + void testNotifyOfAddedMetricWithDifferentTypes() throws Exception { + Counter counter = new SimpleCounter(); + reporter.notifyOfAddedMetric(counter, "counter_type_test", metricGroup); + + Gauge gauge = () -> 123; + reporter.notifyOfAddedMetric(gauge, "gauge_type_test", metricGroup); + + TestMeter meter = new TestMeter(100, 5.0); + reporter.notifyOfAddedMetric(meter, "meter_type_test", metricGroup); + + TestHistogram histogram = new TestHistogram(); + reporter.notifyOfAddedMetric(histogram, "histogram_type_test", metricGroup); + + Map metricNames = getMetricNames(); + assertThat(metricNames).containsKeys(counter, gauge, meter, histogram); + assertThat(metricNames.get(counter)).isEqualTo("fluss_tabletServer_counter_type_test"); + assertThat(metricNames.get(gauge)).isEqualTo("fluss_tabletServer_gauge_type_test"); + assertThat(metricNames.get(meter)).isEqualTo("fluss_tabletServer_meter_type_test"); + assertThat(metricNames.get(histogram)).isEqualTo("fluss_tabletServer_histogram_type_test"); + } + + // -------------------- Tests for notifyOfRemovedMetric -------------------- + + @Test + void testNotifyOfRemovedMetric() throws Exception { + Counter counter = new SimpleCounter(); + reporter.notifyOfAddedMetric(counter, "test_counter", metricGroup); + + Map metricNames = getMetricNames(); + assertThat(metricNames).containsKey(counter); + + reporter.notifyOfRemovedMetric(counter, "test_counter", metricGroup); + + assertThat(metricNames).doesNotContainKey(counter); + + Map>> metricTags = + getMetricTags(); + assertThat(metricTags).doesNotContainKey(counter); + } + + // -------------------- Tests for report -------------------- + + @Test + void testReport() throws Exception { + // Test report with no metrics + reporter.report(); + verify(mockClient, times(1)).writePoints(any()); + + // Test report with Counter + Counter counter = new SimpleCounter(); + counter.inc(42); + reporter.notifyOfAddedMetric(counter, "report_test_counter", metricGroup); + reporter.report(); + ArgumentCaptor> captor = ArgumentCaptor.forClass(List.class); + verify(mockClient, times(2)).writePoints(captor.capture()); + List points = captor.getValue(); + assertThat(points).hasSize(1); + + // Test report with Gauge + Gauge gauge = () -> 123; + reporter.notifyOfAddedMetric(gauge, "report_test_gauge", metricGroup); + reporter.report(); + verify(mockClient, times(3)).writePoints(any()); + + // Test report with Meter + TestMeter meter = new TestMeter(1000, 5.0); + reporter.notifyOfAddedMetric(meter, "report_test_meter", metricGroup); + reporter.report(); + verify(mockClient, times(4)).writePoints(any()); + + // Test report with Histogram + TestHistogram histogram = new TestHistogram(); + histogram.setCount(50); + histogram.setMean(100.0); + reporter.notifyOfAddedMetric(histogram, "report_test_histogram", metricGroup); + reporter.report(); + verify(mockClient, times(5)).writePoints(any()); + + // Test report with multiple metrics + Counter counter2 = new SimpleCounter(); + counter2.inc(10); + reporter.notifyOfAddedMetric(counter2, "report_test_counter2", metricGroup); + + Gauge gauge2 = () -> 3.14; + reporter.notifyOfAddedMetric(gauge2, "report_test_gauge2", metricGroup); + reporter.report(); + verify(mockClient, times(6)).writePoints(any()); + } + + // -------------------- Helper methods -------------------- + + @SuppressWarnings("unchecked") + private Map getMetricNames() throws Exception { + Field field = InfluxdbReporter.class.getDeclaredField("metricNames"); + field.setAccessible(true); + return (Map) field.get(reporter); + } + + @SuppressWarnings("unchecked") + private Map>> getMetricTags() + throws Exception { + Field field = InfluxdbReporter.class.getDeclaredField("metricTags"); + field.setAccessible(true); + return (Map>>) + field.get(reporter); + } +} diff --git a/fluss-metrics/fluss-metrics-influxdb/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension b/fluss-metrics/fluss-metrics-influxdb/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension new file mode 100644 index 0000000000..ca0e907f6d --- /dev/null +++ b/fluss-metrics/fluss-metrics-influxdb/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +org.apache.fluss.testutils.common.TestLoggerExtension \ No newline at end of file diff --git a/fluss-metrics/fluss-metrics-influxdb/src/test/resources/log4j2-test.properties b/fluss-metrics/fluss-metrics-influxdb/src/test/resources/log4j2-test.properties new file mode 100644 index 0000000000..460cb8070a --- /dev/null +++ b/fluss-metrics/fluss-metrics-influxdb/src/test/resources/log4j2-test.properties @@ -0,0 +1,28 @@ +w# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Set root logger level to OFF to not flood build logs +# set manually to INFO for debugging purposes +rootLogger.level = OFF +rootLogger.appenderRef.test.ref = TestLogger + +appender.testlogger.name = TestLogger +appender.testlogger.type = CONSOLE +appender.testlogger.target = SYSTEM_ERR +appender.testlogger.layout.type = PatternLayout +appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n diff --git a/fluss-metrics/pom.xml b/fluss-metrics/pom.xml index 9e5bc44871..7a7f7a3762 100644 --- a/fluss-metrics/pom.xml +++ b/fluss-metrics/pom.xml @@ -34,6 +34,7 @@ fluss-metrics-prometheus fluss-metrics-jmx + fluss-metrics-influxdb ../fluss-lake/fluss-lake-paimon/target/fluss-lake-paimon-${project.version}.jar diff --git a/fluss-metrics/fluss-metrics-influxdb/pom.xml b/fluss-metrics/fluss-metrics-influxdb/pom.xml index eef4ed8c34..89f39d4208 100644 --- a/fluss-metrics/fluss-metrics-influxdb/pom.xml +++ b/fluss-metrics/fluss-metrics-influxdb/pom.xml @@ -45,7 +45,6 @@ com.influxdb influxdb3-java ${influxdb3.version} - provided @@ -53,6 +52,7 @@ org.apache.fluss fluss-test-utils + org.apache.fluss fluss-common @@ -60,6 +60,13 @@ test test-jar + + + com.github.tomakehurst + wiremock-jre8 + 2.32.0 + test + @@ -74,6 +81,74 @@ ${skip.on.java8} + + org.apache.maven.plugins + maven-shade-plugin + + + shade-fluss + package + + shade + + + + + com.influxdb:* + org.apache.arrow:* + io.grpc:* + io.netty:* + com.google.protobuf:* + com.google.guava:* + com.fasterxml.jackson.core:* + com.google.flatbuffers:* + io.perfmark:* + com.google.code.findbugs:* + org.codehaus.mojo:* + + + + + com.influxdb + org.apache.fluss.metrics.influxdb.shaded.com.influxdb + + + org.apache.arrow + org.apache.fluss.metrics.influxdb.shaded.org.apache.arrow + + + io.grpc + org.apache.fluss.metrics.influxdb.shaded.io.grpc + + + io.netty + org.apache.fluss.metrics.influxdb.shaded.io.netty + + + com.google.protobuf + org.apache.fluss.metrics.influxdb.shaded.com.google.protobuf + + + com.google.common + org.apache.fluss.metrics.influxdb.shaded.com.google.common + + + com.fasterxml.jackson + org.apache.fluss.metrics.influxdb.shaded.com.fasterxml.jackson + + + com.google.flatbuffers + org.apache.fluss.metrics.influxdb.shaded.com.google.flatbuffers + + + org.codehaus.mojo.annotations + org.apache.fluss.metrics.influxdb.shaded.org.codehaus.mojo.annotations + + + + + + diff --git a/fluss-metrics/fluss-metrics-influxdb/src/main/java/org/apache/fluss/metrics/influxdb/InfluxdbPointProducer.java b/fluss-metrics/fluss-metrics-influxdb/src/main/java/org/apache/fluss/metrics/influxdb/InfluxdbPointProducer.java index 04ee1f30ed..5ae2b1ebb2 100644 --- a/fluss-metrics/fluss-metrics-influxdb/src/main/java/org/apache/fluss/metrics/influxdb/InfluxdbPointProducer.java +++ b/fluss-metrics/fluss-metrics-influxdb/src/main/java/org/apache/fluss/metrics/influxdb/InfluxdbPointProducer.java @@ -44,8 +44,10 @@ public Point createPoint( Metric metric, String metricName, List> tags, Instant time) { Point point = Point.measurement(metricName).setTimestamp(time); - for (Map.Entry tag : tags) { - point.setTag(tag.getKey(), tag.getValue()); + if (tags != null) { + for (Map.Entry tag : tags) { + point.setTag(tag.getKey(), tag.getValue()); + } } if (metric instanceof Counter) { @@ -75,9 +77,9 @@ private Point createPointForGauge(Gauge gauge, Point point) { Object value = gauge.getValue(); if (value instanceof Number) { - return point.setField("value", ((Number) value)); + return point.setField("value", (Number) value); } else if (value instanceof Boolean) { - return point.setField("value", ((boolean) value)); + return point.setField("value", ((Boolean) value).booleanValue()); } else { return point.setField("value", String.valueOf(value)); } diff --git a/fluss-metrics/fluss-metrics-influxdb/src/main/java/org/apache/fluss/metrics/influxdb/InfluxdbReporter.java b/fluss-metrics/fluss-metrics-influxdb/src/main/java/org/apache/fluss/metrics/influxdb/InfluxdbReporter.java index ba43551f42..0781793f6d 100644 --- a/fluss-metrics/fluss-metrics-influxdb/src/main/java/org/apache/fluss/metrics/influxdb/InfluxdbReporter.java +++ b/fluss-metrics/fluss-metrics-influxdb/src/main/java/org/apache/fluss/metrics/influxdb/InfluxdbReporter.java @@ -23,6 +23,7 @@ import org.apache.fluss.metrics.groups.MetricGroup; import org.apache.fluss.metrics.reporter.ScheduledMetricReporter; import org.apache.fluss.utils.MapUtils; +import org.apache.fluss.utils.StringUtils; import com.influxdb.v3.client.InfluxDBClient; import com.influxdb.v3.client.Point; @@ -32,10 +33,12 @@ import java.time.Duration; import java.time.Instant; +import java.util.AbstractMap; import java.util.ArrayList; -import java.util.HashMap; +import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.regex.Pattern; /** {@link ScheduledMetricReporter} that exports {@link Metric Metrics} via InfluxDB. */ public class InfluxdbReporter implements ScheduledMetricReporter { @@ -44,22 +47,24 @@ public class InfluxdbReporter implements ScheduledMetricReporter { private static final char SCOPE_SEPARATOR = '_'; private static final String SCOPE_PREFIX = "fluss" + SCOPE_SEPARATOR; + private static final Pattern INVALID_CHAR_PATTERN = Pattern.compile("[^a-zA-Z0-9_:]"); - private final Map metricNames; - private final Map>> metricTags; + final Map metricNames; + final Map>> metricTags; private final InfluxDBClient client; private final Duration pushInterval; private final InfluxdbPointProducer pointProducer; public InfluxdbReporter( String hostUrl, String org, String bucket, String token, Duration pushInterval) { - ClientConfig clientConfig = - new ClientConfig.Builder() - .host(hostUrl) - .token(token.toCharArray()) - .organization(org) - .database(bucket) - .build(); + ClientConfig.Builder clientConfigBuilder = + new ClientConfig.Builder().host(hostUrl).token(token.toCharArray()); + + if (!StringUtils.isNullOrWhitespaceOnly(org)) { + clientConfigBuilder.organization(org); + } + + ClientConfig clientConfig = clientConfigBuilder.database(bucket).build(); this.client = InfluxDBClient.getInstance(clientConfig); this.pushInterval = pushInterval; @@ -94,7 +99,8 @@ public void report() { for (Map.Entry entry : metricNames.entrySet()) { Metric metric = entry.getKey(); String metricName = entry.getValue(); - List> tags = metricTags.get(metric); + List> tags = + metricTags.getOrDefault(metric, Collections.emptyList()); try { Point point = pointProducer.createPoint(metric, metricName, tags, now); @@ -104,7 +110,13 @@ public void report() { } } - client.writePoints(points); + if (!points.isEmpty()) { + try { + client.writePoints(points); + } catch (Exception e) { + LOG.warn("Failed to write points to InfluxDB", e); + } + } } @Override @@ -138,13 +150,13 @@ private List> getTags(MetricGroup group) { List> tags = new ArrayList<>(); for (Map.Entry entry : group.getAllVariables().entrySet()) { tags.add( - new HashMap.SimpleEntry<>( + new AbstractMap.SimpleEntry<>( filterCharacters(entry.getKey()), filterCharacters(entry.getValue()))); } return tags; } private String filterCharacters(String input) { - return input.replaceAll("[^a-zA-Z0-9_:]", String.valueOf(SCOPE_SEPARATOR)); + return INVALID_CHAR_PATTERN.matcher(input).replaceAll(String.valueOf(SCOPE_SEPARATOR)); } } diff --git a/fluss-metrics/fluss-metrics-influxdb/src/main/java/org/apache/fluss/metrics/influxdb/InfluxdbReporterPlugin.java b/fluss-metrics/fluss-metrics-influxdb/src/main/java/org/apache/fluss/metrics/influxdb/InfluxdbReporterPlugin.java index 0f502cea0d..eccc9f95a7 100644 --- a/fluss-metrics/fluss-metrics-influxdb/src/main/java/org/apache/fluss/metrics/influxdb/InfluxdbReporterPlugin.java +++ b/fluss-metrics/fluss-metrics-influxdb/src/main/java/org/apache/fluss/metrics/influxdb/InfluxdbReporterPlugin.java @@ -30,6 +30,7 @@ import static org.apache.fluss.config.ConfigOptions.METRICS_REPORTER_INFLUXDB_ORG; import static org.apache.fluss.config.ConfigOptions.METRICS_REPORTER_INFLUXDB_PUSH_INTERVAL; import static org.apache.fluss.config.ConfigOptions.METRICS_REPORTER_INFLUXDB_TOKEN; +import static org.apache.fluss.config.ConfigOptions.METRICS_REPORTER_INFLUXDB_VERSION; /** {@link MetricReporterPlugin} for {@link InfluxdbReporter}. */ public class InfluxdbReporterPlugin implements MetricReporterPlugin { @@ -38,6 +39,7 @@ public class InfluxdbReporterPlugin implements MetricReporterPlugin { @Override public MetricReporter createMetricReporter(Configuration configuration) { + String version = configuration.getString(METRICS_REPORTER_INFLUXDB_VERSION); String hostUrl = configuration.getString(METRICS_REPORTER_INFLUXDB_HOST_URL); String org = configuration.getString(METRICS_REPORTER_INFLUXDB_ORG); String bucket = configuration.getString(METRICS_REPORTER_INFLUXDB_BUCKET); @@ -50,11 +52,11 @@ public MetricReporter createMetricReporter(Configuration configuration) { + METRICS_REPORTER_INFLUXDB_HOST_URL.key() + "'"); } - if (StringUtils.isNullOrWhitespaceOnly(org)) { + if ("v2".equalsIgnoreCase(version) && StringUtils.isNullOrWhitespaceOnly(org)) { throw new IllegalArgumentException( "InfluxDB organization must be configured via '" + METRICS_REPORTER_INFLUXDB_ORG.key() - + "'"); + + "' when using InfluxDB v2"); } if (StringUtils.isNullOrWhitespaceOnly(bucket)) { throw new IllegalArgumentException( diff --git a/fluss-metrics/fluss-metrics-influxdb/src/test/java/org/apache/fluss/metrics/influxdb/InfluxdbReporterTest.java b/fluss-metrics/fluss-metrics-influxdb/src/test/java/org/apache/fluss/metrics/influxdb/InfluxdbReporterTest.java index 450c617f61..ebcf2dc908 100644 --- a/fluss-metrics/fluss-metrics-influxdb/src/test/java/org/apache/fluss/metrics/influxdb/InfluxdbReporterTest.java +++ b/fluss-metrics/fluss-metrics-influxdb/src/test/java/org/apache/fluss/metrics/influxdb/InfluxdbReporterTest.java @@ -1,12 +1,13 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -19,68 +20,39 @@ import org.apache.fluss.config.Configuration; import org.apache.fluss.metrics.Counter; -import org.apache.fluss.metrics.Gauge; import org.apache.fluss.metrics.SimpleCounter; import org.apache.fluss.metrics.groups.MetricGroup; -import org.apache.fluss.metrics.util.TestHistogram; -import org.apache.fluss.metrics.util.TestMeter; import org.apache.fluss.metrics.util.TestMetricGroup; -import com.influxdb.v3.client.InfluxDBClient; -import com.influxdb.v3.client.Point; +import com.github.tomakehurst.wiremock.WireMockServer; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.mockito.ArgumentCaptor; -import java.lang.reflect.Field; import java.time.Duration; import java.util.HashMap; import java.util.List; import java.util.Map; +import static com.github.tomakehurst.wiremock.client.WireMock.aResponse; +import static com.github.tomakehurst.wiremock.client.WireMock.containing; +import static com.github.tomakehurst.wiremock.client.WireMock.post; +import static com.github.tomakehurst.wiremock.client.WireMock.postRequestedFor; +import static com.github.tomakehurst.wiremock.client.WireMock.urlPathEqualTo; +import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig; import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.doNothing; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; /** Tests for the {@link InfluxdbReporter}. */ class InfluxdbReporterTest { - private InfluxdbReporter reporter; - private MetricGroup metricGroup; - private InfluxDBClient mockClient; + private static final String METRIC_HOSTNAME = "localhost"; - @BeforeEach - void setUp() throws Exception { - mockClient = mock(InfluxDBClient.class); - doNothing().when(mockClient).writePoints(any()); + private static final Map variables; + private static final MetricGroup metricGroup; - reporter = - new InfluxdbReporter( - "http://localhost:8086", - "test-org", - "test-bucket", - "test-token", - Duration.ofSeconds(10)) { - @Override - public void open(Configuration config) { - // Override to inject mock client - try { - Field clientField = InfluxdbReporter.class.getDeclaredField("client"); - clientField.setAccessible(true); - clientField.set(this, mockClient); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - }; - reporter.open(new Configuration()); - - Map variables = new HashMap<>(); - variables.put("", "localhost"); + static { + variables = new HashMap<>(); + variables.put("", METRIC_HOSTNAME); variables.put("table", "test_table"); metricGroup = @@ -90,139 +62,74 @@ public void open(Configuration config) { .build(); } + private WireMockServer wireMockServer; + private InfluxdbReporter reporter; + + @BeforeEach + void setUp() { + wireMockServer = new WireMockServer(wireMockConfig().dynamicPort()); + wireMockServer.start(); + } + @AfterEach void tearDown() { if (reporter != null) { reporter.close(); } + if (wireMockServer != null) { + wireMockServer.stop(); + } } - // -------------------- Tests for notifyOfAddedMetric -------------------- - @Test - void testNotifyOfAddedMetric() throws Exception { - Counter counter = new SimpleCounter(); - counter.inc(42); - - reporter.notifyOfAddedMetric(counter, "test_counter", metricGroup); - - Map metricNames = getMetricNames(); - assertThat(metricNames).containsKey(counter); - assertThat(metricNames.get(counter)).isEqualTo("fluss_tabletServer_test_counter"); + void testMetricRegistration() { + reporter = createReporter(); - Map>> metricTags = - getMetricTags(); - assertThat(metricTags).containsKey(counter); - assertThat(metricTags.get(counter)) - .containsExactlyInAnyOrder( - Map.entry("_host_", "localhost"), Map.entry("table", "test_table")); - } - - @Test - void testNotifyOfAddedMetricWithDifferentTypes() throws Exception { + String metricName = "TestCounter"; Counter counter = new SimpleCounter(); - reporter.notifyOfAddedMetric(counter, "counter_type_test", metricGroup); - - Gauge gauge = () -> 123; - reporter.notifyOfAddedMetric(gauge, "gauge_type_test", metricGroup); - - TestMeter meter = new TestMeter(100, 5.0); - reporter.notifyOfAddedMetric(meter, "meter_type_test", metricGroup); - - TestHistogram histogram = new TestHistogram(); - reporter.notifyOfAddedMetric(histogram, "histogram_type_test", metricGroup); - - Map metricNames = getMetricNames(); - assertThat(metricNames).containsKeys(counter, gauge, meter, histogram); - assertThat(metricNames.get(counter)).isEqualTo("fluss_tabletServer_counter_type_test"); - assertThat(metricNames.get(gauge)).isEqualTo("fluss_tabletServer_gauge_type_test"); - assertThat(metricNames.get(meter)).isEqualTo("fluss_tabletServer_meter_type_test"); - assertThat(metricNames.get(histogram)).isEqualTo("fluss_tabletServer_histogram_type_test"); + reporter.notifyOfAddedMetric(counter, metricName, metricGroup); + + String name = reporter.metricNames.get(counter); + assertThat(name).isNotNull(); + assertThat(name).isEqualTo("fluss_tabletServer_" + metricName); + + List> tags = reporter.metricTags.get(counter); + assertThat(tags).isNotNull(); + assertThat(tags) + .anyMatch(e -> e.getKey().equals("_host_") && e.getValue().equals(METRIC_HOSTNAME)); + assertThat(tags) + .anyMatch(e -> e.getKey().equals("table") && e.getValue().equals("test_table")); } - // -------------------- Tests for notifyOfRemovedMetric -------------------- - @Test - void testNotifyOfRemovedMetric() throws Exception { - Counter counter = new SimpleCounter(); - reporter.notifyOfAddedMetric(counter, "test_counter", metricGroup); - - Map metricNames = getMetricNames(); - assertThat(metricNames).containsKey(counter); + void testMetricReporting() { + wireMockServer.stubFor( + post(urlPathEqualTo("/api/v2/write")).willReturn(aResponse().withStatus(200))); - reporter.notifyOfRemovedMetric(counter, "test_counter", metricGroup); - - assertThat(metricNames).doesNotContainKey(counter); - - Map>> metricTags = - getMetricTags(); - assertThat(metricTags).doesNotContainKey(counter); - } + reporter = createReporter(); - // -------------------- Tests for report -------------------- - - @Test - void testReport() throws Exception { - // Test report with no metrics - reporter.report(); - verify(mockClient, times(1)).writePoints(any()); - - // Test report with Counter + String metricName = "TestCounter"; Counter counter = new SimpleCounter(); + reporter.notifyOfAddedMetric(counter, metricName, metricGroup); counter.inc(42); - reporter.notifyOfAddedMetric(counter, "report_test_counter", metricGroup); - reporter.report(); - ArgumentCaptor> captor = ArgumentCaptor.forClass(List.class); - verify(mockClient, times(2)).writePoints(captor.capture()); - List points = captor.getValue(); - assertThat(points).hasSize(1); - - // Test report with Gauge - Gauge gauge = () -> 123; - reporter.notifyOfAddedMetric(gauge, "report_test_gauge", metricGroup); - reporter.report(); - verify(mockClient, times(3)).writePoints(any()); - // Test report with Meter - TestMeter meter = new TestMeter(1000, 5.0); - reporter.notifyOfAddedMetric(meter, "report_test_meter", metricGroup); reporter.report(); - verify(mockClient, times(4)).writePoints(any()); - // Test report with Histogram - TestHistogram histogram = new TestHistogram(); - histogram.setCount(50); - histogram.setMean(100.0); - reporter.notifyOfAddedMetric(histogram, "report_test_histogram", metricGroup); - reporter.report(); - verify(mockClient, times(5)).writePoints(any()); - - // Test report with multiple metrics - Counter counter2 = new SimpleCounter(); - counter2.inc(10); - reporter.notifyOfAddedMetric(counter2, "report_test_counter2", metricGroup); - - Gauge gauge2 = () -> 3.14; - reporter.notifyOfAddedMetric(gauge2, "report_test_gauge2", metricGroup); - reporter.report(); - verify(mockClient, times(6)).writePoints(any()); - } - - // -------------------- Helper methods -------------------- - - @SuppressWarnings("unchecked") - private Map getMetricNames() throws Exception { - Field field = InfluxdbReporter.class.getDeclaredField("metricNames"); - field.setAccessible(true); - return (Map) field.get(reporter); + wireMockServer.verify( + postRequestedFor(urlPathEqualTo("/api/v2/write")) + .withRequestBody(containing("fluss_tabletServer_" + metricName)) + .withRequestBody(containing("count=42i"))); } - @SuppressWarnings("unchecked") - private Map>> getMetricTags() - throws Exception { - Field field = InfluxdbReporter.class.getDeclaredField("metricTags"); - field.setAccessible(true); - return (Map>>) - field.get(reporter); + private InfluxdbReporter createReporter() { + InfluxdbReporter reporter = + new InfluxdbReporter( + wireMockServer.baseUrl(), + "test-org", + "test-bucket", + "test-token", + Duration.ofSeconds(10)); + reporter.open(new Configuration()); + return reporter; } } diff --git a/fluss-metrics/fluss-metrics-influxdb/src/test/resources/log4j2-test.properties b/fluss-metrics/fluss-metrics-influxdb/src/test/resources/log4j2-test.properties index 460cb8070a..288e00f662 100644 --- a/fluss-metrics/fluss-metrics-influxdb/src/test/resources/log4j2-test.properties +++ b/fluss-metrics/fluss-metrics-influxdb/src/test/resources/log4j2-test.properties @@ -1,4 +1,4 @@ -w# +# # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information diff --git a/website/docs/maintenance/observability/metric-reporters.md b/website/docs/maintenance/observability/metric-reporters.md index 43ce9473b0..4afdca5b44 100644 --- a/website/docs/maintenance/observability/metric-reporters.md +++ b/website/docs/maintenance/observability/metric-reporters.md @@ -105,3 +105,40 @@ metrics.reporter.prometheus-push.random-job-name-suffix: true metrics.reporter.prometheus-push.delete-on-shutdown: true metrics.reporter.prometheus-push.grouping-key: instance=instance01;cluster=clusterA ``` + +### InfluxDB + +Type: push + +InfluxDB reporter supports both InfluxDB v2 and InfluxDB v3. The default version is v3. + +Parameters: + +- `metrics.reporter.influxdb.version` - (Optional) The InfluxDB version to connect to, defaults to `v3`. Supported values are `v2` and `v3`. +- `metrics.reporter.influxdb.host-url` - The InfluxDB server host URL including scheme, host name, and port. +- `metrics.reporter.influxdb.bucket` - The InfluxDB bucket/database name. +- `metrics.reporter.influxdb.org` - The InfluxDB organization name. Required for InfluxDB v2, not needed for InfluxDB v3. +- `metrics.reporter.influxdb.token` - The InfluxDB authentication token. +- `metrics.reporter.influxdb.push-interval` - (Optional) The interval of reporting metrics to InfluxDB, defaults to 10 SECONDS. + +Example configuration for InfluxDB v3 (default): + +```yaml +metrics.reporters: influxdb +metrics.reporter.influxdb.host-url: http://localhost:8181 +metrics.reporter.influxdb.bucket: fluss_metrics +metrics.reporter.influxdb.token: your-influxdb-token +metrics.reporter.influxdb.push-interval: 10 SECONDS +``` + +Example configuration for InfluxDB v2: + +```yaml +metrics.reporters: influxdb +metrics.reporter.influxdb.version: v2 +metrics.reporter.influxdb.host-url: http://localhost:8086 +metrics.reporter.influxdb.bucket: fluss_metrics +metrics.reporter.influxdb.org: fluss_org +metrics.reporter.influxdb.token: your-influxdb-token +metrics.reporter.influxdb.push-interval: 10 SECONDS +``` From 27f87a58674b8bd0fb8b271df0ab026de68775b5 Mon Sep 17 00:00:00 2001 From: zhuyufeng0809 <1547107965@qq.com> Date: Mon, 4 May 2026 15:25:06 +0800 Subject: [PATCH 3/3] [metric] Support influxdb reporter --- fluss-metrics/fluss-metrics-influxdb/pom.xml | 2 +- .../org/apache/fluss/metrics/influxdb/InfluxdbReporter.java | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/fluss-metrics/fluss-metrics-influxdb/pom.xml b/fluss-metrics/fluss-metrics-influxdb/pom.xml index 89f39d4208..a0e97032d2 100644 --- a/fluss-metrics/fluss-metrics-influxdb/pom.xml +++ b/fluss-metrics/fluss-metrics-influxdb/pom.xml @@ -23,7 +23,7 @@ org.apache.fluss fluss-metrics - 0.10-SNAPSHOT + 1.0-SNAPSHOT fluss-metrics-influxdb diff --git a/fluss-metrics/fluss-metrics-influxdb/src/main/java/org/apache/fluss/metrics/influxdb/InfluxdbReporter.java b/fluss-metrics/fluss-metrics-influxdb/src/main/java/org/apache/fluss/metrics/influxdb/InfluxdbReporter.java index 0781793f6d..aa067bd18d 100644 --- a/fluss-metrics/fluss-metrics-influxdb/src/main/java/org/apache/fluss/metrics/influxdb/InfluxdbReporter.java +++ b/fluss-metrics/fluss-metrics-influxdb/src/main/java/org/apache/fluss/metrics/influxdb/InfluxdbReporter.java @@ -22,7 +22,6 @@ import org.apache.fluss.metrics.Metric; import org.apache.fluss.metrics.groups.MetricGroup; import org.apache.fluss.metrics.reporter.ScheduledMetricReporter; -import org.apache.fluss.utils.MapUtils; import org.apache.fluss.utils.StringUtils; import com.influxdb.v3.client.InfluxDBClient; @@ -38,6 +37,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.regex.Pattern; /** {@link ScheduledMetricReporter} that exports {@link Metric Metrics} via InfluxDB. */ @@ -68,8 +68,8 @@ public InfluxdbReporter( this.client = InfluxDBClient.getInstance(clientConfig); this.pushInterval = pushInterval; - this.metricNames = MapUtils.newConcurrentHashMap(); - this.metricTags = MapUtils.newConcurrentHashMap(); + this.metricNames = new ConcurrentHashMap<>(); + this.metricTags = new ConcurrentHashMap<>(); this.pointProducer = InfluxdbPointProducer.getInstance(); LOG.info("Started InfluxDB reporter connecting to {}", hostUrl);