From 2be0f6c7e32c2c23d6363dfe013b5bb89b44a7dc Mon Sep 17 00:00:00 2001 From: Suhaib Date: Wed, 14 May 2025 21:48:07 -0700 Subject: [PATCH 1/5] Python client --- .vscode/tasks.json | 18 +- proto/smq.pb.go | 167 ++++--- proto/smq_grpc.pb.go | 2 +- pysmq/README.md | 234 ++++++++++ pysmq/examples/publish_example.py | 56 +++ pysmq/examples/secure_connection_example.py | 105 +++++ pysmq/examples/subscribe_example.py | 122 +++++ pysmq/pyproject.toml | 55 +++ pysmq/pysmq/__init__.py | 37 ++ pysmq/pysmq/client.py | 429 +++++++++++++++++ pysmq/pysmq/config.py | 41 ++ pysmq/pysmq/exceptions.py | 84 ++++ pysmq/pysmq/proto/__init__.py | 8 + pysmq/pysmq/proto/smq_pb2.py | 71 +++ pysmq/pysmq/proto/smq_pb2_grpc.py | 486 ++++++++++++++++++++ pysmq/tests/__init__.py | 0 pysmq/tests/test_client.py | 149 ++++++ 17 files changed, 2006 insertions(+), 58 deletions(-) create mode 100644 pysmq/README.md create mode 100644 pysmq/examples/publish_example.py create mode 100644 pysmq/examples/secure_connection_example.py create mode 100644 pysmq/examples/subscribe_example.py create mode 100644 pysmq/pyproject.toml create mode 100644 pysmq/pysmq/__init__.py create mode 100644 pysmq/pysmq/client.py create mode 100644 pysmq/pysmq/config.py create mode 100644 pysmq/pysmq/exceptions.py create mode 100644 pysmq/pysmq/proto/__init__.py create mode 100644 pysmq/pysmq/proto/smq_pb2.py create mode 100644 pysmq/pysmq/proto/smq_pb2_grpc.py create mode 100644 pysmq/tests/__init__.py create mode 100644 pysmq/tests/test_client.py diff --git a/.vscode/tasks.json b/.vscode/tasks.json index 1e9b369..0c6f9a7 100644 --- a/.vscode/tasks.json +++ b/.vscode/tasks.json @@ -2,7 +2,7 @@ "version": "2.0.0", "tasks": [ { - "label": "Generate Protobuf", + "label": "Generate Go Protobuf", "type": "shell", "command": "protoc --proto_path=. --go_out=./proto/ --go_opt=paths=source_relative --go-grpc_out=./proto/ --go-grpc_opt=paths=source_relative smq.proto", "group": { @@ -10,6 +10,22 @@ "isDefault": true }, "detail": "Generates Go files from smq.proto using protoc. Assumes protoc, protoc-gen-go, and protoc-gen-go-grpc are installed and in PATH." + }, + { + "label": "Generate Python Protobuf", + "type": "shell", + "command": "python3 -m grpc_tools.protoc --proto_path=. --python_out=./pysmq/pysmq/proto --grpc_python_out=./pysmq/pysmq/proto smq.proto && python3 -c \"import os, re; f='./pysmq/pysmq/proto/smq_pb2_grpc.py'; content=open(f).read(); open(f, 'w').write(re.sub(r'import smq_pb2', 'from . import smq_pb2', content))\"", + "group": "build", + "detail": "Generates Python files from smq.proto using grpc_tools.protoc. Requires grpcio-tools to be installed in the Python environment." + }, + { + "label": "Generate All Protobuf", + "dependsOn": ["Generate Go Protobuf", "Generate Python Protobuf"], + "group": { + "kind": "build", + "isDefault": true + }, + "detail": "Generates both Go and Python files from smq.proto." } ] } diff --git a/proto/smq.pb.go b/proto/smq.pb.go index 67ee22e..b310737 100644 --- a/proto/smq.pb.go +++ b/proto/smq.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.6 -// protoc v6.30.2 +// protoc-gen-go v1.36.5 +// protoc v5.29.3 // source: smq.proto package proto @@ -811,60 +811,115 @@ func (*DeleteUntilOffsetResponse) Descriptor() ([]byte, []int) { var File_smq_proto protoreflect.FileDescriptor -const file_smq_proto_rawDesc = "" + - "\n" + - "\tsmq.proto\x12\x03smq\"D\n" + - "\x10RetrievedMessage\x12\x18\n" + - "\amessage\x18\x01 \x01(\fR\amessage\x12\x16\n" + - "\x06offset\x18\x02 \x01(\x03R\x06offset\"d\n" + - "\x13BulkRetrieveRequest\x12\x14\n" + - "\x05topic\x18\x01 \x01(\tR\x05topic\x12!\n" + - "\fstart_offset\x18\x02 \x01(\x03R\vstartOffset\x12\x14\n" + - "\x05limit\x18\x03 \x01(\x05R\x05limit\"\x80\x01\n" + - "\x14BulkRetrieveResponse\x121\n" + - "\bmessages\x18\x01 \x03(\v2\x15.smq.RetrievedMessageR\bmessages\x12\x14\n" + - "\x05count\x18\x02 \x01(\x05R\x05count\x12\x1f\n" + - "\vnext_offset\x18\x03 \x01(\x03R\n" + - "nextOffset\"\x10\n" + - "\x0eConnectRequest\"\x11\n" + - "\x0fConnectResponse\".\n" + - "\x16GetLatestOffsetRequest\x12\x14\n" + - "\x05topic\x18\x01 \x01(\tR\x05topic\"1\n" + - "\x17GetLatestOffsetResponse\x12\x16\n" + - "\x06offset\x18\x01 \x01(\x03R\x06offset\"0\n" + - "\x18GetEarliestOffsetRequest\x12\x14\n" + - "\x05topic\x18\x01 \x01(\tR\x05topic\"3\n" + - "\x19GetEarliestOffsetResponse\x12\x16\n" + - "\x06offset\x18\x01 \x01(\x03R\x06offset\"*\n" + - "\x12CreateTopicRequest\x12\x14\n" + - "\x05topic\x18\x01 \x01(\tR\x05topic\"\x15\n" + - "\x13CreateTopicResponse\"@\n" + - "\x0eProduceRequest\x12\x14\n" + - "\x05topic\x18\x01 \x01(\tR\x05topic\x12\x18\n" + - "\amessage\x18\x02 \x01(\fR\amessage\")\n" + - "\x0fProduceResponse\x12\x16\n" + - "\x06offset\x18\x01 \x01(\x03R\x06offset\">\n" + - "\x0eConsumeRequest\x12\x14\n" + - "\x05topic\x18\x01 \x01(\tR\x05topic\x12\x16\n" + - "\x06offset\x18\x02 \x01(\x03R\x06offset\"C\n" + - "\x0fConsumeResponse\x12\x18\n" + - "\amessage\x18\x01 \x01(\fR\amessage\x12\x16\n" + - "\x06offset\x18\x02 \x01(\x03R\x06offset\"H\n" + - "\x18DeleteUntilOffsetRequest\x12\x14\n" + - "\x05topic\x18\x01 \x01(\tR\x05topic\x12\x16\n" + - "\x06offset\x18\x02 \x01(\x03R\x06offset\"\x1b\n" + - "\x19DeleteUntilOffsetResponse2\xc3\x05\n" + - "\x12SuhaibMessageQueue\x126\n" + - "\aConnect\x12\x13.smq.ConnectRequest\x1a\x14.smq.ConnectResponse\"\x00\x12N\n" + - "\x0fGetLatestOffset\x12\x1b.smq.GetLatestOffsetRequest\x1a\x1c.smq.GetLatestOffsetResponse\"\x00\x12T\n" + - "\x11GetEarliestOffset\x12\x1d.smq.GetEarliestOffsetRequest\x1a\x1e.smq.GetEarliestOffsetResponse\"\x00\x12B\n" + - "\vCreateTopic\x12\x17.smq.CreateTopicRequest\x1a\x18.smq.CreateTopicResponse\"\x00\x126\n" + - "\aProduce\x12\x13.smq.ProduceRequest\x1a\x14.smq.ProduceResponse\"\x00\x126\n" + - "\aConsume\x12\x13.smq.ConsumeRequest\x1a\x14.smq.ConsumeResponse\"\x00\x12>\n" + - "\rStreamProduce\x12\x13.smq.ProduceRequest\x1a\x14.smq.ProduceResponse\"\x00(\x01\x12>\n" + - "\rStreamConsume\x12\x13.smq.ConsumeRequest\x1a\x14.smq.ConsumeResponse\"\x000\x01\x12T\n" + - "\x11DeleteUntilOffset\x12\x1d.smq.DeleteUntilOffsetRequest\x1a\x1e.smq.DeleteUntilOffsetResponse\"\x00\x12E\n" + - "\fBulkRetrieve\x12\x18.smq.BulkRetrieveRequest\x1a\x19.smq.BulkRetrieveResponse\"\x00B\tZ\a./protob\x06proto3" +var file_smq_proto_rawDesc = string([]byte{ + 0x0a, 0x09, 0x73, 0x6d, 0x71, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x03, 0x73, 0x6d, 0x71, + 0x22, 0x44, 0x0a, 0x10, 0x52, 0x65, 0x74, 0x72, 0x69, 0x65, 0x76, 0x65, 0x64, 0x4d, 0x65, 0x73, + 0x73, 0x61, 0x67, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x16, + 0x0a, 0x06, 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x06, + 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x22, 0x64, 0x0a, 0x13, 0x42, 0x75, 0x6c, 0x6b, 0x52, 0x65, + 0x74, 0x72, 0x69, 0x65, 0x76, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x14, 0x0a, + 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x74, 0x6f, + 0x70, 0x69, 0x63, 0x12, 0x21, 0x0a, 0x0c, 0x73, 0x74, 0x61, 0x72, 0x74, 0x5f, 0x6f, 0x66, 0x66, + 0x73, 0x65, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0b, 0x73, 0x74, 0x61, 0x72, 0x74, + 0x4f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x18, + 0x03, 0x20, 0x01, 0x28, 0x05, 0x52, 0x05, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x22, 0x80, 0x01, 0x0a, + 0x14, 0x42, 0x75, 0x6c, 0x6b, 0x52, 0x65, 0x74, 0x72, 0x69, 0x65, 0x76, 0x65, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x31, 0x0a, 0x08, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, + 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x15, 0x2e, 0x73, 0x6d, 0x71, 0x2e, 0x52, 0x65, + 0x74, 0x72, 0x69, 0x65, 0x76, 0x65, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x08, + 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x12, 0x14, 0x0a, 0x05, 0x63, 0x6f, 0x75, 0x6e, + 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x05, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x12, 0x1f, + 0x0a, 0x0b, 0x6e, 0x65, 0x78, 0x74, 0x5f, 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x18, 0x03, 0x20, + 0x01, 0x28, 0x03, 0x52, 0x0a, 0x6e, 0x65, 0x78, 0x74, 0x4f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x22, + 0x10, 0x0a, 0x0e, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x22, 0x11, 0x0a, 0x0f, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x52, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x2e, 0x0a, 0x16, 0x47, 0x65, 0x74, 0x4c, 0x61, 0x74, 0x65, 0x73, + 0x74, 0x4f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x14, + 0x0a, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x74, + 0x6f, 0x70, 0x69, 0x63, 0x22, 0x31, 0x0a, 0x17, 0x47, 0x65, 0x74, 0x4c, 0x61, 0x74, 0x65, 0x73, + 0x74, 0x4f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, + 0x16, 0x0a, 0x06, 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, + 0x06, 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x22, 0x30, 0x0a, 0x18, 0x47, 0x65, 0x74, 0x45, 0x61, + 0x72, 0x6c, 0x69, 0x65, 0x73, 0x74, 0x4f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x22, 0x33, 0x0a, 0x19, 0x47, 0x65, 0x74, + 0x45, 0x61, 0x72, 0x6c, 0x69, 0x65, 0x73, 0x74, 0x4f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x52, 0x65, + 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x06, 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x22, 0x2a, + 0x0a, 0x12, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x22, 0x15, 0x0a, 0x13, 0x43, 0x72, + 0x65, 0x61, 0x74, 0x65, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, + 0x65, 0x22, 0x40, 0x0a, 0x0e, 0x50, 0x72, 0x6f, 0x64, 0x75, 0x63, 0x65, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, + 0x73, 0x61, 0x67, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, + 0x61, 0x67, 0x65, 0x22, 0x29, 0x0a, 0x0f, 0x50, 0x72, 0x6f, 0x64, 0x75, 0x63, 0x65, 0x52, 0x65, + 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x06, 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x22, 0x3e, + 0x0a, 0x0e, 0x43, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x12, 0x14, 0x0a, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x12, 0x16, 0x0a, 0x06, 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x06, 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x22, 0x43, + 0x0a, 0x0f, 0x43, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, + 0x65, 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x0c, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x6f, + 0x66, 0x66, 0x73, 0x65, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x06, 0x6f, 0x66, 0x66, + 0x73, 0x65, 0x74, 0x22, 0x48, 0x0a, 0x18, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x55, 0x6e, 0x74, + 0x69, 0x6c, 0x4f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, + 0x14, 0x0a, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, + 0x74, 0x6f, 0x70, 0x69, 0x63, 0x12, 0x16, 0x0a, 0x06, 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x18, + 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x06, 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x22, 0x1b, 0x0a, + 0x19, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x55, 0x6e, 0x74, 0x69, 0x6c, 0x4f, 0x66, 0x66, 0x73, + 0x65, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x32, 0xc3, 0x05, 0x0a, 0x12, 0x53, + 0x75, 0x68, 0x61, 0x69, 0x62, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x51, 0x75, 0x65, 0x75, + 0x65, 0x12, 0x36, 0x0a, 0x07, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x12, 0x13, 0x2e, 0x73, + 0x6d, 0x71, 0x2e, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x1a, 0x14, 0x2e, 0x73, 0x6d, 0x71, 0x2e, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x52, + 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x4e, 0x0a, 0x0f, 0x47, 0x65, 0x74, + 0x4c, 0x61, 0x74, 0x65, 0x73, 0x74, 0x4f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x12, 0x1b, 0x2e, 0x73, + 0x6d, 0x71, 0x2e, 0x47, 0x65, 0x74, 0x4c, 0x61, 0x74, 0x65, 0x73, 0x74, 0x4f, 0x66, 0x66, 0x73, + 0x65, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1c, 0x2e, 0x73, 0x6d, 0x71, 0x2e, + 0x47, 0x65, 0x74, 0x4c, 0x61, 0x74, 0x65, 0x73, 0x74, 0x4f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x52, + 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x54, 0x0a, 0x11, 0x47, 0x65, 0x74, + 0x45, 0x61, 0x72, 0x6c, 0x69, 0x65, 0x73, 0x74, 0x4f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x12, 0x1d, + 0x2e, 0x73, 0x6d, 0x71, 0x2e, 0x47, 0x65, 0x74, 0x45, 0x61, 0x72, 0x6c, 0x69, 0x65, 0x73, 0x74, + 0x4f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1e, 0x2e, + 0x73, 0x6d, 0x71, 0x2e, 0x47, 0x65, 0x74, 0x45, 0x61, 0x72, 0x6c, 0x69, 0x65, 0x73, 0x74, 0x4f, + 0x66, 0x66, 0x73, 0x65, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, + 0x42, 0x0a, 0x0b, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x12, 0x17, + 0x2e, 0x73, 0x6d, 0x71, 0x2e, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x54, 0x6f, 0x70, 0x69, 0x63, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x18, 0x2e, 0x73, 0x6d, 0x71, 0x2e, 0x43, 0x72, + 0x65, 0x61, 0x74, 0x65, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, + 0x65, 0x22, 0x00, 0x12, 0x36, 0x0a, 0x07, 0x50, 0x72, 0x6f, 0x64, 0x75, 0x63, 0x65, 0x12, 0x13, + 0x2e, 0x73, 0x6d, 0x71, 0x2e, 0x50, 0x72, 0x6f, 0x64, 0x75, 0x63, 0x65, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x1a, 0x14, 0x2e, 0x73, 0x6d, 0x71, 0x2e, 0x50, 0x72, 0x6f, 0x64, 0x75, 0x63, + 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x36, 0x0a, 0x07, 0x43, + 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x12, 0x13, 0x2e, 0x73, 0x6d, 0x71, 0x2e, 0x43, 0x6f, 0x6e, + 0x73, 0x75, 0x6d, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x14, 0x2e, 0x73, 0x6d, + 0x71, 0x2e, 0x43, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, + 0x65, 0x22, 0x00, 0x12, 0x3e, 0x0a, 0x0d, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x50, 0x72, 0x6f, + 0x64, 0x75, 0x63, 0x65, 0x12, 0x13, 0x2e, 0x73, 0x6d, 0x71, 0x2e, 0x50, 0x72, 0x6f, 0x64, 0x75, + 0x63, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x14, 0x2e, 0x73, 0x6d, 0x71, 0x2e, + 0x50, 0x72, 0x6f, 0x64, 0x75, 0x63, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, + 0x00, 0x28, 0x01, 0x12, 0x3e, 0x0a, 0x0d, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x43, 0x6f, 0x6e, + 0x73, 0x75, 0x6d, 0x65, 0x12, 0x13, 0x2e, 0x73, 0x6d, 0x71, 0x2e, 0x43, 0x6f, 0x6e, 0x73, 0x75, + 0x6d, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x14, 0x2e, 0x73, 0x6d, 0x71, 0x2e, + 0x43, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, + 0x00, 0x30, 0x01, 0x12, 0x54, 0x0a, 0x11, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x55, 0x6e, 0x74, + 0x69, 0x6c, 0x4f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x12, 0x1d, 0x2e, 0x73, 0x6d, 0x71, 0x2e, 0x44, + 0x65, 0x6c, 0x65, 0x74, 0x65, 0x55, 0x6e, 0x74, 0x69, 0x6c, 0x4f, 0x66, 0x66, 0x73, 0x65, 0x74, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1e, 0x2e, 0x73, 0x6d, 0x71, 0x2e, 0x44, 0x65, + 0x6c, 0x65, 0x74, 0x65, 0x55, 0x6e, 0x74, 0x69, 0x6c, 0x4f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x52, + 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x45, 0x0a, 0x0c, 0x42, 0x75, 0x6c, + 0x6b, 0x52, 0x65, 0x74, 0x72, 0x69, 0x65, 0x76, 0x65, 0x12, 0x18, 0x2e, 0x73, 0x6d, 0x71, 0x2e, + 0x42, 0x75, 0x6c, 0x6b, 0x52, 0x65, 0x74, 0x72, 0x69, 0x65, 0x76, 0x65, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x1a, 0x19, 0x2e, 0x73, 0x6d, 0x71, 0x2e, 0x42, 0x75, 0x6c, 0x6b, 0x52, 0x65, + 0x74, 0x72, 0x69, 0x65, 0x76, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, + 0x42, 0x09, 0x5a, 0x07, 0x2e, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x33, +}) var ( file_smq_proto_rawDescOnce sync.Once diff --git a/proto/smq_grpc.pb.go b/proto/smq_grpc.pb.go index 27c4147..4239338 100644 --- a/proto/smq_grpc.pb.go +++ b/proto/smq_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.5.1 -// - protoc v6.30.2 +// - protoc v5.29.3 // source: smq.proto package proto diff --git a/pysmq/README.md b/pysmq/README.md new file mode 100644 index 0000000..38a3e71 --- /dev/null +++ b/pysmq/README.md @@ -0,0 +1,234 @@ +# PySMQ: Python Client for SuhaibMessageQueue + +This package provides a Python client for interacting with [SuhaibMessageQueue](https://github.com/Suhaibinator/SuhaibMessageQueue), a simple and efficient message queue service. + +## Installation + +```bash +# From PyPI (once published) +pip install pysmq + +# From source +git clone https://github.com/Suhaibinator/SuhaibMessageQueue.git +cd SuhaibMessageQueue +pip install -e ./pysmq +``` + +## Dependencies + +- Python 3.7+ +- grpcio +- protobuf + +## Quick Start + +### Creating a client and publishing messages + +```python +from pysmq.client import Client + +# Create a client +with Client(host="localhost", port=8097) as client: + # Create a topic + client.create_topic("my-topic") + + # Publish a message + offset = client.produce("my-topic", b"Hello, SMQ!") + print(f"Message published at offset {offset}") +``` + +### Consuming messages + +```python +from pysmq.client import Client + +# Create a client +with Client(host="localhost", port=8097) as client: + # Consume the latest message + message, offset = client.consume_latest("my-topic") + print(f"Latest message: {message.decode('utf-8')}, offset: {offset}") + + # Consume the earliest message + message, offset = client.consume_earliest("my-topic") + print(f"Earliest message: {message.decode('utf-8')}, offset: {offset}") + + # Consume a specific message + message, offset = client.consume("my-topic", 5) # offset 5 + print(f"Message at offset 5: {message.decode('utf-8')}") +``` + +### Streaming messages + +```python +from pysmq.client import Client + +# Create a client +with Client(host="localhost", port=8097) as client: + # Stream produce messages + messages = [b"Message 1", b"Message 2", b"Message 3"] + last_offset = client.stream_produce("my-topic", messages) + print(f"Last message published at offset {last_offset}") + + # Stream consume messages (continuous) + for message, offset in client.stream_consume("my-topic", 0): + print(f"Received message: {message.decode('utf-8')}, offset: {offset}") + # Break the loop when needed + if offset >= 10: + break +``` + +### Secure Connection (mTLS) + +```python +from pysmq.client import Client +from pysmq.config import ClientTLSConfig + +# Create TLS configuration +tls_config = ClientTLSConfig( + cert_file="/path/to/client.crt", + key_file="/path/to/client.key", + ca_file="/path/to/ca.crt" +) + +# Create a secure client +with Client(host="localhost", port=8097, tls_config=tls_config) as client: + # Use the client as normal + client.create_topic("secure-topic") + offset = client.produce("secure-topic", b"Secure message") + print(f"Secure message published at offset {offset}") +``` + +## API Overview + +### Client + +The `Client` class provides methods for interacting with the SMQ server. + +```python +# Constructor +client = Client( + host="localhost", + port=8097, + tls_config=None, # Optional: ClientTLSConfig for secure connections + max_send_message_size_mb=1024, # Optional: Maximum size of messages to send + max_receive_message_size_mb=1024 # Optional: Maximum size of messages to receive +) +``` + +### Topic Management + +```python +# Create a topic +client.create_topic(topic) +``` + +### Message Production + +```python +# Produce a single message +offset = client.produce(topic, message_bytes) + +# Produce a stream of messages +last_offset = client.stream_produce(topic, messages_iterable) +``` + +### Message Consumption + +```python +# Consume a single message at a specific offset +message, offset = client.consume(topic, offset) + +# Consume the earliest available message +message, offset = client.consume_earliest(topic) + +# Consume the latest message +message, offset = client.consume_latest(topic) + +# Stream consume messages (returns an iterator) +for message, offset in client.stream_consume(topic, start_offset): + # Process each message + pass +``` + +### Offset Management + +```python +# Get the latest offset for a topic +latest_offset = client.get_latest_offset(topic) + +# Get the earliest available offset for a topic +earliest_offset = client.get_earliest_offset(topic) + +# Delete all messages up to and including the specified offset +client.delete_until_offset(topic, offset) +``` + +### Bulk Operations + +```python +# Retrieve a batch of messages +response = client.bulk_retrieve(topic, start_offset, limit) +for message in response.messages: + # Process each message + message_bytes = message.message + message_offset = message.offset +``` + +### Connection Management + +```python +# Connect to the server (optional, happens automatically on first operation) +client.connect() + +# Close the connection (or use 'with' statement) +client.close() +``` + +## Exception Handling + +The client throws specific exceptions for different types of errors: + +```python +from pysmq.exceptions import ( + SMQException, # Base exception for all SMQ errors + SMQConnectionError, # Connection issues + SMQAuthenticationError, # TLS/authentication issues + SMQProduceError, # Issues with producing messages + SMQConsumeError, # Issues with consuming messages + SMQTopicCreationError, # Issues with creating topics + SMQOffsetError, # Issues with offsets + SMQTimeoutError, # Timeout errors +) + +try: + client.produce("my-topic", b"Hello") +except SMQProduceError as e: + print(f"Failed to produce message: {e}") + # Access the original error if needed + original_error = e.original_error +``` + +## Configuration + +The client supports various configuration options: + +```python +from pysmq.config import ClientTLSConfig, ClientConfig + +# Default timeout values can be accessed or modified +ClientConfig.DEFAULT_TIMEOUT = 60.0 # Default timeout for most operations +ClientConfig.DEFAULT_TOPIC_TIMEOUT = 2.0 # Default timeout for topic creation +ClientConfig.DEFAULT_DELETE_TIMEOUT = 2.0 # Default timeout for deletion operations +``` + +## Examples + +For more detailed examples, see the [examples](examples/) directory: + +- [Basic Publishing](examples/publish_example.py) +- [Message Consumption](examples/subscribe_example.py) +- [Secure Connection](examples/secure_connection_example.py) + +## License + +This project is licensed under the MIT License - see the LICENSE file for details. diff --git a/pysmq/examples/publish_example.py b/pysmq/examples/publish_example.py new file mode 100644 index 0000000..34e42b1 --- /dev/null +++ b/pysmq/examples/publish_example.py @@ -0,0 +1,56 @@ +#!/usr/bin/env python3 +""" +Example showing how to publish messages to SMQ using the Python client. +""" +import time +import json + +from pysmq.client import Client +from pysmq.config import ClientTLSConfig + + +def main(): + # Replace with actual server details + host = "localhost" + port = 8097 + + # Create a client with default settings (insecure connection) + with Client(host=host, port=port) as client: + # Connect to the server (this is optional but a good test) + print("Connecting to SMQ server...") + client.connect() + print("Connected!") + + # Create a topic if it doesn't exist + topic = "example-topic" + try: + print(f"Creating topic '{topic}'...") + client.create_topic(topic) + print(f"Topic '{topic}' created successfully!") + except Exception as e: + # Topic may already exist, which is okay + print(f"Note: {e}") + + # Publish a few messages + for i in range(5): + # Create a sample message (convert to bytes) + message = { + "id": i, + "timestamp": time.time(), + "content": f"Message #{i}", + } + message_bytes = json.dumps(message).encode('utf-8') + + # Publish the message + print(f"Publishing message {i}...") + offset = client.produce(topic, message_bytes) + print(f"Message published at offset {offset}") + + # Small delay between messages for demonstration + time.sleep(0.5) + + print("All messages published successfully!") + + +if __name__ == "__main__": + main() diff --git a/pysmq/examples/secure_connection_example.py b/pysmq/examples/secure_connection_example.py new file mode 100644 index 0000000..0f06dc2 --- /dev/null +++ b/pysmq/examples/secure_connection_example.py @@ -0,0 +1,105 @@ +#!/usr/bin/env python3 +""" +Example showing how to establish a secure connection to the SMQ server using mTLS. +""" +import json +import time +import os + +from pysmq.client import Client +from pysmq.config import ClientTLSConfig +from pysmq.exceptions import SMQAuthenticationError, SMQConnectionError + + +def main(): + # Replace with actual server details + host = "localhost" + port = 8097 + + # Replace with actual paths to your TLS certificate files + # These paths should point to your client certificate, client key, + # and CA certificate that signed the server's certificate + cert_file = os.environ.get("CLIENT_CERT", "/path/to/client.crt") + key_file = os.environ.get("CLIENT_KEY", "/path/to/client.key") + ca_file = os.environ.get("CA_CERT", "/path/to/ca.crt") + + # Check if the certificate files exist + for file_path, file_name in [ + (cert_file, "Client certificate"), + (key_file, "Client key"), + (ca_file, "CA certificate") + ]: + if not os.path.exists(file_path): + print(f"Warning: {file_name} file not found at {file_path}") + print("This example requires valid TLS certificates to work.") + print("You can set the paths via environment variables:") + print(" CLIENT_CERT: Path to client certificate file") + print(" CLIENT_KEY: Path to client key file") + print(" CA_CERT: Path to CA certificate file") + return + + # Create the TLS configuration + tls_config = ClientTLSConfig( + cert_file=cert_file, + key_file=key_file, + ca_file=ca_file, + ) + + try: + print("Creating secure client with mTLS...") + # Create a client with mTLS configuration + with Client(host=host, port=port, tls_config=tls_config) as client: + # Connect to the server + print("Connecting to SMQ server securely...") + client.connect() + print("Connected securely!") + + # Example: Create a topic + topic = "secure-example-topic" + try: + print(f"Creating topic '{topic}'...") + client.create_topic(topic) + print(f"Topic '{topic}' created successfully!") + except Exception as e: + # Topic may already exist, which is okay + print(f"Note: {e}") + + # Example: Publish a message + message = { + "id": 1, + "timestamp": time.time(), + "content": "Secure message", + "secure": True, + } + message_bytes = json.dumps(message).encode('utf-8') + + print("Publishing a secure message...") + offset = client.produce(topic, message_bytes) + print(f"Message published securely at offset {offset}") + + # Example: Consume the message + print("Consuming the message...") + received_bytes, received_offset = client.consume(topic, offset) + + # Decode and print the message + received_message = json.loads(received_bytes.decode('utf-8')) + print(f"Received secure message at offset {received_offset}:") + print(f" ID: {received_message.get('id')}") + print(f" Timestamp: {received_message.get('timestamp')}") + print(f" Content: {received_message.get('content')}") + print(f" Secure: {received_message.get('secure')}") + + print("Secure connection example completed successfully!") + + except SMQAuthenticationError as e: + print(f"TLS Authentication Error: {e}") + print("Make sure your certificates are correctly configured and valid.") + except SMQConnectionError as e: + print(f"Connection Error: {e}") + print("Make sure the server is running and configured for mTLS.") + except Exception as e: + print(f"Unexpected Error: {e}") + + +if __name__ == "__main__": + main() diff --git a/pysmq/examples/subscribe_example.py b/pysmq/examples/subscribe_example.py new file mode 100644 index 0000000..0ab4c72 --- /dev/null +++ b/pysmq/examples/subscribe_example.py @@ -0,0 +1,122 @@ +#!/usr/bin/env python3 +""" +Example showing how to consume messages from SMQ using the Python client. +""" +import json +import time +from typing import Tuple + +from pysmq.client import Client +from pysmq.config import ClientTLSConfig + + +def process_message(message_bytes: bytes, offset: int) -> None: + """ + Process a received message. + + Args: + message_bytes: The message content as bytes. + offset: The offset of the message. + """ + try: + # Decode the message from bytes to a Python dictionary + message = json.loads(message_bytes.decode('utf-8')) + print(f"Received message at offset {offset}:") + print(f" ID: {message.get('id')}") + print(f" Timestamp: {message.get('timestamp')}") + print(f" Content: {message.get('content')}") + print("-" * 40) + except json.JSONDecodeError: + print(f"Received non-JSON message at offset {offset}: {message_bytes}") + except Exception as e: + print(f"Error processing message at offset {offset}: {e}") + + +def main(): + # Replace with actual server details + host = "localhost" + port = 8097 + + # Create a client with default settings (insecure connection) + with Client(host=host, port=port) as client: + # Connect to the server (this is optional but a good test) + print("Connecting to SMQ server...") + client.connect() + print("Connected!") + + # Specify the topic to consume from + topic = "example-topic" + + # Demonstration 1: Consume the latest message + print("\n=== Consuming latest message ===") + try: + message_bytes, offset = client.consume_latest(topic) + process_message(message_bytes, offset) + except Exception as e: + print(f"Error consuming latest message: {e}") + + # Demonstration 2: Consume the earliest message + print("\n=== Consuming earliest message ===") + try: + message_bytes, offset = client.consume_earliest(topic) + process_message(message_bytes, offset) + except Exception as e: + print(f"Error consuming earliest message: {e}") + + # Demonstration 3: Bulk retrieve messages + print("\n=== Bulk retrieving messages ===") + try: + # Get the earliest offset + start_offset = client.get_earliest_offset(topic) + # Retrieve up to 10 messages + response = client.bulk_retrieve(topic, start_offset, 10) + + print(f"Retrieved {response.count} messages, next offset: {response.next_offset}") + for message in response.messages: + process_message(message.message, message.offset) + except Exception as e: + print(f"Error bulk retrieving messages: {e}") + + # Demonstration 4: Stream consumption (continuous) + print("\n=== Starting stream consumption (will run for 30 seconds) ===") + try: + # Get the latest offset to start consuming from + start_offset = client.get_latest_offset(topic) + print(f"Starting consumption from offset {start_offset}") + + # Set an end time for the demo (30 seconds from now) + end_time = time.time() + 30 + + def stream_consumer(): + """Generator that yields True until the end time is reached.""" + while time.time() < end_time: + yield True + return + + # Start consuming messages + print("Waiting for new messages... (Ctrl+C to stop)") + consumer = stream_consumer() + + for message_bytes, offset in client.stream_consume(topic, start_offset): + process_message(message_bytes, offset) + + # Check if we should stop + try: + next(consumer) + except StopIteration: + print("Time limit reached, stopping consumption") + break + + # If you want to stop based on some condition, + # you can add a break statement here + + except KeyboardInterrupt: + print("Interrupted by user, stopping consumption") + except Exception as e: + print(f"Error in stream consumption: {e}") + + print("\nSubscribe example completed!") + + +if __name__ == "__main__": + main() diff --git a/pysmq/pyproject.toml b/pysmq/pyproject.toml new file mode 100644 index 0000000..7d78d24 --- /dev/null +++ b/pysmq/pyproject.toml @@ -0,0 +1,55 @@ +[build-system] +requires = ["setuptools>=61.0", "wheel", "setuptools-scm"] +build-backend = "setuptools.build_meta" + +[project] +name = "pysmq" +dynamic = ["version"] +description = "Python client for SuhaibMessageQueue" +readme = "README.md" +authors = [ + {name = "Suhaib MessageQueue", email = "example@example.com"}, +] +license = {text = "MIT"} +classifiers = [ + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.7", + "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "License :: OSI Approved :: MIT License", + "Operating System :: OS Independent", + "Intended Audience :: Developers", + "Topic :: Software Development :: Libraries", + "Topic :: System :: Networking", +] +requires-python = ">=3.7" +dependencies = [ + "grpcio>=1.50.0", + "protobuf>=4.21.0", +] + +[project.optional-dependencies] +dev = [ + "grpcio-tools>=1.50.0", + "pytest>=7.0.0", + "black>=22.0.0", + "isort>=5.10.0", + "flake8>=5.0.0", +] + +[tool.setuptools_scm] +write_to = "pysmq/_version.py" + +[tool.black] +line-length = 88 +target-version = ["py37", "py38", "py39", "py310", "py311"] + +[tool.isort] +profile = "black" +line_length = 88 + +[tool.pytest.ini_options] +testpaths = ["tests"] +python_files = "test_*.py" diff --git a/pysmq/pysmq/__init__.py b/pysmq/pysmq/__init__.py new file mode 100644 index 0000000..7263384 --- /dev/null +++ b/pysmq/pysmq/__init__.py @@ -0,0 +1,37 @@ +""" +Python client for SuhaibMessageQueue. + +This package provides a Python client for interacting with SuhaibMessageQueue, +a simple and efficient message queue service. +""" + +from .client import Client +from .config import ClientTLSConfig, ClientConfig, SMQ_SPECIAL_OFFSET_HEARTBEAT +from .exceptions import ( + SMQException, + SMQConnectionError, + SMQAuthenticationError, + SMQProduceError, + SMQConsumeError, + SMQTopicCreationError, + SMQOffsetError, + SMQTimeoutError, +) + +# Package information +__version__ = "0.1.0" # Will be overwritten by setuptools_scm +__author__ = "Suhaib" +__all__ = [ + "Client", + "ClientTLSConfig", + "ClientConfig", + "SMQ_SPECIAL_OFFSET_HEARTBEAT", + "SMQException", + "SMQConnectionError", + "SMQAuthenticationError", + "SMQProduceError", + "SMQConsumeError", + "SMQTopicCreationError", + "SMQOffsetError", + "SMQTimeoutError", +] diff --git a/pysmq/pysmq/client.py b/pysmq/pysmq/client.py new file mode 100644 index 0000000..61464c5 --- /dev/null +++ b/pysmq/pysmq/client.py @@ -0,0 +1,429 @@ +""" +Core client implementation for the SMQ Python client. +""" +import os +import ssl +from typing import Iterator, Iterable, Tuple, Callable, Optional, Any, Union + +import grpc + +from .proto import smq_pb2, smq_pb2_grpc +from .config import ClientTLSConfig, ClientConfig, SMQ_SPECIAL_OFFSET_HEARTBEAT +from .exceptions import ( + SMQException, + SMQConnectionError, + SMQAuthenticationError, + SMQProduceError, + SMQConsumeError, + SMQTimeoutError, + SMQTopicCreationError, + map_grpc_error, +) + + +class Client: + """ + Client for SuhaibMessageQueue. + + This client handles communication with the SMQ server, including connection + management, message production, consumption, topic management, and offset + management. + """ + + def __init__( + self, + host: str, + port: int, + tls_config: Optional[ClientTLSConfig] = None, + max_send_message_size_mb: int = ClientConfig.DEFAULT_MAX_MESSAGE_SIZE_MB, + max_receive_message_size_mb: int = ClientConfig.DEFAULT_MAX_MESSAGE_SIZE_MB, + ): + """ + Initialize a new SMQ client. + + Args: + host: The hostname or IP address of the SMQ server. + port: The port number of the SMQ server. + tls_config: Optional TLS configuration for secure connections. + If provided, establishes a secure connection using mTLS. + If None, establishes an insecure connection. + max_send_message_size_mb: Maximum size of messages to send, in MB. + max_receive_message_size_mb: Maximum size of messages to receive, in MB. + + Raises: + SMQConnectionError: If there's an issue establishing the connection. + SMQAuthenticationError: If there's an issue with TLS authentication. + """ + self._address = f"{host}:{port}" + self._channel = None + self._stub = None + + # Convert message sizes from MB to bytes + max_send_size = max_send_message_size_mb * 1024 * 1024 + max_receive_size = max_receive_message_size_mb * 1024 * 1024 + + # Set up channel options + options = [ + ('grpc.max_send_message_length', max_send_size), + ('grpc.max_receive_message_length', max_receive_size), + ] + + try: + if tls_config: + # Load client certificate and key for mTLS + with open(tls_config.cert_file, 'rb') as f: + client_cert = f.read() + + with open(tls_config.key_file, 'rb') as f: + client_key = f.read() + + # Load CA certificate for server verification + with open(tls_config.ca_file, 'rb') as f: + ca_cert = f.read() + + # Create SSL credentials + credentials = grpc.ssl_channel_credentials( + root_certificates=ca_cert, + private_key=client_key, + certificate_chain=client_cert, + ) + + # Create secure channel + self._channel = grpc.secure_channel( + self._address, + credentials, + options=options, + ) + else: + # Create insecure channel + self._channel = grpc.insecure_channel( + self._address, + options=options, + ) + + # Create the gRPC stub + self._stub = smq_pb2_grpc.SuhaibMessageQueueStub(self._channel) + + except (OSError, IOError) as e: + # Handle file I/O errors (e.g., certificate files not found) + raise SMQAuthenticationError(f"Error loading TLS certificates: {str(e)}", e) + except grpc.RpcError as e: + # Handle gRPC connection errors + raise map_grpc_error(e, SMQConnectionError) + + def __enter__(self): + """Enable the use of 'with' statement for better resource management.""" + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """Close the client when exiting the 'with' block.""" + self.close() + + def close(self): + """ + Close the connection to the SMQ server. + + This method should be called when the client is no longer needed to + release resources. It's automatically called when using the client + with a 'with' statement. + """ + if self._channel: + self._channel.close() + self._channel = None + self._stub = None + + def connect(self, timeout: Optional[float] = None) -> None: + """ + Establish a connection to the SMQ server. + + This is a lightweight method that simply verifies the connection. + The actual connection is established when the client is created. + + Args: + timeout: Optional timeout in seconds. If None, uses the default timeout. + + Raises: + SMQConnectionError: If there's an issue with the connection. + """ + try: + self._stub.Connect( + smq_pb2.ConnectRequest(), + timeout=timeout, + ) + except grpc.RpcError as e: + raise map_grpc_error(e, SMQConnectionError) + + def create_topic(self, topic: str, timeout: Optional[float] = ClientConfig.DEFAULT_TOPIC_TIMEOUT) -> None: + """ + Create a new topic on the SMQ server. + + Args: + topic: The name of the topic to create. + timeout: Optional timeout in seconds. + + Raises: + SMQTopicCreationError: If there's an issue creating the topic. + """ + try: + self._stub.CreateTopic( + smq_pb2.CreateTopicRequest(topic=topic), + timeout=timeout, + ) + except grpc.RpcError as e: + raise map_grpc_error(e, SMQTopicCreationError) + + def get_latest_offset(self, topic: str, timeout: Optional[float] = ClientConfig.DEFAULT_TIMEOUT) -> int: + """ + Get the latest (highest) offset for a topic. + + Args: + topic: The name of the topic. + timeout: Optional timeout in seconds. + + Returns: + The latest offset as an integer. + + Raises: + SMQOffsetError: If there's an issue getting the offset. + """ + try: + response = self._stub.GetLatestOffset( + smq_pb2.GetLatestOffsetRequest(topic=topic), + timeout=timeout, + ) + return response.offset + except grpc.RpcError as e: + raise map_grpc_error(e) + + def get_earliest_offset(self, topic: str, timeout: Optional[float] = ClientConfig.DEFAULT_TIMEOUT) -> int: + """ + Get the earliest (lowest) available offset for a topic. + + Args: + topic: The name of the topic. + timeout: Optional timeout in seconds. + + Returns: + The earliest offset as an integer. + + Raises: + SMQOffsetError: If there's an issue getting the offset. + """ + try: + response = self._stub.GetEarliestOffset( + smq_pb2.GetEarliestOffsetRequest(topic=topic), + timeout=timeout, + ) + return response.offset + except grpc.RpcError as e: + raise map_grpc_error(e) + + def produce(self, topic: str, message: bytes, timeout: Optional[float] = ClientConfig.DEFAULT_TIMEOUT) -> int: + """ + Produce a single message to a topic. + + Args: + topic: The name of the topic. + message: The message content as bytes. + timeout: Optional timeout in seconds. + + Returns: + The offset of the produced message. + + Raises: + SMQProduceError: If there's an issue producing the message. + """ + try: + response = self._stub.Produce( + smq_pb2.ProduceRequest(topic=topic, message=message), + timeout=timeout, + ) + return response.offset + except grpc.RpcError as e: + raise map_grpc_error(e, SMQProduceError) + + def stream_produce(self, topic: str, messages: Iterable[bytes]) -> int: + """ + Produce a stream of messages to a topic. + + Args: + topic: The name of the topic. + messages: An iterable of messages, each as bytes. + + Returns: + The offset of the last produced message. + + Raises: + SMQProduceError: If there's an issue producing messages. + """ + try: + # Start a streaming RPC + stream = self._stub.StreamProduce() + + # Send messages + for message in messages: + stream.write(smq_pb2.ProduceRequest(topic=topic, message=message)) + + # Close the stream and get the response + stream.done_writing() + response = stream.recv() + return response.offset + except grpc.RpcError as e: + raise map_grpc_error(e, SMQProduceError) + + def consume(self, topic: str, offset: int, timeout: Optional[float] = ClientConfig.DEFAULT_TIMEOUT) -> Tuple[bytes, int]: + """ + Consume a single message from a topic at a specific offset. + + Args: + topic: The name of the topic. + offset: The offset to consume from. + timeout: Optional timeout in seconds. + + Returns: + A tuple of (message_bytes, offset). + + Raises: + SMQConsumeError: If there's an issue consuming the message. + """ + try: + response = self._stub.Consume( + smq_pb2.ConsumeRequest(topic=topic, offset=offset), + timeout=timeout, + ) + return response.message, response.offset + except grpc.RpcError as e: + raise map_grpc_error(e, SMQConsumeError) + + def consume_earliest(self, topic: str, timeout: Optional[float] = ClientConfig.DEFAULT_TIMEOUT) -> Tuple[bytes, int]: + """ + Consume the earliest available message from a topic. + + Args: + topic: The name of the topic. + timeout: Optional timeout in seconds. + + Returns: + A tuple of (message_bytes, offset). + + Raises: + SMQConsumeError: If there's an issue consuming the message. + """ + try: + # Get the earliest offset + earliest_offset = self.get_earliest_offset(topic, timeout) + + # Consume the message at that offset + return self.consume(topic, earliest_offset, timeout) + except SMQException as e: + raise SMQConsumeError(f"Failed to consume earliest message: {str(e)}", e.original_error) + + def consume_latest(self, topic: str, timeout: Optional[float] = ClientConfig.DEFAULT_TIMEOUT) -> Tuple[bytes, int]: + """ + Consume the latest message from a topic. + + Args: + topic: The name of the topic. + timeout: Optional timeout in seconds. + + Returns: + A tuple of (message_bytes, offset). + + Raises: + SMQConsumeError: If there's an issue consuming the message. + """ + try: + # Get the latest offset + latest_offset = self.get_latest_offset(topic, timeout) + + # Consume the message at that offset + return self.consume(topic, latest_offset, timeout) + except SMQException as e: + raise SMQConsumeError(f"Failed to consume latest message: {str(e)}", e.original_error) + + def stream_consume(self, topic: str, start_offset: int) -> Iterator[Tuple[bytes, int]]: + """ + Consume a stream of messages from a topic starting at a specific offset. + + This method returns an iterator that yields message bytes and offsets + as they arrive. It handles heartbeat messages automatically. + + Args: + topic: The name of the topic. + start_offset: The offset to start consuming from. + + Returns: + An iterator of (message_bytes, offset) tuples. + + Raises: + SMQConsumeError: If there's an issue consuming messages. + """ + try: + # Start a streaming RPC + stream_request = smq_pb2.ConsumeRequest(topic=topic, offset=start_offset) + stream = self._stub.StreamConsume(stream_request) + + # Iterate over the stream, skipping heartbeats + for response in stream: + # Skip heartbeat messages + if response.offset == SMQ_SPECIAL_OFFSET_HEARTBEAT: + continue + + yield response.message, response.offset + except grpc.RpcError as e: + raise map_grpc_error(e, SMQConsumeError) + + def delete_until_offset(self, topic: str, offset: int, timeout: Optional[float] = ClientConfig.DEFAULT_DELETE_TIMEOUT) -> None: + """ + Delete all messages in a topic up to and including the specified offset. + + Args: + topic: The name of the topic. + offset: The offset up to which messages should be deleted. + timeout: Optional timeout in seconds. + + Raises: + SMQException: If there's an issue deleting messages. + """ + try: + self._stub.DeleteUntilOffset( + smq_pb2.DeleteUntilOffsetRequest(topic=topic, offset=offset), + timeout=timeout, + ) + except grpc.RpcError as e: + raise map_grpc_error(e) + + def bulk_retrieve( + self, + topic: str, + start_offset: int, + limit: int, + timeout: Optional[float] = ClientConfig.DEFAULT_TIMEOUT + ) -> smq_pb2.BulkRetrieveResponse: + """ + Retrieve a batch of messages from a topic starting at a specific offset. + + Args: + topic: The name of the topic. + start_offset: The offset to start retrieving from. + limit: The maximum number of messages to retrieve. + timeout: Optional timeout in seconds. + + Returns: + A BulkRetrieveResponse containing messages, count, and next_offset. + + Raises: + SMQConsumeError: If there's an issue retrieving messages. + """ + try: + response = self._stub.BulkRetrieve( + smq_pb2.BulkRetrieveRequest( + topic=topic, + start_offset=start_offset, + limit=limit, + ), + timeout=timeout, + ) + return response + except grpc.RpcError as e: + raise map_grpc_error(e, SMQConsumeError) diff --git a/pysmq/pysmq/config.py b/pysmq/pysmq/config.py new file mode 100644 index 0000000..44c939a --- /dev/null +++ b/pysmq/pysmq/config.py @@ -0,0 +1,41 @@ +""" +Configuration and constants for the SMQ Python client. +""" +from dataclasses import dataclass +from typing import Optional + + +# Special offset value for heartbeat messages +SMQ_SPECIAL_OFFSET_HEARTBEAT = -1 + + +@dataclass +class ClientTLSConfig: + """ + Configuration for mTLS (mutual TLS) authentication. + + Attributes: + cert_file: Path to client's certificate file (PEM format) + key_file: Path to client's private key file (PEM format) + ca_file: Path to CA's certificate file (PEM format) to verify the server + """ + cert_file: str + key_file: str + ca_file: str + + +class ClientConfig: + """ + Default configuration values for the SMQ client. + """ + # Default timeout for unary RPC calls (in seconds) + DEFAULT_TIMEOUT = 100.0 + + # Default timeout for topic creation (in seconds) + DEFAULT_TOPIC_TIMEOUT = 1.0 + + # Default timeout for deletion operations (in seconds) + DEFAULT_DELETE_TIMEOUT = 1.0 + + # Default max message size in MB + DEFAULT_MAX_MESSAGE_SIZE_MB = 1024 diff --git a/pysmq/pysmq/exceptions.py b/pysmq/pysmq/exceptions.py new file mode 100644 index 0000000..12ab614 --- /dev/null +++ b/pysmq/pysmq/exceptions.py @@ -0,0 +1,84 @@ +""" +Custom exceptions for the SMQ Python client. +""" +import grpc +from typing import Optional + + +class SMQException(Exception): + """Base exception class for all SMQ-related exceptions.""" + + def __init__(self, message: str, original_error: Optional[Exception] = None): + super().__init__(message) + self.original_error = original_error + + +class SMQConnectionError(SMQException): + """Exception raised when there's an issue with the connection to the SMQ server.""" + pass + + +class SMQAuthenticationError(SMQConnectionError): + """Exception raised when there's an authentication issue (e.g., TLS-related).""" + pass + + +class SMQProduceError(SMQException): + """Exception raised when there's an issue with producing messages.""" + pass + + +class SMQConsumeError(SMQException): + """Exception raised when there's an issue with consuming messages.""" + pass + + +class SMQTopicCreationError(SMQException): + """Exception raised when there's an issue with creating a topic.""" + pass + + +class SMQOffsetError(SMQException): + """Exception raised when there's an issue with offsets.""" + pass + + +class SMQTimeoutError(SMQException): + """Exception raised when an operation times out.""" + pass + + +def map_grpc_error(error: grpc.RpcError, default_exception=SMQException) -> SMQException: + """ + Map a gRPC error to an appropriate SMQ exception. + + Args: + error: The gRPC error to map + default_exception: The default exception class to use if no specific mapping is found + + Returns: + An instance of the appropriate SMQ exception + """ + code = error.code() + details = error.details() if hasattr(error, 'details') else str(error) + + if code == grpc.StatusCode.DEADLINE_EXCEEDED: + return SMQTimeoutError(f"Operation timed out: {details}", error) + elif code == grpc.StatusCode.UNAVAILABLE: + return SMQConnectionError(f"Server unavailable: {details}", error) + elif code == grpc.StatusCode.UNAUTHENTICATED: + return SMQAuthenticationError(f"Authentication failed: {details}", error) + elif code == grpc.StatusCode.INVALID_ARGUMENT: + # Attempt to categorize based on error message + msg = str(error).lower() + if 'topic' in msg and ('create' in msg or 'new' in msg): + return SMQTopicCreationError(f"Failed to create topic: {details}", error) + elif 'offset' in msg: + return SMQOffsetError(f"Invalid offset: {details}", error) + elif 'produce' in msg or 'message' in msg: + return SMQProduceError(f"Failed to produce message: {details}", error) + elif 'consume' in msg: + return SMQConsumeError(f"Failed to consume message: {details}", error) + + # Default case + return default_exception(f"SMQ operation failed: {details}", error) diff --git a/pysmq/pysmq/proto/__init__.py b/pysmq/pysmq/proto/__init__.py new file mode 100644 index 0000000..3785d3f --- /dev/null +++ b/pysmq/pysmq/proto/__init__.py @@ -0,0 +1,8 @@ +""" +Generated gRPC code for the SMQ protocol. +""" +from . import smq_pb2 +from . import smq_pb2_grpc + +# Export key classes and interfaces +SuhaibMessageQueueStub = smq_pb2_grpc.SuhaibMessageQueueStub diff --git a/pysmq/pysmq/proto/smq_pb2.py b/pysmq/pysmq/proto/smq_pb2.py new file mode 100644 index 0000000..8ace130 --- /dev/null +++ b/pysmq/pysmq/proto/smq_pb2.py @@ -0,0 +1,71 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# NO CHECKED-IN PROTOBUF GENCODE +# source: smq.proto +# Protobuf Python Version: 5.29.0 +"""Generated protocol buffer code.""" +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import runtime_version as _runtime_version +from google.protobuf import symbol_database as _symbol_database +from google.protobuf.internal import builder as _builder +_runtime_version.ValidateProtobufRuntimeVersion( + _runtime_version.Domain.PUBLIC, + 5, + 29, + 0, + '', + 'smq.proto' +) +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\tsmq.proto\x12\x03smq\"3\n\x10RetrievedMessage\x12\x0f\n\x07message\x18\x01 \x01(\x0c\x12\x0e\n\x06offset\x18\x02 \x01(\x03\"I\n\x13\x42ulkRetrieveRequest\x12\r\n\x05topic\x18\x01 \x01(\t\x12\x14\n\x0cstart_offset\x18\x02 \x01(\x03\x12\r\n\x05limit\x18\x03 \x01(\x05\"c\n\x14\x42ulkRetrieveResponse\x12\'\n\x08messages\x18\x01 \x03(\x0b\x32\x15.smq.RetrievedMessage\x12\r\n\x05\x63ount\x18\x02 \x01(\x05\x12\x13\n\x0bnext_offset\x18\x03 \x01(\x03\"\x10\n\x0e\x43onnectRequest\"\x11\n\x0f\x43onnectResponse\"\'\n\x16GetLatestOffsetRequest\x12\r\n\x05topic\x18\x01 \x01(\t\")\n\x17GetLatestOffsetResponse\x12\x0e\n\x06offset\x18\x01 \x01(\x03\")\n\x18GetEarliestOffsetRequest\x12\r\n\x05topic\x18\x01 \x01(\t\"+\n\x19GetEarliestOffsetResponse\x12\x0e\n\x06offset\x18\x01 \x01(\x03\"#\n\x12\x43reateTopicRequest\x12\r\n\x05topic\x18\x01 \x01(\t\"\x15\n\x13\x43reateTopicResponse\"0\n\x0eProduceRequest\x12\r\n\x05topic\x18\x01 \x01(\t\x12\x0f\n\x07message\x18\x02 \x01(\x0c\"!\n\x0fProduceResponse\x12\x0e\n\x06offset\x18\x01 \x01(\x03\"/\n\x0e\x43onsumeRequest\x12\r\n\x05topic\x18\x01 \x01(\t\x12\x0e\n\x06offset\x18\x02 \x01(\x03\"2\n\x0f\x43onsumeResponse\x12\x0f\n\x07message\x18\x01 \x01(\x0c\x12\x0e\n\x06offset\x18\x02 \x01(\x03\"9\n\x18\x44\x65leteUntilOffsetRequest\x12\r\n\x05topic\x18\x01 \x01(\t\x12\x0e\n\x06offset\x18\x02 \x01(\x03\"\x1b\n\x19\x44\x65leteUntilOffsetResponse2\xc3\x05\n\x12SuhaibMessageQueue\x12\x36\n\x07\x43onnect\x12\x13.smq.ConnectRequest\x1a\x14.smq.ConnectResponse\"\x00\x12N\n\x0fGetLatestOffset\x12\x1b.smq.GetLatestOffsetRequest\x1a\x1c.smq.GetLatestOffsetResponse\"\x00\x12T\n\x11GetEarliestOffset\x12\x1d.smq.GetEarliestOffsetRequest\x1a\x1e.smq.GetEarliestOffsetResponse\"\x00\x12\x42\n\x0b\x43reateTopic\x12\x17.smq.CreateTopicRequest\x1a\x18.smq.CreateTopicResponse\"\x00\x12\x36\n\x07Produce\x12\x13.smq.ProduceRequest\x1a\x14.smq.ProduceResponse\"\x00\x12\x36\n\x07\x43onsume\x12\x13.smq.ConsumeRequest\x1a\x14.smq.ConsumeResponse\"\x00\x12>\n\rStreamProduce\x12\x13.smq.ProduceRequest\x1a\x14.smq.ProduceResponse\"\x00(\x01\x12>\n\rStreamConsume\x12\x13.smq.ConsumeRequest\x1a\x14.smq.ConsumeResponse\"\x00\x30\x01\x12T\n\x11\x44\x65leteUntilOffset\x12\x1d.smq.DeleteUntilOffsetRequest\x1a\x1e.smq.DeleteUntilOffsetResponse\"\x00\x12\x45\n\x0c\x42ulkRetrieve\x12\x18.smq.BulkRetrieveRequest\x1a\x19.smq.BulkRetrieveResponse\"\x00\x42\tZ\x07./protob\x06proto3') + +_globals = globals() +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) +_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'smq_pb2', _globals) +if not _descriptor._USE_C_DESCRIPTORS: + _globals['DESCRIPTOR']._loaded_options = None + _globals['DESCRIPTOR']._serialized_options = b'Z\007./proto' + _globals['_RETRIEVEDMESSAGE']._serialized_start=18 + _globals['_RETRIEVEDMESSAGE']._serialized_end=69 + _globals['_BULKRETRIEVEREQUEST']._serialized_start=71 + _globals['_BULKRETRIEVEREQUEST']._serialized_end=144 + _globals['_BULKRETRIEVERESPONSE']._serialized_start=146 + _globals['_BULKRETRIEVERESPONSE']._serialized_end=245 + _globals['_CONNECTREQUEST']._serialized_start=247 + _globals['_CONNECTREQUEST']._serialized_end=263 + _globals['_CONNECTRESPONSE']._serialized_start=265 + _globals['_CONNECTRESPONSE']._serialized_end=282 + _globals['_GETLATESTOFFSETREQUEST']._serialized_start=284 + _globals['_GETLATESTOFFSETREQUEST']._serialized_end=323 + _globals['_GETLATESTOFFSETRESPONSE']._serialized_start=325 + _globals['_GETLATESTOFFSETRESPONSE']._serialized_end=366 + _globals['_GETEARLIESTOFFSETREQUEST']._serialized_start=368 + _globals['_GETEARLIESTOFFSETREQUEST']._serialized_end=409 + _globals['_GETEARLIESTOFFSETRESPONSE']._serialized_start=411 + _globals['_GETEARLIESTOFFSETRESPONSE']._serialized_end=454 + _globals['_CREATETOPICREQUEST']._serialized_start=456 + _globals['_CREATETOPICREQUEST']._serialized_end=491 + _globals['_CREATETOPICRESPONSE']._serialized_start=493 + _globals['_CREATETOPICRESPONSE']._serialized_end=514 + _globals['_PRODUCEREQUEST']._serialized_start=516 + _globals['_PRODUCEREQUEST']._serialized_end=564 + _globals['_PRODUCERESPONSE']._serialized_start=566 + _globals['_PRODUCERESPONSE']._serialized_end=599 + _globals['_CONSUMEREQUEST']._serialized_start=601 + _globals['_CONSUMEREQUEST']._serialized_end=648 + _globals['_CONSUMERESPONSE']._serialized_start=650 + _globals['_CONSUMERESPONSE']._serialized_end=700 + _globals['_DELETEUNTILOFFSETREQUEST']._serialized_start=702 + _globals['_DELETEUNTILOFFSETREQUEST']._serialized_end=759 + _globals['_DELETEUNTILOFFSETRESPONSE']._serialized_start=761 + _globals['_DELETEUNTILOFFSETRESPONSE']._serialized_end=788 + _globals['_SUHAIBMESSAGEQUEUE']._serialized_start=791 + _globals['_SUHAIBMESSAGEQUEUE']._serialized_end=1498 +# @@protoc_insertion_point(module_scope) diff --git a/pysmq/pysmq/proto/smq_pb2_grpc.py b/pysmq/pysmq/proto/smq_pb2_grpc.py new file mode 100644 index 0000000..5b91fcb --- /dev/null +++ b/pysmq/pysmq/proto/smq_pb2_grpc.py @@ -0,0 +1,486 @@ +# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! +"""Client and server classes corresponding to protobuf-defined services.""" +import grpc +import warnings + +from . import smq_pb2 as smq__pb2 + +GRPC_GENERATED_VERSION = '1.71.0' +GRPC_VERSION = grpc.__version__ +_version_not_supported = False + +try: + from grpc._utilities import first_version_is_lower + _version_not_supported = first_version_is_lower(GRPC_VERSION, GRPC_GENERATED_VERSION) +except ImportError: + _version_not_supported = True + +if _version_not_supported: + raise RuntimeError( + f'The grpc package installed is at version {GRPC_VERSION},' + + f' but the generated code in smq_pb2_grpc.py depends on' + + f' grpcio>={GRPC_GENERATED_VERSION}.' + + f' Please upgrade your grpc module to grpcio>={GRPC_GENERATED_VERSION}' + + f' or downgrade your generated code using grpcio-tools<={GRPC_VERSION}.' + ) + + +class SuhaibMessageQueueStub(object): + """Missing associated documentation comment in .proto file.""" + + def __init__(self, channel): + """Constructor. + + Args: + channel: A grpc.Channel. + """ + self.Connect = channel.unary_unary( + '/smq.SuhaibMessageQueue/Connect', + request_serializer=smq__pb2.ConnectRequest.SerializeToString, + response_deserializer=smq__pb2.ConnectResponse.FromString, + _registered_method=True) + self.GetLatestOffset = channel.unary_unary( + '/smq.SuhaibMessageQueue/GetLatestOffset', + request_serializer=smq__pb2.GetLatestOffsetRequest.SerializeToString, + response_deserializer=smq__pb2.GetLatestOffsetResponse.FromString, + _registered_method=True) + self.GetEarliestOffset = channel.unary_unary( + '/smq.SuhaibMessageQueue/GetEarliestOffset', + request_serializer=smq__pb2.GetEarliestOffsetRequest.SerializeToString, + response_deserializer=smq__pb2.GetEarliestOffsetResponse.FromString, + _registered_method=True) + self.CreateTopic = channel.unary_unary( + '/smq.SuhaibMessageQueue/CreateTopic', + request_serializer=smq__pb2.CreateTopicRequest.SerializeToString, + response_deserializer=smq__pb2.CreateTopicResponse.FromString, + _registered_method=True) + self.Produce = channel.unary_unary( + '/smq.SuhaibMessageQueue/Produce', + request_serializer=smq__pb2.ProduceRequest.SerializeToString, + response_deserializer=smq__pb2.ProduceResponse.FromString, + _registered_method=True) + self.Consume = channel.unary_unary( + '/smq.SuhaibMessageQueue/Consume', + request_serializer=smq__pb2.ConsumeRequest.SerializeToString, + response_deserializer=smq__pb2.ConsumeResponse.FromString, + _registered_method=True) + self.StreamProduce = channel.stream_unary( + '/smq.SuhaibMessageQueue/StreamProduce', + request_serializer=smq__pb2.ProduceRequest.SerializeToString, + response_deserializer=smq__pb2.ProduceResponse.FromString, + _registered_method=True) + self.StreamConsume = channel.unary_stream( + '/smq.SuhaibMessageQueue/StreamConsume', + request_serializer=smq__pb2.ConsumeRequest.SerializeToString, + response_deserializer=smq__pb2.ConsumeResponse.FromString, + _registered_method=True) + self.DeleteUntilOffset = channel.unary_unary( + '/smq.SuhaibMessageQueue/DeleteUntilOffset', + request_serializer=smq__pb2.DeleteUntilOffsetRequest.SerializeToString, + response_deserializer=smq__pb2.DeleteUntilOffsetResponse.FromString, + _registered_method=True) + self.BulkRetrieve = channel.unary_unary( + '/smq.SuhaibMessageQueue/BulkRetrieve', + request_serializer=smq__pb2.BulkRetrieveRequest.SerializeToString, + response_deserializer=smq__pb2.BulkRetrieveResponse.FromString, + _registered_method=True) + + +class SuhaibMessageQueueServicer(object): + """Missing associated documentation comment in .proto file.""" + + def Connect(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def GetLatestOffset(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def GetEarliestOffset(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def CreateTopic(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def Produce(self, request, context): + """Single message versions + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def Consume(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def StreamProduce(self, request_iterator, context): + """Stream versions + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def StreamConsume(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def DeleteUntilOffset(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def BulkRetrieve(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + +def add_SuhaibMessageQueueServicer_to_server(servicer, server): + rpc_method_handlers = { + 'Connect': grpc.unary_unary_rpc_method_handler( + servicer.Connect, + request_deserializer=smq__pb2.ConnectRequest.FromString, + response_serializer=smq__pb2.ConnectResponse.SerializeToString, + ), + 'GetLatestOffset': grpc.unary_unary_rpc_method_handler( + servicer.GetLatestOffset, + request_deserializer=smq__pb2.GetLatestOffsetRequest.FromString, + response_serializer=smq__pb2.GetLatestOffsetResponse.SerializeToString, + ), + 'GetEarliestOffset': grpc.unary_unary_rpc_method_handler( + servicer.GetEarliestOffset, + request_deserializer=smq__pb2.GetEarliestOffsetRequest.FromString, + response_serializer=smq__pb2.GetEarliestOffsetResponse.SerializeToString, + ), + 'CreateTopic': grpc.unary_unary_rpc_method_handler( + servicer.CreateTopic, + request_deserializer=smq__pb2.CreateTopicRequest.FromString, + response_serializer=smq__pb2.CreateTopicResponse.SerializeToString, + ), + 'Produce': grpc.unary_unary_rpc_method_handler( + servicer.Produce, + request_deserializer=smq__pb2.ProduceRequest.FromString, + response_serializer=smq__pb2.ProduceResponse.SerializeToString, + ), + 'Consume': grpc.unary_unary_rpc_method_handler( + servicer.Consume, + request_deserializer=smq__pb2.ConsumeRequest.FromString, + response_serializer=smq__pb2.ConsumeResponse.SerializeToString, + ), + 'StreamProduce': grpc.stream_unary_rpc_method_handler( + servicer.StreamProduce, + request_deserializer=smq__pb2.ProduceRequest.FromString, + response_serializer=smq__pb2.ProduceResponse.SerializeToString, + ), + 'StreamConsume': grpc.unary_stream_rpc_method_handler( + servicer.StreamConsume, + request_deserializer=smq__pb2.ConsumeRequest.FromString, + response_serializer=smq__pb2.ConsumeResponse.SerializeToString, + ), + 'DeleteUntilOffset': grpc.unary_unary_rpc_method_handler( + servicer.DeleteUntilOffset, + request_deserializer=smq__pb2.DeleteUntilOffsetRequest.FromString, + response_serializer=smq__pb2.DeleteUntilOffsetResponse.SerializeToString, + ), + 'BulkRetrieve': grpc.unary_unary_rpc_method_handler( + servicer.BulkRetrieve, + request_deserializer=smq__pb2.BulkRetrieveRequest.FromString, + response_serializer=smq__pb2.BulkRetrieveResponse.SerializeToString, + ), + } + generic_handler = grpc.method_handlers_generic_handler( + 'smq.SuhaibMessageQueue', rpc_method_handlers) + server.add_generic_rpc_handlers((generic_handler,)) + server.add_registered_method_handlers('smq.SuhaibMessageQueue', rpc_method_handlers) + + + # This class is part of an EXPERIMENTAL API. +class SuhaibMessageQueue(object): + """Missing associated documentation comment in .proto file.""" + + @staticmethod + def Connect(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary( + request, + target, + '/smq.SuhaibMessageQueue/Connect', + smq__pb2.ConnectRequest.SerializeToString, + smq__pb2.ConnectResponse.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + _registered_method=True) + + @staticmethod + def GetLatestOffset(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary( + request, + target, + '/smq.SuhaibMessageQueue/GetLatestOffset', + smq__pb2.GetLatestOffsetRequest.SerializeToString, + smq__pb2.GetLatestOffsetResponse.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + _registered_method=True) + + @staticmethod + def GetEarliestOffset(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary( + request, + target, + '/smq.SuhaibMessageQueue/GetEarliestOffset', + smq__pb2.GetEarliestOffsetRequest.SerializeToString, + smq__pb2.GetEarliestOffsetResponse.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + _registered_method=True) + + @staticmethod + def CreateTopic(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary( + request, + target, + '/smq.SuhaibMessageQueue/CreateTopic', + smq__pb2.CreateTopicRequest.SerializeToString, + smq__pb2.CreateTopicResponse.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + _registered_method=True) + + @staticmethod + def Produce(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary( + request, + target, + '/smq.SuhaibMessageQueue/Produce', + smq__pb2.ProduceRequest.SerializeToString, + smq__pb2.ProduceResponse.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + _registered_method=True) + + @staticmethod + def Consume(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary( + request, + target, + '/smq.SuhaibMessageQueue/Consume', + smq__pb2.ConsumeRequest.SerializeToString, + smq__pb2.ConsumeResponse.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + _registered_method=True) + + @staticmethod + def StreamProduce(request_iterator, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.stream_unary( + request_iterator, + target, + '/smq.SuhaibMessageQueue/StreamProduce', + smq__pb2.ProduceRequest.SerializeToString, + smq__pb2.ProduceResponse.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + _registered_method=True) + + @staticmethod + def StreamConsume(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_stream( + request, + target, + '/smq.SuhaibMessageQueue/StreamConsume', + smq__pb2.ConsumeRequest.SerializeToString, + smq__pb2.ConsumeResponse.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + _registered_method=True) + + @staticmethod + def DeleteUntilOffset(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary( + request, + target, + '/smq.SuhaibMessageQueue/DeleteUntilOffset', + smq__pb2.DeleteUntilOffsetRequest.SerializeToString, + smq__pb2.DeleteUntilOffsetResponse.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + _registered_method=True) + + @staticmethod + def BulkRetrieve(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary( + request, + target, + '/smq.SuhaibMessageQueue/BulkRetrieve', + smq__pb2.BulkRetrieveRequest.SerializeToString, + smq__pb2.BulkRetrieveResponse.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + _registered_method=True) diff --git a/pysmq/tests/__init__.py b/pysmq/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pysmq/tests/test_client.py b/pysmq/tests/test_client.py new file mode 100644 index 0000000..e390686 --- /dev/null +++ b/pysmq/tests/test_client.py @@ -0,0 +1,149 @@ +""" +Unit tests for the SMQ Python client. +""" +import unittest +from unittest.mock import Mock, patch, MagicMock +import grpc + +from pysmq.client import Client +from pysmq.config import ClientTLSConfig +from pysmq.exceptions import SMQConnectionError, SMQProduceError, SMQConsumeError + + +class TestClient(unittest.TestCase): + """Test cases for the SMQ Client.""" + + def setUp(self): + """Set up test fixtures.""" + # Create a mock for the gRPC stub + self.mock_stub = Mock() + + # Patch the gRPC channel creation to return a mock + self.channel_patcher = patch('grpc.insecure_channel') + self.mock_channel = self.channel_patcher.start() + self.mock_channel.return_value = Mock() + + # Patch the stub class to return our mock stub + self.stub_patcher = patch('pysmq.proto.smq_pb2_grpc.SuhaibMessageQueueStub') + self.mock_stub_class = self.stub_patcher.start() + self.mock_stub_class.return_value = self.mock_stub + + # Create the client with the patched grpc channel + self.client = Client(host="localhost", port=8097) + + def tearDown(self): + """Tear down test fixtures.""" + self.channel_patcher.stop() + self.stub_patcher.stop() + + def test_connect(self): + """Test the connect method.""" + # Set up mock response + self.mock_stub.Connect.return_value = Mock() + + # Call the method + self.client.connect() + + # Assert the stub was called correctly + self.mock_stub.Connect.assert_called_once() + + def test_connect_error(self): + """Test the connect method with an error.""" + # Set up mock to raise an error + error = grpc.RpcError() + error.code = lambda: grpc.StatusCode.UNAVAILABLE + error.details = lambda: "Connection refused" + self.mock_stub.Connect.side_effect = error + + # Call the method and assert it raises the expected exception + with self.assertRaises(SMQConnectionError): + self.client.connect() + + def test_create_topic(self): + """Test the create_topic method.""" + # Set up mock response + self.mock_stub.CreateTopic.return_value = Mock() + + # Call the method + self.client.create_topic("test-topic") + + # Assert the stub was called correctly + self.mock_stub.CreateTopic.assert_called_once() + args, kwargs = self.mock_stub.CreateTopic.call_args + self.assertEqual(args[0].topic, "test-topic") + + def test_produce(self): + """Test the produce method.""" + # Set up mock response + mock_response = Mock() + mock_response.offset = 42 + self.mock_stub.Produce.return_value = mock_response + + # Call the method + offset = self.client.produce("test-topic", b"test-message") + + # Assert the stub was called correctly + self.mock_stub.Produce.assert_called_once() + args, kwargs = self.mock_stub.Produce.call_args + self.assertEqual(args[0].topic, "test-topic") + self.assertEqual(args[0].message, b"test-message") + + # Assert the return value + self.assertEqual(offset, 42) + + def test_produce_error(self): + """Test the produce method with an error.""" + # Set up mock to raise an error + error = grpc.RpcError() + error.code = lambda: grpc.StatusCode.INVALID_ARGUMENT + error.details = lambda: "Invalid topic" + self.mock_stub.Produce.side_effect = error + + # Call the method and assert it raises the expected exception + with self.assertRaises(SMQProduceError): + self.client.produce("test-topic", b"test-message") + + def test_consume(self): + """Test the consume method.""" + # Set up mock response + mock_response = Mock() + mock_response.message = b"test-message" + mock_response.offset = 42 + self.mock_stub.Consume.return_value = mock_response + + # Call the method + message, offset = self.client.consume("test-topic", 42) + + # Assert the stub was called correctly + self.mock_stub.Consume.assert_called_once() + args, kwargs = self.mock_stub.Consume.call_args + self.assertEqual(args[0].topic, "test-topic") + self.assertEqual(args[0].offset, 42) + + # Assert the return values + self.assertEqual(message, b"test-message") + self.assertEqual(offset, 42) + + def test_consume_error(self): + """Test the consume method with an error.""" + # Set up mock to raise an error + error = grpc.RpcError() + error.code = lambda: grpc.StatusCode.NOT_FOUND + error.details = lambda: "Offset not found" + self.mock_stub.Consume.side_effect = error + + # Call the method and assert it raises the expected exception + with self.assertRaises(SMQConsumeError): + self.client.consume("test-topic", 42) + + def test_close(self): + """Test the close method.""" + # Call the method + self.client.close() + + # Assert the channel was closed + self.client._channel.close.assert_called_once() + + +if __name__ == "__main__": + unittest.main() From 7eb5f86ea72aaf4438474ab43e58021bb488b9f4 Mon Sep 17 00:00:00 2001 From: Suhaib Date: Wed, 14 May 2025 21:58:23 -0700 Subject: [PATCH 2/5] Publish PyPI github action --- .github/workflows/publish-to-pypi.yml | 45 +++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) create mode 100644 .github/workflows/publish-to-pypi.yml diff --git a/.github/workflows/publish-to-pypi.yml b/.github/workflows/publish-to-pypi.yml new file mode 100644 index 0000000..e36731b --- /dev/null +++ b/.github/workflows/publish-to-pypi.yml @@ -0,0 +1,45 @@ +name: Publish Python Package to PyPI + +on: + push: + tags: + - 'v*' # Trigger on tags like v0.1.0, v1.0.0, etc. + +jobs: + build-and-publish: + name: Build and publish Python package to PyPI + runs-on: ubuntu-latest + # NOTE: You should create a "pypi-publish" environment in GitHub repo settings + # and configure it to match what you set up in PyPI trusted publishing + # Uncomment the following line once your environment is set up in GitHub: + # environment: pypi-publish + permissions: + id-token: write # Required for trusted publishing (OIDC) + contents: read # To checkout the repository + + steps: + - name: Checkout repository + uses: actions/checkout@v4 + with: + fetch-depth: 0 # Fetches all history for setuptools_scm + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: '3.9' + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + python -m pip install build twine + + - name: Build package + working-directory: ./pysmq + run: python -m build + + - name: Publish package to PyPI + uses: pypa/gh-action-pypi-publish@release/v1 + with: + packages-dir: ./pysmq/dist/ + skip-existing: true # Optional: Don't fail if version already exists + print-hash: true # Print hashes of uploaded files for verification From 737e5d5311a473a47473ddc0f63a1acb88443169 Mon Sep 17 00:00:00 2001 From: Suhaib Date: Wed, 14 May 2025 22:04:58 -0700 Subject: [PATCH 3/5] update --- .github/workflows/publish-to-pypi.yml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/.github/workflows/publish-to-pypi.yml b/.github/workflows/publish-to-pypi.yml index e36731b..5de120b 100644 --- a/.github/workflows/publish-to-pypi.yml +++ b/.github/workflows/publish-to-pypi.yml @@ -33,8 +33,14 @@ jobs: python -m pip install --upgrade pip python -m pip install build twine + - name: Extract version from tag + id: get_version + run: echo "VERSION=${GITHUB_REF#refs/tags/v}" >> $GITHUB_OUTPUT + - name: Build package working-directory: ./pysmq + env: + SETUPTOOLS_SCM_PRETEND_VERSION: ${{ steps.get_version.outputs.VERSION }} run: python -m build - name: Publish package to PyPI From 8037734dab153102d1dbc5e67d3fce71282e5008 Mon Sep 17 00:00:00 2001 From: Suhaib Date: Wed, 14 May 2025 22:15:11 -0700 Subject: [PATCH 4/5] updates --- .github/workflows/publish-to-pypi.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/publish-to-pypi.yml b/.github/workflows/publish-to-pypi.yml index 5de120b..755ce49 100644 --- a/.github/workflows/publish-to-pypi.yml +++ b/.github/workflows/publish-to-pypi.yml @@ -12,7 +12,7 @@ jobs: # NOTE: You should create a "pypi-publish" environment in GitHub repo settings # and configure it to match what you set up in PyPI trusted publishing # Uncomment the following line once your environment is set up in GitHub: - # environment: pypi-publish + environment: pypi permissions: id-token: write # Required for trusted publishing (OIDC) contents: read # To checkout the repository From dd6d6e29be984ba3200843137972921f6a14b330 Mon Sep 17 00:00:00 2001 From: Suhaib Date: Wed, 14 May 2025 22:19:14 -0700 Subject: [PATCH 5/5] updates --- .github/workflows/publish-to-pypi.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/publish-to-pypi.yml b/.github/workflows/publish-to-pypi.yml index 755ce49..b10a6c6 100644 --- a/.github/workflows/publish-to-pypi.yml +++ b/.github/workflows/publish-to-pypi.yml @@ -9,7 +9,7 @@ jobs: build-and-publish: name: Build and publish Python package to PyPI runs-on: ubuntu-latest - # NOTE: You should create a "pypi-publish" environment in GitHub repo settings + # NOTE: You should create an environment in GitHub repo settings # and configure it to match what you set up in PyPI trusted publishing # Uncomment the following line once your environment is set up in GitHub: environment: pypi @@ -40,7 +40,7 @@ jobs: - name: Build package working-directory: ./pysmq env: - SETUPTOOLS_SCM_PRETEND_VERSION: ${{ steps.get_version.outputs.VERSION }} + SETUPTOOLS_SCM_PRETEND_VERSION_FOR_PYSMQ: ${{ steps.get_version.outputs.VERSION }} run: python -m build - name: Publish package to PyPI