Override WORKLOAD_DURATION when > 0
+```
+
+Unknown flags are ignored, so the workload accepts `workload_current_command` strings designed for other SDKs without erroring.
+
+## How CI uses this module
+
+The CI lives in [`ydb-java-sdk/.github/workflows/slo.yml`](https://github.com/ydb-platform/ydb-java-sdk/blob/master/.github/workflows/slo.yml), not here. The flow is:
+
+1. Check out the SDK PR (`current`) and the merge-base SDK commit (`baseline`).
+2. Check out `ydb-java-examples` for the workload sources.
+3. For each version, run `.github/scripts/build-slo-image.sh` from the SDK repo. The script assembles a build context with the SDK and examples checkouts side by side and feeds it to [`slo/Dockerfile`](Dockerfile), which:
+ - Builds the SDK from source and installs it into an in-image local Maven repository.
+ - Pins `ydb.sdk.version` in the examples parent pom to that version.
+ - Builds the `slo` module against the freshly-installed SDK.
+4. Pass the two images (`ydb-app-current`, `ydb-app-baseline`) to `ydb-platform/ydb-slo-action/init@v2`.
+5. After the run, [`ydb-platform/ydb-slo-action/report@v2`](https://github.com/ydb-platform/ydb-slo-action) compares the two and posts a summary to the PR.
+
+The build is fully self-contained — the SDK under test does not need to be published to a remote Maven repository.
+
+## Building locally
+
+The workload can be built standalone against a published SDK version. From the `ydb-java-examples` repository root:
+
+```bash
+mvn -pl slo -am -DskipTests package
+```
+
+The resulting jar is at `slo/target/ydb-slo-workload.jar`. To run it against a local YDB:
+
+```bash
+export YDB_CONNECTION_STRING="grpc://localhost:2136?database=/local"
+export WORKLOAD_REF=local
+export WORKLOAD_NAME=java-query-kv
+export WORKLOAD_DURATION=60
+
+java -jar slo/target/ydb-slo-workload.jar --read-rps 100 --write-rps 10 --prefill-count 100
+```
+
+If `OTEL_EXPORTER_OTLP_ENDPOINT` is not set, metrics are still recorded in-process but never exported — handy for verifying that the workload itself runs cleanly before pushing to CI.
+
+## Files
+
+```
+slo/
+├── Dockerfile Multi-stage build (SDK + workload)
+├── pom.xml Maven module descriptor
+├── README.md This file
+└── src/main/
+ ├── java/tech/ydb/slo/
+ │ ├── Config.java Reads action env vars
+ │ ├── Main.java Entry point
+ │ ├── Metrics.java OTLP metrics + HDR histograms
+ │ └── kv/
+ │ ├── KvWorkload.java Setup/run/teardown loop
+ │ ├── KvWorkloadParams.java JCommander-bound CLI flags
+ │ ├── Row.java Row data class
+ │ └── RowGenerator.java Random payload generator
+ └── resources/
+ └── log4j2.xml Console logging config
+```
diff --git a/slo/pom.xml b/slo/pom.xml
new file mode 100644
index 0000000..8c3f49c
--- /dev/null
+++ b/slo/pom.xml
@@ -0,0 +1,90 @@
+
+
+ 4.0.0
+
+
+ tech.ydb.examples
+ ydb-sdk-examples
+ 1.1.0-SNAPSHOT
+
+
+ ydb-slo-workload
+ YDB SLO workload
+ SLO workload application for testing YDB Java SDK reliability under load and chaos
+
+
+ 1.82
+ 1.59.0
+ 2.2.2
+
+
+
+
+ tech.ydb
+ ydb-sdk-query
+
+
+
+ com.beust
+ jcommander
+ ${jcommander.version}
+
+
+
+ org.hdrhistogram
+ HdrHistogram
+ ${hdrhistogram.version}
+
+
+
+ io.opentelemetry
+ opentelemetry-api
+ ${opentelemetry.version}
+
+
+ io.opentelemetry
+ opentelemetry-sdk
+ ${opentelemetry.version}
+
+
+ io.opentelemetry
+ opentelemetry-sdk-metrics
+ ${opentelemetry.version}
+
+
+ io.opentelemetry
+ opentelemetry-exporter-otlp
+ ${opentelemetry.version}
+
+
+
+ org.apache.logging.log4j
+ log4j-slf4j2-impl
+
+
+
+
+ ydb-slo-workload
+
+
+ org.apache.maven.plugins
+ maven-dependency-plugin
+
+
+ org.apache.maven.plugins
+ maven-jar-plugin
+
+
+
+ true
+ libs/
+ tech.ydb.slo.Main
+
+
+
+
+
+
+
diff --git a/slo/src/main/java/tech/ydb/slo/Config.java b/slo/src/main/java/tech/ydb/slo/Config.java
new file mode 100644
index 0000000..dc7838c
--- /dev/null
+++ b/slo/src/main/java/tech/ydb/slo/Config.java
@@ -0,0 +1,113 @@
+package tech.ydb.slo;
+
+/**
+ * Configuration for the SLO workload, populated from environment variables
+ * provided by the YDB SLO action runtime.
+ *
+ * The action sets these variables on the workload container:
+ *
+ * - {@code YDB_CONNECTION_STRING} or {@code YDB_ENDPOINT} + {@code YDB_DATABASE} — YDB connection
+ * - {@code WORKLOAD_REF} — value used as the {@code ref} label on all metrics
+ * - {@code WORKLOAD_NAME} — workload name (also used as part of the table path)
+ * - {@code WORKLOAD_DURATION} — workload run duration in seconds (0 = unlimited)
+ * - {@code OTEL_EXPORTER_OTLP_ENDPOINT} — OTLP endpoint for pushing metrics
+ *
+ */
+public final class Config {
+ private final String connectionString;
+ private final String ref;
+ private final String workloadName;
+ private final int durationSeconds;
+ private final String otlpEndpoint;
+
+ private Config(
+ String connectionString,
+ String ref,
+ String workloadName,
+ int durationSeconds,
+ String otlpEndpoint
+ ) {
+ this.connectionString = connectionString;
+ this.ref = ref;
+ this.workloadName = workloadName;
+ this.durationSeconds = durationSeconds;
+ this.otlpEndpoint = otlpEndpoint;
+ }
+
+ public String connectionString() {
+ return connectionString;
+ }
+
+ public String ref() {
+ return ref;
+ }
+
+ public String workloadName() {
+ return workloadName;
+ }
+
+ public int durationSeconds() {
+ return durationSeconds;
+ }
+
+ public String otlpEndpoint() {
+ return otlpEndpoint;
+ }
+
+ /**
+ * Loads configuration from environment variables.
+ *
+ * @throws IllegalStateException if required variables are missing or invalid
+ */
+ public static Config fromEnv() {
+ String connectionString = resolveConnectionString();
+ if (connectionString == null || connectionString.isEmpty()) {
+ throw new IllegalStateException(
+ "YDB connection is not configured: set YDB_CONNECTION_STRING or YDB_ENDPOINT + YDB_DATABASE"
+ );
+ }
+
+ String ref = envOrDefault("WORKLOAD_REF", "unknown");
+ String workloadName = envOrDefault("WORKLOAD_NAME", "java-slo-workload");
+ int durationSeconds = parseInt(envOrDefault("WORKLOAD_DURATION", "600"), 600);
+ String otlpEndpoint = envOrDefault("OTEL_EXPORTER_OTLP_ENDPOINT", "");
+
+ return new Config(connectionString, ref, workloadName, durationSeconds, otlpEndpoint);
+ }
+
+ private static String resolveConnectionString() {
+ String cs = System.getenv("YDB_CONNECTION_STRING");
+ if (cs != null && !cs.isEmpty()) {
+ return cs;
+ }
+
+ String endpoint = System.getenv("YDB_ENDPOINT");
+ String database = System.getenv("YDB_DATABASE");
+ if (endpoint == null || endpoint.isEmpty() || database == null || database.isEmpty()) {
+ return null;
+ }
+
+ // Compose connection string in the form expected by GrpcTransport.forConnectionString:
+ // grpc://host:port/database
+ if (endpoint.endsWith("/") && database.startsWith("/")) {
+ return endpoint + database.substring(1);
+ }
+ if (!endpoint.endsWith("/") && !database.startsWith("/")) {
+ return endpoint + "/" + database;
+ }
+ return endpoint + database;
+ }
+
+ private static String envOrDefault(String name, String defaultValue) {
+ String value = System.getenv(name);
+ return (value == null || value.isEmpty()) ? defaultValue : value;
+ }
+
+ private static int parseInt(String value, int defaultValue) {
+ try {
+ return Integer.parseInt(value);
+ } catch (NumberFormatException e) {
+ return defaultValue;
+ }
+ }
+}
diff --git a/slo/src/main/java/tech/ydb/slo/Main.java b/slo/src/main/java/tech/ydb/slo/Main.java
new file mode 100644
index 0000000..ffe40dd
--- /dev/null
+++ b/slo/src/main/java/tech/ydb/slo/Main.java
@@ -0,0 +1,151 @@
+package tech.ydb.slo;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.ParameterException;
+import org.apache.logging.log4j.LogManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import tech.ydb.core.grpc.GrpcTransport;
+import tech.ydb.query.QueryClient;
+import tech.ydb.slo.kv.KvWorkload;
+import tech.ydb.slo.kv.KvWorkloadParams;
+
+/**
+ * Entry point of the SLO workload.
+ *
+ * Reads connection details and run parameters from environment variables
+ * (see {@link Config}), parses workload-specific flags from the command line
+ * (see {@link KvWorkloadParams}), and runs the KV workload phases — setup,
+ * run, teardown — pushing metrics to the OTLP endpoint configured by the YDB
+ * SLO action runtime.
+ *
+ *
Exit codes:
+ *
+ * - {@code 0} — workload completed successfully
+ * - {@code 1} — workload failed (an unhandled exception or interrupted run)
+ * - {@code 2} — invalid CLI arguments
+ *
+ */
+public final class Main {
+ private static final Logger logger = LoggerFactory.getLogger(Main.class);
+
+ private Main() {
+ // utility class
+ }
+
+ public static void main(String[] args) {
+ Config config;
+ try {
+ config = Config.fromEnv();
+ } catch (IllegalStateException e) {
+ logger.error("invalid environment configuration: {}", e.getMessage());
+ System.exit(2);
+ return;
+ }
+
+ KvWorkloadParams params = new KvWorkloadParams();
+ try {
+ JCommander.newBuilder()
+ .programName("ydb-slo-workload")
+ .acceptUnknownOptions(true)
+ .addObject(params)
+ .build()
+ .parse(args);
+ } catch (ParameterException e) {
+ logger.error("invalid CLI arguments: {}", e.getMessage());
+ System.exit(2);
+ return;
+ }
+
+ // CLI duration takes precedence over WORKLOAD_DURATION when supplied.
+ if (params.durationSeconds() <= 0) {
+ params.setDurationSeconds(config.durationSeconds());
+ }
+
+ logger.info("starting SLO workload: name={}, ref={}, duration={}s, readRps={}, writeRps={}",
+ config.workloadName(),
+ config.ref(),
+ params.durationSeconds(),
+ params.readRps(),
+ params.writeRps());
+
+ // The table path embeds workload name and ref so concurrent runs of
+ // the current and baseline images don't step on each other. Both
+ // components are sanitized: WORKLOAD_NAME comes from the action input
+ // and is normally already safe, but we don't trust user input to be
+ // a valid YDB identifier.
+ String tablePath = sanitize(config.workloadName()) + "_" + sanitize(config.ref());
+
+ int exitCode = 0;
+ Metrics metrics = Metrics.create(config);
+ GrpcTransport transport = GrpcTransport.forConnectionString(config.connectionString())
+ .build();
+ QueryClient queryClient = QueryClient.newClient(transport).build();
+
+ KvWorkload workload = new KvWorkload(queryClient, metrics, params, tablePath);
+
+ try {
+ workload.setup();
+ workload.run();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ logger.warn("workload interrupted");
+ exitCode = 1;
+ } catch (Throwable t) {
+ logger.error("workload failed", t);
+ exitCode = 1;
+ } finally {
+ try {
+ workload.teardown();
+ } catch (Throwable t) {
+ logger.warn("teardown failed", t);
+ }
+
+ try {
+ metrics.flush();
+ } catch (Throwable t) {
+ logger.warn("metrics flush failed", t);
+ }
+
+ closeQuietly(metrics, "metrics");
+ closeQuietly(queryClient, "query client");
+ closeQuietly(transport, "transport");
+ }
+
+ // Flush and stop log4j2 before exit so async appenders don't drop
+ // the last messages. Replaces the previous Thread.sleep(100) hack.
+ LogManager.shutdown();
+
+ System.exit(exitCode);
+ }
+
+ private static void closeQuietly(AutoCloseable closeable, String name) {
+ if (closeable == null) {
+ return;
+ }
+ try {
+ closeable.close();
+ } catch (Throwable t) {
+ logger.warn("failed to close {}: {}", name, t.toString());
+ }
+ }
+
+ /**
+ * Replaces characters that aren't valid in YDB table names with underscores.
+ * Refs from CI may include slashes ({@code release/1.2}) or dots, which
+ * the action permits in metrics labels but YDB rejects in table paths.
+ */
+ private static String sanitize(String value) {
+ StringBuilder sb = new StringBuilder(value.length());
+ for (int i = 0; i < value.length(); i++) {
+ char c = value.charAt(i);
+ if (Character.isLetterOrDigit(c) || c == '_') {
+ sb.append(c);
+ } else {
+ sb.append('_');
+ }
+ }
+ return sb.toString();
+ }
+}
diff --git a/slo/src/main/java/tech/ydb/slo/Metrics.java b/slo/src/main/java/tech/ydb/slo/Metrics.java
new file mode 100644
index 0000000..ebb343b
--- /dev/null
+++ b/slo/src/main/java/tech/ydb/slo/Metrics.java
@@ -0,0 +1,384 @@
+package tech.ydb.slo;
+
+import java.time.Duration;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+import io.opentelemetry.api.common.AttributeKey;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.LongCounter;
+import io.opentelemetry.api.metrics.LongUpDownCounter;
+import io.opentelemetry.api.metrics.Meter;
+import io.opentelemetry.api.metrics.ObservableDoubleMeasurement;
+import io.opentelemetry.exporter.otlp.http.metrics.OtlpHttpMetricExporter;
+import io.opentelemetry.sdk.metrics.SdkMeterProvider;
+import io.opentelemetry.sdk.metrics.SdkMeterProviderBuilder;
+import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader;
+import io.opentelemetry.sdk.resources.Resource;
+import org.HdrHistogram.Histogram;
+
+/**
+ * Collects and pushes SLO workload metrics to the OTLP endpoint configured by
+ * the YDB SLO action runtime.
+ *
+ * Metrics emitted (matching the contract from
+ * {@code ydb-platform/ydb-slo-action}):
+ *
+ * - {@code sdk.operations.total} — counter, labeled by
+ * {@code operation_type} and {@code operation_status}
+ * - {@code sdk.errors.total} — counter, labeled by
+ * {@code operation_type} and {@code error_kind}
+ * - {@code sdk.retry.attempts.total} — counter, labeled by
+ * {@code operation_type} and {@code operation_status}
+ * - {@code sdk.pending.operations} — up/down counter, labeled by
+ * {@code operation_type}
+ * - {@code sdk.operation.latency.p50.seconds} /
+ * {@code .p95.seconds} / {@code .p99.seconds} —
+ * observable gauges fed from per-operation HDR histograms
+ *
+ *
+ * Every metric carries the {@code ref} label so the report action can
+ * separate current and baseline series.
+ */
+public final class Metrics implements AutoCloseable {
+
+ public enum OperationType {
+ READ("read"),
+ WRITE("write");
+
+ private final String label;
+
+ OperationType(String label) {
+ this.label = label;
+ }
+
+ public String label() {
+ return label;
+ }
+ }
+
+ public enum OperationStatus {
+ SUCCESS("success"),
+ ERROR("error");
+
+ private final String label;
+
+ OperationStatus(String label) {
+ this.label = label;
+ }
+
+ public String label() {
+ return label;
+ }
+ }
+
+ private static final AttributeKey ATTR_OPERATION_TYPE =
+ AttributeKey.stringKey("operation_type");
+ private static final AttributeKey ATTR_OPERATION_STATUS =
+ AttributeKey.stringKey("operation_status");
+ private static final AttributeKey ATTR_ERROR_KIND =
+ AttributeKey.stringKey("error_kind");
+ private static final AttributeKey ATTR_REF =
+ AttributeKey.stringKey("ref");
+
+ // HDR histograms record latencies in microseconds with high precision up to 60 s.
+ private static final long HDR_MIN_MICROS = 1L;
+ private static final long HDR_MAX_MICROS = 60L * 1_000_000L;
+ private static final int HDR_SIGNIFICANT_DIGITS = 3;
+
+ private final SdkMeterProvider meterProvider;
+ private final String ref;
+ private final LongCounter operationsTotal;
+ private final LongCounter errorsTotal;
+ private final LongCounter retryAttemptsTotal;
+ private final LongUpDownCounter pendingOperations;
+
+ private final Map histograms = new ConcurrentHashMap<>();
+
+ private Metrics(
+ SdkMeterProvider meterProvider,
+ String ref,
+ LongCounter operationsTotal,
+ LongCounter errorsTotal,
+ LongCounter retryAttemptsTotal,
+ LongUpDownCounter pendingOperations
+ ) {
+ this.meterProvider = meterProvider;
+ this.ref = ref;
+ this.operationsTotal = operationsTotal;
+ this.errorsTotal = errorsTotal;
+ this.retryAttemptsTotal = retryAttemptsTotal;
+ this.pendingOperations = pendingOperations;
+ }
+
+ /**
+ * Builds a {@code Metrics} instance configured to push OTLP metrics every
+ * second to the endpoint from {@code config.otlpEndpoint()}. If the
+ * endpoint is empty, all metrics are still observable in-process but never
+ * exported.
+ */
+ public static Metrics create(Config config) {
+ String ref = config.ref();
+
+ Resource resource = Resource.getDefault().toBuilder()
+ .put("service.name", config.workloadName())
+ .put("ref", ref)
+ .put("sdk", "java")
+ .build();
+
+ SdkMeterProviderBuilder providerBuilder = SdkMeterProvider.builder()
+ .setResource(resource);
+
+ if (config.otlpEndpoint() != null && !config.otlpEndpoint().isEmpty()) {
+ OtlpHttpMetricExporter exporter = OtlpHttpMetricExporter.builder()
+ .setEndpoint(metricsEndpoint(config.otlpEndpoint()))
+ .setTimeout(Duration.ofSeconds(10))
+ .build();
+ providerBuilder.registerMetricReader(
+ PeriodicMetricReader.builder(exporter)
+ .setInterval(Duration.ofSeconds(1))
+ .build()
+ );
+ }
+
+ SdkMeterProvider provider = providerBuilder.build();
+ Meter meter = provider.get("slo-workload-" + config.workloadName());
+
+ LongCounter operationsTotal = meter.counterBuilder("sdk.operations.total")
+ .setDescription("Total number of operations")
+ .setUnit("{operation}")
+ .build();
+
+ LongCounter errorsTotal = meter.counterBuilder("sdk.errors.total")
+ .setDescription("Total number of errors")
+ .setUnit("{error}")
+ .build();
+
+ LongCounter retryAttemptsTotal = meter.counterBuilder("sdk.retry.attempts.total")
+ .setDescription("Total number of retry attempts")
+ .setUnit("{attempt}")
+ .build();
+
+ LongUpDownCounter pendingOperations = meter.upDownCounterBuilder("sdk.pending.operations")
+ .setDescription("Currently in-flight operations")
+ .build();
+
+ Map histograms = new ConcurrentHashMap<>();
+
+ // Pre-create one histogram per operation_type so the first export
+ // already produces gauge series. We only track successful operations:
+ // failure latency is dominated by retry budgets / timeouts and would
+ // skew the percentiles without telling us anything useful about SDK
+ // performance. The SLO action's metrics.yaml filters by
+ // operation_status="success" anyway.
+ for (OperationType type : OperationType.values()) {
+ histograms.put(type, newHistogram());
+ }
+
+ // Build the three percentile gauges as raw observers — their values
+ // are produced by a single batch callback below, which reads
+ // p50/p95/p99 from the same histogram snapshot and then resets the
+ // histogram. Reading all three percentiles from one snapshot avoids
+ // races where p99 could be observed against a freshly-reset histogram
+ // populated by p50, and resetting after each export means the gauge
+ // reflects only latencies recorded during the last export interval —
+ // matching the JS SDK's behaviour and avoiding cold-start tail drag
+ // on the JVM (without reset, JIT-warmup outliers stick to p99 for
+ // the rest of the run).
+ ObservableDoubleMeasurement p50Observer = meter.gaugeBuilder("sdk.operation.latency.p50.seconds")
+ .setUnit("s")
+ .setDescription("p50 operation latency in seconds")
+ .buildObserver();
+
+ ObservableDoubleMeasurement p95Observer = meter.gaugeBuilder("sdk.operation.latency.p95.seconds")
+ .setUnit("s")
+ .setDescription("p95 operation latency in seconds")
+ .buildObserver();
+
+ ObservableDoubleMeasurement p99Observer = meter.gaugeBuilder("sdk.operation.latency.p99.seconds")
+ .setUnit("s")
+ .setDescription("p99 operation latency in seconds")
+ .buildObserver();
+
+ meter.batchCallback(
+ () -> observeAndResetPercentiles(histograms, ref, p50Observer, p95Observer, p99Observer),
+ p50Observer, p95Observer, p99Observer
+ );
+
+ Metrics metrics = new Metrics(
+ provider,
+ ref,
+ operationsTotal,
+ errorsTotal,
+ retryAttemptsTotal,
+ pendingOperations
+ );
+ metrics.histograms.putAll(histograms);
+ return metrics;
+ }
+
+ private static String metricsEndpoint(String otlpEndpoint) {
+ // OTLP HTTP exporter expects the full /v1/metrics path. The SLO action
+ // sets OTEL_EXPORTER_OTLP_ENDPOINT to the base URL (e.g.
+ // http://ydb-prometheus:9090/api/v1/otlp), so we append the suffix
+ // unless the user has already provided it.
+ String trimmed = otlpEndpoint.endsWith("/")
+ ? otlpEndpoint.substring(0, otlpEndpoint.length() - 1)
+ : otlpEndpoint;
+ if (trimmed.endsWith("/v1/metrics")) {
+ return trimmed;
+ }
+ return trimmed + "/v1/metrics";
+ }
+
+ /**
+ * Records a started operation and returns a span used to record the
+ * outcome.
+ */
+ public Span startOperation(OperationType type) {
+ pendingOperations.add(1, Attributes.of(
+ ATTR_REF, ref,
+ ATTR_OPERATION_TYPE, type.label()
+ ));
+ return new Span(this, type, System.nanoTime());
+ }
+
+ /**
+ * Forces a final flush of pending metrics. Should be called before exit
+ * to make sure the report action sees the last seconds of data.
+ */
+ public void flush() {
+ meterProvider.forceFlush().join(10, TimeUnit.SECONDS);
+ }
+
+ @Override
+ public void close() {
+ meterProvider.shutdown().join(10, TimeUnit.SECONDS);
+ }
+
+ private void recordOutcome(
+ OperationType type,
+ OperationStatus status,
+ int attempts,
+ long latencyMicros,
+ String errorKind
+ ) {
+ Attributes opAttrs = Attributes.of(
+ ATTR_REF, ref,
+ ATTR_OPERATION_TYPE, type.label(),
+ ATTR_OPERATION_STATUS, status.label()
+ );
+
+ operationsTotal.add(1, opAttrs);
+ retryAttemptsTotal.add(Math.max(0L, attempts), opAttrs);
+ pendingOperations.add(-1, Attributes.of(
+ ATTR_REF, ref,
+ ATTR_OPERATION_TYPE, type.label()
+ ));
+
+ // Latency is recorded only for successful operations. Failed
+ // operations spend most of their time inside the retry budget /
+ // timeout machinery, so their latency reflects the retry policy
+ // rather than the SDK's performance. Mixing those samples into the
+ // percentile gauges produces noisy spikes during chaos scenarios
+ // and tells us nothing actionable.
+ if (status == OperationStatus.SUCCESS) {
+ Histogram histogram = histograms.computeIfAbsent(type, k -> newHistogram());
+ long clamped = Math.max(HDR_MIN_MICROS, Math.min(HDR_MAX_MICROS, latencyMicros));
+ synchronized (histogram) {
+ histogram.recordValue(clamped);
+ }
+ } else {
+ errorsTotal.add(1, Attributes.of(
+ ATTR_REF, ref,
+ ATTR_OPERATION_TYPE, type.label(),
+ ATTR_ERROR_KIND, errorKind == null ? "unknown" : errorKind
+ ));
+ }
+ }
+
+ /**
+ * Observes p50/p95/p99 for every populated histogram in one go and then
+ * resets the histogram. Called from a single OTel batch callback so all
+ * three percentiles are read from a consistent snapshot — without that,
+ * a concurrent record could land between the p50 and p99 reads and
+ * produce inconsistent values across gauges.
+ */
+ private static void observeAndResetPercentiles(
+ Map histograms,
+ String ref,
+ ObservableDoubleMeasurement p50Out,
+ ObservableDoubleMeasurement p95Out,
+ ObservableDoubleMeasurement p99Out
+ ) {
+ for (Map.Entry entry : histograms.entrySet()) {
+ OperationType type = entry.getKey();
+ Histogram histogram = entry.getValue();
+
+ long p50Micros;
+ long p95Micros;
+ long p99Micros;
+ synchronized (histogram) {
+ if (histogram.getTotalCount() == 0) {
+ continue;
+ }
+ p50Micros = histogram.getValueAtPercentile(50.0);
+ p95Micros = histogram.getValueAtPercentile(95.0);
+ p99Micros = histogram.getValueAtPercentile(99.0);
+ histogram.reset();
+ }
+
+ // Percentile gauges are always tagged with operation_status="success"
+ // because we only record successful samples (see recordOutcome).
+ // The SLO action's metrics.yaml filters on this same label, so the
+ // gauges line up with what the report expects.
+ Attributes attrs = Attributes.of(
+ ATTR_REF, ref,
+ ATTR_OPERATION_TYPE, type.label(),
+ ATTR_OPERATION_STATUS, OperationStatus.SUCCESS.label()
+ );
+ p50Out.record(p50Micros / 1_000_000.0, attrs);
+ p95Out.record(p95Micros / 1_000_000.0, attrs);
+ p99Out.record(p99Micros / 1_000_000.0, attrs);
+ }
+ }
+
+ private static Histogram newHistogram() {
+ return new Histogram(HDR_MIN_MICROS, HDR_MAX_MICROS, HDR_SIGNIFICANT_DIGITS);
+ }
+
+ /**
+ * One in-flight operation. Call exactly one of the {@code finish} methods.
+ */
+ public static final class Span {
+ private final Metrics metrics;
+ private final OperationType type;
+ private final long startNanos;
+ private boolean finished;
+
+ private Span(Metrics metrics, OperationType type, long startNanos) {
+ this.metrics = metrics;
+ this.type = type;
+ this.startNanos = startNanos;
+ }
+
+ public void finishSuccess(int attempts) {
+ finish(OperationStatus.SUCCESS, attempts, null);
+ }
+
+ public void finishError(int attempts, String errorKind) {
+ finish(OperationStatus.ERROR, attempts, errorKind);
+ }
+
+ private void finish(OperationStatus status, int attempts, String errorKind) {
+ if (finished) {
+ return;
+ }
+ finished = true;
+ long latencyMicros = (System.nanoTime() - startNanos) / 1_000L;
+ metrics.recordOutcome(type, status, attempts, latencyMicros, errorKind);
+ }
+ }
+
+}
diff --git a/slo/src/main/java/tech/ydb/slo/kv/KvWorkload.java b/slo/src/main/java/tech/ydb/slo/kv/KvWorkload.java
new file mode 100644
index 0000000..d38db63
--- /dev/null
+++ b/slo/src/main/java/tech/ydb/slo/kv/KvWorkload.java
@@ -0,0 +1,463 @@
+package tech.ydb.slo.kv;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.util.concurrent.RateLimiter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import tech.ydb.common.transaction.TxMode;
+import tech.ydb.core.Result;
+import tech.ydb.core.Status;
+import tech.ydb.query.QueryClient;
+import tech.ydb.query.settings.ExecuteQuerySettings;
+import tech.ydb.query.tools.QueryReader;
+import tech.ydb.query.tools.SessionRetryContext;
+import tech.ydb.slo.Metrics;
+import tech.ydb.table.query.Params;
+import tech.ydb.table.result.ResultSetReader;
+import tech.ydb.table.values.PrimitiveValue;
+
+/**
+ * Key-value workload for the SLO test.
+ *
+ * The workload creates a partitioned table, prefills it with rows, and then
+ * runs read and write loops at fixed RPS for the configured duration. Each
+ * operation is timed and retried via {@link SessionRetryContext}; the outcome
+ * is recorded into {@link Metrics} so the SLO action can compare current and
+ * baseline runs.
+ *
+ *
Schema and queries mirror the KV workloads in the Go and JavaScript SDKs
+ * so the produced metrics are directly comparable across SDKs.
+ *
+ *
Concurrency model: each operation type (read / write) gets a dedicated
+ * thread pool sized to the configured RPS. Every worker thread pulls a permit
+ * from a shared Guava {@link RateLimiter} and executes the operation inline.
+ * There is no separate driver thread and no work queue, which removes the
+ * unbounded backlog risk under chaos and keeps the worker count proportional
+ * to the actual concurrency budget.
+ */
+public final class KvWorkload {
+ private static final Logger logger = LoggerFactory.getLogger(KvWorkload.class);
+
+ private static final String CREATE_TABLE_QUERY_TEMPLATE = ""
+ + "CREATE TABLE IF NOT EXISTS `%s` ("
+ + " hash Uint64,"
+ + " id Uint64,"
+ + " payload_str Utf8,"
+ + " payload_double Double,"
+ + " payload_timestamp Timestamp,"
+ + " payload_hash Uint64,"
+ + " PRIMARY KEY (hash, id)"
+ + ") WITH ("
+ + " UNIFORM_PARTITIONS = %d,"
+ + " AUTO_PARTITIONING_BY_SIZE = ENABLED,"
+ + " AUTO_PARTITIONING_PARTITION_SIZE_MB = %d,"
+ + " AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = %d,"
+ + " AUTO_PARTITIONING_MAX_PARTITIONS_COUNT = %d"
+ + ")";
+
+ private static final String DROP_TABLE_QUERY_TEMPLATE = "DROP TABLE `%s`";
+
+ private static final String WRITE_QUERY_TEMPLATE = ""
+ + "DECLARE $id AS Uint64;"
+ + "DECLARE $payload_str AS Utf8;"
+ + "DECLARE $payload_double AS Double;"
+ + "DECLARE $payload_timestamp AS Timestamp;"
+ + "DECLARE $payload_hash AS Uint64;"
+ + "UPSERT INTO `%s` ("
+ + " id, hash, payload_str, payload_double, payload_timestamp, payload_hash"
+ + ") VALUES ("
+ + " $id,"
+ + " Digest::NumericHash($id),"
+ + " $payload_str,"
+ + " $payload_double,"
+ + " $payload_timestamp,"
+ + " $payload_hash"
+ + ");";
+
+ private static final String READ_QUERY_TEMPLATE = ""
+ + "DECLARE $id AS Uint64;"
+ + "SELECT id, payload_str, payload_double, payload_timestamp, payload_hash"
+ + " FROM `%s`"
+ + " WHERE id = $id AND hash = Digest::NumericHash($id);";
+
+ /**
+ * Hard cap on the number of worker threads spawned for a single operation
+ * type. The SLO targets a few hundred RPS in CI; allowing more workers
+ * than this just wastes threads on JIT-warmup contention without
+ * improving throughput.
+ */
+ private static final int MAX_WORKERS = 64;
+
+ /**
+ * Extra time, on top of the workload duration, given to worker pools to
+ * complete their last in-flight operations before {@link #run()} forces
+ * shutdown. Picked to be larger than the default per-attempt timeout so
+ * a request that started just before the deadline can finish cleanly.
+ */
+ private static final long SHUTDOWN_GRACE_SECONDS = 30L;
+
+ private final SessionRetryContext retryCtx;
+ private final Metrics metrics;
+ private final KvWorkloadParams params;
+ private final String tablePath;
+
+ private final RowGenerator generator;
+
+ public KvWorkload(QueryClient queryClient, Metrics metrics, KvWorkloadParams params, String tablePath) {
+ this.retryCtx = SessionRetryContext.create(queryClient).build();
+ this.metrics = metrics;
+ this.params = params;
+ this.tablePath = tablePath;
+ this.generator = new RowGenerator(params.prefillCount());
+ }
+
+ /**
+ * Creates the table (if missing) and prefills it with
+ * {@code params.prefillCount()} rows. Prefill uses a fixed-size thread pool
+ * so we don't open thousands of sessions in parallel on slow runners.
+ */
+ public void setup() throws InterruptedException {
+ logger.info("creating table {}", tablePath);
+ Status createStatus = retryCtx.supplyResult(session ->
+ session.createQuery(
+ String.format(
+ CREATE_TABLE_QUERY_TEMPLATE,
+ tablePath,
+ params.minPartitionCount(),
+ params.partitionSizeMb(),
+ params.minPartitionCount(),
+ params.maxPartitionCount()
+ ),
+ TxMode.NONE
+ ).execute()
+ ).join().getStatus();
+ createStatus.expectSuccess("failed to create table " + tablePath);
+ logger.info("table {} created", tablePath);
+
+ logger.info("prefilling {} rows into {}", params.prefillCount(), tablePath);
+ int parallelism = Math.min(MAX_WORKERS, Math.max(1, (int) Math.min(params.prefillCount(), MAX_WORKERS)));
+ ExecutorService prefillPool = Executors.newFixedThreadPool(
+ parallelism, namedThreadFactory("slo-prefill-")
+ );
+ try {
+ List> futures = new ArrayList<>();
+ for (long i = 0; i < params.prefillCount(); i++) {
+ final long id = i;
+ futures.add(CompletableFuture.supplyAsync(
+ () -> writeRowSilently(generator.generate(id)),
+ prefillPool
+ ));
+ }
+
+ int failed = 0;
+ for (CompletableFuture f : futures) {
+ Status s = f.join();
+ if (!s.isSuccess()) {
+ failed++;
+ if (failed <= 5) {
+ logger.warn("prefill row failed: {}", s);
+ }
+ }
+ }
+ if (failed > 0) {
+ logger.warn("prefill completed with {} failed rows out of {}", failed, params.prefillCount());
+ } else {
+ logger.info("prefill completed");
+ }
+ } finally {
+ prefillPool.shutdown();
+ if (!prefillPool.awaitTermination(30, TimeUnit.SECONDS)) {
+ prefillPool.shutdownNow();
+ }
+ }
+ }
+
+ /**
+ * Runs the workload until the configured deadline or thread interruption.
+ *
+ * Read and write workers run concurrently on dedicated thread pools.
+ * Each worker pulls a permit from its rate limiter and executes the
+ * operation inline, so there is no shared work queue and no driver
+ * thread. Sub-zero RPS disables the corresponding loop entirely (useful
+ * for write-only or read-only smoke tests).
+ */
+ public void run() throws InterruptedException {
+ long durationSeconds = params.durationSeconds();
+ long endNanos = durationSeconds > 0
+ ? System.nanoTime() + TimeUnit.SECONDS.toNanos(durationSeconds)
+ : Long.MAX_VALUE;
+
+ // Track how many writes have completed so reads target a key-space
+ // that's actually been populated. The generator itself was
+ // constructed with nextId = prefillCount, so writes pick up where
+ // prefill left off.
+ AtomicLong writesIssued = new AtomicLong();
+
+ int readWorkers = workerCount(params.readRps());
+ int writeWorkers = workerCount(params.writeRps());
+
+ if (readWorkers == 0 && writeWorkers == 0) {
+ logger.warn("both read and write RPS are <= 0, run phase has nothing to do");
+ return;
+ }
+
+ ExecutorService readPool = null;
+ ExecutorService writePool = null;
+ try {
+ if (readWorkers > 0) {
+ readPool = Executors.newFixedThreadPool(readWorkers, namedThreadFactory("slo-read-"));
+ RateLimiter readLimiter = RateLimiter.create(params.readRps());
+ for (int i = 0; i < readWorkers; i++) {
+ readPool.execute(() -> workerLoop(
+ endNanos, readLimiter,
+ () -> readOnce(writesIssued.get()),
+ "read"
+ ));
+ }
+ } else {
+ logger.info("read RPS <= 0, skipping read workers");
+ }
+
+ if (writeWorkers > 0) {
+ writePool = Executors.newFixedThreadPool(writeWorkers, namedThreadFactory("slo-write-"));
+ RateLimiter writeLimiter = RateLimiter.create(params.writeRps());
+ for (int i = 0; i < writeWorkers; i++) {
+ writePool.execute(() -> workerLoop(
+ endNanos, writeLimiter,
+ () -> {
+ writeOnce(generator.generate());
+ writesIssued.incrementAndGet();
+ },
+ "write"
+ ));
+ }
+ } else {
+ logger.info("write RPS <= 0, skipping write workers");
+ }
+
+ // Wait for workers to drain naturally as they hit the deadline.
+ // shutdown() lets in-flight ops finish; awaitTermination caps the
+ // wait at duration + grace so the run phase can't hang past the
+ // configured budget. Workers are stopped via shutdownNow() in
+ // the finally block if they exceed the grace window.
+ long graceNanos = TimeUnit.SECONDS.toNanos(SHUTDOWN_GRACE_SECONDS);
+ long waitNanos = durationSeconds > 0
+ ? Math.max(0L, endNanos - System.nanoTime()) + graceNanos
+ : Long.MAX_VALUE;
+
+ if (readPool != null) {
+ readPool.shutdown();
+ }
+ if (writePool != null) {
+ writePool.shutdown();
+ }
+
+ long readWaitNanos = waitNanos;
+ if (readPool != null) {
+ long started = System.nanoTime();
+ if (!readPool.awaitTermination(readWaitNanos, TimeUnit.NANOSECONDS)) {
+ logger.warn("read pool did not drain within deadline, forcing shutdown");
+ readPool.shutdownNow();
+ }
+ waitNanos = Math.max(0L, waitNanos - (System.nanoTime() - started));
+ }
+ if (writePool != null) {
+ if (!writePool.awaitTermination(waitNanos, TimeUnit.NANOSECONDS)) {
+ logger.warn("write pool did not drain within deadline, forcing shutdown");
+ writePool.shutdownNow();
+ }
+ }
+ } finally {
+ forceShutdown(readPool, "read pool");
+ forceShutdown(writePool, "write pool");
+ }
+ }
+
+ /**
+ * Drops the workload table. Called from the {@code finally} block in
+ * {@code Main} so the database is left clean even on failure.
+ */
+ public void teardown() {
+ logger.info("dropping table {}", tablePath);
+ Status status = retryCtx.supplyResult(session ->
+ session.createQuery(
+ String.format(DROP_TABLE_QUERY_TEMPLATE, tablePath),
+ TxMode.NONE
+ ).execute()
+ ).join().getStatus();
+ if (!status.isSuccess()) {
+ logger.warn("failed to drop table {}: {}", tablePath, status);
+ } else {
+ logger.info("table {} dropped", tablePath);
+ }
+ }
+
+ // --- internals ---------------------------------------------------------
+
+ /**
+ * Loops on a single worker thread until the deadline or interruption,
+ * pacing each iteration through the shared rate limiter and running the
+ * operation inline. No work queue is involved — backpressure comes
+ * naturally from the limiter blocking the worker.
+ */
+ private void workerLoop(long endNanos, RateLimiter limiter, Runnable singleOp, String name) {
+ while (System.nanoTime() < endNanos && !Thread.currentThread().isInterrupted()) {
+ limiter.acquire();
+ try {
+ singleOp.run();
+ } catch (Throwable t) {
+ logger.warn("{} op threw unexpectedly: {}", name, t.toString());
+ }
+ }
+ }
+
+ /**
+ * Computes the number of worker threads for a given RPS target.
+ * Returns 0 for non-positive RPS so the caller skips the loop entirely.
+ */
+ private static int workerCount(int rps) {
+ if (rps <= 0) {
+ return 0;
+ }
+ return Math.min(MAX_WORKERS, Math.max(1, rps));
+ }
+
+ /**
+ * Picks a random id in [0, keyspaceUpper) and reads it back from the table.
+ * Reads target only ids known to exist (the prefilled range plus rows
+ * written so far during this run), so a successful read always returns
+ * a row and exercises the deserialization path.
+ */
+ private void readOnce(long writesObserved) {
+ long upperBound = Math.max(1L, params.prefillCount() + writesObserved);
+ long id = ThreadLocalRandom.current().nextLong(upperBound);
+
+ Metrics.Span span = metrics.startOperation(Metrics.OperationType.READ);
+ AtomicInteger attempts = new AtomicInteger();
+ ExecuteQuerySettings settings = ExecuteQuerySettings.newBuilder()
+ .withRequestTimeout(Duration.ofMillis(params.readTimeoutMs()))
+ .build();
+
+ Result result = retryCtx.supplyResult(session -> {
+ attempts.incrementAndGet();
+ return QueryReader.readFrom(session.createQuery(
+ String.format(READ_QUERY_TEMPLATE, tablePath),
+ TxMode.SNAPSHOT_RO,
+ Params.of("$id", PrimitiveValue.newUint64(id)),
+ settings
+ ));
+ }).join();
+
+ int retryAttempts = Math.max(0, attempts.get() - 1);
+
+ if (!result.getStatus().isSuccess()) {
+ span.finishError(retryAttempts, classifyStatus(result.getStatus()));
+ return;
+ }
+
+ // Touch the result set so we exercise the deserialization path.
+ // For ids in the prefilled range the row is guaranteed to exist;
+ // for ids in the just-written range it almost always exists, so
+ // the absence branch is rare but harmless.
+ QueryReader reader = result.getValue();
+ if (reader.getResultSetCount() > 0) {
+ ResultSetReader rs = reader.getResultSet(0);
+ while (rs.next()) {
+ rs.getColumn("id").getUint64();
+ }
+ }
+
+ span.finishSuccess(retryAttempts);
+ }
+
+ private void writeOnce(Row row) {
+ Metrics.Span span = metrics.startOperation(Metrics.OperationType.WRITE);
+ AtomicInteger attempts = new AtomicInteger();
+
+ Status status = writeRowInternal(row, attempts);
+ int retryAttempts = Math.max(0, attempts.get() - 1);
+
+ if (status.isSuccess()) {
+ span.finishSuccess(retryAttempts);
+ } else {
+ span.finishError(retryAttempts, classifyStatus(status));
+ logger.debug("write {} failed: {}", row.id(), status);
+ }
+ }
+
+ /**
+ * Writes a single row without recording metrics. Used during prefill so
+ * the histogram of operation latencies is not polluted with bulk-load
+ * timings.
+ */
+ private Status writeRowSilently(Row row) {
+ return writeRowInternal(row, new AtomicInteger());
+ }
+
+ private Status writeRowInternal(Row row, AtomicInteger attempts) {
+ ExecuteQuerySettings settings = ExecuteQuerySettings.newBuilder()
+ .withRequestTimeout(Duration.ofMillis(params.writeTimeoutMs()))
+ .build();
+ return retryCtx.supplyStatus(session -> {
+ attempts.incrementAndGet();
+ return session.createQuery(
+ String.format(WRITE_QUERY_TEMPLATE, tablePath),
+ TxMode.SERIALIZABLE_RW,
+ Params.of(
+ "$id", PrimitiveValue.newUint64(row.id()),
+ "$payload_str", PrimitiveValue.newText(row.payloadStr()),
+ "$payload_double", PrimitiveValue.newDouble(row.payloadDouble()),
+ "$payload_timestamp", PrimitiveValue.newTimestamp(row.payloadTimestamp()),
+ "$payload_hash", PrimitiveValue.newUint64(row.payloadHash())
+ ),
+ settings
+ ).execute().thenApply(Result::getStatus);
+ }).join();
+ }
+
+ private static String classifyStatus(Status status) {
+ return "ydb/" + status.getCode().name().toLowerCase();
+ }
+
+ private static ThreadFactory namedThreadFactory(String prefix) {
+ AtomicInteger counter = new AtomicInteger();
+ return r -> {
+ Thread t = new Thread(r, prefix + counter.getAndIncrement());
+ t.setDaemon(true);
+ return t;
+ };
+ }
+
+ /**
+ * Final cleanup for an executor service. The graceful shutdown is done
+ * inline in {@link #run()} so deadlines line up with workload duration;
+ * this method is the safety net invoked from the {@code finally} block,
+ * forcing shutdown if the pool somehow survived.
+ */
+ private static void forceShutdown(ExecutorService pool, String name) {
+ if (pool == null || pool.isTerminated()) {
+ return;
+ }
+ logger.warn("{} still active in cleanup, forcing shutdown", name);
+ pool.shutdownNow();
+ try {
+ if (!pool.awaitTermination(5, TimeUnit.SECONDS)) {
+ logger.warn("{} did not terminate after shutdownNow", name);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+}
diff --git a/slo/src/main/java/tech/ydb/slo/kv/KvWorkloadParams.java b/slo/src/main/java/tech/ydb/slo/kv/KvWorkloadParams.java
new file mode 100644
index 0000000..e6346e4
--- /dev/null
+++ b/slo/src/main/java/tech/ydb/slo/kv/KvWorkloadParams.java
@@ -0,0 +1,113 @@
+package tech.ydb.slo.kv;
+
+import com.beust.jcommander.Parameter;
+
+/**
+ * Tunable parameters for the KV workload.
+ *
+ * Defaults match the SLO workloads in the Go and JavaScript SDKs so the
+ * three runs are comparable. JCommander annotations let the operator override
+ * any field from the command line, e.g.
+ * {@code --read-rps 500 --write-rps 50}.
+ */
+public final class KvWorkloadParams {
+
+ @Parameter(
+ names = {"--read-rps"},
+ description = "Target read operations per second"
+ )
+ private int readRps = 1000;
+
+ @Parameter(
+ names = {"--write-rps"},
+ description = "Target write operations per second"
+ )
+ private int writeRps = 100;
+
+ @Parameter(
+ names = {"--read-timeout-ms"},
+ description = "Per-attempt read timeout in milliseconds"
+ )
+ private int readTimeoutMs = 10_000;
+
+ @Parameter(
+ names = {"--write-timeout-ms"},
+ description = "Per-attempt write timeout in milliseconds"
+ )
+ private int writeTimeoutMs = 10_000;
+
+ @Parameter(
+ names = {"--prefill-count"},
+ description = "Number of rows to prefill before the run phase"
+ )
+ private long prefillCount = 1_000L;
+
+ @Parameter(
+ names = {"--partition-size"},
+ description = "Auto-partitioning partition size in MB"
+ )
+ private int partitionSizeMb = 1;
+
+ @Parameter(
+ names = {"--min-partition-count"},
+ description = "Minimum number of table partitions"
+ )
+ private int minPartitionCount = 6;
+
+ @Parameter(
+ names = {"--max-partition-count"},
+ description = "Maximum number of table partitions"
+ )
+ private int maxPartitionCount = 1_000;
+
+ @Parameter(
+ names = {"--duration"},
+ description = "Run duration in seconds (overrides WORKLOAD_DURATION when > 0)"
+ )
+ private int durationSeconds = 0;
+
+ public int readRps() {
+ return readRps;
+ }
+
+ public int writeRps() {
+ return writeRps;
+ }
+
+ public int readTimeoutMs() {
+ return readTimeoutMs;
+ }
+
+ public int writeTimeoutMs() {
+ return writeTimeoutMs;
+ }
+
+ public long prefillCount() {
+ return prefillCount;
+ }
+
+ public int partitionSizeMb() {
+ return partitionSizeMb;
+ }
+
+ public int minPartitionCount() {
+ return minPartitionCount;
+ }
+
+ public int maxPartitionCount() {
+ return maxPartitionCount;
+ }
+
+ /**
+ * Effective run duration. If the CLI flag was omitted (left at 0), falls
+ * back to the value supplied via the {@code WORKLOAD_DURATION} environment
+ * variable through {@code Config}.
+ */
+ public int durationSeconds() {
+ return durationSeconds;
+ }
+
+ public void setDurationSeconds(int durationSeconds) {
+ this.durationSeconds = durationSeconds;
+ }
+}
diff --git a/slo/src/main/java/tech/ydb/slo/kv/Row.java b/slo/src/main/java/tech/ydb/slo/kv/Row.java
new file mode 100644
index 0000000..9f1c292
--- /dev/null
+++ b/slo/src/main/java/tech/ydb/slo/kv/Row.java
@@ -0,0 +1,62 @@
+package tech.ydb.slo.kv;
+
+import java.time.Instant;
+
+/**
+ * A single row of the KV workload table.
+ *
+ *
The schema mirrors the one used by SLO workloads in other YDB SDKs
+ * (Go, JavaScript) so reports across SDKs are comparable:
+ *
+ * hash Uint64 (primary key, computed server-side via Digest::NumericHash(id))
+ * id Uint64 (primary key)
+ * payload_str Utf8
+ * payload_double Double
+ * payload_timestamp Timestamp
+ * payload_hash Uint64
+ *
+ *
+ * The {@code hash} column is computed by YDB at insert time via
+ * {@code Digest::NumericHash($id)}, so we don't carry it on the client.
+ */
+public final class Row {
+ private final long id;
+ private final String payloadStr;
+ private final double payloadDouble;
+ private final Instant payloadTimestamp;
+ private final long payloadHash;
+
+ public Row(
+ long id,
+ String payloadStr,
+ double payloadDouble,
+ Instant payloadTimestamp,
+ long payloadHash
+ ) {
+ this.id = id;
+ this.payloadStr = payloadStr;
+ this.payloadDouble = payloadDouble;
+ this.payloadTimestamp = payloadTimestamp;
+ this.payloadHash = payloadHash;
+ }
+
+ public long id() {
+ return id;
+ }
+
+ public String payloadStr() {
+ return payloadStr;
+ }
+
+ public double payloadDouble() {
+ return payloadDouble;
+ }
+
+ public Instant payloadTimestamp() {
+ return payloadTimestamp;
+ }
+
+ public long payloadHash() {
+ return payloadHash;
+ }
+}
diff --git a/slo/src/main/java/tech/ydb/slo/kv/RowGenerator.java b/slo/src/main/java/tech/ydb/slo/kv/RowGenerator.java
new file mode 100644
index 0000000..df8c63e
--- /dev/null
+++ b/slo/src/main/java/tech/ydb/slo/kv/RowGenerator.java
@@ -0,0 +1,57 @@
+package tech.ydb.slo.kv;
+
+import java.security.SecureRandom;
+import java.time.Instant;
+import java.util.Base64;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Generates rows for the KV workload.
+ *
+ *
Each row gets a monotonically increasing {@code id} and a random payload.
+ * The {@code hash} column is computed server-side via
+ * {@code Digest::NumericHash($id)} at insert time, so it is not carried on
+ * the client. The format mirrors the SLO workloads in the Go and JS SDKs so
+ * the resulting tables are interchangeable.
+ */
+public final class RowGenerator {
+ private static final int MIN_PAYLOAD_LENGTH = 20;
+ private static final int MAX_PAYLOAD_LENGTH = 40;
+
+ private static final SecureRandom SECURE_RANDOM = new SecureRandom();
+
+ private final AtomicLong nextId;
+
+ public RowGenerator(long startId) {
+ this.nextId = new AtomicLong(startId);
+ }
+
+ /**
+ * Generates a new row with a fresh monotonically increasing id.
+ */
+ public Row generate() {
+ long id = nextId.getAndIncrement();
+ return generate(id);
+ }
+
+ /**
+ * Generates a row with an explicit id (used during prefill to control IDs).
+ */
+ public Row generate(long id) {
+ long payloadHash = ThreadLocalRandom.current().nextLong();
+ double payloadDouble = ThreadLocalRandom.current().nextDouble();
+ String payloadStr = randomPayloadString();
+ Instant payloadTimestamp = Instant.now();
+
+ return new Row(id, payloadStr, payloadDouble, payloadTimestamp, payloadHash);
+ }
+
+ private static String randomPayloadString() {
+ int length = MIN_PAYLOAD_LENGTH
+ + ThreadLocalRandom.current().nextInt(MAX_PAYLOAD_LENGTH - MIN_PAYLOAD_LENGTH + 1);
+ byte[] bytes = new byte[length];
+ SECURE_RANDOM.nextBytes(bytes);
+ return Base64.getEncoder().withoutPadding().encodeToString(bytes);
+ }
+}
diff --git a/slo/src/main/resources/log4j2.xml b/slo/src/main/resources/log4j2.xml
new file mode 100644
index 0000000..77b24cd
--- /dev/null
+++ b/slo/src/main/resources/log4j2.xml
@@ -0,0 +1,34 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+