diff --git a/pom.xml b/pom.xml index 621dae8..be8e522 100644 --- a/pom.xml +++ b/pom.xml @@ -30,6 +30,7 @@ url-shortener-demo jdbc project-course + slo diff --git a/slo/Dockerfile b/slo/Dockerfile new file mode 100644 index 0000000..6e14af3 --- /dev/null +++ b/slo/Dockerfile @@ -0,0 +1,90 @@ +# Multi-stage Dockerfile for the YDB Java SDK SLO workload. +# +# The image is consumed by the YDB SLO action (`ydb-platform/ydb-slo-action`). +# Two instances run in parallel — current and baseline — under the same chaos +# conditions, and their metrics are compared by the report action. +# +# Build context: a directory that contains TWO checkouts side by side: +# ./ydb-java-sdk the SDK source under test (current or baseline) +# ./ydb-java-examples the SLO workload sources +# +# The CI workflow assembles this layout in a temp directory and passes it as +# the docker build context, so this Dockerfile can build the SDK from source +# and then the workload against that exact build — without ever needing the +# SDK to be published to a remote Maven repository. +# +# Optional build args: +# MAVEN_IMAGE Builder image. Defaults to `maven:3.9-eclipse-temurin-17`. +# RUNTIME_IMAGE Runtime image. Defaults to `eclipse-temurin:17-jre`. + +ARG MAVEN_IMAGE=maven:3.9-eclipse-temurin-17 +ARG RUNTIME_IMAGE=eclipse-temurin:17-jre + +# ---------- builder: install the SDK --------------------------------------- +FROM ${MAVEN_IMAGE} AS sdk-build + +WORKDIR /src + +# Copy only the SDK checkout into the builder so changes elsewhere in the +# context don't invalidate this layer's cache. +COPY ydb-java-sdk /src/ydb-java-sdk + +# Install the SDK (and its BOM) into the in-image local Maven repository at +# /root/.m2/repository. Tests are skipped — the SDK has its own CI for that; +# here we only need the artifacts. We also skip javadoc/source jars because +# the workload doesn't need them. +RUN cd /src/ydb-java-sdk && \ + mvn -B -q \ + -DskipTests \ + -Dmaven.javadoc.skip=true \ + -Dmaven.source.skip=true \ + -Dgpg.skip=true \ + install + +# Capture the SDK version into a small file so the next stage can read it +# without parsing the pom again. `help:evaluate` is quiet enough to be safe in +# scripts. +RUN cd /src/ydb-java-sdk && \ + mvn -B -q help:evaluate -Dexpression=project.version -DforceStdout > /tmp/sdk.version && \ + echo "Built SDK version: $(cat /tmp/sdk.version)" + +# ---------- builder: build the workload ------------------------------------ +FROM sdk-build AS workload-build + +# Copy the examples checkout. We do this in a separate stage so changes to +# the workload code don't invalidate the SDK install layer above. +COPY ydb-java-examples /src/ydb-java-examples + +# Override the SDK version pinned in the examples parent pom to point at the +# version we just installed. This lets us test SDK SNAPSHOTs without +# publishing anywhere. +RUN cd /src/ydb-java-examples && \ + SDK_VERSION="$(cat /tmp/sdk.version)" && \ + echo "Pinning ydb-java-examples to SDK ${SDK_VERSION}" && \ + mvn -B -q versions:set-property \ + -Dproperty=ydb.sdk.version \ + -DnewVersion="${SDK_VERSION}" \ + -DgenerateBackupPoms=false + +# Build only the slo module (and its required parent/BOM context). The +# examples parent pom lists many modules; `-pl slo -am` keeps the build +# focused on what the workload actually needs. +RUN cd /src/ydb-java-examples && \ + mvn -B -q \ + -pl slo -am \ + -DskipTests \ + -Dmaven.javadoc.skip=true \ + package + +# ---------- runtime -------------------------------------------------------- +FROM ${RUNTIME_IMAGE} + +WORKDIR /app + +# Copy the executable jar plus its transitive dependencies. The slo pom is +# configured to drop dependencies into target/libs and to set the manifest +# Class-Path to libs/, so a single `java -jar` call is enough. +COPY --from=workload-build /src/ydb-java-examples/slo/target/ydb-slo-workload.jar /app/ydb-slo-workload.jar +COPY --from=workload-build /src/ydb-java-examples/slo/target/libs /app/libs + +ENTRYPOINT ["java", "-jar", "/app/ydb-slo-workload.jar"] diff --git a/slo/README.md b/slo/README.md new file mode 100644 index 0000000..9fdb08c --- /dev/null +++ b/slo/README.md @@ -0,0 +1,119 @@ +# YDB Java SDK SLO workload + +This module contains the workload application used by the [YDB SLO Action](https://github.com/ydb-platform/ydb-slo-action) to test the reliability of the YDB Java SDK under load and chaos. + +It is a sibling of the SLO workloads in [`ydb-go-sdk`](https://github.com/ydb-platform/ydb-go-sdk/tree/master/tests/slo) and [`ydb-js-sdk`](https://github.com/ydb-platform/ydb-js-sdk/tree/main/tests/slo): the schema, queries and metrics are kept compatible so reports across SDKs are directly comparable. + +## What it does + +The workload runs three phases: + +1. **Setup** — creates a partitioned KV table and prefills it with rows. +2. **Run** — drives concurrent read and write loops at fixed RPS for the configured duration. Each operation is timed and retried via `tech.ydb.query.tools.SessionRetryContext`; the outcome is recorded as Prometheus-compatible metrics that the action scrapes via OTLP. +3. **Teardown** — drops the workload table even if the run failed, so the cluster is left clean. + +While the workload runs, the SLO action injects chaos (node restarts, network black holes, container pauses). The metrics show how well the SDK copes with those failures. + +## Metrics + +Every metric carries a `ref` label whose value is taken from the `WORKLOAD_REF` environment variable. This is how the report action separates the **current** PR run from the **baseline** run. + +Names below are shown in Prometheus form (with underscores). Internally the workload uses the OpenTelemetry naming convention with dots (e.g. `sdk.operations.total`); the OTLP → Prometheus conversion replaces dots with underscores automatically, so this is what you see when you query Prometheus or write rules in `metrics.yaml`. + +| Metric | Type | Labels | +| ----------------------------------- | --------------- | ------------------------------------------------------- | +| `sdk_operations_total` | counter | `operation_type`, `operation_status` | +| `sdk_errors_total` | counter | `operation_type`, `error_kind` | +| `sdk_retry_attempts_total` | counter | `operation_type`, `operation_status` | +| `sdk_pending_operations` | up/down counter | `operation_type` | +| `sdk_operation_latency_p50_seconds` | gauge | `operation_type`, `operation_status` (always `success`) | +| `sdk_operation_latency_p95_seconds` | gauge | `operation_type`, `operation_status` (always `success`) | +| `sdk_operation_latency_p99_seconds` | gauge | `operation_type`, `operation_status` (always `success`) | + +Latency percentiles are computed from per-operation HDR histograms and reflect only successful operations — failure latency is dominated by retry budgets and timeouts and would mask real SDK regressions during chaos. Counters (`sdk_operations_total`, `sdk_errors_total`) cover both branches, so availability is computed correctly. + +## Inputs + +The workload reads connection details and run parameters from environment variables provided by the action: + +| Variable | Description | +| ------------------------------- | ------------------------------------------------ | +| `YDB_CONNECTION_STRING` | YDB connection string (preferred) | +| `YDB_ENDPOINT` + `YDB_DATABASE` | Legacy, used if `YDB_CONNECTION_STRING` is unset | +| `WORKLOAD_REF` | Value of the `ref` label on every metric | +| `WORKLOAD_NAME` | Workload name (used to compose the table name) | +| `WORKLOAD_DURATION` | Run duration in seconds | +| `OTEL_EXPORTER_OTLP_ENDPOINT` | OTLP HTTP endpoint to push metrics to | + +KV-specific tunables are passed via the command line and parsed by JCommander: + +``` +--read-rps Target read RPS (default 1000) +--write-rps Target write RPS (default 100) +--read-timeout-ms Per-attempt read timeout in milliseconds (default 10000) +--write-timeout-ms Per-attempt write timeout in milliseconds (default 10000) +--prefill-count Rows to prefill before the run phase (default 1000) +--partition-size Auto-partitioning partition size in MB (default 1) +--min-partition-count Minimum number of table partitions (default 6) +--max-partition-count Maximum number of table partitions (default 1000) +--duration Override WORKLOAD_DURATION when > 0 +``` + +Unknown flags are ignored, so the workload accepts `workload_current_command` strings designed for other SDKs without erroring. + +## How CI uses this module + +The CI lives in [`ydb-java-sdk/.github/workflows/slo.yml`](https://github.com/ydb-platform/ydb-java-sdk/blob/master/.github/workflows/slo.yml), not here. The flow is: + +1. Check out the SDK PR (`current`) and the merge-base SDK commit (`baseline`). +2. Check out `ydb-java-examples` for the workload sources. +3. For each version, run `.github/scripts/build-slo-image.sh` from the SDK repo. The script assembles a build context with the SDK and examples checkouts side by side and feeds it to [`slo/Dockerfile`](Dockerfile), which: + - Builds the SDK from source and installs it into an in-image local Maven repository. + - Pins `ydb.sdk.version` in the examples parent pom to that version. + - Builds the `slo` module against the freshly-installed SDK. +4. Pass the two images (`ydb-app-current`, `ydb-app-baseline`) to `ydb-platform/ydb-slo-action/init@v2`. +5. After the run, [`ydb-platform/ydb-slo-action/report@v2`](https://github.com/ydb-platform/ydb-slo-action) compares the two and posts a summary to the PR. + +The build is fully self-contained — the SDK under test does not need to be published to a remote Maven repository. + +## Building locally + +The workload can be built standalone against a published SDK version. From the `ydb-java-examples` repository root: + +```bash +mvn -pl slo -am -DskipTests package +``` + +The resulting jar is at `slo/target/ydb-slo-workload.jar`. To run it against a local YDB: + +```bash +export YDB_CONNECTION_STRING="grpc://localhost:2136?database=/local" +export WORKLOAD_REF=local +export WORKLOAD_NAME=java-query-kv +export WORKLOAD_DURATION=60 + +java -jar slo/target/ydb-slo-workload.jar --read-rps 100 --write-rps 10 --prefill-count 100 +``` + +If `OTEL_EXPORTER_OTLP_ENDPOINT` is not set, metrics are still recorded in-process but never exported — handy for verifying that the workload itself runs cleanly before pushing to CI. + +## Files + +``` +slo/ +├── Dockerfile Multi-stage build (SDK + workload) +├── pom.xml Maven module descriptor +├── README.md This file +└── src/main/ + ├── java/tech/ydb/slo/ + │ ├── Config.java Reads action env vars + │ ├── Main.java Entry point + │ ├── Metrics.java OTLP metrics + HDR histograms + │ └── kv/ + │ ├── KvWorkload.java Setup/run/teardown loop + │ ├── KvWorkloadParams.java JCommander-bound CLI flags + │ ├── Row.java Row data class + │ └── RowGenerator.java Random payload generator + └── resources/ + └── log4j2.xml Console logging config +``` diff --git a/slo/pom.xml b/slo/pom.xml new file mode 100644 index 0000000..8c3f49c --- /dev/null +++ b/slo/pom.xml @@ -0,0 +1,90 @@ + + + 4.0.0 + + + tech.ydb.examples + ydb-sdk-examples + 1.1.0-SNAPSHOT + + + ydb-slo-workload + YDB SLO workload + SLO workload application for testing YDB Java SDK reliability under load and chaos + + + 1.82 + 1.59.0 + 2.2.2 + + + + + tech.ydb + ydb-sdk-query + + + + com.beust + jcommander + ${jcommander.version} + + + + org.hdrhistogram + HdrHistogram + ${hdrhistogram.version} + + + + io.opentelemetry + opentelemetry-api + ${opentelemetry.version} + + + io.opentelemetry + opentelemetry-sdk + ${opentelemetry.version} + + + io.opentelemetry + opentelemetry-sdk-metrics + ${opentelemetry.version} + + + io.opentelemetry + opentelemetry-exporter-otlp + ${opentelemetry.version} + + + + org.apache.logging.log4j + log4j-slf4j2-impl + + + + + ydb-slo-workload + + + org.apache.maven.plugins + maven-dependency-plugin + + + org.apache.maven.plugins + maven-jar-plugin + + + + true + libs/ + tech.ydb.slo.Main + + + + + + + diff --git a/slo/src/main/java/tech/ydb/slo/Config.java b/slo/src/main/java/tech/ydb/slo/Config.java new file mode 100644 index 0000000..dc7838c --- /dev/null +++ b/slo/src/main/java/tech/ydb/slo/Config.java @@ -0,0 +1,113 @@ +package tech.ydb.slo; + +/** + * Configuration for the SLO workload, populated from environment variables + * provided by the YDB SLO action runtime. + * + *

The action sets these variables on the workload container: + *

    + *
  • {@code YDB_CONNECTION_STRING} or {@code YDB_ENDPOINT} + {@code YDB_DATABASE} — YDB connection
  • + *
  • {@code WORKLOAD_REF} — value used as the {@code ref} label on all metrics
  • + *
  • {@code WORKLOAD_NAME} — workload name (also used as part of the table path)
  • + *
  • {@code WORKLOAD_DURATION} — workload run duration in seconds (0 = unlimited)
  • + *
  • {@code OTEL_EXPORTER_OTLP_ENDPOINT} — OTLP endpoint for pushing metrics
  • + *
+ */ +public final class Config { + private final String connectionString; + private final String ref; + private final String workloadName; + private final int durationSeconds; + private final String otlpEndpoint; + + private Config( + String connectionString, + String ref, + String workloadName, + int durationSeconds, + String otlpEndpoint + ) { + this.connectionString = connectionString; + this.ref = ref; + this.workloadName = workloadName; + this.durationSeconds = durationSeconds; + this.otlpEndpoint = otlpEndpoint; + } + + public String connectionString() { + return connectionString; + } + + public String ref() { + return ref; + } + + public String workloadName() { + return workloadName; + } + + public int durationSeconds() { + return durationSeconds; + } + + public String otlpEndpoint() { + return otlpEndpoint; + } + + /** + * Loads configuration from environment variables. + * + * @throws IllegalStateException if required variables are missing or invalid + */ + public static Config fromEnv() { + String connectionString = resolveConnectionString(); + if (connectionString == null || connectionString.isEmpty()) { + throw new IllegalStateException( + "YDB connection is not configured: set YDB_CONNECTION_STRING or YDB_ENDPOINT + YDB_DATABASE" + ); + } + + String ref = envOrDefault("WORKLOAD_REF", "unknown"); + String workloadName = envOrDefault("WORKLOAD_NAME", "java-slo-workload"); + int durationSeconds = parseInt(envOrDefault("WORKLOAD_DURATION", "600"), 600); + String otlpEndpoint = envOrDefault("OTEL_EXPORTER_OTLP_ENDPOINT", ""); + + return new Config(connectionString, ref, workloadName, durationSeconds, otlpEndpoint); + } + + private static String resolveConnectionString() { + String cs = System.getenv("YDB_CONNECTION_STRING"); + if (cs != null && !cs.isEmpty()) { + return cs; + } + + String endpoint = System.getenv("YDB_ENDPOINT"); + String database = System.getenv("YDB_DATABASE"); + if (endpoint == null || endpoint.isEmpty() || database == null || database.isEmpty()) { + return null; + } + + // Compose connection string in the form expected by GrpcTransport.forConnectionString: + // grpc://host:port/database + if (endpoint.endsWith("/") && database.startsWith("/")) { + return endpoint + database.substring(1); + } + if (!endpoint.endsWith("/") && !database.startsWith("/")) { + return endpoint + "/" + database; + } + return endpoint + database; + } + + private static String envOrDefault(String name, String defaultValue) { + String value = System.getenv(name); + return (value == null || value.isEmpty()) ? defaultValue : value; + } + + private static int parseInt(String value, int defaultValue) { + try { + return Integer.parseInt(value); + } catch (NumberFormatException e) { + return defaultValue; + } + } +} diff --git a/slo/src/main/java/tech/ydb/slo/Main.java b/slo/src/main/java/tech/ydb/slo/Main.java new file mode 100644 index 0000000..ffe40dd --- /dev/null +++ b/slo/src/main/java/tech/ydb/slo/Main.java @@ -0,0 +1,151 @@ +package tech.ydb.slo; + +import com.beust.jcommander.JCommander; +import com.beust.jcommander.ParameterException; +import org.apache.logging.log4j.LogManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import tech.ydb.core.grpc.GrpcTransport; +import tech.ydb.query.QueryClient; +import tech.ydb.slo.kv.KvWorkload; +import tech.ydb.slo.kv.KvWorkloadParams; + +/** + * Entry point of the SLO workload. + * + *

Reads connection details and run parameters from environment variables + * (see {@link Config}), parses workload-specific flags from the command line + * (see {@link KvWorkloadParams}), and runs the KV workload phases — setup, + * run, teardown — pushing metrics to the OTLP endpoint configured by the YDB + * SLO action runtime. + * + *

Exit codes: + *

    + *
  • {@code 0} — workload completed successfully
  • + *
  • {@code 1} — workload failed (an unhandled exception or interrupted run)
  • + *
  • {@code 2} — invalid CLI arguments
  • + *
+ */ +public final class Main { + private static final Logger logger = LoggerFactory.getLogger(Main.class); + + private Main() { + // utility class + } + + public static void main(String[] args) { + Config config; + try { + config = Config.fromEnv(); + } catch (IllegalStateException e) { + logger.error("invalid environment configuration: {}", e.getMessage()); + System.exit(2); + return; + } + + KvWorkloadParams params = new KvWorkloadParams(); + try { + JCommander.newBuilder() + .programName("ydb-slo-workload") + .acceptUnknownOptions(true) + .addObject(params) + .build() + .parse(args); + } catch (ParameterException e) { + logger.error("invalid CLI arguments: {}", e.getMessage()); + System.exit(2); + return; + } + + // CLI duration takes precedence over WORKLOAD_DURATION when supplied. + if (params.durationSeconds() <= 0) { + params.setDurationSeconds(config.durationSeconds()); + } + + logger.info("starting SLO workload: name={}, ref={}, duration={}s, readRps={}, writeRps={}", + config.workloadName(), + config.ref(), + params.durationSeconds(), + params.readRps(), + params.writeRps()); + + // The table path embeds workload name and ref so concurrent runs of + // the current and baseline images don't step on each other. Both + // components are sanitized: WORKLOAD_NAME comes from the action input + // and is normally already safe, but we don't trust user input to be + // a valid YDB identifier. + String tablePath = sanitize(config.workloadName()) + "_" + sanitize(config.ref()); + + int exitCode = 0; + Metrics metrics = Metrics.create(config); + GrpcTransport transport = GrpcTransport.forConnectionString(config.connectionString()) + .build(); + QueryClient queryClient = QueryClient.newClient(transport).build(); + + KvWorkload workload = new KvWorkload(queryClient, metrics, params, tablePath); + + try { + workload.setup(); + workload.run(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.warn("workload interrupted"); + exitCode = 1; + } catch (Throwable t) { + logger.error("workload failed", t); + exitCode = 1; + } finally { + try { + workload.teardown(); + } catch (Throwable t) { + logger.warn("teardown failed", t); + } + + try { + metrics.flush(); + } catch (Throwable t) { + logger.warn("metrics flush failed", t); + } + + closeQuietly(metrics, "metrics"); + closeQuietly(queryClient, "query client"); + closeQuietly(transport, "transport"); + } + + // Flush and stop log4j2 before exit so async appenders don't drop + // the last messages. Replaces the previous Thread.sleep(100) hack. + LogManager.shutdown(); + + System.exit(exitCode); + } + + private static void closeQuietly(AutoCloseable closeable, String name) { + if (closeable == null) { + return; + } + try { + closeable.close(); + } catch (Throwable t) { + logger.warn("failed to close {}: {}", name, t.toString()); + } + } + + /** + * Replaces characters that aren't valid in YDB table names with underscores. + * Refs from CI may include slashes ({@code release/1.2}) or dots, which + * the action permits in metrics labels but YDB rejects in table paths. + */ + private static String sanitize(String value) { + StringBuilder sb = new StringBuilder(value.length()); + for (int i = 0; i < value.length(); i++) { + char c = value.charAt(i); + if (Character.isLetterOrDigit(c) || c == '_') { + sb.append(c); + } else { + sb.append('_'); + } + } + return sb.toString(); + } +} diff --git a/slo/src/main/java/tech/ydb/slo/Metrics.java b/slo/src/main/java/tech/ydb/slo/Metrics.java new file mode 100644 index 0000000..ebb343b --- /dev/null +++ b/slo/src/main/java/tech/ydb/slo/Metrics.java @@ -0,0 +1,384 @@ +package tech.ydb.slo; + +import java.time.Duration; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; + +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.LongCounter; +import io.opentelemetry.api.metrics.LongUpDownCounter; +import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.api.metrics.ObservableDoubleMeasurement; +import io.opentelemetry.exporter.otlp.http.metrics.OtlpHttpMetricExporter; +import io.opentelemetry.sdk.metrics.SdkMeterProvider; +import io.opentelemetry.sdk.metrics.SdkMeterProviderBuilder; +import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader; +import io.opentelemetry.sdk.resources.Resource; +import org.HdrHistogram.Histogram; + +/** + * Collects and pushes SLO workload metrics to the OTLP endpoint configured by + * the YDB SLO action runtime. + * + *

Metrics emitted (matching the contract from + * {@code ydb-platform/ydb-slo-action}): + *

    + *
  • {@code sdk.operations.total} — counter, labeled by + * {@code operation_type} and {@code operation_status}
  • + *
  • {@code sdk.errors.total} — counter, labeled by + * {@code operation_type} and {@code error_kind}
  • + *
  • {@code sdk.retry.attempts.total} — counter, labeled by + * {@code operation_type} and {@code operation_status}
  • + *
  • {@code sdk.pending.operations} — up/down counter, labeled by + * {@code operation_type}
  • + *
  • {@code sdk.operation.latency.p50.seconds} / + * {@code .p95.seconds} / {@code .p99.seconds} — + * observable gauges fed from per-operation HDR histograms
  • + *
+ * + *

Every metric carries the {@code ref} label so the report action can + * separate current and baseline series. + */ +public final class Metrics implements AutoCloseable { + + public enum OperationType { + READ("read"), + WRITE("write"); + + private final String label; + + OperationType(String label) { + this.label = label; + } + + public String label() { + return label; + } + } + + public enum OperationStatus { + SUCCESS("success"), + ERROR("error"); + + private final String label; + + OperationStatus(String label) { + this.label = label; + } + + public String label() { + return label; + } + } + + private static final AttributeKey ATTR_OPERATION_TYPE = + AttributeKey.stringKey("operation_type"); + private static final AttributeKey ATTR_OPERATION_STATUS = + AttributeKey.stringKey("operation_status"); + private static final AttributeKey ATTR_ERROR_KIND = + AttributeKey.stringKey("error_kind"); + private static final AttributeKey ATTR_REF = + AttributeKey.stringKey("ref"); + + // HDR histograms record latencies in microseconds with high precision up to 60 s. + private static final long HDR_MIN_MICROS = 1L; + private static final long HDR_MAX_MICROS = 60L * 1_000_000L; + private static final int HDR_SIGNIFICANT_DIGITS = 3; + + private final SdkMeterProvider meterProvider; + private final String ref; + private final LongCounter operationsTotal; + private final LongCounter errorsTotal; + private final LongCounter retryAttemptsTotal; + private final LongUpDownCounter pendingOperations; + + private final Map histograms = new ConcurrentHashMap<>(); + + private Metrics( + SdkMeterProvider meterProvider, + String ref, + LongCounter operationsTotal, + LongCounter errorsTotal, + LongCounter retryAttemptsTotal, + LongUpDownCounter pendingOperations + ) { + this.meterProvider = meterProvider; + this.ref = ref; + this.operationsTotal = operationsTotal; + this.errorsTotal = errorsTotal; + this.retryAttemptsTotal = retryAttemptsTotal; + this.pendingOperations = pendingOperations; + } + + /** + * Builds a {@code Metrics} instance configured to push OTLP metrics every + * second to the endpoint from {@code config.otlpEndpoint()}. If the + * endpoint is empty, all metrics are still observable in-process but never + * exported. + */ + public static Metrics create(Config config) { + String ref = config.ref(); + + Resource resource = Resource.getDefault().toBuilder() + .put("service.name", config.workloadName()) + .put("ref", ref) + .put("sdk", "java") + .build(); + + SdkMeterProviderBuilder providerBuilder = SdkMeterProvider.builder() + .setResource(resource); + + if (config.otlpEndpoint() != null && !config.otlpEndpoint().isEmpty()) { + OtlpHttpMetricExporter exporter = OtlpHttpMetricExporter.builder() + .setEndpoint(metricsEndpoint(config.otlpEndpoint())) + .setTimeout(Duration.ofSeconds(10)) + .build(); + providerBuilder.registerMetricReader( + PeriodicMetricReader.builder(exporter) + .setInterval(Duration.ofSeconds(1)) + .build() + ); + } + + SdkMeterProvider provider = providerBuilder.build(); + Meter meter = provider.get("slo-workload-" + config.workloadName()); + + LongCounter operationsTotal = meter.counterBuilder("sdk.operations.total") + .setDescription("Total number of operations") + .setUnit("{operation}") + .build(); + + LongCounter errorsTotal = meter.counterBuilder("sdk.errors.total") + .setDescription("Total number of errors") + .setUnit("{error}") + .build(); + + LongCounter retryAttemptsTotal = meter.counterBuilder("sdk.retry.attempts.total") + .setDescription("Total number of retry attempts") + .setUnit("{attempt}") + .build(); + + LongUpDownCounter pendingOperations = meter.upDownCounterBuilder("sdk.pending.operations") + .setDescription("Currently in-flight operations") + .build(); + + Map histograms = new ConcurrentHashMap<>(); + + // Pre-create one histogram per operation_type so the first export + // already produces gauge series. We only track successful operations: + // failure latency is dominated by retry budgets / timeouts and would + // skew the percentiles without telling us anything useful about SDK + // performance. The SLO action's metrics.yaml filters by + // operation_status="success" anyway. + for (OperationType type : OperationType.values()) { + histograms.put(type, newHistogram()); + } + + // Build the three percentile gauges as raw observers — their values + // are produced by a single batch callback below, which reads + // p50/p95/p99 from the same histogram snapshot and then resets the + // histogram. Reading all three percentiles from one snapshot avoids + // races where p99 could be observed against a freshly-reset histogram + // populated by p50, and resetting after each export means the gauge + // reflects only latencies recorded during the last export interval — + // matching the JS SDK's behaviour and avoiding cold-start tail drag + // on the JVM (without reset, JIT-warmup outliers stick to p99 for + // the rest of the run). + ObservableDoubleMeasurement p50Observer = meter.gaugeBuilder("sdk.operation.latency.p50.seconds") + .setUnit("s") + .setDescription("p50 operation latency in seconds") + .buildObserver(); + + ObservableDoubleMeasurement p95Observer = meter.gaugeBuilder("sdk.operation.latency.p95.seconds") + .setUnit("s") + .setDescription("p95 operation latency in seconds") + .buildObserver(); + + ObservableDoubleMeasurement p99Observer = meter.gaugeBuilder("sdk.operation.latency.p99.seconds") + .setUnit("s") + .setDescription("p99 operation latency in seconds") + .buildObserver(); + + meter.batchCallback( + () -> observeAndResetPercentiles(histograms, ref, p50Observer, p95Observer, p99Observer), + p50Observer, p95Observer, p99Observer + ); + + Metrics metrics = new Metrics( + provider, + ref, + operationsTotal, + errorsTotal, + retryAttemptsTotal, + pendingOperations + ); + metrics.histograms.putAll(histograms); + return metrics; + } + + private static String metricsEndpoint(String otlpEndpoint) { + // OTLP HTTP exporter expects the full /v1/metrics path. The SLO action + // sets OTEL_EXPORTER_OTLP_ENDPOINT to the base URL (e.g. + // http://ydb-prometheus:9090/api/v1/otlp), so we append the suffix + // unless the user has already provided it. + String trimmed = otlpEndpoint.endsWith("/") + ? otlpEndpoint.substring(0, otlpEndpoint.length() - 1) + : otlpEndpoint; + if (trimmed.endsWith("/v1/metrics")) { + return trimmed; + } + return trimmed + "/v1/metrics"; + } + + /** + * Records a started operation and returns a span used to record the + * outcome. + */ + public Span startOperation(OperationType type) { + pendingOperations.add(1, Attributes.of( + ATTR_REF, ref, + ATTR_OPERATION_TYPE, type.label() + )); + return new Span(this, type, System.nanoTime()); + } + + /** + * Forces a final flush of pending metrics. Should be called before exit + * to make sure the report action sees the last seconds of data. + */ + public void flush() { + meterProvider.forceFlush().join(10, TimeUnit.SECONDS); + } + + @Override + public void close() { + meterProvider.shutdown().join(10, TimeUnit.SECONDS); + } + + private void recordOutcome( + OperationType type, + OperationStatus status, + int attempts, + long latencyMicros, + String errorKind + ) { + Attributes opAttrs = Attributes.of( + ATTR_REF, ref, + ATTR_OPERATION_TYPE, type.label(), + ATTR_OPERATION_STATUS, status.label() + ); + + operationsTotal.add(1, opAttrs); + retryAttemptsTotal.add(Math.max(0L, attempts), opAttrs); + pendingOperations.add(-1, Attributes.of( + ATTR_REF, ref, + ATTR_OPERATION_TYPE, type.label() + )); + + // Latency is recorded only for successful operations. Failed + // operations spend most of their time inside the retry budget / + // timeout machinery, so their latency reflects the retry policy + // rather than the SDK's performance. Mixing those samples into the + // percentile gauges produces noisy spikes during chaos scenarios + // and tells us nothing actionable. + if (status == OperationStatus.SUCCESS) { + Histogram histogram = histograms.computeIfAbsent(type, k -> newHistogram()); + long clamped = Math.max(HDR_MIN_MICROS, Math.min(HDR_MAX_MICROS, latencyMicros)); + synchronized (histogram) { + histogram.recordValue(clamped); + } + } else { + errorsTotal.add(1, Attributes.of( + ATTR_REF, ref, + ATTR_OPERATION_TYPE, type.label(), + ATTR_ERROR_KIND, errorKind == null ? "unknown" : errorKind + )); + } + } + + /** + * Observes p50/p95/p99 for every populated histogram in one go and then + * resets the histogram. Called from a single OTel batch callback so all + * three percentiles are read from a consistent snapshot — without that, + * a concurrent record could land between the p50 and p99 reads and + * produce inconsistent values across gauges. + */ + private static void observeAndResetPercentiles( + Map histograms, + String ref, + ObservableDoubleMeasurement p50Out, + ObservableDoubleMeasurement p95Out, + ObservableDoubleMeasurement p99Out + ) { + for (Map.Entry entry : histograms.entrySet()) { + OperationType type = entry.getKey(); + Histogram histogram = entry.getValue(); + + long p50Micros; + long p95Micros; + long p99Micros; + synchronized (histogram) { + if (histogram.getTotalCount() == 0) { + continue; + } + p50Micros = histogram.getValueAtPercentile(50.0); + p95Micros = histogram.getValueAtPercentile(95.0); + p99Micros = histogram.getValueAtPercentile(99.0); + histogram.reset(); + } + + // Percentile gauges are always tagged with operation_status="success" + // because we only record successful samples (see recordOutcome). + // The SLO action's metrics.yaml filters on this same label, so the + // gauges line up with what the report expects. + Attributes attrs = Attributes.of( + ATTR_REF, ref, + ATTR_OPERATION_TYPE, type.label(), + ATTR_OPERATION_STATUS, OperationStatus.SUCCESS.label() + ); + p50Out.record(p50Micros / 1_000_000.0, attrs); + p95Out.record(p95Micros / 1_000_000.0, attrs); + p99Out.record(p99Micros / 1_000_000.0, attrs); + } + } + + private static Histogram newHistogram() { + return new Histogram(HDR_MIN_MICROS, HDR_MAX_MICROS, HDR_SIGNIFICANT_DIGITS); + } + + /** + * One in-flight operation. Call exactly one of the {@code finish} methods. + */ + public static final class Span { + private final Metrics metrics; + private final OperationType type; + private final long startNanos; + private boolean finished; + + private Span(Metrics metrics, OperationType type, long startNanos) { + this.metrics = metrics; + this.type = type; + this.startNanos = startNanos; + } + + public void finishSuccess(int attempts) { + finish(OperationStatus.SUCCESS, attempts, null); + } + + public void finishError(int attempts, String errorKind) { + finish(OperationStatus.ERROR, attempts, errorKind); + } + + private void finish(OperationStatus status, int attempts, String errorKind) { + if (finished) { + return; + } + finished = true; + long latencyMicros = (System.nanoTime() - startNanos) / 1_000L; + metrics.recordOutcome(type, status, attempts, latencyMicros, errorKind); + } + } + +} diff --git a/slo/src/main/java/tech/ydb/slo/kv/KvWorkload.java b/slo/src/main/java/tech/ydb/slo/kv/KvWorkload.java new file mode 100644 index 0000000..d38db63 --- /dev/null +++ b/slo/src/main/java/tech/ydb/slo/kv/KvWorkload.java @@ -0,0 +1,463 @@ +package tech.ydb.slo.kv; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import com.google.common.util.concurrent.RateLimiter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import tech.ydb.common.transaction.TxMode; +import tech.ydb.core.Result; +import tech.ydb.core.Status; +import tech.ydb.query.QueryClient; +import tech.ydb.query.settings.ExecuteQuerySettings; +import tech.ydb.query.tools.QueryReader; +import tech.ydb.query.tools.SessionRetryContext; +import tech.ydb.slo.Metrics; +import tech.ydb.table.query.Params; +import tech.ydb.table.result.ResultSetReader; +import tech.ydb.table.values.PrimitiveValue; + +/** + * Key-value workload for the SLO test. + * + *

The workload creates a partitioned table, prefills it with rows, and then + * runs read and write loops at fixed RPS for the configured duration. Each + * operation is timed and retried via {@link SessionRetryContext}; the outcome + * is recorded into {@link Metrics} so the SLO action can compare current and + * baseline runs. + * + *

Schema and queries mirror the KV workloads in the Go and JavaScript SDKs + * so the produced metrics are directly comparable across SDKs. + * + *

Concurrency model: each operation type (read / write) gets a dedicated + * thread pool sized to the configured RPS. Every worker thread pulls a permit + * from a shared Guava {@link RateLimiter} and executes the operation inline. + * There is no separate driver thread and no work queue, which removes the + * unbounded backlog risk under chaos and keeps the worker count proportional + * to the actual concurrency budget. + */ +public final class KvWorkload { + private static final Logger logger = LoggerFactory.getLogger(KvWorkload.class); + + private static final String CREATE_TABLE_QUERY_TEMPLATE = "" + + "CREATE TABLE IF NOT EXISTS `%s` (" + + " hash Uint64," + + " id Uint64," + + " payload_str Utf8," + + " payload_double Double," + + " payload_timestamp Timestamp," + + " payload_hash Uint64," + + " PRIMARY KEY (hash, id)" + + ") WITH (" + + " UNIFORM_PARTITIONS = %d," + + " AUTO_PARTITIONING_BY_SIZE = ENABLED," + + " AUTO_PARTITIONING_PARTITION_SIZE_MB = %d," + + " AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = %d," + + " AUTO_PARTITIONING_MAX_PARTITIONS_COUNT = %d" + + ")"; + + private static final String DROP_TABLE_QUERY_TEMPLATE = "DROP TABLE `%s`"; + + private static final String WRITE_QUERY_TEMPLATE = "" + + "DECLARE $id AS Uint64;" + + "DECLARE $payload_str AS Utf8;" + + "DECLARE $payload_double AS Double;" + + "DECLARE $payload_timestamp AS Timestamp;" + + "DECLARE $payload_hash AS Uint64;" + + "UPSERT INTO `%s` (" + + " id, hash, payload_str, payload_double, payload_timestamp, payload_hash" + + ") VALUES (" + + " $id," + + " Digest::NumericHash($id)," + + " $payload_str," + + " $payload_double," + + " $payload_timestamp," + + " $payload_hash" + + ");"; + + private static final String READ_QUERY_TEMPLATE = "" + + "DECLARE $id AS Uint64;" + + "SELECT id, payload_str, payload_double, payload_timestamp, payload_hash" + + " FROM `%s`" + + " WHERE id = $id AND hash = Digest::NumericHash($id);"; + + /** + * Hard cap on the number of worker threads spawned for a single operation + * type. The SLO targets a few hundred RPS in CI; allowing more workers + * than this just wastes threads on JIT-warmup contention without + * improving throughput. + */ + private static final int MAX_WORKERS = 64; + + /** + * Extra time, on top of the workload duration, given to worker pools to + * complete their last in-flight operations before {@link #run()} forces + * shutdown. Picked to be larger than the default per-attempt timeout so + * a request that started just before the deadline can finish cleanly. + */ + private static final long SHUTDOWN_GRACE_SECONDS = 30L; + + private final SessionRetryContext retryCtx; + private final Metrics metrics; + private final KvWorkloadParams params; + private final String tablePath; + + private final RowGenerator generator; + + public KvWorkload(QueryClient queryClient, Metrics metrics, KvWorkloadParams params, String tablePath) { + this.retryCtx = SessionRetryContext.create(queryClient).build(); + this.metrics = metrics; + this.params = params; + this.tablePath = tablePath; + this.generator = new RowGenerator(params.prefillCount()); + } + + /** + * Creates the table (if missing) and prefills it with + * {@code params.prefillCount()} rows. Prefill uses a fixed-size thread pool + * so we don't open thousands of sessions in parallel on slow runners. + */ + public void setup() throws InterruptedException { + logger.info("creating table {}", tablePath); + Status createStatus = retryCtx.supplyResult(session -> + session.createQuery( + String.format( + CREATE_TABLE_QUERY_TEMPLATE, + tablePath, + params.minPartitionCount(), + params.partitionSizeMb(), + params.minPartitionCount(), + params.maxPartitionCount() + ), + TxMode.NONE + ).execute() + ).join().getStatus(); + createStatus.expectSuccess("failed to create table " + tablePath); + logger.info("table {} created", tablePath); + + logger.info("prefilling {} rows into {}", params.prefillCount(), tablePath); + int parallelism = Math.min(MAX_WORKERS, Math.max(1, (int) Math.min(params.prefillCount(), MAX_WORKERS))); + ExecutorService prefillPool = Executors.newFixedThreadPool( + parallelism, namedThreadFactory("slo-prefill-") + ); + try { + List> futures = new ArrayList<>(); + for (long i = 0; i < params.prefillCount(); i++) { + final long id = i; + futures.add(CompletableFuture.supplyAsync( + () -> writeRowSilently(generator.generate(id)), + prefillPool + )); + } + + int failed = 0; + for (CompletableFuture f : futures) { + Status s = f.join(); + if (!s.isSuccess()) { + failed++; + if (failed <= 5) { + logger.warn("prefill row failed: {}", s); + } + } + } + if (failed > 0) { + logger.warn("prefill completed with {} failed rows out of {}", failed, params.prefillCount()); + } else { + logger.info("prefill completed"); + } + } finally { + prefillPool.shutdown(); + if (!prefillPool.awaitTermination(30, TimeUnit.SECONDS)) { + prefillPool.shutdownNow(); + } + } + } + + /** + * Runs the workload until the configured deadline or thread interruption. + * + *

Read and write workers run concurrently on dedicated thread pools. + * Each worker pulls a permit from its rate limiter and executes the + * operation inline, so there is no shared work queue and no driver + * thread. Sub-zero RPS disables the corresponding loop entirely (useful + * for write-only or read-only smoke tests). + */ + public void run() throws InterruptedException { + long durationSeconds = params.durationSeconds(); + long endNanos = durationSeconds > 0 + ? System.nanoTime() + TimeUnit.SECONDS.toNanos(durationSeconds) + : Long.MAX_VALUE; + + // Track how many writes have completed so reads target a key-space + // that's actually been populated. The generator itself was + // constructed with nextId = prefillCount, so writes pick up where + // prefill left off. + AtomicLong writesIssued = new AtomicLong(); + + int readWorkers = workerCount(params.readRps()); + int writeWorkers = workerCount(params.writeRps()); + + if (readWorkers == 0 && writeWorkers == 0) { + logger.warn("both read and write RPS are <= 0, run phase has nothing to do"); + return; + } + + ExecutorService readPool = null; + ExecutorService writePool = null; + try { + if (readWorkers > 0) { + readPool = Executors.newFixedThreadPool(readWorkers, namedThreadFactory("slo-read-")); + RateLimiter readLimiter = RateLimiter.create(params.readRps()); + for (int i = 0; i < readWorkers; i++) { + readPool.execute(() -> workerLoop( + endNanos, readLimiter, + () -> readOnce(writesIssued.get()), + "read" + )); + } + } else { + logger.info("read RPS <= 0, skipping read workers"); + } + + if (writeWorkers > 0) { + writePool = Executors.newFixedThreadPool(writeWorkers, namedThreadFactory("slo-write-")); + RateLimiter writeLimiter = RateLimiter.create(params.writeRps()); + for (int i = 0; i < writeWorkers; i++) { + writePool.execute(() -> workerLoop( + endNanos, writeLimiter, + () -> { + writeOnce(generator.generate()); + writesIssued.incrementAndGet(); + }, + "write" + )); + } + } else { + logger.info("write RPS <= 0, skipping write workers"); + } + + // Wait for workers to drain naturally as they hit the deadline. + // shutdown() lets in-flight ops finish; awaitTermination caps the + // wait at duration + grace so the run phase can't hang past the + // configured budget. Workers are stopped via shutdownNow() in + // the finally block if they exceed the grace window. + long graceNanos = TimeUnit.SECONDS.toNanos(SHUTDOWN_GRACE_SECONDS); + long waitNanos = durationSeconds > 0 + ? Math.max(0L, endNanos - System.nanoTime()) + graceNanos + : Long.MAX_VALUE; + + if (readPool != null) { + readPool.shutdown(); + } + if (writePool != null) { + writePool.shutdown(); + } + + long readWaitNanos = waitNanos; + if (readPool != null) { + long started = System.nanoTime(); + if (!readPool.awaitTermination(readWaitNanos, TimeUnit.NANOSECONDS)) { + logger.warn("read pool did not drain within deadline, forcing shutdown"); + readPool.shutdownNow(); + } + waitNanos = Math.max(0L, waitNanos - (System.nanoTime() - started)); + } + if (writePool != null) { + if (!writePool.awaitTermination(waitNanos, TimeUnit.NANOSECONDS)) { + logger.warn("write pool did not drain within deadline, forcing shutdown"); + writePool.shutdownNow(); + } + } + } finally { + forceShutdown(readPool, "read pool"); + forceShutdown(writePool, "write pool"); + } + } + + /** + * Drops the workload table. Called from the {@code finally} block in + * {@code Main} so the database is left clean even on failure. + */ + public void teardown() { + logger.info("dropping table {}", tablePath); + Status status = retryCtx.supplyResult(session -> + session.createQuery( + String.format(DROP_TABLE_QUERY_TEMPLATE, tablePath), + TxMode.NONE + ).execute() + ).join().getStatus(); + if (!status.isSuccess()) { + logger.warn("failed to drop table {}: {}", tablePath, status); + } else { + logger.info("table {} dropped", tablePath); + } + } + + // --- internals --------------------------------------------------------- + + /** + * Loops on a single worker thread until the deadline or interruption, + * pacing each iteration through the shared rate limiter and running the + * operation inline. No work queue is involved — backpressure comes + * naturally from the limiter blocking the worker. + */ + private void workerLoop(long endNanos, RateLimiter limiter, Runnable singleOp, String name) { + while (System.nanoTime() < endNanos && !Thread.currentThread().isInterrupted()) { + limiter.acquire(); + try { + singleOp.run(); + } catch (Throwable t) { + logger.warn("{} op threw unexpectedly: {}", name, t.toString()); + } + } + } + + /** + * Computes the number of worker threads for a given RPS target. + * Returns 0 for non-positive RPS so the caller skips the loop entirely. + */ + private static int workerCount(int rps) { + if (rps <= 0) { + return 0; + } + return Math.min(MAX_WORKERS, Math.max(1, rps)); + } + + /** + * Picks a random id in [0, keyspaceUpper) and reads it back from the table. + * Reads target only ids known to exist (the prefilled range plus rows + * written so far during this run), so a successful read always returns + * a row and exercises the deserialization path. + */ + private void readOnce(long writesObserved) { + long upperBound = Math.max(1L, params.prefillCount() + writesObserved); + long id = ThreadLocalRandom.current().nextLong(upperBound); + + Metrics.Span span = metrics.startOperation(Metrics.OperationType.READ); + AtomicInteger attempts = new AtomicInteger(); + ExecuteQuerySettings settings = ExecuteQuerySettings.newBuilder() + .withRequestTimeout(Duration.ofMillis(params.readTimeoutMs())) + .build(); + + Result result = retryCtx.supplyResult(session -> { + attempts.incrementAndGet(); + return QueryReader.readFrom(session.createQuery( + String.format(READ_QUERY_TEMPLATE, tablePath), + TxMode.SNAPSHOT_RO, + Params.of("$id", PrimitiveValue.newUint64(id)), + settings + )); + }).join(); + + int retryAttempts = Math.max(0, attempts.get() - 1); + + if (!result.getStatus().isSuccess()) { + span.finishError(retryAttempts, classifyStatus(result.getStatus())); + return; + } + + // Touch the result set so we exercise the deserialization path. + // For ids in the prefilled range the row is guaranteed to exist; + // for ids in the just-written range it almost always exists, so + // the absence branch is rare but harmless. + QueryReader reader = result.getValue(); + if (reader.getResultSetCount() > 0) { + ResultSetReader rs = reader.getResultSet(0); + while (rs.next()) { + rs.getColumn("id").getUint64(); + } + } + + span.finishSuccess(retryAttempts); + } + + private void writeOnce(Row row) { + Metrics.Span span = metrics.startOperation(Metrics.OperationType.WRITE); + AtomicInteger attempts = new AtomicInteger(); + + Status status = writeRowInternal(row, attempts); + int retryAttempts = Math.max(0, attempts.get() - 1); + + if (status.isSuccess()) { + span.finishSuccess(retryAttempts); + } else { + span.finishError(retryAttempts, classifyStatus(status)); + logger.debug("write {} failed: {}", row.id(), status); + } + } + + /** + * Writes a single row without recording metrics. Used during prefill so + * the histogram of operation latencies is not polluted with bulk-load + * timings. + */ + private Status writeRowSilently(Row row) { + return writeRowInternal(row, new AtomicInteger()); + } + + private Status writeRowInternal(Row row, AtomicInteger attempts) { + ExecuteQuerySettings settings = ExecuteQuerySettings.newBuilder() + .withRequestTimeout(Duration.ofMillis(params.writeTimeoutMs())) + .build(); + return retryCtx.supplyStatus(session -> { + attempts.incrementAndGet(); + return session.createQuery( + String.format(WRITE_QUERY_TEMPLATE, tablePath), + TxMode.SERIALIZABLE_RW, + Params.of( + "$id", PrimitiveValue.newUint64(row.id()), + "$payload_str", PrimitiveValue.newText(row.payloadStr()), + "$payload_double", PrimitiveValue.newDouble(row.payloadDouble()), + "$payload_timestamp", PrimitiveValue.newTimestamp(row.payloadTimestamp()), + "$payload_hash", PrimitiveValue.newUint64(row.payloadHash()) + ), + settings + ).execute().thenApply(Result::getStatus); + }).join(); + } + + private static String classifyStatus(Status status) { + return "ydb/" + status.getCode().name().toLowerCase(); + } + + private static ThreadFactory namedThreadFactory(String prefix) { + AtomicInteger counter = new AtomicInteger(); + return r -> { + Thread t = new Thread(r, prefix + counter.getAndIncrement()); + t.setDaemon(true); + return t; + }; + } + + /** + * Final cleanup for an executor service. The graceful shutdown is done + * inline in {@link #run()} so deadlines line up with workload duration; + * this method is the safety net invoked from the {@code finally} block, + * forcing shutdown if the pool somehow survived. + */ + private static void forceShutdown(ExecutorService pool, String name) { + if (pool == null || pool.isTerminated()) { + return; + } + logger.warn("{} still active in cleanup, forcing shutdown", name); + pool.shutdownNow(); + try { + if (!pool.awaitTermination(5, TimeUnit.SECONDS)) { + logger.warn("{} did not terminate after shutdownNow", name); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } +} diff --git a/slo/src/main/java/tech/ydb/slo/kv/KvWorkloadParams.java b/slo/src/main/java/tech/ydb/slo/kv/KvWorkloadParams.java new file mode 100644 index 0000000..e6346e4 --- /dev/null +++ b/slo/src/main/java/tech/ydb/slo/kv/KvWorkloadParams.java @@ -0,0 +1,113 @@ +package tech.ydb.slo.kv; + +import com.beust.jcommander.Parameter; + +/** + * Tunable parameters for the KV workload. + * + *

Defaults match the SLO workloads in the Go and JavaScript SDKs so the + * three runs are comparable. JCommander annotations let the operator override + * any field from the command line, e.g. + * {@code --read-rps 500 --write-rps 50}. + */ +public final class KvWorkloadParams { + + @Parameter( + names = {"--read-rps"}, + description = "Target read operations per second" + ) + private int readRps = 1000; + + @Parameter( + names = {"--write-rps"}, + description = "Target write operations per second" + ) + private int writeRps = 100; + + @Parameter( + names = {"--read-timeout-ms"}, + description = "Per-attempt read timeout in milliseconds" + ) + private int readTimeoutMs = 10_000; + + @Parameter( + names = {"--write-timeout-ms"}, + description = "Per-attempt write timeout in milliseconds" + ) + private int writeTimeoutMs = 10_000; + + @Parameter( + names = {"--prefill-count"}, + description = "Number of rows to prefill before the run phase" + ) + private long prefillCount = 1_000L; + + @Parameter( + names = {"--partition-size"}, + description = "Auto-partitioning partition size in MB" + ) + private int partitionSizeMb = 1; + + @Parameter( + names = {"--min-partition-count"}, + description = "Minimum number of table partitions" + ) + private int minPartitionCount = 6; + + @Parameter( + names = {"--max-partition-count"}, + description = "Maximum number of table partitions" + ) + private int maxPartitionCount = 1_000; + + @Parameter( + names = {"--duration"}, + description = "Run duration in seconds (overrides WORKLOAD_DURATION when > 0)" + ) + private int durationSeconds = 0; + + public int readRps() { + return readRps; + } + + public int writeRps() { + return writeRps; + } + + public int readTimeoutMs() { + return readTimeoutMs; + } + + public int writeTimeoutMs() { + return writeTimeoutMs; + } + + public long prefillCount() { + return prefillCount; + } + + public int partitionSizeMb() { + return partitionSizeMb; + } + + public int minPartitionCount() { + return minPartitionCount; + } + + public int maxPartitionCount() { + return maxPartitionCount; + } + + /** + * Effective run duration. If the CLI flag was omitted (left at 0), falls + * back to the value supplied via the {@code WORKLOAD_DURATION} environment + * variable through {@code Config}. + */ + public int durationSeconds() { + return durationSeconds; + } + + public void setDurationSeconds(int durationSeconds) { + this.durationSeconds = durationSeconds; + } +} diff --git a/slo/src/main/java/tech/ydb/slo/kv/Row.java b/slo/src/main/java/tech/ydb/slo/kv/Row.java new file mode 100644 index 0000000..9f1c292 --- /dev/null +++ b/slo/src/main/java/tech/ydb/slo/kv/Row.java @@ -0,0 +1,62 @@ +package tech.ydb.slo.kv; + +import java.time.Instant; + +/** + * A single row of the KV workload table. + * + *

The schema mirrors the one used by SLO workloads in other YDB SDKs + * (Go, JavaScript) so reports across SDKs are comparable: + *

+ * hash              Uint64 (primary key, computed server-side via Digest::NumericHash(id))
+ * id                Uint64 (primary key)
+ * payload_str       Utf8
+ * payload_double    Double
+ * payload_timestamp Timestamp
+ * payload_hash      Uint64
+ * 
+ * + *

The {@code hash} column is computed by YDB at insert time via + * {@code Digest::NumericHash($id)}, so we don't carry it on the client. + */ +public final class Row { + private final long id; + private final String payloadStr; + private final double payloadDouble; + private final Instant payloadTimestamp; + private final long payloadHash; + + public Row( + long id, + String payloadStr, + double payloadDouble, + Instant payloadTimestamp, + long payloadHash + ) { + this.id = id; + this.payloadStr = payloadStr; + this.payloadDouble = payloadDouble; + this.payloadTimestamp = payloadTimestamp; + this.payloadHash = payloadHash; + } + + public long id() { + return id; + } + + public String payloadStr() { + return payloadStr; + } + + public double payloadDouble() { + return payloadDouble; + } + + public Instant payloadTimestamp() { + return payloadTimestamp; + } + + public long payloadHash() { + return payloadHash; + } +} diff --git a/slo/src/main/java/tech/ydb/slo/kv/RowGenerator.java b/slo/src/main/java/tech/ydb/slo/kv/RowGenerator.java new file mode 100644 index 0000000..df8c63e --- /dev/null +++ b/slo/src/main/java/tech/ydb/slo/kv/RowGenerator.java @@ -0,0 +1,57 @@ +package tech.ydb.slo.kv; + +import java.security.SecureRandom; +import java.time.Instant; +import java.util.Base64; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Generates rows for the KV workload. + * + *

Each row gets a monotonically increasing {@code id} and a random payload. + * The {@code hash} column is computed server-side via + * {@code Digest::NumericHash($id)} at insert time, so it is not carried on + * the client. The format mirrors the SLO workloads in the Go and JS SDKs so + * the resulting tables are interchangeable. + */ +public final class RowGenerator { + private static final int MIN_PAYLOAD_LENGTH = 20; + private static final int MAX_PAYLOAD_LENGTH = 40; + + private static final SecureRandom SECURE_RANDOM = new SecureRandom(); + + private final AtomicLong nextId; + + public RowGenerator(long startId) { + this.nextId = new AtomicLong(startId); + } + + /** + * Generates a new row with a fresh monotonically increasing id. + */ + public Row generate() { + long id = nextId.getAndIncrement(); + return generate(id); + } + + /** + * Generates a row with an explicit id (used during prefill to control IDs). + */ + public Row generate(long id) { + long payloadHash = ThreadLocalRandom.current().nextLong(); + double payloadDouble = ThreadLocalRandom.current().nextDouble(); + String payloadStr = randomPayloadString(); + Instant payloadTimestamp = Instant.now(); + + return new Row(id, payloadStr, payloadDouble, payloadTimestamp, payloadHash); + } + + private static String randomPayloadString() { + int length = MIN_PAYLOAD_LENGTH + + ThreadLocalRandom.current().nextInt(MAX_PAYLOAD_LENGTH - MIN_PAYLOAD_LENGTH + 1); + byte[] bytes = new byte[length]; + SECURE_RANDOM.nextBytes(bytes); + return Base64.getEncoder().withoutPadding().encodeToString(bytes); + } +} diff --git a/slo/src/main/resources/log4j2.xml b/slo/src/main/resources/log4j2.xml new file mode 100644 index 0000000..77b24cd --- /dev/null +++ b/slo/src/main/resources/log4j2.xml @@ -0,0 +1,34 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +