Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ jobs:
-v ${{ github.workspace }}:/workspace -w /workspace \
python:${{ matrix.python-version }}-slim \
bash -c "
apt-get update -qq && apt-get install -y -qq --no-install-recommends curl ca-certificates && \
curl -sSf https://just.systems/install.sh | bash -s -- --to /usr/local/bin && \
just ci-setup-debian && \
just ci-test
Expand Down
14 changes: 7 additions & 7 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ repos:
- id: mixed-line-ending
- id: trailing-whitespace

# Spell check
- repo: https://github.com/codespell-project/codespell
rev: v2.3.0
hooks:
- id: codespell
additional_dependencies: [tomli]
args: [--skip, "*.lock,target/*"]
# # Spell check
# - repo: https://github.com/codespell-project/codespell
# rev: v2.3.0
# hooks:
# - id: codespell
# additional_dependencies: [tomli]
# args: [--skip, "*.lock,target/*"]

# Rust formatting (local hook)
- repo: local
Expand Down
4 changes: 4 additions & 0 deletions Justfile
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ test-python:
test-chaos:
pytest tests/python/test_chaos.py

# Run Queue & Topic chaos tests (concurrent, join/leave, mixed workload)
test-queue-topic-chaos:
pytest tests/python/test_queue_topic_chaos.py -v -s

# Format all code (Rust + Python)
fmt:
cargo fmt
Expand Down
329 changes: 329 additions & 0 deletions benchmarks/baseline_throughput.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,329 @@
#!/usr/bin/env python3
"""
Queue & Topic 基线吞吐 Benchmark(单节点)

在单进程内测量 Queue 与 Topic 的基线吞吐与延迟,便于回归对比。

Usage:
python benchmarks/baseline_throughput.py
python benchmarks/baseline_throughput.py --duration 15 --output results.json
python benchmarks/baseline_throughput.py --queue-only
python benchmarks/baseline_throughput.py --topic-only --topic-subscribers 3
"""

from __future__ import annotations

import argparse
import asyncio
import json
import shutil
import tempfile
import time

import pulsing as pul
from pulsing.queue import read_queue, write_queue
from pulsing.topic import PublishMode, read_topic, write_topic


def _percentile(sorted_data: list[float], p: float) -> float:
if not sorted_data:
return 0.0
idx = min(int(len(sorted_data) * p / 100), len(sorted_data) - 1)
return sorted_data[idx]


# =============================================================================
# Queue 基线
# =============================================================================


async def run_queue_baseline(
system,
storage_path: str,
duration: float,
num_buckets: int,
record_size: int,
) -> dict:
"""单 writer + 单 reader,固定时长,统计写/读吞吐与延迟."""
topic = "baseline_queue"
write_latencies_ms: list[float] = []
read_latencies_ms: list[float] = []
records_written = 0
records_read = 0

writer = await write_queue(
system,
topic=topic,
bucket_column="id",
num_buckets=num_buckets,
storage_path=storage_path,
)
reader = await read_queue(
system,
topic=topic,
num_buckets=num_buckets,
storage_path=storage_path,
)

end_time = time.monotonic() + duration

async def produce():
nonlocal records_written
i = 0
while time.monotonic() < end_time:
t0 = time.perf_counter()
try:
rec = {"id": f"r{i}", "payload": "x" * record_size}
await writer.put(rec)
write_latencies_ms.append((time.perf_counter() - t0) * 1000)
records_written += 1
i += 1
except Exception:

Check notice

Code scanning / CodeQL

Empty except Note test

'except' clause does nothing but pass and there is no explanatory comment.

Copilot Autofix

AI 12 days ago

In general, empty except blocks should either handle the exception meaningfully (e.g., log, update metrics, adjust control flow) or be removed/limited to the specific exception types you truly expect. Swallowing all exceptions with a bare except Exception: and pass hides bugs and operational issues.

For this case, the lowest-impact fix that preserves functionality is to keep the loop running (so the benchmark still proceeds) but log any unexpected exception. That way, errors are visible for debugging without altering the throughput/latency calculations or terminating the coroutine. We should also narrow the exception type if we know the expected ones; but since we do not have more context about writer.put, the safest change within the shown snippet is to add logging rather than change control flow.

Concretely:

  • In benchmarks/baseline_throughput.py, inside run_queue_baseline.produce(), replace
            except Exception:
                pass

with something that records the exception, e.g.:

            except Exception as exc:
                # Keep benchmark running but record unexpected producer errors.
                print(f"[baseline_throughput] Producer error: {exc!r}")

This uses print to avoid adding imports or dependencies; printing to stderr would be slightly nicer but would require importing sys, which we weren’t asked to change. This change keeps the producer loop alive but no longer silences failures. No new methods or definitions are required.

Suggested changeset 1
benchmarks/baseline_throughput.py

Autofix patch

Autofix patch
Run the following command in your local git repository to apply this patch
cat << 'EOF' | git apply
diff --git a/benchmarks/baseline_throughput.py b/benchmarks/baseline_throughput.py
--- a/benchmarks/baseline_throughput.py
+++ b/benchmarks/baseline_throughput.py
@@ -78,8 +78,9 @@
                 write_latencies_ms.append((time.perf_counter() - t0) * 1000)
                 records_written += 1
                 i += 1
-            except Exception:
-                pass
+            except Exception as exc:
+                # Keep benchmark running but do not silently swallow producer errors.
+                print(f"[baseline_throughput] Producer error: {exc!r}")
 
     async def consume():
         nonlocal records_read
EOF
@@ -78,8 +78,9 @@
write_latencies_ms.append((time.perf_counter() - t0) * 1000)
records_written += 1
i += 1
except Exception:
pass
except Exception as exc:
# Keep benchmark running but do not silently swallow producer errors.
print(f"[baseline_throughput] Producer error: {exc!r}")

async def consume():
nonlocal records_read
Copilot is powered by AI and may make mistakes. Always verify output.
pass

async def consume():
nonlocal records_read
while time.monotonic() < end_time:
t0 = time.perf_counter()
try:
batch = await reader.get(limit=50, wait=True, timeout=1.0)
if batch:
read_latencies_ms.append((time.perf_counter() - t0) * 1000)
records_read += len(batch)
except asyncio.TimeoutError:

Check notice

Code scanning / CodeQL

Empty except Note test

'except' clause does nothing but pass and there is no explanatory comment.

Copilot Autofix

AI 12 days ago

To fix the problem while preserving existing functionality, we should keep ignoring asyncio.TimeoutError but explicitly document that this is intentional. This satisfies the linter’s requirement that an empty except block must either do something meaningful or be clearly documented as intentionally empty.

Detailed plan:

  • In benchmarks/baseline_throughput.py, inside run_queue_baseline’s inner consume coroutine, locate the except asyncio.TimeoutError: block.
  • Replace the bare pass with a commented pass explaining that timeouts are expected when no messages arrive within the timeout window and are intentionally ignored for the benchmark.
  • Leave the except Exception: block as‑is, since the CodeQL finding is specifically about line 93. (If desired later, the project could improve that too by logging, but that’s beyond this specific finding.)

No new imports, methods, or definitions are needed; we only add a comment within the existing file.

Suggested changeset 1
benchmarks/baseline_throughput.py

Autofix patch

Autofix patch
Run the following command in your local git repository to apply this patch
cat << 'EOF' | git apply
diff --git a/benchmarks/baseline_throughput.py b/benchmarks/baseline_throughput.py
--- a/benchmarks/baseline_throughput.py
+++ b/benchmarks/baseline_throughput.py
@@ -91,6 +91,7 @@
                     read_latencies_ms.append((time.perf_counter() - t0) * 1000)
                     records_read += len(batch)
             except asyncio.TimeoutError:
+                # No messages arrived within the timeout; this is expected in the benchmark loop.
                 pass
             except Exception:
                 pass
EOF
@@ -91,6 +91,7 @@
read_latencies_ms.append((time.perf_counter() - t0) * 1000)
records_read += len(batch)
except asyncio.TimeoutError:
# No messages arrived within the timeout; this is expected in the benchmark loop.
pass
except Exception:
pass
Copilot is powered by AI and may make mistakes. Always verify output.
pass
except Exception:

Check notice

Code scanning / CodeQL

Empty except Note test

'except' clause does nothing but pass and there is no explanatory comment.

Copilot Autofix

AI 12 days ago

In general, to fix empty except blocks you either (1) remove the try/except if it’s unnecessary, (2) narrow the exception type and handle it meaningfully (e.g., retry, adjust state), or (3) at minimum log the exception so that failures are not silently ignored.

Here, we want to preserve the existing behavior of keeping the loop running until end_time but stop completely hiding errors. The best minimally invasive fix is:

  • For asyncio.TimeoutError, keep ignoring it (it’s a normal condition when using a timeout) but add a short comment to document that this is intentional.
  • For the generic Exception, keep the loop running but log the exception via the standard library logging module. This doesn’t change functionality (the loop still continues), but makes failures visible in logs.

Concretely in benchmarks/baseline_throughput.py:

  • At the top of the file, add import logging.

  • In consume(), change:

    except asyncio.TimeoutError:
        pass
    except Exception:
        pass

    to:

    except asyncio.TimeoutError:
        # Timeout is expected when no messages are available; continue polling.
        pass
    except Exception:
        logging.exception("Unexpected error while reading from queue baseline consumer")

No new helper methods are required; logging.exception automatically logs the stack trace for the active exception.

Suggested changeset 1
benchmarks/baseline_throughput.py

Autofix patch

Autofix patch
Run the following command in your local git repository to apply this patch
cat << 'EOF' | git apply
diff --git a/benchmarks/baseline_throughput.py b/benchmarks/baseline_throughput.py
--- a/benchmarks/baseline_throughput.py
+++ b/benchmarks/baseline_throughput.py
@@ -19,6 +19,7 @@
 import shutil
 import tempfile
 import time
+import logging
 
 import pulsing as pul
 from pulsing.queue import read_queue, write_queue
@@ -91,9 +92,13 @@
                     read_latencies_ms.append((time.perf_counter() - t0) * 1000)
                     records_read += len(batch)
             except asyncio.TimeoutError:
+                # Timeout is expected when there are no messages within the given period.
+                # Simply continue polling until the benchmark duration elapses.
                 pass
             except Exception:
-                pass
+                logging.exception(
+                    "Unexpected error while reading from queue baseline consumer"
+                )
 
     await asyncio.gather(produce(), consume())
 
EOF
@@ -19,6 +19,7 @@
import shutil
import tempfile
import time
import logging

import pulsing as pul
from pulsing.queue import read_queue, write_queue
@@ -91,9 +92,13 @@
read_latencies_ms.append((time.perf_counter() - t0) * 1000)
records_read += len(batch)
except asyncio.TimeoutError:
# Timeout is expected when there are no messages within the given period.
# Simply continue polling until the benchmark duration elapses.
pass
except Exception:
pass
logging.exception(
"Unexpected error while reading from queue baseline consumer"
)

await asyncio.gather(produce(), consume())

Copilot is powered by AI and may make mistakes. Always verify output.
pass

await asyncio.gather(produce(), consume())

write_latencies_ms.sort()
read_latencies_ms.sort()

return {
"duration_s": duration,
"records_written": records_written,
"records_read": records_read,
"write_throughput_rec_s": records_written / duration if duration > 0 else 0,
"read_throughput_rec_s": records_read / duration if duration > 0 else 0,
"write_latency_ms": {
"avg": sum(write_latencies_ms) / len(write_latencies_ms)
if write_latencies_ms
else 0,
"p50": _percentile(write_latencies_ms, 50),
"p95": _percentile(write_latencies_ms, 95),
"p99": _percentile(write_latencies_ms, 99),
},
"read_latency_ms": {
"avg": sum(read_latencies_ms) / len(read_latencies_ms)
if read_latencies_ms
else 0,
"p50": _percentile(read_latencies_ms, 50),
"p95": _percentile(read_latencies_ms, 95),
"p99": _percentile(read_latencies_ms, 99),
},
}


# =============================================================================
# Topic 基线
# =============================================================================


async def run_topic_baseline(
system,
duration: float,
num_subscribers: int,
payload_size: int,
) -> dict:
"""单 publisher + N subscribers,fire_and_forget,统计发布与交付吞吐."""
topic_name = "baseline_topic"
messages_published = 0
delivered_per_sub: list[int] = [0] * num_subscribers
publish_latencies_ms: list[float] = []

writer = await write_topic(system, topic_name)
readers = []
locks = [asyncio.Lock() for _ in range(num_subscribers)]

for i in range(num_subscribers):
reader = await read_topic(system, topic_name, reader_id=f"sub_{i}")

def make_cb(idx):
async def cb(msg):
async with locks[idx]:
delivered_per_sub[idx] += 1

return cb

reader.add_callback(make_cb(i))
await reader.start()
readers.append(reader)

end_time = time.monotonic() + duration
seq = 0

while time.monotonic() < end_time:
t0 = time.perf_counter()
try:
await writer.publish(
{"seq": seq, "payload": "x" * payload_size},
mode=PublishMode.FIRE_AND_FORGET,
)
publish_latencies_ms.append((time.perf_counter() - t0) * 1000)
messages_published += 1
seq += 1
except Exception:

Check notice

Code scanning / CodeQL

Empty except Note test

'except' clause does nothing but pass and there is no explanatory comment.

Copilot Autofix

AI 12 days ago

In general, the correct fix is to avoid silently swallowing exceptions: either narrow the exception type and handle it appropriately (e.g., retry, count as an error, or abort), or at minimum log the exception (possibly at low verbosity) so failures are visible during debugging.

For this specific loop, the best minimal fix that preserves existing behavior (keep the benchmark running even if some publishes fail) is:

  • Keep catching Exception to avoid aborting the loop.
  • Add logging of the exception (including stack trace) so that failures are discoverable.
  • Optionally, increment a “failed publishes” counter so the result dictionary can expose how many publishes failed. However, adding new result fields might be considered a functional change, so to stay conservative, we will only log the exception and keep the result shape unchanged.

Concretely, in benchmarks/baseline_throughput.py:

  • Add an import logging near the other imports.
  • In the except Exception: block, replace pass with a logging.exception(...) call that clearly indicates a publish error occurred during the topic benchmark loop. This preserves control flow but surfaces failures in logs.

No new external packages are required; Python’s standard logging module is sufficient.

Suggested changeset 1
benchmarks/baseline_throughput.py

Autofix patch

Autofix patch
Run the following command in your local git repository to apply this patch
cat << 'EOF' | git apply
diff --git a/benchmarks/baseline_throughput.py b/benchmarks/baseline_throughput.py
--- a/benchmarks/baseline_throughput.py
+++ b/benchmarks/baseline_throughput.py
@@ -19,6 +19,7 @@
 import shutil
 import tempfile
 import time
+import logging
 
 import pulsing as pul
 from pulsing.queue import read_queue, write_queue
@@ -174,7 +175,7 @@
             messages_published += 1
             seq += 1
         except Exception:
-            pass
+            logging.exception("Error while publishing message in topic benchmark loop")
 
     await asyncio.sleep(0.2)
 
EOF
@@ -19,6 +19,7 @@
import shutil
import tempfile
import time
import logging

import pulsing as pul
from pulsing.queue import read_queue, write_queue
@@ -174,7 +175,7 @@
messages_published += 1
seq += 1
except Exception:
pass
logging.exception("Error while publishing message in topic benchmark loop")

await asyncio.sleep(0.2)

Copilot is powered by AI and may make mistakes. Always verify output.
pass

await asyncio.sleep(0.2)

for r in readers:
await r.stop()

publish_latencies_ms.sort()
total_delivered = sum(delivered_per_sub)

return {
"duration_s": duration,
"num_subscribers": num_subscribers,
"messages_published": messages_published,
"total_delivered": total_delivered,
"publish_throughput_msg_s": messages_published / duration
if duration > 0
else 0,
"delivered_throughput_msg_s": total_delivered / duration if duration > 0 else 0,
"publish_latency_ms": {
"avg": sum(publish_latencies_ms) / len(publish_latencies_ms)
if publish_latencies_ms
else 0,
"p50": _percentile(publish_latencies_ms, 50),
"p95": _percentile(publish_latencies_ms, 95),
"p99": _percentile(publish_latencies_ms, 99),
},
}


# =============================================================================
# Main
# =============================================================================


async def main():
parser = argparse.ArgumentParser(
description="Queue & Topic 基线吞吐 Benchmark(单节点)"
)
parser.add_argument(
"--duration",
type=float,
default=10.0,
help="每类基准运行时长(秒)",
)
parser.add_argument(
"--queue-only",
action="store_true",
help="仅跑 Queue 基线",
)
parser.add_argument(
"--topic-only",
action="store_true",
help="仅跑 Topic 基线",
)
parser.add_argument(
"--num-buckets",
type=int,
default=4,
help="Queue 桶数",
)
parser.add_argument(
"--topic-subscribers",
type=int,
default=1,
help="Topic 订阅者数量",
)
parser.add_argument(
"--record-size",
type=int,
default=100,
help="单条记录 payload 字节数",
)
parser.add_argument(
"--output",
type=str,
default=None,
help="结果写入 JSON 文件路径",
)
args = parser.parse_args()

system = await pul.actor_system()
storage_path = tempfile.mkdtemp(prefix="baseline_queue_")
results: dict = {"queue": None, "topic": None}

try:
if not args.topic_only:
print("Running Queue baseline...")
results["queue"] = await run_queue_baseline(
system,
storage_path=storage_path,
duration=args.duration,
num_buckets=args.num_buckets,
record_size=args.record_size,
)

if not args.queue_only:
print("Running Topic baseline...")
results["topic"] = await run_topic_baseline(
system,
duration=args.duration,
num_subscribers=args.topic_subscribers,
payload_size=args.record_size,
)
finally:
await system.shutdown()
shutil.rmtree(storage_path, ignore_errors=True)

# 打印汇总
print()
print("=" * 60)
print("Baseline Throughput Results")
print("=" * 60)

if results["queue"]:
q = results["queue"]
print("\n--- Queue ---")
print(f" Duration: {q['duration_s']:.1f}s")
print(f" Write throughput: {q['write_throughput_rec_s']:.0f} rec/s")
print(f" Read throughput: {q['read_throughput_rec_s']:.0f} rec/s")
print(
f" Write latency: avg={q['write_latency_ms']['avg']:.2f}ms "
f"p50={q['write_latency_ms']['p50']:.2f}ms p99={q['write_latency_ms']['p99']:.2f}ms"
)
print(
f" Read latency: avg={q['read_latency_ms']['avg']:.2f}ms "
f"p50={q['read_latency_ms']['p50']:.2f}ms p99={q['read_latency_ms']['p99']:.2f}ms"
)

if results["topic"]:
t = results["topic"]
print("\n--- Topic ---")
print(f" Duration: {t['duration_s']:.1f}s")
print(f" Subscribers: {t['num_subscribers']}")
print(f" Publish throughput: {t['publish_throughput_msg_s']:.0f} msg/s")
print(
f" Delivered total: {t['total_delivered']} ({t['delivered_throughput_msg_s']:.0f} msg/s)"
)
print(
f" Publish latency: avg={t['publish_latency_ms']['avg']:.2f}ms "
f"p50={t['publish_latency_ms']['p50']:.2f}ms p99={t['publish_latency_ms']['p99']:.2f}ms"
)

print("\n" + "=" * 60)

if args.output:
with open(args.output, "w") as f:
json.dump(results, f, indent=2)
print(f"Results written to {args.output}")


if __name__ == "__main__":
asyncio.run(main())
Loading
Loading