diff --git a/CHANGELOG.md b/CHANGELOG.md index a3995c5..b6bc618 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,10 @@ # Changelog All notable changes to this project will be documented in this file. +## [6.11.0] - 2026-03-25 +### Added +- Ability to specify option for compression when publish data + ## [6.10.0] - 2025-11-21 ### Added - Add on_first_sync callback diff --git a/Gemfile b/Gemfile index 7f4b456..1ef87d6 100644 --- a/Gemfile +++ b/Gemfile @@ -19,3 +19,5 @@ gem "bundler" gem "ostruct" gem "pry" gem "rake" + +gem "rabbit_messaging", git: "https://github.com/umbrellio/rabbit_messaging.git", branch: "compression-for-publisher-and-reciver" diff --git a/Gemfile.lock b/Gemfile.lock index daafac9..efc3dde 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -1,9 +1,20 @@ +GIT + remote: https://github.com/umbrellio/rabbit_messaging.git + revision: a7c85902a8655745525e3f378e58f816ec4ab9c0 + branch: compression-for-publisher-and-reciver + specs: + rabbit_messaging (1.8.0) + bunny (~> 2.0) + kicks + msgpack + zlib + PATH remote: . specs: - table_sync (6.10.0) + table_sync (6.11.0) memery - rabbit_messaging (>= 1.7.0) + rabbit_messaging (>= 1.8.0) rails self_data @@ -81,9 +92,9 @@ GEM minitest (>= 5.1) securerandom (>= 0.3) tzinfo (~> 2.0, >= 2.0.5) - amq-protocol (2.3.4) + amq-protocol (2.5.1) ast (2.4.3) - base64 (0.2.0) + base64 (0.3.0) benchmark (0.4.0) bigdecimal (3.1.9) builder (3.3.0) @@ -92,7 +103,7 @@ GEM sorted_set (~> 1, >= 1.0.2) cgi (0.5.0) coderay (1.1.3) - concurrent-ruby (1.3.5) + concurrent-ruby (1.3.6) connection_pool (2.5.3) crass (1.0.6) date (3.5.0) @@ -136,6 +147,7 @@ GEM mini_mime (1.1.5) mini_portile2 (2.8.9) minitest (5.25.5) + msgpack (1.8.0) net-imap (0.5.12) date net-protocol @@ -181,9 +193,6 @@ GEM psych (5.2.6) date stringio - rabbit_messaging (1.7.0) - bunny (~> 2.0) - kicks racc (1.8.1) rack (3.1.15) rack-session (2.1.1) @@ -223,7 +232,7 @@ GEM thor (~> 1.0, >= 1.2.2) zeitwerk (~> 2.6) rainbow (3.1.1) - rake (13.2.1) + rake (13.3.1) rbtree (0.4.6) rdoc (6.15.1) erb @@ -298,7 +307,6 @@ GEM base64 (~> 0.1) logger (~> 1.4) sigdump (~> 0.2.2) - set (1.1.2) sigdump (0.2.5) simplecov (0.22.0) docile (~> 1.1) @@ -307,11 +315,10 @@ GEM simplecov-html (0.13.1) simplecov-lcov (0.8.0) simplecov_json_formatter (0.1.4) - sorted_set (1.0.3) + sorted_set (1.1.0) rbtree - set (~> 1.0) stringio (3.1.8) - thor (1.4.0) + thor (1.5.0) timecop (0.9.10) timeout (0.4.3) tsort (0.2.0) @@ -326,6 +333,7 @@ GEM websocket-extensions (>= 0.1.0) websocket-extensions (0.1.5) zeitwerk (2.6.18) + zlib (3.2.3) PLATFORMS aarch64-linux-gnu @@ -345,6 +353,7 @@ DEPENDENCIES ostruct pg pry + rabbit_messaging! rake rspec rubocop-config-umbrellio diff --git a/lib/table_sync.rb b/lib/table_sync.rb index 0ac8aec..52441d1 100644 --- a/lib/table_sync.rb +++ b/lib/table_sync.rb @@ -52,6 +52,7 @@ def sync(object_class, **options) if_condition: options[:if], unless_condition: options[:unless], debounce_time: options[:debounce_time], + compress: options.fetch(:compress, false), ).register_callbacks end diff --git a/lib/table_sync/instrument_adapter/active_support.rb b/lib/table_sync/instrument_adapter/active_support.rb index 8d60a34..5287e21 100644 --- a/lib/table_sync/instrument_adapter/active_support.rb +++ b/lib/table_sync/instrument_adapter/active_support.rb @@ -4,13 +4,21 @@ module TableSync::InstrumentAdapter module ActiveSupport module_function - def notify(table:, schema:, event:, direction:, count: 1) + def notify( # rubocop:disable Metrics/ParameterLists + table:, + schema:, + event:, + direction:, + count: 1, + compress: false + ) ::ActiveSupport::Notifications.instrument "tablesync.#{direction}.#{event}", count:, table: table.to_s, schema: schema.to_s, event:, - direction: + direction:, + compress: end end end diff --git a/lib/table_sync/publishing/batch.rb b/lib/table_sync/publishing/batch.rb index 19801d6..50b5a95 100644 --- a/lib/table_sync/publishing/batch.rb +++ b/lib/table_sync/publishing/batch.rb @@ -6,7 +6,8 @@ class TableSync::Publishing::Batch :custom_version, :routing_key, :headers, - :event + :event, + :compress def initialize(attrs = {}) attrs = attrs.with_indifferent_access @@ -17,6 +18,7 @@ def initialize(attrs = {}) self.routing_key = attrs[:routing_key] self.headers = attrs[:headers] self.event = attrs.fetch(:event, :update).to_sym + self.compress = attrs.fetch(:compress, false) validate_required_attributes! end @@ -55,6 +57,7 @@ def attributes routing_key: routing_key, headers: headers, event: event, + compress: compress, } end diff --git a/lib/table_sync/publishing/message/base.rb b/lib/table_sync/publishing/message/base.rb index e0fb5e6..30bdbae 100644 --- a/lib/table_sync/publishing/message/base.rb +++ b/lib/table_sync/publishing/message/base.rb @@ -5,7 +5,8 @@ class Base attr_accessor :custom_version, :object_class, :original_attributes, - :event + :event, + :compress attr_reader :objects @@ -14,6 +15,7 @@ def initialize(params = {}) self.object_class = params[:object_class] self.original_attributes = params[:original_attributes] self.event = params[:event].to_sym + self.compress = params.fetch(:compress, false) @objects = find_or_init_objects @@ -65,6 +67,7 @@ def notify! table: model_naming.table, schema: model_naming.schema, event:, + compress:, direction: :publish, count: objects.count, ) diff --git a/lib/table_sync/publishing/message/batch.rb b/lib/table_sync/publishing/message/batch.rb index b40a1df..73cb215 100644 --- a/lib/table_sync/publishing/message/batch.rb +++ b/lib/table_sync/publishing/message/batch.rb @@ -13,7 +13,7 @@ def initialize(params = {}) def params TableSync::Publishing::Params::Batch.new( - { object_class:, headers:, routing_key: }.compact, + { object_class:, headers:, routing_key:, compress: }.compact, ).construct end end diff --git a/lib/table_sync/publishing/message/raw.rb b/lib/table_sync/publishing/message/raw.rb index 6704312..1f03cd8 100644 --- a/lib/table_sync/publishing/message/raw.rb +++ b/lib/table_sync/publishing/message/raw.rb @@ -9,7 +9,8 @@ class Raw :routing_key, :headers, :custom_version, - :event + :event, + :compress def initialize(params = {}) self.model_name = params[:model_name] @@ -20,6 +21,7 @@ def initialize(params = {}) self.headers = params[:headers] self.custom_version = params[:custom_version] self.event = params[:event] + self.compress = params.fetch(:compress, false) end def publish @@ -35,6 +37,7 @@ def notify! table: table_name, schema: schema_name, event:, + compress: compress, count: original_attributes.count, direction: :publish, ) @@ -57,7 +60,7 @@ def data def params TableSync::Publishing::Params::Raw.new( - { model_name:, headers:, routing_key: }.compact, + { model_name:, headers:, routing_key:, compress: }.compact, ).construct end end diff --git a/lib/table_sync/publishing/message/single.rb b/lib/table_sync/publishing/message/single.rb index 40ae8ab..4954f94 100644 --- a/lib/table_sync/publishing/message/single.rb +++ b/lib/table_sync/publishing/message/single.rb @@ -7,7 +7,7 @@ def object end def params - TableSync::Publishing::Params::Single.new(object:).construct + TableSync::Publishing::Params::Single.new(object:, compress:).construct end end end diff --git a/lib/table_sync/publishing/params/base.rb b/lib/table_sync/publishing/params/base.rb index a8ca63a..af951ac 100644 --- a/lib/table_sync/publishing/params/base.rb +++ b/lib/table_sync/publishing/params/base.rb @@ -13,9 +13,12 @@ def construct routing_key:, headers:, exchange_name:, + compress: compress, ) end + attr_accessor :compress + private def calculated_routing_key diff --git a/lib/table_sync/publishing/params/batch.rb b/lib/table_sync/publishing/params/batch.rb index 73fd1fe..65db027 100644 --- a/lib/table_sync/publishing/params/batch.rb +++ b/lib/table_sync/publishing/params/batch.rb @@ -10,6 +10,7 @@ def initialize(attrs = {}) @exchange_name = attrs[:exchange_name] @routing_key = attrs[:routing_key] @headers = attrs[:headers] + @compress = attrs.fetch(:compress, false) end def exchange_name diff --git a/lib/table_sync/publishing/params/single.rb b/lib/table_sync/publishing/params/single.rb index 3c5e733..da7bfd0 100644 --- a/lib/table_sync/publishing/params/single.rb +++ b/lib/table_sync/publishing/params/single.rb @@ -4,10 +4,11 @@ module TableSync::Publishing::Params class Single < Base attr_reader :object, :routing_key, :headers - def initialize(object:) + def initialize(object:, compress: false) @object = object @routing_key = calculated_routing_key @headers = calculated_headers + @compress = compress end private diff --git a/lib/table_sync/publishing/raw.rb b/lib/table_sync/publishing/raw.rb index 25cdbd8..24991aa 100644 --- a/lib/table_sync/publishing/raw.rb +++ b/lib/table_sync/publishing/raw.rb @@ -10,7 +10,8 @@ class TableSync::Publishing::Raw :custom_version, :routing_key, :headers, - :event + :event, + :compress def initialize(attributes = {}) attributes = attributes.with_indifferent_access @@ -23,6 +24,7 @@ def initialize(attributes = {}) self.routing_key = attributes[:routing_key] self.headers = attributes[:headers] self.event = attributes.fetch(:event, :update).to_sym + self.compress = attributes.fetch(:compress, false) end require_attributes :model_name, :original_attributes @@ -47,6 +49,7 @@ def attributes routing_key: routing_key, headers: headers, event: event, + compress: compress, } end end diff --git a/lib/table_sync/publishing/single.rb b/lib/table_sync/publishing/single.rb index a55ca6b..3146d4e 100644 --- a/lib/table_sync/publishing/single.rb +++ b/lib/table_sync/publishing/single.rb @@ -7,7 +7,8 @@ class TableSync::Publishing::Single :original_attributes, :debounce_time, :custom_version, - :event + :event, + :compress def initialize(attrs = {}) attrs = attrs.with_indifferent_access @@ -15,6 +16,7 @@ def initialize(attrs = {}) self.object_class = attrs[:object_class] self.original_attributes = attrs[:original_attributes] self.debounce_time = attrs[:debounce_time] + self.compress = attrs.fetch(:compress, false) self.custom_version = attrs[:custom_version] self.event = attrs.fetch(:event, :update) end @@ -56,6 +58,7 @@ def attributes debounce_time: debounce_time, custom_version: custom_version, event: event, + compress: compress, } end diff --git a/lib/table_sync/setup/active_record.rb b/lib/table_sync/setup/active_record.rb index 531f3b7..c8c493b 100644 --- a/lib/table_sync/setup/active_record.rb +++ b/lib/table_sync/setup/active_record.rb @@ -16,6 +16,7 @@ def define_after_commit(event) original_attributes: attributes, event:, debounce_time: options[:debounce_time], + compress: options.fetch(:compress, false), ).publish_later end end diff --git a/lib/table_sync/setup/base.rb b/lib/table_sync/setup/base.rb index 7e0f445..e9fa170 100644 --- a/lib/table_sync/setup/base.rb +++ b/lib/table_sync/setup/base.rb @@ -6,7 +6,7 @@ class Base INVALID_EVENT = Class.new(StandardError) INVALID_CONDITION = Class.new(StandardError) - attr_accessor :object_class, :debounce_time, :on, :if_condition, :unless_condition + attr_accessor :object_class, :debounce_time, :on, :if_condition, :unless_condition, :compress def initialize(attrs = {}) attrs.each do |key, value| @@ -57,6 +57,7 @@ def options_exposed_for_block if: if_condition, unless: unless_condition, debounce_time:, + compress: compress, } end end diff --git a/lib/table_sync/setup/sequel.rb b/lib/table_sync/setup/sequel.rb index c3e61cf..e908a8d 100644 --- a/lib/table_sync/setup/sequel.rb +++ b/lib/table_sync/setup/sequel.rb @@ -15,6 +15,7 @@ def define_after_commit(event) original_attributes: values, event:, debounce_time: options[:debounce_time], + compress: options.fetch(:compress, false), ).publish_later end end diff --git a/lib/table_sync/version.rb b/lib/table_sync/version.rb index bee1ab1..685f4f1 100644 --- a/lib/table_sync/version.rb +++ b/lib/table_sync/version.rb @@ -1,5 +1,5 @@ # frozen_string_literal: true module TableSync - VERSION = "6.10.0" + VERSION = "6.11.0" end diff --git a/spec/publishing/batch_spec.rb b/spec/publishing/batch_spec.rb index de14a8f..27fee46 100644 --- a/spec/publishing/batch_spec.rb +++ b/spec/publishing/batch_spec.rb @@ -11,6 +11,7 @@ let(:routing_key) { object_class.tableize } let(:expected_routing_key) { "a_record_users" } let(:headers) { { klass: object_class } } + let(:compress) { false } let(:attributes) do { @@ -20,6 +21,7 @@ headers:, routing_key:, custom_version: nil, + compress:, } end @@ -29,6 +31,16 @@ TableSync::Publishing::Batch, %i[object_class original_attributes] + context "when compress option has been provided" do + let(:compress) { true } + + it_behaves_like "publisher#publish_now with stubbed message", + TableSync::Publishing::Message::Batch + it_behaves_like "publisher#new without expected fields", + TableSync::Publishing::Batch, + %i[object_class original_attributes] + end + context "real user" do context "sequel" do let(:object_class) { "SequelUser" } diff --git a/spec/publishing/message/batch_spec.rb b/spec/publishing/message/batch_spec.rb index a4906e6..457f008 100644 --- a/spec/publishing/message/batch_spec.rb +++ b/spec/publishing/message/batch_spec.rb @@ -40,6 +40,7 @@ object_class: attributes[:object_class], routing_key: attributes[:routing_key], headers: attributes[:headers], + compress: false, } end @@ -66,6 +67,21 @@ described_class.new(attributes).publish end + + context "when compress option has been specified" do + let(:attributes) { super().merge(compress: true) } + let(:params_attributes) { super().merge(compress: true) } + + it "calls data and params with correct attrs" do + expect(data_class).to receive(:new).with(data_attributes) + expect(params_class).to receive(:new).with(params_attributes) + + expect(data).to receive(:construct) + expect(params).to receive(:construct) + + described_class.new(attributes).publish + end + end end it "calls Rabbit#publish" do diff --git a/spec/publishing/message/raw_spec.rb b/spec/publishing/message/raw_spec.rb index 2bcbd36..0cfe6bd 100644 --- a/spec/publishing/message/raw_spec.rb +++ b/spec/publishing/message/raw_spec.rb @@ -34,6 +34,7 @@ model_name: attributes[:model_name], routing_key: attributes[:routing_key], headers: attributes[:headers], + compress: false, } end @@ -54,6 +55,21 @@ described_class.new(attributes).publish end + + context "when compress option has been specified" do + let(:attributes) { super().merge(compress: true) } + let(:params_attributes) { super().merge(compress: true) } + + it "calls data and params with correct attrs" do + expect(data_class).to receive(:new).with(data_attributes) + expect(params_class).to receive(:new).with(params_attributes) + + expect(data).to receive(:construct) + expect(params).to receive(:construct) + + described_class.new(attributes).publish + end + end end it "calls Rabbit#publish" do diff --git a/spec/publishing/message/single_spec.rb b/spec/publishing/message/single_spec.rb index 58b87d1..d86945f 100644 --- a/spec/publishing/message/single_spec.rb +++ b/spec/publishing/message/single_spec.rb @@ -41,6 +41,7 @@ let(:params_attributes) do { object:, + compress: false, } end @@ -65,6 +66,21 @@ described_class.new(attributes).publish end + + context "when compress option has been specified" do + let(:attributes) { super().merge(compress: true) } + let(:params_attributes) { super().merge(compress: true) } + + it "calls data and params with correct attrs" do + expect(data_class).to receive(:new).with(data_attributes) + expect(params_class).to receive(:new).with(params_attributes) + + expect(data).to receive(:construct) + expect(params).to receive(:construct) + + described_class.new(attributes).publish + end + end end it "calls Rabbit#publish" do diff --git a/spec/publishing/params/batch_spec.rb b/spec/publishing/params/batch_spec.rb index 5a7afb6..e137f0f 100644 --- a/spec/publishing/params/batch_spec.rb +++ b/spec/publishing/params/batch_spec.rb @@ -10,6 +10,7 @@ { confirm_select: true, realtime: true, + compress: false, event: :table_sync, } end @@ -27,6 +28,13 @@ end describe "#construct" do + context "when compress option has been provided" do + let(:attributes) { super().merge(compress: true) } + let(:expected_values) { default_expected_values.merge(compress: true) } + + it_behaves_like "constructs with expected values" + end + context "default params" do let(:expected_values) { default_expected_values } diff --git a/spec/publishing/params/raw_spec.rb b/spec/publishing/params/raw_spec.rb index f5584bc..37d9505 100644 --- a/spec/publishing/params/raw_spec.rb +++ b/spec/publishing/params/raw_spec.rb @@ -11,6 +11,7 @@ confirm_select: true, realtime: true, event: :table_sync, + compress: false, } end @@ -27,6 +28,13 @@ end describe "#construct" do + context "when compress option has been provided" do + let(:attributes) { super().merge(compress: true) } + let(:expected_values) { default_expected_values.merge(compress: true) } + + it_behaves_like "constructs with expected values" + end + context "default params" do let(:expected_values) { default_expected_values } diff --git a/spec/publishing/params/single_spec.rb b/spec/publishing/params/single_spec.rb index 8a69070..6f8ae35 100644 --- a/spec/publishing/params/single_spec.rb +++ b/spec/publishing/params/single_spec.rb @@ -17,6 +17,7 @@ confirm_select: true, realtime: true, event: :table_sync, + compress: false, } end @@ -39,6 +40,13 @@ end describe "#construct" do + context "when compress option has been provided" do + let(:attributes) { super().merge(compress: true) } + let(:expected_values) { default_expected_values.merge(compress: true) } + + it_behaves_like "constructs with expected values" + end + context "default params" do let(:expected_values) { default_expected_values } diff --git a/spec/publishing/raw_spec.rb b/spec/publishing/raw_spec.rb index c52b0f3..018605d 100644 --- a/spec/publishing/raw_spec.rb +++ b/spec/publishing/raw_spec.rb @@ -10,6 +10,7 @@ let(:original_attributes) { [{ id: 1, name: "purum" }] } let(:table_name) { "sequel_users" } let(:schema_name) { "public" } + let(:compress) { false } let(:attributes) do { @@ -21,6 +22,7 @@ table_name:, schema_name:, custom_version: nil, + compress:, } end @@ -32,6 +34,16 @@ it_behaves_like "publisher#publish_now without stubbed message", TableSync::Publishing::Message::Raw + context "when compress option has been provided" do + let(:compress) { true } + + it_behaves_like "publisher#publish_now with stubbed message", + TableSync::Publishing::Message::Raw + + it_behaves_like "publisher#publish_now without stubbed message", + TableSync::Publishing::Message::Raw + end + context "when routing_key is nil" do let(:routing_key) { nil } let(:expected_routing_key) { "sequel_users" } diff --git a/spec/publishing/single_spec.rb b/spec/publishing/single_spec.rb index d674979..4000498 100644 --- a/spec/publishing/single_spec.rb +++ b/spec/publishing/single_spec.rb @@ -11,6 +11,7 @@ let(:expected_routing_key) { "a_record_users" } let(:headers) { { klass: object_class } } let(:debounce_time) { 30 } + let(:compress) { false } let(:attributes) do { @@ -19,10 +20,18 @@ event:, debounce_time:, custom_version: nil, + compress:, } end describe "#publish_now" do + context "when compress option has been provided" do + let(:compress) { true } + + it_behaves_like "publisher#publish_now with stubbed message", + TableSync::Publishing::Message::Single + end + it_behaves_like "publisher#publish_now with stubbed message", TableSync::Publishing::Message::Single diff --git a/table_sync.gemspec b/table_sync.gemspec index dc01905..6f1a8c7 100644 --- a/table_sync.gemspec +++ b/table_sync.gemspec @@ -27,7 +27,7 @@ Gem::Specification.new do |spec| end spec.add_dependency "memery" - spec.add_dependency "rabbit_messaging", ">= 1.7.0" + spec.add_dependency "rabbit_messaging", ">= 1.8.0" spec.add_dependency "rails" spec.add_dependency "self_data" end