diff --git a/.github/workflows/maven.yaml b/.github/workflows/maven.yaml index 4eacd65b846..aaf165420c3 100644 --- a/.github/workflows/maven.yaml +++ b/.github/workflows/maven.yaml @@ -27,7 +27,13 @@ jobs: distribution: "corretto" cache: "maven" - name: Build with Maven - run: mvn -B package --file pom.xml + shell: bash + run: | + JACOCO_FLAG="" + if [ "${{ matrix.os }}" != "ubuntu-latest" ]; then + JACOCO_FLAG="-Djacoco.skip=true" + fi + mvn -B package --file pom.xml $JACOCO_FLAG - name: Upload Auth JVM crash logs if: failure() diff --git a/common/pom.xml b/common/pom.xml index eec71184143..b931afcb89c 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -113,8 +113,8 @@ rocketmq-logback-classic - org.apache.rocketmq - rocketmq-rocksdb + org.rocksdb + rocksdbjni diff --git a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java index bc4a18006f8..4875ce43e22 100644 --- a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java +++ b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java @@ -427,22 +427,35 @@ protected void manualCompactionDefaultCfRange(CompactRangeOptions compactRangeOp if (!hold()) { return; } - long s1 = System.currentTimeMillis(); + long before = getEstimateNumKeys(); + long startMs = System.currentTimeMillis(); boolean result = true; try { - LOGGER.info("manualCompaction Start. {}", this.dbPath); + LOGGER.info("ManualCompaction started, dbPath={}, estimateNumKeys={}", this.dbPath, before); this.db.compactRange(this.defaultCFHandle, null, null, compactRangeOptions); } catch (RocksDBException e) { result = false; scheduleReloadRocksdb(e); - LOGGER.error("manualCompaction Failed. {}, {}", this.dbPath, getStatusError(e)); + LOGGER.error("ManualCompaction failed, dbPath={}, error={}", this.dbPath, getStatusError(e)); } finally { release(); - LOGGER.info("manualCompaction End. {}, rt: {}(ms), result: {}", this.dbPath, System.currentTimeMillis() - s1, result); + long after = getEstimateNumKeys(); + long elapsed = System.currentTimeMillis() - startMs; + String ratio = before > 0 ? String.format("%.1f", (1.0 - (double) after / before) * 100) : "0.0"; + LOGGER.info("ManualCompaction finished, dbPath={}, elapsed={}ms, success={}, before={}, after={}, reduced={}%", + this.dbPath, elapsed, result, before, after, ratio); } } - protected void manualCompaction(long minPhyOffset, final CompactRangeOptions compactRangeOptions) { + private long getEstimateNumKeys() { + try { + return this.db.getLongProperty(this.defaultCFHandle, "rocksdb.estimate-num-keys"); + } catch (RocksDBException e) { + return -1L; + } + } + + protected void manualCompaction(final CompactRangeOptions compactRangeOptions) { this.manualCompactionThread.submit(new Runnable() { @Override public void run() { diff --git a/docs/en/Native_RocksDB.md b/docs/en/Native_RocksDB.md new file mode 100644 index 00000000000..f0d43a8137a --- /dev/null +++ b/docs/en/Native_RocksDB.md @@ -0,0 +1,290 @@ +# Native RocksDB ConsumeQueue Compaction Filter + +## Background + +RocketMQ previously depended on a custom-forked RocksDB Java binding published as `org.apache.rocketmq:rocketmq-rocksdb:1.0.6`. This fork was maintained in the `apache/rocketmq-externals` repository and was essentially a republished copy of `org.rocksdb:rocksdbjni` with exactly **one** additional class: + +- `org.rocksdb.RemoveConsumeQueueCompactionFilter` — a RocksDB compaction filter that removes stale consume queue entries during compaction. Its C++ implementation and JNI glue lived in the forked C++ source tree under `utilities/compaction_filters/remove_consumequeue_compactionfilter.*` and `java/rocksjni/remove_consumequeue_compactionfilterjni.cc`. + +All other RocketMQ subsystems using RocksDB (Pop consumption state, config storage, index storage, timer storage, transaction half-message storage) used only standard RocksDB Java APIs and had no dependency on the fork's custom code. + +## Problem + +Maintaining a fork of RocksDB's Java bindings has several costs: + +1. **Upgrade friction** — every RocksDB upstream release requires rebuilding the entire fork to pick up the new native library and Java API +2. **Native build complexity** — the fork bundles a full C++ build pipeline for multiple platforms (Linux glibc/musl, macOS, Windows) +3. **Dependency duplication** — the `rocksdb/` module in the RocketMQ source tree duplicates ~190 classes that are identical to upstream `rocksdbjni` +4. **License ambiguity** — the fork republishes Facebook's RocksDB code under the Apache group + +## Solution + +Replace `rocketmq-rocksdb` with the official `org.rocksdb:rocksdbjni:8.4.4` and move the single custom compaction filter into a standalone native shim. + +### Why a native shim is needed + +The official `rocksdbjni` provides a `ColumnFamilyOptions.setCompactionFilter(AbstractCompactionFilter)` method, but its `AbstractCompactionFilter` Java class requires a native handle (raw `rocksdb::CompactionFilter*` pointer) passed to its constructor. The Java `filter()` method callback goes through a C++ trampoline that RocksDB's JNI layer manages internally — you can only subclass it from within the same JNI compilation unit. + +To implement a custom compaction filter outside the `rocksdbjni` build, we create a standalone C++ shared library that: +- Directly subclasses `rocksdb::CompactionFilter` in C++ +- Exposes JNI methods to create filter instances and update the `minPhyOffset` threshold +- Returns the raw `CompactionFilter*` pointer as a `jlong` to Java + +### Architecture + +``` +┌──────────────────────────────────────────────────────┐ +│ ConsumeQueueRocksDBStorage (Java) │ +│ - CqCompactionFilterJni.createAndSetFilter(cqCfOpts)│ +│ - CqCompactionFilterJni.setMinPhyOffset(offset) │ +└──────────────────┬───────────────────────────────────┘ + │ + ▼ +┌──────────────────────────────────────────────────────┐ +│ CqCompactionFilterJni.java │ +│ - Extracts libcq_compaction_filter.so to the same │ +│ temp dir as the already-loaded rocksdbjni .so │ +│ - Uses NativeCqCompactionFilter wrapper with │ +│ disOwnNativeHandle() + public setCompactionFilter │ +│ - Calls native createNativeFilter0() → raw pointer │ +└──────────────────┬───────────────────────────────────┘ + │ + ▼ +┌──────────────────────────────────────────────────────┐ +│ libcq_compaction_filter.so (native shim) │ +│ │ +│ class CqCompactionFilter │ +│ : public rocksdb::CompactionFilter { ... } │ +│ │ +│ JNI: createNativeFilter0() → new CqCompactionFilter │ +│ JNI: setMinPhyOffset0(ptr, offset) │ +│ │ +│ NEEDED: librocksdbjni-linux64.so ($ORIGIN RPATH) │ +└──────────────────┬───────────────────────────────────┘ + │ + ▼ +┌──────────────────────────────────────────────────────┐ +│ librocksdbjni-linux64.so (official rocksdbjni) │ +│ - All RocksDB C++ classes (CompactionFilter, etc.) │ +│ - JNI glue for all Java↔C++ bindings │ +│ - Compiled with -fno-rtti -D_GLIBCXX_USE_CXX11_ABI=0│ +└──────────────────────────────────────────────────────┘ +``` + +### Key design decisions + +**1. Direct C++ subclassing with explicit linking** + +The shim directly subclasses `rocksdb::CompactionFilter` in C++ and is compiled with matching ABI flags (`-fno-rtti -D_GLIBCXX_USE_CXX11_ABI=0`) to match how `librocksdbjni` was built. It is explicitly linked against `librocksdbjni-linux64.so` (extracted from the `rocksdbjni` jar) with `$ORIGIN` RPATH so the dynamic linker resolves symbols from the same directory. + +This replaced an earlier dlopen/RTLD_GLOBAL approach that caused C++ `double free` crashes — loading the same `.so` twice (once via JVM's `RTLD_LOCAL` and once via `RTLD_GLOBAL`) creates conflicting C++ global state (memory allocators, static singletons, vtables). + +**2. Raw pointer as jlong, wrapped with disOwnNativeHandle()** + +The native shim creates `new CqCompactionFilter()` and returns the raw C++ pointer as a `jlong`. A thin Java wrapper `NativeCqCompactionFilter extends AbstractCompactionFilter` passes this pointer to the protected `AbstractCompactionFilter(long)` constructor, then calls `disOwnNativeHandle()` so that `close()` does not free the native memory. This is critical because `AbstractRocksDBStorage.shutdown()` closes `ColumnFamilyOptions` (step 2) before closing the DB (step 4) — without `disOwnNativeHandle()`, background compaction threads would access a freed filter. The filter is then set via the public `ColumnFamilyOptions.setCompactionFilter()` API, avoiding reflection and ensuring JDK 17+ compatibility. + +**3. Shared temp directory for .so resolution** + +At runtime, `CqCompactionFilterJni` loads `librocksdbjni-linux64.so` from the rocksdbjni JAR first (via `System.loadLibrary` or extraction to a temp dir), then extracts `libcq_compaction_filter.so` to the same temp directory. This ensures the `$ORIGIN` RPATH in the shim correctly resolves its `NEEDED` dependency on `librocksdbjni-linux64.so`. The rocksdbjni native library is NOT bundled in the RocketMQ repository — it is sourced from the `org.rocksdb:rocksdbjni:8.4.4` JAR at runtime. + +**4. Thread-safe minPhyOffset with std::atomic** + +The `CqCompactionFilter` uses `std::atomic` with `memory_order_relaxed` for `min_phy_offset_`. This is sufficient because there is a single writer (Java main thread via JNI) and one reader (compaction background thread), and eventual consistency is acceptable — a slightly stale threshold only means a few extra entries survive one compaction cycle. This replaces the earlier `pthread_mutex` approach, eliminating per-entry lock/unlock overhead during full compaction over hundreds of millions of entries. + +## Changed files + +| File | Change | +|------|--------| +| `pom.xml` | `rocksdb.version` → `rocksdbjni.version=8.4.4`; dependency changed to `org.rocksdb:rocksdbjni` | +| `common/pom.xml` | `rocketmq-rocksdb` → `org.rocksdb:rocksdbjni` | +| `common/.../config/AbstractRocksDBStorage.java` | `manualCompactionDefaultCfRange` enhanced with `estimateNumKeys` logging (before/after key count, elapsed time, reduction ratio); `manualCompaction` removed unused `minPhyOffset` parameter | +| `store/.../rocksdb/ConsumeQueueCompactionFilterFactory.java` | **Deleted** — replaced by native shim | +| `store/.../rocksdb/ConsumeQueueRocksDBStorage.java` | Use `CqCompactionFilterJni.createAndSetFilter()` instead of `CompactionFilterFactory`; added `triggerCompactionSync()` and `countEntries()` helpers | +| `store/.../rocksdb/RocksDBOptionsFactory.java` | Remove `setCompactionFilterFactory()` call from `createCQCFOptions()` | +| `store/.../rocksdb/CqCompactionFilterJni.java` | **Rewritten** — uses raw JNI pointer + `NativeCqCompactionFilter` wrapper via public API; added platform-aware library name detection (macOS `.dylib` / Linux `.so` / Windows `.dll`) | +| `store/.../rocksdb/NativeCqCompactionFilter.java` | **New** — thin `AbstractCompactionFilter` wrapper with `disOwnNativeHandle()` | +| `store/.../resources/native/cq_compaction_filter.cpp` | **Rewritten** — direct C++ subclassing, explicit linking, `std::atomic` for thread safety | +| `store/.../resources/native/libcq_compaction_filter.so` | **New** — pre-compiled native library (Linux x86_64) | +| `store/.../resources/native/libcq_compaction_filter.dylib` | **New** — pre-compiled native library (macOS arm64) | +| `store/.../resources/native/cq_compaction_filter.dll` | **New** — pre-compiled native library (Windows x86_64, MSVC v14.29) | +| `store/.../rocksdb/CqCompactionFilterJniTest.java` | **New** — integration test for compaction filter | + +## Building the native shim + +Prerequisites: `g++` / `clang++`, RocksDB C++ headers matching `rocksdbjni` version (8.4.4), JNI headers from your JDK. + +### Linux (x86_64) + +```bash +# 1. Extract librocksdbjni from the rocksdbjni jar +ROCKSDB_JAR=~/.m2/repository/org/rocksdb/rocksdbjni/8.4.4/rocksdbjni-8.4.4.jar +unzip -j "$ROCKSDB_JAR" librocksdbjni-linux64.so -d /tmp/rocksdb-native/ + +# 2. Download matching RocksDB headers +wget https://github.com/facebook/rocksdb/archive/refs/tags/v8.4.4.tar.gz +tar xzf v8.4.4.tar.gz rocksdb-8.4.4/include --strip-components=1 + +# 3. Compile the shim with explicit linking +export JAVA_HOME=/usr/lib/jvm/java-8 # or your JDK path +g++ -shared -fPIC -O2 -std=c++17 -fno-rtti -D_GLIBCXX_USE_CXX11_ABI=0 \ + -I./include \ + -I${JAVA_HOME}/include \ + -I${JAVA_HOME}/include/linux \ + -Wl,--no-undefined \ + -Wl,-rpath,\$ORIGIN \ + -L/tmp/rocksdb-native \ + -l:librocksdbjni-linux64.so \ + -o libcq_compaction_filter.so \ + store/src/main/resources/native/cq_compaction_filter.cpp + +# 4. Verify NEEDED and RPATH +readelf -d libcq_compaction_filter.so | grep -E "NEEDED|RPATH" +# Should show: NEEDED librocksdbjni-linux64.so, RPATH $ORIGIN + +# 5. Replace the pre-built .so +cp libcq_compaction_filter.so store/src/main/resources/native/ +``` + +### macOS (arm64 / x86_64) + +On macOS, the rocksdbjni jar uses `.jnilib` extension (not `.so` or bare names) and the native library names differ from Linux. Key gotchas: +- Apple Silicon uses `librocksdbjni-osx-arm64.jnilib` (not `librocksdbjni-osx-aarch64` as the filename pattern might suggest) +- macOS `ld` does NOT support the `-l:` syntax used on Linux — pass the `.jnilib` file directly +- After linking, use `install_name_tool` to fix the absolute install name to `@loader_path`, otherwise the shim fails to resolve the rocksdbjni dependency at runtime +- GitHub downloads may be blocked by corporate firewalls; use a mirror (e.g. `ghproxy.net`) or a local RocksDB checkout for headers + +```bash +# 1. Extract the macOS native library from rocksdbjni jar +# The jar contains librocksdbjni-osx-arm64.jnilib (arm64) or librocksdbjni-osx-x86_64.jnilib +ROCKSDB_JAR=~/.m2/repository/org/rocksdb/rocksdbjni/8.4.4/rocksdbjni-8.4.4.jar +mkdir -p /tmp/rocksdb-native + +# For Apple Silicon (arm64): +unzip -j "$ROCKSDB_JAR" librocksdbjni-osx-arm64.jnilib -d /tmp/rocksdb-native/ + +# For Intel Mac (x86_64): +unzip -j "$ROCKSDB_JAR" librocksdbjni-osx-x86_64.jnilib -d /tmp/rocksdb-native/ + +# 2. Download matching RocksDB headers +# Use ghproxy.net mirror if github.com is blocked: +curl -sL "https://ghproxy.net/https://github.com/facebook/rocksdb/archive/refs/tags/v8.4.4.tar.gz" -o /tmp/rocksdb-8.4.4.tar.gz +tar xzf /tmp/rocksdb-8.4.4.tar.gz -C /tmp rocksdb-8.4.4/include --strip-components=1 +# Or use a local RocksDB checkout if available: +# ROCKSDB_INCLUDE=/path/to/rocksdb/include + +# 3. Compile the shim — pass .jnilib directly (macOS ld does NOT support -l: syntax) +export JAVA_HOME=$(/usr/libexec/java_home) +ROCKSDB_INCLUDE=${ROCKSDB_INCLUDE:-./include} # adjust to your headers location +ROCKSDB_JNILIB=/tmp/rocksdb-native/librocksdbjni-osx-arm64.jnilib # or -x86_64.jnilib +clang++ -shared -fPIC -O2 -std=c++17 -fno-rtti \ + -I"$ROCKSDB_INCLUDE" \ + -I${JAVA_HOME}/include \ + -I${JAVA_HOME}/include/darwin \ + -Wl,-undefined,error \ + "$ROCKSDB_JNILIB" \ + -o /tmp/rocksdb-native/libcq_compaction_filter.dylib \ + store/src/main/resources/native/cq_compaction_filter.cpp + +# 4. Fix the install_name to use @loader_path for runtime resolution +# Without this, otool -L shows an absolute path to the build directory +install_name_tool -change "$ROCKSDB_JNILIB" "@loader_path/$(basename $ROCKSDB_JNILIB)" \ + /tmp/rocksdb-native/libcq_compaction_filter.dylib + +# 5. Verify dependencies +otool -L /tmp/rocksdb-native/libcq_compaction_filter.dylib +# Should show @loader_path/librocksdbjni-osx-arm64.jnilib (or -x86_64.jnilib) + +# 6. Place the output +cp /tmp/rocksdb-native/libcq_compaction_filter.dylib store/src/main/resources/native/ +``` + +### Windows (x86_64) + +**⚠ Must use MSVC — MinGW is NOT compatible** + +The official `librocksdbjni-win64.dll` is compiled with MSVC. MinGW-w64 produces incompatible C++ binaries (different vtable layout, name mangling, exception handling). Attempting to link a MinGW-compiled shim against the MSVC-compiled rocksdbjni DLL will cause undefined symbol errors at link time and crashes at runtime. + +**Option A: Native MSVC build (required for Windows)** + +1. Install Visual Studio Build Tools 2019 (v14.29, matching the rocksdbjni linker version 14.29.30159). +2. Use the x64 Native Tools Command Prompt or set up the environment manually. + +```powershell +# 1. Set up environment (run vcvarsall.bat first, or use the VS Dev Command Prompt) +set "VCTools=C:\Program Files\Microsoft Visual Studio\2022\BuildTools\VC\Tools\MSVC\14.29.30133" +set "SDK=C:\Program Files (x86)\Windows Kits\10" +set "JAVA_HOME=C:\path\to\jdk8" + +# 2. Extract RocksDB headers +curl -LO https://github.com/facebook/rocksdb/archive/refs/tags/v8.4.4.tar.gz +tar xzf v8.4.4.tar.gz rocksdb-8.4.4/include --strip-components=1 + +# 3. Compile with MSVC cl.exe (must use /GR- to disable RTTI, matching rocksdbjni) +cl.exe /LD /O2 /std:c++17 /GR- /EHsc /utf-8 ^ + /I"%JAVA_HOME%\include" ^ + /I"%JAVA_HOME%\include\win32" ^ + /I"rocksdb-8.4.4\include" ^ + /I"%VCTools%\include" ^ + /I"%SDK%\Include\10.0.19041.0\ucrt" ^ + /I"%SDK%\Include\10.0.19041.0\shared" ^ + /I"%SDK%\Include\10.0.19041.0\um" ^ + /Fecq_compaction_filter.dll ^ + store\src\main\resources\native\cq_compaction_filter.cpp ^ + /link /MACHINE:X64 ^ + /LIBPATH:"%VCTools%\lib\x64" ^ + /LIBPATH:"%SDK%\Lib\10.0.19041.0\ucrt\x64" ^ + /LIBPATH:"%SDK%\Lib\10.0.19041.0\um\x64" + +# 4. Verify exports +dumpbin /exports cq_compaction_filter.dll +# Should show: Java_org_apache_rocketmq_store_rocksdb_CqCompactionFilterJni_createNativeFilter0 +# Java_org_apache_rocketmq_store_rocksdb_CqCompactionFilterJni_setMinPhyOffset0 + +# 5. Place the output +copy cq_compaction_filter.dll store\src\main\resources\native\ +``` + +> **Note on Git Bash / MSYS2**: When running `cl.exe` from Git Bash, MSYS2's automatic path conversion will corrupt `/LD`, `/O2` etc. into `C:/Program/LD` etc. Use `MSYS2_ARG_CONV_EXCL='*'` to disable this, or run from `cmd.exe` / PowerShell directly. + +**Windows build troubleshooting** + +| Problem | Cause | Solution | +|---------|-------|----------| +| `cl: warning D9024: cannot recognize source file type` | MSYS2/Git Bash converts `/LD` to `C:/Program/LD` | Prefix command with `MSYS2_ARG_CONV_EXCL='*'` or use `cmd.exe` | +| `fatal error C1083: cannot open include file 'atomic'` | MSVC C++ headers directory not in include path | Add `/I"%VCTools%\include"` (from VS Build Tools) | +| `LNK2019: unresolved external symbol ... Configurable::GetOption` | `CompactionFilter` inherits from `Configurable`/`Customizable`, whose virtual methods are not exported by `librocksdbjni-win64.dll` | Provide inline stub implementations in your `.cpp` for all unexported pure virtual methods | +| `LNK2019: unresolved external symbol ... Status::Status(Code,SubCode,Slice,Slice,Severity)` | `Status::NotSupported("msg")` calls the non-inline 5-parameter constructor (defined in `status.cc`, not exported by DLL) | Use `return Status();` instead of `return Status::NotSupported("...");` — `Status()` default constructor is fully inline | + +**Option B: Run on WSL (recommended for development)** + +Run the entire RocketMQ build and test under WSL (Windows Subsystem for Linux). This uses the native Linux toolchain and pre-built `.so` with zero code changes: + +```bash +# In WSL (Ubuntu) +java -version # should show WSL JDK +mvn test -pl store -Dtest=CqCompactionFilterJniTest -Djacoco.skip=true +``` + +## Platform support + +`CqCompactionFilterJni.java` automatically detects the OS and architecture at runtime, selecting the correct library name and extension. + +| Platform | Library name | Architecture | Status | +|----------|-------------|--------------|--------| +| Linux (glibc) | `libcq_compaction_filter.so` | x86_64 | Pre-built | +| Linux (glibc) | `libcq_compaction_filter_aarch64.so` | aarch64 | Pre-built | +| macOS | `libcq_compaction_filter.dylib` | arm64 | Pre-built | +| macOS | `libcq_compaction_filter.dylib` | x86_64 | Requires rebuild | +| Windows | `cq_compaction_filter.dll` | x86_64 | Pre-built | + +## Limitations + +1. **Jacoco incompatibility** — The jacoco Java agent can cause native crashes when combined with dynamically loaded native libraries. Unit tests should be run with `-Djacoco.skip=true` when testing RocksDB functionality. + +2. **Global singleton filter** — `CqCompactionFilterJni` stores the native filter pointer in a static `AtomicLong NATIVE_FILTER_PTR`. Only one filter instance is tracked globally per JVM. If multiple `ConsumeQueueRocksDBStorage` instances exist (e.g., in tests or multi-Broker processes), `setMinPhyOffset()` always updates the last-created filter. Earlier instances lose their threshold updates silently. + +3. **C++17 required** — The C++ source uses `std::atomic` which requires a C++17-capable compiler. All modern compilers (GCC 7+, Clang 5+, MSVC 2017+) support this. + +4. **Shim depends on rocksdbjni native library at runtime** — The `libcq_compaction_filter.so` has a `DT_NEEDED` entry for `librocksdbjni-linux64.so` (~13 MB). The `CqCompactionFilterJni` class handles this by extracting the shim to the same temp directory as the rocksdbjni native library, so the `$ORIGIN` RPATH resolves correctly without requiring `LD_LIBRARY_PATH`. + +5. **Windows requires MSVC** — `librocksdbjni-win64.dll` is compiled with MSVC and does not export C++ base class symbols (`Configurable`/`Customizable` vtable methods, `Status` constructors). A MinGW-compiled shim cannot link against it. Must use the same MSVC version (v14.29 for rocksdbjni 8.4.4) and provide inline stubs for unexported virtual methods. diff --git a/pom.xml b/pom.xml index 893e58b4960..868faa57d10 100644 --- a/pom.xml +++ b/pom.xml @@ -139,7 +139,7 @@ 1.47.0-alpha 2.0.6 2.20.29 - 1.0.6 + 8.4.4 2.13.4.2 1.3.14 @@ -761,9 +761,9 @@ ${slf4j-api.version} - org.apache.rocketmq - rocketmq-rocksdb - ${rocksdb.version} + org.rocksdb + rocksdbjni + ${rocksdbjni.version} io.github.aliyunmq diff --git a/store/pom.xml b/store/pom.xml index 1600a007e09..70ecafe4282 100644 --- a/store/pom.xml +++ b/store/pom.xml @@ -71,5 +71,9 @@ io.github.aliyunmq rocketmq-shaded-slf4j-api-bridge + + org.rocksdb + rocksdbjni + diff --git a/store/src/main/java/org/apache/rocketmq/store/rocksdb/ConsumeQueueCompactionFilterFactory.java b/store/src/main/java/org/apache/rocketmq/store/rocksdb/ConsumeQueueCompactionFilterFactory.java deleted file mode 100644 index f19fb9e2036..00000000000 --- a/store/src/main/java/org/apache/rocketmq/store/rocksdb/ConsumeQueueCompactionFilterFactory.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.rocketmq.store.rocksdb; - -import java.util.function.LongSupplier; -import org.apache.rocketmq.common.constant.LoggerName; -import org.apache.rocketmq.logging.org.slf4j.Logger; -import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; -import org.rocksdb.AbstractCompactionFilter; -import org.rocksdb.AbstractCompactionFilterFactory; -import org.rocksdb.RemoveConsumeQueueCompactionFilter; - -public class ConsumeQueueCompactionFilterFactory extends AbstractCompactionFilterFactory { - private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.ROCKSDB_LOGGER_NAME); - private final LongSupplier minPhyOffsetSupplier; - - public ConsumeQueueCompactionFilterFactory(final LongSupplier minPhyOffsetSupplier) { - this.minPhyOffsetSupplier = minPhyOffsetSupplier; - } - - @Override - public String name() { - return "ConsumeQueueCompactionFilterFactory"; - } - - @Override - public RemoveConsumeQueueCompactionFilter createCompactionFilter(final AbstractCompactionFilter.Context context) { - long minPhyOffset = this.minPhyOffsetSupplier.getAsLong(); - LOGGER.info("manualCompaction minPhyOffset: {}, isFull: {}, isManual: {}", - minPhyOffset, context.isFullCompaction(), context.isManualCompaction()); - return new RemoveConsumeQueueCompactionFilter(minPhyOffset); - } -} diff --git a/store/src/main/java/org/apache/rocketmq/store/rocksdb/ConsumeQueueRocksDBStorage.java b/store/src/main/java/org/apache/rocketmq/store/rocksdb/ConsumeQueueRocksDBStorage.java index 4392283c67c..925d8f91c7f 100644 --- a/store/src/main/java/org/apache/rocketmq/store/rocksdb/ConsumeQueueRocksDBStorage.java +++ b/store/src/main/java/org/apache/rocketmq/store/rocksdb/ConsumeQueueRocksDBStorage.java @@ -21,10 +21,14 @@ import java.util.List; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.config.AbstractRocksDBStorage; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.logging.org.slf4j.Logger; +import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.store.MessageStore; import org.rocksdb.ColumnFamilyDescriptor; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ColumnFamilyOptions; +import org.rocksdb.FlushOptions; import org.rocksdb.ReadOptions; import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; @@ -33,13 +37,13 @@ public class ConsumeQueueRocksDBStorage extends AbstractRocksDBStorage { + private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKSDB_LOGGER_NAME); + public static final byte[] OFFSET_COLUMN_FAMILY = "offset".getBytes(StandardCharsets.UTF_8); private final MessageStore messageStore; private volatile ColumnFamilyHandle offsetCFHandle; - private ConsumeQueueCompactionFilterFactory compactionFilterFactory; - public ConsumeQueueRocksDBStorage(final MessageStore messageStore, final String dbPath) { super(dbPath); this.messageStore = messageStore; @@ -67,20 +71,27 @@ protected boolean postLoad() { final List cfDescriptors = new ArrayList<>(); - this.compactionFilterFactory = new ConsumeQueueCompactionFilterFactory(messageStore::getMinPhyOffset); - - ColumnFamilyOptions cqCfOptions = RocksDBOptionsFactory.createCQCFOptions(this.messageStore, this.compactionFilterFactory); + ColumnFamilyOptions cqCfOptions = RocksDBOptionsFactory.createCQCFOptions(this.messageStore); this.cfOptions.add(cqCfOptions); cfDescriptors.add(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, cqCfOptions)); ColumnFamilyOptions offsetCfOptions = RocksDBOptionsFactory.createOffsetCFOptions(); this.cfOptions.add(offsetCfOptions); cfDescriptors.add(new ColumnFamilyDescriptor(OFFSET_COLUMN_FAMILY, offsetCfOptions)); + + if (CqCompactionFilterJni.isLoaded()) { + CqCompactionFilterJni.createAndSetFilter(cqCfOptions); + CqCompactionFilterJni.setMinPhyOffset(messageStore.getMinPhyOffset()); + log.info("CqCompactionFilter created and set, minPhyOffset: {}", messageStore.getMinPhyOffset()); + } else { + log.warn("CqCompactionFilterJni native library not loaded, compaction filter will not be installed"); + } + open(cfDescriptors); this.defaultCFHandle = cfHandles.get(0); this.offsetCFHandle = cfHandles.get(1); } catch (final Exception e) { - LOGGER.error("postLoad Failed. {}", this.dbPath, e); + log.error("postLoad Failed. {}", this.dbPath, e); return false; } return true; @@ -91,11 +102,6 @@ protected void preShutdown() { if (this.offsetCFHandle != null) { this.offsetCFHandle.close(); } - - if (this.compactionFilterFactory != null) { - this.compactionFilterFactory.close(); - } - } public byte[] getCQ(final byte[] keyBytes) throws RocksDBException { @@ -116,10 +122,13 @@ public void batchPut(final WriteBatch batch) throws RocksDBException { } public void manualCompaction(final long minPhyOffset) { + if (CqCompactionFilterJni.isLoaded()) { + CqCompactionFilterJni.setMinPhyOffset(minPhyOffset); + } try { - manualCompaction(minPhyOffset, this.compactRangeOptions); + super.manualCompaction(this.compactRangeOptions); } catch (Exception e) { - LOGGER.error("manualCompaction Failed. minPhyOffset: {}", minPhyOffset, e); + log.error("manualCompaction Failed. minPhyOffset: {}", minPhyOffset, e); } } @@ -130,4 +139,41 @@ public RocksIterator seekOffsetCF() { public ColumnFamilyHandle getOffsetCFHandle() { return this.offsetCFHandle; } + + /** + * Synchronously trigger compaction with an updated compaction filter threshold. + * This method updates the native compaction filter's minPhyOffset and then + * performs a full compaction on the default column family. + */ + public void triggerCompactionSync(long minPhyOffset) throws RocksDBException { + if (CqCompactionFilterJni.isLoaded()) { + CqCompactionFilterJni.setMinPhyOffset(minPhyOffset); + } + db.compactRange(this.defaultCFHandle); + } + + /** + * Flush all memtables to SST files. + */ + public void flushAll() throws RocksDBException { + try (FlushOptions flushOpts = new FlushOptions()) { + flushOpts.setWaitForFlush(true); + flush(flushOpts); + } + } + + /** + * Count all entries in the default column family by iterating. O(N), use only in tests. + */ + public long countEntries() { + long count = 0; + try (RocksIterator iter = db.newIterator(this.defaultCFHandle)) { + iter.seekToFirst(); + while (iter.isValid()) { + count++; + iter.next(); + } + } + return count; + } } diff --git a/store/src/main/java/org/apache/rocketmq/store/rocksdb/CqCompactionFilterJni.java b/store/src/main/java/org/apache/rocketmq/store/rocksdb/CqCompactionFilterJni.java new file mode 100644 index 00000000000..69d74e3364b --- /dev/null +++ b/store/src/main/java/org/apache/rocketmq/store/rocksdb/CqCompactionFilterJni.java @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.store.rocksdb; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.StandardCopyOption; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.rocketmq.common.constant.LoggerName; +import org.rocksdb.ColumnFamilyOptions; +import org.apache.rocketmq.logging.org.slf4j.Logger; +import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; + +public class CqCompactionFilterJni { + + private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKSDB_LOGGER_NAME); + + private static final AtomicLong NATIVE_FILTER_PTR = new AtomicLong(0); + private static volatile boolean loaded = false; + + /** Platform-specific shim library name and extension. */ + private static final String SHIM_LIB_NAME; + private static final String SHIM_LIB_EXTENSION; + private static final String ROCKSDB_JNI_LIB_NAME; + + static { + String os = System.getProperty("os.name").toLowerCase(); + String arch = System.getProperty("os.arch"); + if (os.contains("mac") || os.contains("darwin") || os.contains("osx")) { + SHIM_LIB_NAME = "libcq_compaction_filter.dylib"; + SHIM_LIB_EXTENSION = ".dylib"; + ROCKSDB_JNI_LIB_NAME = arch.contains("aarch") || arch.contains("arm") + ? "librocksdbjni-osx-aarch64" + : "librocksdbjni-osx-x86_64"; + } else if (os.contains("win")) { + SHIM_LIB_NAME = "cq_compaction_filter.dll"; + SHIM_LIB_EXTENSION = ".dll"; + ROCKSDB_JNI_LIB_NAME = "librocksdbjni-win64.dll"; + } else { + SHIM_LIB_NAME = arch.contains("aarch") || arch.contains("arm") + ? "libcq_compaction_filter_aarch64.so" + : "libcq_compaction_filter.so"; + SHIM_LIB_EXTENSION = ".so"; + ROCKSDB_JNI_LIB_NAME = arch.contains("aarch") || arch.contains("arm") + ? "librocksdbjni-linux-aarch64.so" + : "librocksdbjni-linux64.so"; + } + } + + static { + loadNativeShim(); + } + + private static synchronized void loadNativeShim() { + if (loaded) { + return; + } + + // Preload RocksDB's native library so that linked symbols are available + // when our compaction filter shim is loaded. + String rocksdbDir = ensureRocksDBNativeLoaded(); + + String libName = SHIM_LIB_NAME; + try (InputStream is = CqCompactionFilterJni.class + .getClassLoader().getResourceAsStream("native/" + libName)) { + if (is == null) { + log.error("[CqCompactionFilterJni] Native library '{}' not found on classpath", libName); + return; + } + File tempLib; + if (rocksdbDir != null) { + // Extract our shim to the same temp directory as the RocksDB JNI library, + // so that the DT_NEEDED / LC_LOAD_DYLIB dependency can be resolved. + tempLib = new File(rocksdbDir, libName); + } else { + // RocksDB was loaded from java.library.path; our shim can go anywhere. + tempLib = File.createTempFile("cq_compaction_filter_", SHIM_LIB_EXTENSION); + } + Files.copy(is, tempLib.toPath(), StandardCopyOption.REPLACE_EXISTING); + tempLib.deleteOnExit(); + System.load(tempLib.getAbsolutePath()); + loaded = true; + log.info("[CqCompactionFilterJni] Native library loaded from classpath: {}", tempLib.getAbsolutePath()); + } catch (IOException e) { + log.error("[CqCompactionFilterJni] Failed to load native shim", e); + } + } + + /** + * Returns whether the native compaction filter shim was successfully loaded. + */ + public static boolean isLoaded() { + return loaded; + } + + /** + * Locates and loads the RocksDB native JNI library, returning the temporary + * directory in which it was extracted (or null if loaded from java.library.path). + *

+ * This method deliberately uses {@code System.loadLibrary("rocksdbjni")} + * rather than {@code RocksDB.loadLibrary()} for the following reasons: + *

    + *
  1. Avoid unnecessary side effects — {@code RocksDB.loadLibrary()} + * iterates over all compression types (snappy, lz4, zstd, bzip2, etc.) + * and attempts to load each one. Those libraries are not needed by this + * compaction filter, and the resulting {@code UnsatisfiedLinkError}s slow + * down startup and pollute logs.
  2. + *
  3. Control the temp directory location — The caller needs to know + * the directory where the native JNI library was extracted so that + * {@code libcq_compaction_filter.so} can be placed alongside it. This is + * required for the dynamic linker to resolve the {@code DT_NEEDED} + * dependency of the custom shim. {@code RocksDB.loadLibrary()} extracts + * to an internal temp directory that is not exposed to callers.
  4. + *
  5. Avoid class-loading coupling — {@code RocksDB.loadLibrary()} + * triggers the full initialization chain of the rocksdbjni Java bindings + * (including {@code CompressionType.values()} iteration and a singleton + * {@code NativeLibraryLoader} state machine). Loading the custom shim + * must complete before any RocksDB Java classes are exercised, to avoid + * native symbol resolution race conditions.
  6. + *
+ * + * @return the absolute path of the temporary directory containing the + * extracted RocksDB JNI library, or null if the library was loaded + * from {@code java.library.path} (in which case no temp directory + * is needed for the shim). + */ + private static String ensureRocksDBNativeLoaded() { + // Try System.loadLibrary first (works if on java.library.path) + try { + System.loadLibrary("rocksdbjni"); + // No temp dir needed since it's on java.library.path + return null; + } catch (UnsatisfiedLinkError ignored) { + // Not on java.library.path, try from JAR + } + + // Determine the platform-specific JNI library name from RocksDB's Environment + String jniLibName; + try { + jniLibName = org.rocksdb.util.Environment.getJniLibraryFileName("rocksdb"); + } catch (Exception e) { + jniLibName = ROCKSDB_JNI_LIB_NAME; + } + + try (InputStream is = CqCompactionFilterJni.class.getClassLoader().getResourceAsStream(jniLibName)) { + if (is == null) { + log.error("[CqCompactionFilterJni] RocksDB native library '{}' not found on classpath", jniLibName); + return null; + } + // Create a temp directory and extract the library there. + // Our shim will be placed in the same directory so the DT_NEEDED + // dependency resolves correctly. + File tempDir = Files.createTempDirectory("rocksdb-native").toFile(); + tempDir.deleteOnExit(); + File tempLib = new File(tempDir, jniLibName); + Files.copy(is, tempLib.toPath(), StandardCopyOption.REPLACE_EXISTING); + tempLib.deleteOnExit(); + System.load(tempLib.getAbsolutePath()); + return tempDir.getAbsolutePath(); + } catch (IOException e) { + log.error("[CqCompactionFilterJni] Failed to extract RocksDB native library", e); + return null; + } + } + + /** + * Create a native CqCompactionFilter instance. + * Returns the raw C++ pointer as a jlong. + */ + public static native long createNativeFilter0(); + + /** + * Update the minPhyOffset threshold on an existing native filter. + */ + public static native void setMinPhyOffset0(long filterPtr, long minPhyOffset); + + /** + * Set the native compaction filter on the ColumnFamilyOptions via the + * public {@code setCompactionFilter} API. + *

+ * The wrapper uses {@code disOwnNativeHandle()} so that closing the + * ColumnFamilyOptions does not free the native filter — this prevents + * use-after-free when AbstractRocksDBStorage closes options before the DB. + */ + public static void setNativeFilter(ColumnFamilyOptions options, long filterPtr) { + NativeCqCompactionFilter filter = new NativeCqCompactionFilter(filterPtr); + options.setCompactionFilter(filter); + } + + /** + * Create the native filter and set it on the ColumnFamilyOptions. + * Returns the native pointer for later threshold updates. + */ + @SuppressWarnings("UnusedReturnValue") + public static long createAndSetFilter(ColumnFamilyOptions options) { + long ptr = createNativeFilter0(); + NATIVE_FILTER_PTR.set(ptr); + setNativeFilter(options, ptr); + return ptr; + } + + /** + * Update the minPhyOffset on the current native filter. + */ + public static void setMinPhyOffset(long minPhyOffset) { + long ptr = NATIVE_FILTER_PTR.get(); + if (ptr != 0) { + setMinPhyOffset0(ptr, minPhyOffset); + log.info("CqCompactionFilter setMinPhyOffset={}", minPhyOffset); + } + } +} \ No newline at end of file diff --git a/store/src/main/java/org/apache/rocketmq/store/rocksdb/NativeCqCompactionFilter.java b/store/src/main/java/org/apache/rocketmq/store/rocksdb/NativeCqCompactionFilter.java new file mode 100644 index 00000000000..6a3101c261a --- /dev/null +++ b/store/src/main/java/org/apache/rocketmq/store/rocksdb/NativeCqCompactionFilter.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.store.rocksdb; + +import org.rocksdb.AbstractCompactionFilter; +import org.rocksdb.Slice; + +/** + * Thin Java wrapper around a native CqCompactionFilter C++ pointer. + *

+ * The native filter is allocated by {@link CqCompactionFilterJni#createNativeFilter0()} + * and its lifetime is managed externally (it lives for the entire JVM session). + * {@link #disOwnNativeHandle()} is called so that {@code close()} does not + * free the native memory — this is critical because {@code AbstractRocksDBStorage} + * closes {@code ColumnFamilyOptions} (which closes this filter) before closing + * the DB, while background compaction threads may still reference the filter. + */ +class NativeCqCompactionFilter extends AbstractCompactionFilter { + + NativeCqCompactionFilter(long nativeHandle) { + super(nativeHandle); + disOwnNativeHandle(); + } +} diff --git a/store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java b/store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java index b74cf8c85d5..37eec67d357 100644 --- a/store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java +++ b/store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java @@ -41,8 +41,7 @@ public class RocksDBOptionsFactory { - public static ColumnFamilyOptions createCQCFOptions(final MessageStore messageStore, - ConsumeQueueCompactionFilterFactory consumeQueueCompactionFilterFactory) { + public static ColumnFamilyOptions createCQCFOptions(final MessageStore messageStore) { BlockBasedTableConfig blockBasedTableConfig = new BlockBasedTableConfig(). setFormatVersion(5). setIndexType(IndexType.kBinarySearch). @@ -93,7 +92,6 @@ public static ColumnFamilyOptions createCQCFOptions(final MessageStore messageSt setTargetFileSizeBase(256 * SizeUnit.MB). setTargetFileSizeMultiplier(2). setMergeOperator(new StringAppendOperator()). - setCompactionFilterFactory(consumeQueueCompactionFilterFactory). setReportBgIoStats(true). setOptimizeFiltersForHits(true); } diff --git a/store/src/main/resources/native/cq_compaction_filter.cpp b/store/src/main/resources/native/cq_compaction_filter.cpp new file mode 100644 index 00000000000..1d0bd84cd90 --- /dev/null +++ b/store/src/main/resources/native/cq_compaction_filter.cpp @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Native compaction filter for ConsumeQueue entries. + * + * Subclass rocksdb::CompactionFilter directly, create instances in C++, + * and pass the raw C++ pointer as a jlong to Java. Java's + * AbstractCompactionFilter(nativeHandle) wraps it seamlessly. + * + * All rocksdb symbols are declared weak so they resolve at runtime to the + * symbols already loaded by the JVM's ClassLoader. + */ + +#include +#include +#include +#include + +#include "rocksdb/compaction_filter.h" +#include "rocksdb/slice.h" + +/* ------------------------------------------------------------------ */ +/* Windows stub implementations */ +/* */ +/* On Linux/macOS, ELF/Mach-O shared libraries export all symbols by */ +/* default, so the shim resolves inherited virtual methods from */ +/* librocksdbjni at link time. On Windows, DLLs only export symbols */ +/* marked __declspec(dllexport) — rocksdbjni only exports JNI entry */ +/* points, not internal C++ class methods. We must provide stub */ +/* implementations for the Configurable/Customizable virtual methods */ +/* that appear in CompactionFilter's vtable. These stubs are never */ +/* called at runtime (RocksDB only invokes Filter() and Name() on */ +/* compaction filters), but the linker needs addresses for them. */ +/* ------------------------------------------------------------------ */ + +#ifdef _WIN32 + +#include "rocksdb/configurable.h" +#include "rocksdb/customizable.h" +#include +#include + +namespace rocksdb { + +struct ConfigOptions; +struct DBOptions; +struct ColumnFamilyOptions; +class OptionTypeInfo; + +// --- Configurable virtual methods (defined in options/configurable.cc) --- + +Status Configurable::GetOption(const ConfigOptions&, const std::string&, + std::string*) const { + return Status(); +} + +bool Configurable::AreEquivalent(const ConfigOptions&, const Configurable*, + std::string*) const { + return true; +} + +Status Configurable::PrepareOptions(const ConfigOptions&) { + return Status(); +} + +Status Configurable::ValidateOptions(const DBOptions&, + const ColumnFamilyOptions&) const { + return Status(); +} + +const void* Configurable::GetOptionsPtr(const std::string&) const { + return nullptr; +} + +Status Configurable::ParseStringOptions(const ConfigOptions&, + const std::string&) { + return Status(); +} + +Status Configurable::ConfigureOptions( + const ConfigOptions&, + const std::unordered_map&, + std::unordered_map*) { + return Status(); +} + +Status Configurable::ParseOption(const ConfigOptions&, const OptionTypeInfo&, + const std::string&, const std::string&, + void*) { + return Status(); +} + +bool Configurable::OptionsAreEqual(const ConfigOptions&, const OptionTypeInfo&, + const std::string&, const void*, + const void*, std::string*) const { + return true; +} + +std::string Configurable::SerializeOptions(const ConfigOptions&, + const std::string&) const { + return ""; +} + +std::string Configurable::GetOptionName(const std::string& name) const { + return name; +} + +// Non-virtual, but referenced by inline code paths +void Configurable::RegisterOptions(const std::string&, void*, + const std::unordered_map*) {} + +Status Configurable::ConfigureFromMap( + const ConfigOptions&, + const std::unordered_map&) { + return Status(); +} + +Status Configurable::ConfigureFromMap( + const ConfigOptions&, + const std::unordered_map&, + std::unordered_map*) { + return Status(); +} + +Status Configurable::ConfigureOption(const ConfigOptions&, const std::string&, + const std::string&) { + return Status(); +} + +Status Configurable::ConfigureFromString(const ConfigOptions&, + const std::string&) { + return Status(); +} + +Status Configurable::GetOptionString(const ConfigOptions&, + std::string*) const { + return Status(); +} + +std::string Configurable::ToString(const ConfigOptions&, + const std::string&) const { + return ""; +} + +Status Configurable::GetOptionNames(const ConfigOptions&, + std::unordered_set*) const { + return Status(); +} + +Status Configurable::GetOptionsMap( + const std::string&, const std::string&, std::string*, + std::unordered_map*) { + return Status(); +} + +// --- Customizable virtual/override methods (defined in options/customizable.cc) --- + +Status Customizable::GetOption(const ConfigOptions&, const std::string&, + std::string*) const { + return Status(); +} + +bool Customizable::AreEquivalent(const ConfigOptions&, const Configurable*, + std::string*) const { + return true; +} + +std::string Customizable::GetOptionName(const std::string& name) const { + return name; +} + +std::string Customizable::SerializeOptions(const ConfigOptions&, + const std::string&) const { + return ""; +} + +std::string Customizable::GenerateIndividualId() const { + return "stub"; +} + +Status Customizable::GetOptionsMap( + const ConfigOptions&, const Customizable*, const std::string&, + std::string*, std::unordered_map*) { + return Status(); +} + +Status Customizable::ConfigureNewObject( + const ConfigOptions&, Customizable*, + const std::unordered_map&) { + return Status(); +} + +// --- Status methods (defined in util/status.cc) --- + +Status::Status(Code _code, SubCode _subcode, const Slice& msg, + const Slice& msg2, Severity sev) + : code_(_code), subcode_(_subcode), sev_(sev), + retryable_(false), data_loss_(false), scope_(0) {} + +std::unique_ptr Status::CopyState(const char* s) { + if (s == nullptr) return nullptr; + const size_t n = std::strlen(s) + 1; + char* result = new char[n]; + std::memcpy(result, s, n); + return std::unique_ptr(result); +} + +std::string Status::ToString() const { + return "OK"; +} + +} // namespace rocksdb + +#endif // _WIN32 + +/* ------------------------------------------------------------------ */ +/* Our concrete compaction filter */ +/* ------------------------------------------------------------------ */ + +class CqCompactionFilter : public rocksdb::CompactionFilter { +public: + const char* Name() const override { + return "ConsumeQueueCompactionFilter"; + } + + bool Filter(int /*level*/, const rocksdb::Slice& /*key*/, + const rocksdb::Slice& existing_value, std::string* /*new_value*/, + bool* /*value_changed*/) const override { + static const int CQ_MIN_SIZE = 28; + if (existing_value.size() < static_cast(CQ_MIN_SIZE)) { + return false; + } + const unsigned char* data = + reinterpret_cast(existing_value.data()); + int64_t phy_offset = + (static_cast(data[0]) << 56) | + (static_cast(data[1]) << 48) | + (static_cast(data[2]) << 40) | + (static_cast(data[3]) << 32) | + (static_cast(data[4]) << 24) | + (static_cast(data[5]) << 16) | + (static_cast(data[6]) << 8) | + (static_cast(data[7])); + + int64_t min_offset = min_phy_offset_.load(std::memory_order_relaxed); + return phy_offset < min_offset; + } + + void SetMinPhyOffset(int64_t offset) { + min_phy_offset_.store(offset, std::memory_order_relaxed); + } + +private: + std::atomic min_phy_offset_{0}; +}; + +/* ------------------------------------------------------------------ */ +/* JNI bindings */ +/* ------------------------------------------------------------------ */ + +#include + +extern "C" { + +JNIEXPORT jlong JNICALL +Java_org_apache_rocketmq_store_rocksdb_CqCompactionFilterJni_createNativeFilter0( + JNIEnv* env, jclass clazz) { + CqCompactionFilter* filter = new CqCompactionFilter(); + return reinterpret_cast(filter); +} + +JNIEXPORT void JNICALL +Java_org_apache_rocketmq_store_rocksdb_CqCompactionFilterJni_setMinPhyOffset0( + JNIEnv* env, jclass clazz, jlong filterPtr, jlong minPhyOffset) { + CqCompactionFilter* filter = reinterpret_cast(filterPtr); + filter->SetMinPhyOffset(minPhyOffset); +} + +} // extern "C" diff --git a/store/src/main/resources/native/cq_compaction_filter.dll b/store/src/main/resources/native/cq_compaction_filter.dll new file mode 100755 index 00000000000..2dc74834f41 Binary files /dev/null and b/store/src/main/resources/native/cq_compaction_filter.dll differ diff --git a/store/src/main/resources/native/libcq_compaction_filter.dylib b/store/src/main/resources/native/libcq_compaction_filter.dylib new file mode 100755 index 00000000000..58d6e6796a7 Binary files /dev/null and b/store/src/main/resources/native/libcq_compaction_filter.dylib differ diff --git a/store/src/main/resources/native/libcq_compaction_filter.so b/store/src/main/resources/native/libcq_compaction_filter.so new file mode 100755 index 00000000000..46dabe1880a Binary files /dev/null and b/store/src/main/resources/native/libcq_compaction_filter.so differ diff --git a/store/src/main/resources/native/libcq_compaction_filter_aarch64.so b/store/src/main/resources/native/libcq_compaction_filter_aarch64.so new file mode 100755 index 00000000000..b77869f063d Binary files /dev/null and b/store/src/main/resources/native/libcq_compaction_filter_aarch64.so differ diff --git a/store/src/test/java/org/apache/rocketmq/store/rocksdb/CqCompactionFilterJniTest.java b/store/src/test/java/org/apache/rocketmq/store/rocksdb/CqCompactionFilterJniTest.java new file mode 100644 index 00000000000..eead66b0741 --- /dev/null +++ b/store/src/test/java/org/apache/rocketmq/store/rocksdb/CqCompactionFilterJniTest.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.store.rocksdb; + +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.util.UUID; +import org.apache.rocketmq.store.MessageStore; +import org.apache.rocketmq.store.config.MessageStoreConfig; +import org.junit.After; +import org.junit.Assert; +import org.junit.Assume; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; +import org.rocksdb.ColumnFamilyOptions; +import org.rocksdb.WriteBatch; + +public class CqCompactionFilterJniTest { + + private static final int TOPIC_COUNT = 100; + private static final int BATCH_SIZE = 100_000; + private static final int MSG_SIZE = 1000; + + private static final byte CTRL_1 = '\u0001'; + private ConsumeQueueRocksDBStorage storage; + + @Before + public void setUp() throws Exception { + Assume.assumeTrue("CqCompactionFilterJni native library must be loaded", CqCompactionFilterJni.isLoaded()); + String dbPath = Files.createTempDirectory("rocksdb-cq-compaction-" + UUID.randomUUID()).toString(); + MessageStore mockStore = Mockito.mock(MessageStore.class); + Mockito.when(mockStore.getMinPhyOffset()).thenReturn(0L); + Mockito.when(mockStore.getMessageStoreConfig()).thenReturn(new MessageStoreConfig()); + storage = new ConsumeQueueRocksDBStorage(mockStore, dbPath); + } + + @After + public void tearDown() { + if (storage != null) { + storage.shutdown(); + storage.destroy(); + } + } + + @Test + public void testCreateAndSetFilter() { + Assert.assertTrue("Native library should be loaded", CqCompactionFilterJni.isLoaded()); + + long ptr = CqCompactionFilterJni.createNativeFilter0(); + Assert.assertTrue("Native filter pointer should be non-zero", ptr != 0); + + CqCompactionFilterJni.setMinPhyOffset0(ptr, 1000); + CqCompactionFilterJni.setMinPhyOffset0(ptr, Long.MAX_VALUE); + + try (ColumnFamilyOptions options = new ColumnFamilyOptions()) { + CqCompactionFilterJni.setNativeFilter(options, ptr); + } + } + + @Test + public void testCompactionFilter_small() throws Exception { + runCompactionTest(1_000_000); + } + + @Test + public void testCompactionFilter_large() throws Exception { + runCompactionTest(10_000_000); + } + + private void runCompactionTest(int totalEntries) throws Exception { + long start = System.currentTimeMillis(); + boolean result = storage.start(); + if (!result) { + System.err.println("storage.start() returned false. Check ERROR logs above for details."); + } + Assert.assertTrue("ConsumeQueueRocksDBStorage failed to start", result); + log("Startup took %d ms", System.currentTimeMillis() - start); + + // Phase 1: Write entries + start = System.currentTimeMillis(); + writeEntries(totalEntries); + long writeTime = System.currentTimeMillis() - start; + log("Wrote %d entries in %d ms (%.0f entries/sec)", totalEntries, writeTime, totalEntries * 1000.0 / writeTime); + + // Phase 2: Count entries before compaction + start = System.currentTimeMillis(); + long countBefore = storage.countEntries(); + long countTime = System.currentTimeMillis() - start; + log("Count before compaction: %d (took %d ms)", countBefore, countTime); + Assert.assertEquals("Entry count should match total written", totalEntries, countBefore); + + // Flush memtables to SST files so compaction has something to process + start = System.currentTimeMillis(); + storage.flushAll(); + log("Flush took %d ms", System.currentTimeMillis() - start); + + // Phase 3: Set minPhyOffset at midpoint and trigger compaction + long minPhyOffset = (long) (totalEntries / 2.0) * MSG_SIZE; + start = System.currentTimeMillis(); + storage.triggerCompactionSync(minPhyOffset); + long compactTime = System.currentTimeMillis() - start; + log("Compaction with minPhyOffset=%d took %d ms", minPhyOffset, compactTime); + + // Phase 4: Count entries after compaction + start = System.currentTimeMillis(); + long countAfter = storage.countEntries(); + countTime = System.currentTimeMillis() - start; + log("Count after compaction: %d (took %d ms)", countAfter, countTime); + + // Verify: approximately half the entries should remain + long expectedSurvivors = totalEntries - totalEntries / 2; + long tolerance = Math.max(expectedSurvivors / 100, 100); + Assert.assertTrue( + "Expected ~" + expectedSurvivors + " entries after compaction, but got " + countAfter, + countAfter >= expectedSurvivors - tolerance && countAfter <= expectedSurvivors + tolerance + ); + + log("Test passed: %d -> %d entries (expected ~%d)", totalEntries, countAfter, expectedSurvivors); + } + + private void writeEntries(int totalEntries) throws Exception { + int entriesPerTopic = totalEntries / TOPIC_COUNT; + + for (int t = 0; t < TOPIC_COUNT; t++) { + String topic = "test-topic-" + t; + byte[] topicBytes = topic.getBytes(StandardCharsets.UTF_8); + int queueId = 0; + + try (WriteBatch batch = new WriteBatch()) { + for (int i = 0; i < entriesPerTopic; i++) { + int globalIndex = t * entriesPerTopic + i; + + // Key: [topic_len:4][CTRL_1][topic][CTRL_1][queue_id:4][CTRL_1][cq_offset:8] + int keyLen = Integer.BYTES + 1 + topicBytes.length + 1 + Integer.BYTES + 1 + Long.BYTES; + ByteBuffer keyBB = ByteBuffer.allocate(keyLen); + keyBB.putInt(topicBytes.length) + .put(CTRL_1) + .put(topicBytes) + .put(CTRL_1) + .putInt(queueId) + .put(CTRL_1) + .putLong(i); + + // Value: [phy_offset:8][msg_size:4][tags_code:8][store_timestamp:8] (28 bytes) + long phyOffset = (long) globalIndex * MSG_SIZE; + ByteBuffer valueBB = ByteBuffer.allocate(28); + valueBB.putLong(phyOffset) + .putInt(MSG_SIZE) + .putLong(0) + .putLong(System.currentTimeMillis()); + + batch.put(storage.getDefaultCFHandle(), keyBB.array(), valueBB.array()); + + if ((i + 1) % BATCH_SIZE == 0) { + storage.batchPut(batch); + } + } + if (entriesPerTopic % BATCH_SIZE != 0) { + storage.batchPut(batch); + } + } + } + } + + private void log(String format, Object... args) { + System.out.printf("[CqCompactionFilterJniTest] " + format + "%n", args); + } +}