diff --git a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java index df21129db6..0b1f1b98c2 100644 --- a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java +++ b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java @@ -2027,6 +2027,50 @@ public class ConfigOptions { + "the CoordinatorServer) it is advisable to use a port range " + "like 9990-9999."); + // ------------------------------------------------------------------------ + // ConfigOptions for influxdb reporter + // ------------------------------------------------------------------------ + public static final ConfigOption METRICS_REPORTER_INFLUXDB_VERSION = + key("metrics.reporter.influxdb.version") + .stringType() + .defaultValue("v3") + .withDescription( + "The InfluxDB version to connect to. Supported values are 'v2' and 'v3'. " + + "For InfluxDB v2, the 'org' configuration is required; for InfluxDB v3, it is not needed."); + + public static final ConfigOption METRICS_REPORTER_INFLUXDB_HOST_URL = + key("metrics.reporter.influxdb.host-url") + .stringType() + .noDefaultValue() + .withDescription( + "The InfluxDB server host URL including scheme, host name, and port."); + + public static final ConfigOption METRICS_REPORTER_INFLUXDB_BUCKET = + key("metrics.reporter.influxdb.bucket") + .stringType() + .noDefaultValue() + .withFallbackKeys("metrics.reporter.influxdb.database") + .withDescription("The InfluxDB bucket/database name."); + + public static final ConfigOption METRICS_REPORTER_INFLUXDB_ORG = + key("metrics.reporter.influxdb.org") + .stringType() + .noDefaultValue() + .withDescription( + "The InfluxDB organization name. Required for InfluxDB v2, not needed for InfluxDB v3."); + + public static final ConfigOption METRICS_REPORTER_INFLUXDB_TOKEN = + key("metrics.reporter.influxdb.token") + .stringType() + .noDefaultValue() + .withDescription("The InfluxDB authentication token."); + + public static final ConfigOption METRICS_REPORTER_INFLUXDB_PUSH_INTERVAL = + key("metrics.reporter.influxdb.push-interval") + .durationType() + .defaultValue(Duration.ofSeconds(10)) + .withDescription("The interval of reporting metrics to InfluxDB."); + // ------------------------------------------------------------------------ // ConfigOptions for lakehouse storage // ------------------------------------------------------------------------ diff --git a/fluss-dist/pom.xml b/fluss-dist/pom.xml index 5c8a00dea5..1c7c96531c 100644 --- a/fluss-dist/pom.xml +++ b/fluss-dist/pom.xml @@ -76,6 +76,13 @@ provided + + org.apache.fluss + fluss-metrics-influxdb + ${project.version} + provided + + org.apache.fluss fluss-lake-paimon diff --git a/fluss-dist/src/main/assemblies/plugins.xml b/fluss-dist/src/main/assemblies/plugins.xml index 8eb05e8a07..dd62655926 100644 --- a/fluss-dist/src/main/assemblies/plugins.xml +++ b/fluss-dist/src/main/assemblies/plugins.xml @@ -80,6 +80,13 @@ 0644 + + ../fluss-metrics/fluss-metrics-influxdb/target/fluss-metrics-influxdb-${project.version}.jar + plugins/influxdb/ + fluss-metrics-influxdb-${project.version}.jar + 0644 + + ../fluss-lake/fluss-lake-paimon/target/fluss-lake-paimon-${project.version}.jar diff --git a/fluss-metrics/fluss-metrics-influxdb/pom.xml b/fluss-metrics/fluss-metrics-influxdb/pom.xml new file mode 100644 index 0000000000..a0e97032d2 --- /dev/null +++ b/fluss-metrics/fluss-metrics-influxdb/pom.xml @@ -0,0 +1,154 @@ + + + + + 4.0.0 + + org.apache.fluss + fluss-metrics + 1.0-SNAPSHOT + + + fluss-metrics-influxdb + Fluss : Metrics : InfluxDB + + + 1.8.0 + + + + + org.apache.fluss + fluss-common + ${project.version} + provided + + + + com.influxdb + influxdb3-java + ${influxdb3.version} + + + + + org.apache.fluss + fluss-test-utils + + + + org.apache.fluss + fluss-common + ${project.version} + test + test-jar + + + + com.github.tomakehurst + wiremock-jre8 + 2.32.0 + test + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + + ${skip.on.java8} + + ${skip.on.java8} + + + + org.apache.maven.plugins + maven-shade-plugin + + + shade-fluss + package + + shade + + + + + com.influxdb:* + org.apache.arrow:* + io.grpc:* + io.netty:* + com.google.protobuf:* + com.google.guava:* + com.fasterxml.jackson.core:* + com.google.flatbuffers:* + io.perfmark:* + com.google.code.findbugs:* + org.codehaus.mojo:* + + + + + com.influxdb + org.apache.fluss.metrics.influxdb.shaded.com.influxdb + + + org.apache.arrow + org.apache.fluss.metrics.influxdb.shaded.org.apache.arrow + + + io.grpc + org.apache.fluss.metrics.influxdb.shaded.io.grpc + + + io.netty + org.apache.fluss.metrics.influxdb.shaded.io.netty + + + com.google.protobuf + org.apache.fluss.metrics.influxdb.shaded.com.google.protobuf + + + com.google.common + org.apache.fluss.metrics.influxdb.shaded.com.google.common + + + com.fasterxml.jackson + org.apache.fluss.metrics.influxdb.shaded.com.fasterxml.jackson + + + com.google.flatbuffers + org.apache.fluss.metrics.influxdb.shaded.com.google.flatbuffers + + + org.codehaus.mojo.annotations + org.apache.fluss.metrics.influxdb.shaded.org.codehaus.mojo.annotations + + + + + + + + + diff --git a/fluss-metrics/fluss-metrics-influxdb/src/main/java/org/apache/fluss/metrics/influxdb/InfluxdbPointProducer.java b/fluss-metrics/fluss-metrics-influxdb/src/main/java/org/apache/fluss/metrics/influxdb/InfluxdbPointProducer.java new file mode 100644 index 0000000000..5ae2b1ebb2 --- /dev/null +++ b/fluss-metrics/fluss-metrics-influxdb/src/main/java/org/apache/fluss/metrics/influxdb/InfluxdbPointProducer.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.metrics.influxdb; + +import org.apache.fluss.metrics.Counter; +import org.apache.fluss.metrics.Gauge; +import org.apache.fluss.metrics.Histogram; +import org.apache.fluss.metrics.HistogramStatistics; +import org.apache.fluss.metrics.Meter; +import org.apache.fluss.metrics.Metric; + +import com.influxdb.v3.client.Point; + +import java.time.Instant; +import java.util.List; +import java.util.Map; + +/** Producer that creates InfluxDB {@link Point Points} from Fluss {@link Metric Metrics}. */ +public class InfluxdbPointProducer { + + private static final InfluxdbPointProducer INSTANCE = new InfluxdbPointProducer(); + + public static InfluxdbPointProducer getInstance() { + return INSTANCE; + } + + public Point createPoint( + Metric metric, String metricName, List> tags, Instant time) { + Point point = Point.measurement(metricName).setTimestamp(time); + + if (tags != null) { + for (Map.Entry tag : tags) { + point.setTag(tag.getKey(), tag.getValue()); + } + } + + if (metric instanceof Counter) { + return createPointForCounter((Counter) metric, point); + } + + if (metric instanceof Gauge) { + return createPointForGauge((Gauge) metric, point); + } + + if (metric instanceof Meter) { + return createPointForMeter((Meter) metric, point); + } + + if (metric instanceof Histogram) { + return createPointForHistogram((Histogram) metric, point); + } + + throw new IllegalArgumentException("Unknown metric type: " + metric.getClass()); + } + + private Point createPointForCounter(Counter counter, Point point) { + return point.setField("count", counter.getCount()); + } + + private Point createPointForGauge(Gauge gauge, Point point) { + Object value = gauge.getValue(); + + if (value instanceof Number) { + return point.setField("value", (Number) value); + } else if (value instanceof Boolean) { + return point.setField("value", ((Boolean) value).booleanValue()); + } else { + return point.setField("value", String.valueOf(value)); + } + } + + private Point createPointForMeter(Meter meter, Point point) { + return point.setField("rate", meter.getRate()).setField("count", meter.getCount()); + } + + private Point createPointForHistogram(Histogram histogram, Point point) { + HistogramStatistics stats = histogram.getStatistics(); + return point.setField("count", histogram.getCount()) + .setField("mean", stats.getMean()) + .setField("stddev", stats.getStdDev()) + .setField("min", stats.getMin()) + .setField("max", stats.getMax()) + .setField("p50", stats.getQuantile(0.5)) + .setField("p75", stats.getQuantile(0.75)) + .setField("p95", stats.getQuantile(0.95)) + .setField("p98", stats.getQuantile(0.98)) + .setField("p99", stats.getQuantile(0.99)) + .setField("p999", stats.getQuantile(0.999)); + } +} diff --git a/fluss-metrics/fluss-metrics-influxdb/src/main/java/org/apache/fluss/metrics/influxdb/InfluxdbReporter.java b/fluss-metrics/fluss-metrics-influxdb/src/main/java/org/apache/fluss/metrics/influxdb/InfluxdbReporter.java new file mode 100644 index 0000000000..aa067bd18d --- /dev/null +++ b/fluss-metrics/fluss-metrics-influxdb/src/main/java/org/apache/fluss/metrics/influxdb/InfluxdbReporter.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.metrics.influxdb; + +import org.apache.fluss.config.Configuration; +import org.apache.fluss.metrics.Metric; +import org.apache.fluss.metrics.groups.MetricGroup; +import org.apache.fluss.metrics.reporter.ScheduledMetricReporter; +import org.apache.fluss.utils.StringUtils; + +import com.influxdb.v3.client.InfluxDBClient; +import com.influxdb.v3.client.Point; +import com.influxdb.v3.client.config.ClientConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.time.Instant; +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.regex.Pattern; + +/** {@link ScheduledMetricReporter} that exports {@link Metric Metrics} via InfluxDB. */ +public class InfluxdbReporter implements ScheduledMetricReporter { + + private static final Logger LOG = LoggerFactory.getLogger(InfluxdbReporter.class); + + private static final char SCOPE_SEPARATOR = '_'; + private static final String SCOPE_PREFIX = "fluss" + SCOPE_SEPARATOR; + private static final Pattern INVALID_CHAR_PATTERN = Pattern.compile("[^a-zA-Z0-9_:]"); + + final Map metricNames; + final Map>> metricTags; + private final InfluxDBClient client; + private final Duration pushInterval; + private final InfluxdbPointProducer pointProducer; + + public InfluxdbReporter( + String hostUrl, String org, String bucket, String token, Duration pushInterval) { + ClientConfig.Builder clientConfigBuilder = + new ClientConfig.Builder().host(hostUrl).token(token.toCharArray()); + + if (!StringUtils.isNullOrWhitespaceOnly(org)) { + clientConfigBuilder.organization(org); + } + + ClientConfig clientConfig = clientConfigBuilder.database(bucket).build(); + + this.client = InfluxDBClient.getInstance(clientConfig); + this.pushInterval = pushInterval; + this.metricNames = new ConcurrentHashMap<>(); + this.metricTags = new ConcurrentHashMap<>(); + this.pointProducer = InfluxdbPointProducer.getInstance(); + + LOG.info("Started InfluxDB reporter connecting to {}", hostUrl); + } + + @Override + public void open(Configuration config) { + // do nothing + } + + @Override + public void close() { + if (client != null) { + try { + client.close(); + } catch (Exception e) { + LOG.warn("Failed to close InfluxDB client", e); + } + } + } + + @Override + public void report() { + List points = new ArrayList<>(); + Instant now = Instant.now(); + + for (Map.Entry entry : metricNames.entrySet()) { + Metric metric = entry.getKey(); + String metricName = entry.getValue(); + List> tags = + metricTags.getOrDefault(metric, Collections.emptyList()); + + try { + Point point = pointProducer.createPoint(metric, metricName, tags, now); + points.add(point); + } catch (Exception e) { + LOG.warn("Failed to create point for metric {}", metricName, e); + } + } + + if (!points.isEmpty()) { + try { + client.writePoints(points); + } catch (Exception e) { + LOG.warn("Failed to write points to InfluxDB", e); + } + } + } + + @Override + public Duration scheduleInterval() { + return pushInterval; + } + + @Override + public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) { + String scopedMetricName = getScopedName(metricName, group); + List> tags = getTags(group); + + metricNames.put(metric, scopedMetricName); + metricTags.put(metric, tags); + } + + @Override + public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) { + metricNames.remove(metric); + metricTags.remove(metric); + } + + private String getScopedName(String metricName, MetricGroup group) { + return SCOPE_PREFIX + + group.getLogicalScope(this::filterCharacters, SCOPE_SEPARATOR) + + SCOPE_SEPARATOR + + filterCharacters(metricName); + } + + private List> getTags(MetricGroup group) { + List> tags = new ArrayList<>(); + for (Map.Entry entry : group.getAllVariables().entrySet()) { + tags.add( + new AbstractMap.SimpleEntry<>( + filterCharacters(entry.getKey()), filterCharacters(entry.getValue()))); + } + return tags; + } + + private String filterCharacters(String input) { + return INVALID_CHAR_PATTERN.matcher(input).replaceAll(String.valueOf(SCOPE_SEPARATOR)); + } +} diff --git a/fluss-metrics/fluss-metrics-influxdb/src/main/java/org/apache/fluss/metrics/influxdb/InfluxdbReporterPlugin.java b/fluss-metrics/fluss-metrics-influxdb/src/main/java/org/apache/fluss/metrics/influxdb/InfluxdbReporterPlugin.java new file mode 100644 index 0000000000..eccc9f95a7 --- /dev/null +++ b/fluss-metrics/fluss-metrics-influxdb/src/main/java/org/apache/fluss/metrics/influxdb/InfluxdbReporterPlugin.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.metrics.influxdb; + +import org.apache.fluss.config.Configuration; +import org.apache.fluss.metrics.reporter.MetricReporter; +import org.apache.fluss.metrics.reporter.MetricReporterPlugin; +import org.apache.fluss.utils.StringUtils; + +import java.time.Duration; + +import static org.apache.fluss.config.ConfigOptions.METRICS_REPORTER_INFLUXDB_BUCKET; +import static org.apache.fluss.config.ConfigOptions.METRICS_REPORTER_INFLUXDB_HOST_URL; +import static org.apache.fluss.config.ConfigOptions.METRICS_REPORTER_INFLUXDB_ORG; +import static org.apache.fluss.config.ConfigOptions.METRICS_REPORTER_INFLUXDB_PUSH_INTERVAL; +import static org.apache.fluss.config.ConfigOptions.METRICS_REPORTER_INFLUXDB_TOKEN; +import static org.apache.fluss.config.ConfigOptions.METRICS_REPORTER_INFLUXDB_VERSION; + +/** {@link MetricReporterPlugin} for {@link InfluxdbReporter}. */ +public class InfluxdbReporterPlugin implements MetricReporterPlugin { + + private static final String PLUGIN_NAME = "influxdb"; + + @Override + public MetricReporter createMetricReporter(Configuration configuration) { + String version = configuration.getString(METRICS_REPORTER_INFLUXDB_VERSION); + String hostUrl = configuration.getString(METRICS_REPORTER_INFLUXDB_HOST_URL); + String org = configuration.getString(METRICS_REPORTER_INFLUXDB_ORG); + String bucket = configuration.getString(METRICS_REPORTER_INFLUXDB_BUCKET); + String token = configuration.getString(METRICS_REPORTER_INFLUXDB_TOKEN); + Duration pushInterval = configuration.get(METRICS_REPORTER_INFLUXDB_PUSH_INTERVAL); + + if (StringUtils.isNullOrWhitespaceOnly(hostUrl)) { + throw new IllegalArgumentException( + "InfluxDB host URL must be configured via '" + + METRICS_REPORTER_INFLUXDB_HOST_URL.key() + + "'"); + } + if ("v2".equalsIgnoreCase(version) && StringUtils.isNullOrWhitespaceOnly(org)) { + throw new IllegalArgumentException( + "InfluxDB organization must be configured via '" + + METRICS_REPORTER_INFLUXDB_ORG.key() + + "' when using InfluxDB v2"); + } + if (StringUtils.isNullOrWhitespaceOnly(bucket)) { + throw new IllegalArgumentException( + "InfluxDB bucket must be configured via '" + + METRICS_REPORTER_INFLUXDB_BUCKET.key() + + "'"); + } + if (StringUtils.isNullOrWhitespaceOnly(token)) { + throw new IllegalArgumentException( + "InfluxDB token must be configured via '" + + METRICS_REPORTER_INFLUXDB_TOKEN.key() + + "'"); + } + + return new InfluxdbReporter(hostUrl, org, bucket, token, pushInterval); + } + + @Override + public String identifier() { + return PLUGIN_NAME; + } +} diff --git a/fluss-metrics/fluss-metrics-influxdb/src/main/resources/META-INF/services/org.apache.fluss.metrics.reporter.MetricReporterPlugin b/fluss-metrics/fluss-metrics-influxdb/src/main/resources/META-INF/services/org.apache.fluss.metrics.reporter.MetricReporterPlugin new file mode 100644 index 0000000000..e2b8a6eae2 --- /dev/null +++ b/fluss-metrics/fluss-metrics-influxdb/src/main/resources/META-INF/services/org.apache.fluss.metrics.reporter.MetricReporterPlugin @@ -0,0 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +org.apache.fluss.metrics.influxdb.InfluxdbReporterPlugin diff --git a/fluss-metrics/fluss-metrics-influxdb/src/test/java/org/apache/fluss/metrics/influxdb/InfluxdbPointProducerTest.java b/fluss-metrics/fluss-metrics-influxdb/src/test/java/org/apache/fluss/metrics/influxdb/InfluxdbPointProducerTest.java new file mode 100644 index 0000000000..9cb999c03b --- /dev/null +++ b/fluss-metrics/fluss-metrics-influxdb/src/test/java/org/apache/fluss/metrics/influxdb/InfluxdbPointProducerTest.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.metrics.influxdb; + +import org.apache.fluss.metrics.Counter; +import org.apache.fluss.metrics.Gauge; +import org.apache.fluss.metrics.SimpleCounter; +import org.apache.fluss.metrics.util.TestHistogram; +import org.apache.fluss.metrics.util.TestMeter; + +import com.influxdb.v3.client.Point; +import org.junit.jupiter.api.Test; + +import java.math.BigInteger; +import java.time.Instant; +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for the {@link InfluxdbPointProducer}. */ +class InfluxdbPointProducerTest { + + private final InfluxdbPointProducer pointProducer = InfluxdbPointProducer.getInstance(); + + @Test + void testCreatePointForCounter() { + Counter counter = new SimpleCounter(); + counter.inc(42L); + + List> tags = new ArrayList<>(); + tags.add(new AbstractMap.SimpleEntry<>("host", "localhost")); + tags.add(new AbstractMap.SimpleEntry<>("server", "test")); + + Instant time = Instant.now(); + Point point = pointProducer.createPoint(counter, "test_counter", tags, time); + + assertThat(point).isNotNull(); + assertThat(point.getMeasurement()).isEqualTo("test_counter"); + assertThat(point.getField("count")).isEqualTo(42L); + assertThat(point.getTimestamp()) + .isEqualTo( + BigInteger.valueOf( + time.getEpochSecond() * 1_000_000_000L + time.getNano())); + } + + @Test + void testCreatePointForGauge() { + List> tags = new ArrayList<>(); + tags.add(new AbstractMap.SimpleEntry<>("host", "localhost")); + Instant time = Instant.now(); + + // Test with Number (Integer) + Gauge intGauge = () -> 123; + Point intPoint = pointProducer.createPoint(intGauge, "test_gauge_int", tags, time); + assertThat(intPoint).isNotNull(); + assertThat(intPoint.getMeasurement()).isEqualTo("test_gauge_int"); + assertThat(intPoint.getField("value")).isEqualTo(123); + assertThat(intPoint.getTimestamp()) + .isEqualTo( + BigInteger.valueOf( + time.getEpochSecond() * 1_000_000_000L + time.getNano())); + + // Test with Number (Double) + Gauge doubleGauge = () -> 123.456; + Point doublePoint = pointProducer.createPoint(doubleGauge, "test_gauge_double", tags, time); + assertThat(doublePoint).isNotNull(); + assertThat(doublePoint.getMeasurement()).isEqualTo("test_gauge_double"); + assertThat(doublePoint.getField("value")).isEqualTo(123.456); + + // Test with Boolean + Gauge boolGauge = () -> true; + Point boolPoint = pointProducer.createPoint(boolGauge, "test_gauge_boolean", tags, time); + assertThat(boolPoint).isNotNull(); + assertThat(boolPoint.getMeasurement()).isEqualTo("test_gauge_boolean"); + assertThat(boolPoint.getField("value")).isEqualTo(true); + + // Test with String + Gauge stringGauge = () -> "test_value"; + Point stringPoint = pointProducer.createPoint(stringGauge, "test_gauge_string", tags, time); + assertThat(stringPoint).isNotNull(); + assertThat(stringPoint.getMeasurement()).isEqualTo("test_gauge_string"); + assertThat(stringPoint.getField("value")).isEqualTo("test_value"); + + // Test with Null + Gauge nullGauge = () -> null; + Point nullPoint = pointProducer.createPoint(nullGauge, "test_gauge_null", tags, time); + assertThat(nullPoint).isNotNull(); + assertThat(nullPoint.getMeasurement()).isEqualTo("test_gauge_null"); + assertThat(nullPoint.getField("value")).isEqualTo("null"); + } + + @Test + void testCreatePointForMeter() { + TestMeter meter = new TestMeter(1000, 5.0); + + List> tags = new ArrayList<>(); + tags.add(new AbstractMap.SimpleEntry<>("host", "localhost")); + + Instant time = Instant.now(); + Point point = pointProducer.createPoint(meter, "test_meter", tags, time); + + assertThat(point).isNotNull(); + assertThat(point.getMeasurement()).isEqualTo("test_meter"); + assertThat(point.getField("rate")).isEqualTo(5.0); + assertThat(point.getField("count")).isEqualTo(1000L); + assertThat(point.getTimestamp()) + .isEqualTo( + BigInteger.valueOf( + time.getEpochSecond() * 1_000_000_000L + time.getNano())); + } + + @Test + void testCreatePointForHistogram() { + TestHistogram histogram = new TestHistogram(); + histogram.setCount(80); + histogram.setMean(50.5); + histogram.setStdDev(10.2); + histogram.setMax(100); + histogram.setMin(1); + + List> tags = new ArrayList<>(); + tags.add(new AbstractMap.SimpleEntry<>("host", "localhost")); + + Instant time = Instant.now(); + Point point = pointProducer.createPoint(histogram, "test_histogram", tags, time); + + assertThat(point).isNotNull(); + assertThat(point.getMeasurement()).isEqualTo("test_histogram"); + assertThat(point.getField("count")).isEqualTo(80L); + assertThat(point.getField("mean")).isEqualTo(50.5); + assertThat(point.getField("stddev")).isEqualTo(10.2); + assertThat(point.getField("max")).isEqualTo(100L); + assertThat(point.getField("min")).isEqualTo(1L); + assertThat(point.getTimestamp()) + .isEqualTo( + BigInteger.valueOf( + time.getEpochSecond() * 1_000_000_000L + time.getNano())); + } +} diff --git a/fluss-metrics/fluss-metrics-influxdb/src/test/java/org/apache/fluss/metrics/influxdb/InfluxdbReporterPluginTest.java b/fluss-metrics/fluss-metrics-influxdb/src/test/java/org/apache/fluss/metrics/influxdb/InfluxdbReporterPluginTest.java new file mode 100644 index 0000000000..4df8f1abf2 --- /dev/null +++ b/fluss-metrics/fluss-metrics-influxdb/src/test/java/org/apache/fluss/metrics/influxdb/InfluxdbReporterPluginTest.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.metrics.influxdb; + +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.metrics.util.MetricReporterTestUtils; + +import org.junit.jupiter.api.Test; + +import java.time.Duration; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for the {@link InfluxdbReporterPlugin}. */ +class InfluxdbReporterPluginTest { + + @Test + void testCreateMetricReporterWithValidConfig() { + Configuration config = new Configuration(); + config.setString(ConfigOptions.METRICS_REPORTER_INFLUXDB_HOST_URL, "http://localhost:8086"); + config.setString(ConfigOptions.METRICS_REPORTER_INFLUXDB_ORG, "test-org"); + config.setString(ConfigOptions.METRICS_REPORTER_INFLUXDB_BUCKET, "test-bucket"); + config.setString(ConfigOptions.METRICS_REPORTER_INFLUXDB_TOKEN, "test-token"); + config.set(ConfigOptions.METRICS_REPORTER_INFLUXDB_PUSH_INTERVAL, Duration.ofSeconds(15)); + + InfluxdbReporterPlugin plugin = new InfluxdbReporterPlugin(); + InfluxdbReporter reporter = (InfluxdbReporter) plugin.createMetricReporter(config); + + try { + assertThat(reporter).isNotNull(); + assertThat(reporter.scheduleInterval()).isEqualTo(Duration.ofSeconds(15)); + } finally { + reporter.close(); + } + } + + @Test + void testMetricReporterSetupViaSPI() { + MetricReporterTestUtils.testMetricReporterSetupViaSPI(InfluxdbReporterPlugin.class); + } +} diff --git a/fluss-metrics/fluss-metrics-influxdb/src/test/java/org/apache/fluss/metrics/influxdb/InfluxdbReporterTest.java b/fluss-metrics/fluss-metrics-influxdb/src/test/java/org/apache/fluss/metrics/influxdb/InfluxdbReporterTest.java new file mode 100644 index 0000000000..ebcf2dc908 --- /dev/null +++ b/fluss-metrics/fluss-metrics-influxdb/src/test/java/org/apache/fluss/metrics/influxdb/InfluxdbReporterTest.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.metrics.influxdb; + +import org.apache.fluss.config.Configuration; +import org.apache.fluss.metrics.Counter; +import org.apache.fluss.metrics.SimpleCounter; +import org.apache.fluss.metrics.groups.MetricGroup; +import org.apache.fluss.metrics.util.TestMetricGroup; + +import com.github.tomakehurst.wiremock.WireMockServer; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static com.github.tomakehurst.wiremock.client.WireMock.aResponse; +import static com.github.tomakehurst.wiremock.client.WireMock.containing; +import static com.github.tomakehurst.wiremock.client.WireMock.post; +import static com.github.tomakehurst.wiremock.client.WireMock.postRequestedFor; +import static com.github.tomakehurst.wiremock.client.WireMock.urlPathEqualTo; +import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig; +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for the {@link InfluxdbReporter}. */ +class InfluxdbReporterTest { + + private static final String METRIC_HOSTNAME = "localhost"; + + private static final Map variables; + private static final MetricGroup metricGroup; + + static { + variables = new HashMap<>(); + variables.put("", METRIC_HOSTNAME); + variables.put("table", "test_table"); + + metricGroup = + TestMetricGroup.newBuilder() + .setLogicalScopeFunction((characterFilter, character) -> "tabletServer") + .setVariables(variables) + .build(); + } + + private WireMockServer wireMockServer; + private InfluxdbReporter reporter; + + @BeforeEach + void setUp() { + wireMockServer = new WireMockServer(wireMockConfig().dynamicPort()); + wireMockServer.start(); + } + + @AfterEach + void tearDown() { + if (reporter != null) { + reporter.close(); + } + if (wireMockServer != null) { + wireMockServer.stop(); + } + } + + @Test + void testMetricRegistration() { + reporter = createReporter(); + + String metricName = "TestCounter"; + Counter counter = new SimpleCounter(); + reporter.notifyOfAddedMetric(counter, metricName, metricGroup); + + String name = reporter.metricNames.get(counter); + assertThat(name).isNotNull(); + assertThat(name).isEqualTo("fluss_tabletServer_" + metricName); + + List> tags = reporter.metricTags.get(counter); + assertThat(tags).isNotNull(); + assertThat(tags) + .anyMatch(e -> e.getKey().equals("_host_") && e.getValue().equals(METRIC_HOSTNAME)); + assertThat(tags) + .anyMatch(e -> e.getKey().equals("table") && e.getValue().equals("test_table")); + } + + @Test + void testMetricReporting() { + wireMockServer.stubFor( + post(urlPathEqualTo("/api/v2/write")).willReturn(aResponse().withStatus(200))); + + reporter = createReporter(); + + String metricName = "TestCounter"; + Counter counter = new SimpleCounter(); + reporter.notifyOfAddedMetric(counter, metricName, metricGroup); + counter.inc(42); + + reporter.report(); + + wireMockServer.verify( + postRequestedFor(urlPathEqualTo("/api/v2/write")) + .withRequestBody(containing("fluss_tabletServer_" + metricName)) + .withRequestBody(containing("count=42i"))); + } + + private InfluxdbReporter createReporter() { + InfluxdbReporter reporter = + new InfluxdbReporter( + wireMockServer.baseUrl(), + "test-org", + "test-bucket", + "test-token", + Duration.ofSeconds(10)); + reporter.open(new Configuration()); + return reporter; + } +} diff --git a/fluss-metrics/fluss-metrics-influxdb/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension b/fluss-metrics/fluss-metrics-influxdb/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension new file mode 100644 index 0000000000..ca0e907f6d --- /dev/null +++ b/fluss-metrics/fluss-metrics-influxdb/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +org.apache.fluss.testutils.common.TestLoggerExtension \ No newline at end of file diff --git a/fluss-metrics/fluss-metrics-influxdb/src/test/resources/log4j2-test.properties b/fluss-metrics/fluss-metrics-influxdb/src/test/resources/log4j2-test.properties new file mode 100644 index 0000000000..288e00f662 --- /dev/null +++ b/fluss-metrics/fluss-metrics-influxdb/src/test/resources/log4j2-test.properties @@ -0,0 +1,28 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Set root logger level to OFF to not flood build logs +# set manually to INFO for debugging purposes +rootLogger.level = OFF +rootLogger.appenderRef.test.ref = TestLogger + +appender.testlogger.name = TestLogger +appender.testlogger.type = CONSOLE +appender.testlogger.target = SYSTEM_ERR +appender.testlogger.layout.type = PatternLayout +appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n diff --git a/fluss-metrics/pom.xml b/fluss-metrics/pom.xml index 482cca3faa..202993ea34 100644 --- a/fluss-metrics/pom.xml +++ b/fluss-metrics/pom.xml @@ -34,6 +34,7 @@ fluss-metrics-prometheus fluss-metrics-jmx + fluss-metrics-influxdb