Native Python implementation of the ZeroBuffer protocol for high-performance zero-copy inter-process communication.
- True Zero-Copy: Uses memoryview and buffer protocol to avoid data copies
- Cross-Platform: Supports Linux, Windows, and macOS
- Protocol Compatible: Binary compatible with C++ and C# implementations
- Pythonic API: Clean, idiomatic Python interface
- Type Safe: Full type hints for better IDE support
- Thread Safe: Built-in synchronization for multi-threaded applications
- Python 3.8 or later
posix-ipc(Linux/macOS only):pip install posix-ipcpywin32(Windows only):pip install pywin32
pip install -e .from zerobuffer import Reader, BufferConfig
# Create a buffer
config = BufferConfig(metadata_size=1024, payload_size=1024*1024)
with Reader("my-buffer", config) as reader:
# Wait for writer to connect
print("Waiting for writer...")
# Read frames
while True:
frame = reader.read_frame(timeout=5.0)
if frame:
# Option 1: Manual frame management
data = frame.data # This is a memoryview (zero-copy)
print(f"Received frame {frame.sequence}: {len(data)} bytes")
# Process the frame...
reader.release_frame(frame) # Must release to free buffer space
# Option 2: Using context manager (RAII pattern - automatic disposal)
with frame:
data = frame.data # Automatically released when exiting 'with' block
print(f"Received frame {frame.sequence}: {len(data)} bytes")
# Process the frame...
# Frame is automatically released herefrom zerobuffer import Writer
# Connect to existing buffer
with Writer("my-buffer") as writer:
# Write some metadata
metadata = b"{'format': 'raw', 'version': 1}"
writer.set_metadata(metadata)
# Write frames
for i in range(100):
data = f"Frame {i}".encode()
writer.write_frame(data)
print(f"Sent frame {i}")# True zero-copy writing with memoryview
import numpy as np
# Create numpy array
arr = np.arange(1000, dtype=np.float32)
# Get memoryview of array (no copy)
view = memoryview(arr)
# Write with zero-copy (memoryview is handled efficiently)
writer.write_frame(view)
# Or use direct buffer access for maximum performance
buffer = writer.get_frame_buffer(size=4000)
# Write directly into shared memory
buffer[:] = arr.tobytes()
writer.commit_frame()Reader(name: str, config: Optional[BufferConfig] = None)Creates a new buffer and prepares for reading.
Methods:
get_metadata() -> Optional[memoryview]: Get metadata as zero-copy memoryviewread_frame(timeout: Optional[float] = 5.0) -> Optional[Frame]: Read next framerelease_frame(frame: Frame) -> None: Release frame and free buffer spaceis_writer_connected() -> bool: Check if writer is connectedclose() -> None: Close reader and clean up resources
Writer(name: str)Connects to an existing buffer for writing.
Methods:
set_metadata(data: Union[bytes, bytearray, memoryview]) -> None: Write metadata (once only)write_frame(data: Union[bytes, bytearray, memoryview]) -> None: Write a frame (zero-copy for memoryview)get_frame_buffer(size: int) -> memoryview: Get buffer for direct writingcommit_frame() -> None: Commit frame after direct writingis_reader_connected() -> bool: Check if reader is connectedclose() -> None: Close writer
Represents a zero-copy reference to frame data with RAII support.
Properties:
data -> memoryview: Zero-copy view of frame datasize -> int: Size of frame datasequence -> int: Sequence numberis_valid -> bool: Check if frame has valid data
RAII Support: The Frame class implements the context manager protocol for automatic resource management:
# Manual management
frame = reader.read_frame()
if frame:
process(frame.data)
reader.release_frame(frame) # Must release manually
# Automatic management with context manager (RAII)
frame = reader.read_frame()
if frame:
with frame: # Enters context, frame is valid
process(frame.data)
# Frame is automatically disposed/released hereUsing the context manager ensures frames are always properly released, even if an exception occurs during processing.
BufferConfig(metadata_size: int = 1024, payload_size: int = 1024*1024)Configuration for creating a buffer.
The Python implementation provides true zero-copy access through:
- memoryview objects: No data copying when accessing frame data
- Buffer protocol: Direct memory access for compatible objects
- Shared memory: Direct mapping of shared memory into process space
- Converting memoryview to bytes:
bytes(frame.data) - Using non-buffer protocol objects with
write_frame() - String encoding:
"text".encode()
- Use
memoryviewobjects whenever possible - Pass memoryview directly to
write_frame()for zero-copy operation - Use numpy arrays or other buffer protocol objects
- Access frame data directly via
frame.datamemoryview - Use
get_frame_buffer()andcommit_frame()for direct memory access
- Pre-allocate buffers: Reuse buffers instead of creating new ones
- Batch operations: Process multiple frames before releasing
- Use appropriate buffer sizes: See capacity planning in main README
- Monitor buffer utilization: Avoid buffer full conditions
The ZeroBuffer library uses Python's standard logging module. By default, it uses a NullHandler (no output) following Python library best practices.
Set the ZEROBUFFER_LOG_LEVEL environment variable:
export ZEROBUFFER_LOG_LEVEL=DEBUG
python your_app.pyOr in Python:
import os
os.environ['ZEROBUFFER_LOG_LEVEL'] = 'DEBUG'
import zerobuffer # Import after setting env varimport logging
import zerobuffer
# Option 1: Basic configuration for debugging
logging.basicConfig(level=logging.DEBUG)
# Option 2: Configure only zerobuffer logging
logging.getLogger('zerobuffer').setLevel(logging.DEBUG)
# Add a handler to see the output
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
))
logging.getLogger('zerobuffer').addHandler(handler)
# Option 3: Configure specific modules
logging.getLogger('zerobuffer.reader').setLevel(logging.DEBUG) # Only reader logs
logging.getLogger('zerobuffer.writer').setLevel(logging.DEBUG) # Only writer logs
logging.getLogger('zerobuffer.duplex.server').setLevel(logging.DEBUG) # Only server logs- DEBUG: Detailed information for diagnosing issues
- Buffer creation/initialization
- Frame read/write operations with sizes and sequences
- Wrap-around detection and handling
- Semaphore operations
- OIEB state changes
- INFO: General operational information
- Buffer connection/disconnection
- Metadata operations
- Major state changes
- WARNING: Important warnings that don't prevent operation
- Missing metadata
- Retry operations
- ERROR: Error conditions
- Connection failures
- Processing errors
- Resource cleanup failures
All operations may raise exceptions from zerobuffer.exceptions:
WriterDeadException: Writer process died (accepts optional message)ReaderDeadException: Reader process died (accepts optional message)BufferFullException: Buffer is fullFrameTooLargeException: Frame exceeds buffer capacitySequenceError: Frame sequence validation failedMetadataAlreadyWrittenException: Metadata can only be written onceZeroBufferException: Base exception for all ZeroBuffer errors
The Reader and Writer classes are thread-safe. Multiple threads can:
- Call methods on the same Reader/Writer instance
- Read frames concurrently (with proper frame release)
- Write frames concurrently
However, only one Reader and one Writer can connect to a buffer at a time.
- Uses POSIX shared memory (
/dev/shm) - Requires
posix-ipcpackage - File locks in
/tmp/zerobuffer/
- Uses Windows named shared memory
- Requires
pywin32package - File locks in temp directory
- Similar to Linux with BSD-specific handling
- Requires
posix-ipcpackage
The Python implementation includes testing utilities for cross-platform compatibility with C# and C++ implementations:
Ensures unique buffer names across test runs to prevent conflicts:
from zerobuffer_serve.services import BufferNamingService
# Creates unique buffer names for test isolation
naming_service = BufferNamingService(logger)
actual_name = naming_service.get_buffer_name("test-buffer")
# Returns: "test-buffer_<pid>_<timestamp>" or uses Harmony environment variablesProvides consistent test data generation across all language implementations:
from zerobuffer_serve.test_data_patterns import TestDataPatterns
# Generate deterministic frame data
data = TestDataPatterns.generate_frame_data(size=1024, sequence=1)
# Generate simple pattern data
simple_data = TestDataPatterns.generate_simple_frame_data(size=1024)
# Verify data matches pattern
is_valid = TestDataPatterns.verify_simple_frame_data(data)
# Generate test metadata
metadata = TestDataPatterns.generate_metadata(size=256)These utilities ensure that Python, C#, and C++ implementations can exchange data correctly in cross-platform tests.
The Python implementation includes full support for duplex channels (bidirectional request-response communication):
from zerobuffer.duplex import DuplexChannelFactory, ImmutableDuplexServer, ProcessingMode
from zerobuffer import BufferConfig, Frame, Writer
# Server side
factory = DuplexChannelFactory()
config = BufferConfig(metadata_size=4096, payload_size=10*1024*1024)
server = factory.create_immutable_server("my-service", config)
def handle_request(frame: Frame, response_writer: Writer) -> None:
"""Process request and send response"""
with frame: # RAII - frame is automatically disposed
# Access request data (zero-copy)
request_data = bytes(frame.data)
# Process the request
response_data = process_data(request_data)
# Send response
response_writer.write_frame(response_data)
# Start server with single-thread processing (default)
server.start(handle_request)
# Or specify processing mode explicitly
# server.start(handle_request, mode=ProcessingMode.SINGLE_THREAD)
# Client side
client = factory.create_client("my-service")
# Send request
request_data = b"Hello, server!"
client.send_request(request_data)
# Receive response with timeout
response = client.receive_response(timeout=5.0)
if response:
with response: # RAII - automatically disposed
print(f"Response: {bytes(response.data)}")
print(f"Sequence: {response.sequence}")from zerobuffer.error_event_args import ErrorEventArgs
def handle_error(args: ErrorEventArgs) -> None:
"""Handle server errors"""
print(f"Server error: {args.exception}")
# Add error handler
server.add_error_handler(handle_error)
# Start server with metadata initialization
def on_init(metadata: memoryview) -> None:
"""Process client metadata on connection"""
client_info = bytes(metadata).decode()
print(f"Client connected with metadata: {client_info}")
server.start(handle_request, on_init=on_init)See DUPLEX_CHANNEL.md for detailed documentation.