Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
# Changelog
All notable changes to this project will be documented in this file.

## [6.11.0] - 2026-03-25
### Added
- Ability to specify option for compression when publish data

## [6.10.0] - 2025-11-21
### Added
- Add on_first_sync callback
Expand Down
2 changes: 2 additions & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,5 @@ gem "bundler"
gem "ostruct"
gem "pry"
gem "rake"

gem "rabbit_messaging", git: "https://github.com/umbrellio/rabbit_messaging.git", branch: "compression-for-publisher-and-reciver"
21 changes: 16 additions & 5 deletions Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,9 +1,20 @@
GIT
remote: https://github.com/umbrellio/rabbit_messaging.git
revision: 2a31c6878f012fd3a31f538a9d1ac2d0670a2e50
branch: compression-for-publisher-and-reciver
specs:
rabbit_messaging (1.8.0)
bunny (~> 2.0)
kicks
msgpack
zlib

PATH
remote: .
specs:
table_sync (6.10.0)
table_sync (6.11.0)
memery
rabbit_messaging (>= 1.7.0)
rabbit_messaging (>= 1.8.0)
rails
self_data

Expand Down Expand Up @@ -136,6 +147,7 @@ GEM
mini_mime (1.1.5)
mini_portile2 (2.8.9)
minitest (5.25.5)
msgpack (1.8.0)
net-imap (0.5.12)
date
net-protocol
Expand Down Expand Up @@ -181,9 +193,6 @@ GEM
psych (5.2.6)
date
stringio
rabbit_messaging (1.7.0)
bunny (~> 2.0)
kicks
racc (1.8.1)
rack (3.1.15)
rack-session (2.1.1)
Expand Down Expand Up @@ -326,6 +335,7 @@ GEM
websocket-extensions (>= 0.1.0)
websocket-extensions (0.1.5)
zeitwerk (2.6.18)
zlib (3.2.3)

PLATFORMS
aarch64-linux-gnu
Expand All @@ -345,6 +355,7 @@ DEPENDENCIES
ostruct
pg
pry
rabbit_messaging!
rake
rspec
rubocop-config-umbrellio
Expand Down
1 change: 1 addition & 0 deletions lib/table_sync.rb
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ def sync(object_class, **options)
if_condition: options[:if],
unless_condition: options[:unless],
debounce_time: options[:debounce_time],
compress: options.fetch(:compress, false),
).register_callbacks
end

Expand Down
12 changes: 10 additions & 2 deletions lib/table_sync/instrument_adapter/active_support.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,21 @@ module TableSync::InstrumentAdapter
module ActiveSupport
module_function

def notify(table:, schema:, event:, direction:, count: 1)
def notify( # rubocop:disable Metrics/ParameterLists
table:,
schema:,
event:,
direction:,
count: 1,
compress: false
)
::ActiveSupport::Notifications.instrument "tablesync.#{direction}.#{event}",
count:,
table: table.to_s,
schema: schema.to_s,
event:,
direction:
direction:,
compress:
end
end
end
5 changes: 4 additions & 1 deletion lib/table_sync/publishing/batch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ class TableSync::Publishing::Batch
:custom_version,
:routing_key,
:headers,
:event
:event,
:compress

def initialize(attrs = {})
attrs = attrs.with_indifferent_access
Expand All @@ -17,6 +18,7 @@ def initialize(attrs = {})
self.routing_key = attrs[:routing_key]
self.headers = attrs[:headers]
self.event = attrs.fetch(:event, :update).to_sym
self.compress = attrs.fetch(:compress, false)

validate_required_attributes!
end
Expand Down Expand Up @@ -55,6 +57,7 @@ def attributes
routing_key: routing_key,
headers: headers,
event: event,
compress: compress,
}
end

Expand Down
5 changes: 4 additions & 1 deletion lib/table_sync/publishing/message/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ class Base
attr_accessor :custom_version,
:object_class,
:original_attributes,
:event
:event,
:compress

attr_reader :objects

Expand All @@ -14,6 +15,7 @@ def initialize(params = {})
self.object_class = params[:object_class]
self.original_attributes = params[:original_attributes]
self.event = params[:event].to_sym
self.compress = params.fetch(:compress, false)

@objects = find_or_init_objects

Expand Down Expand Up @@ -65,6 +67,7 @@ def notify!
table: model_naming.table,
schema: model_naming.schema,
event:,
compress:,
direction: :publish,
count: objects.count,
)
Expand Down
2 changes: 1 addition & 1 deletion lib/table_sync/publishing/message/batch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ def initialize(params = {})

def params
TableSync::Publishing::Params::Batch.new(
{ object_class:, headers:, routing_key: }.compact,
{ object_class:, headers:, routing_key:, compress: }.compact,
).construct
end
end
Expand Down
7 changes: 5 additions & 2 deletions lib/table_sync/publishing/message/raw.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ class Raw
:routing_key,
:headers,
:custom_version,
:event
:event,
:compress

def initialize(params = {})
self.model_name = params[:model_name]
Expand All @@ -20,6 +21,7 @@ def initialize(params = {})
self.headers = params[:headers]
self.custom_version = params[:custom_version]
self.event = params[:event]
self.compress = params.fetch(:compress, false)
end

def publish
Expand All @@ -35,6 +37,7 @@ def notify!
table: table_name,
schema: schema_name,
event:,
compress: compress,
count: original_attributes.count,
direction: :publish,
)
Expand All @@ -57,7 +60,7 @@ def data

def params
TableSync::Publishing::Params::Raw.new(
{ model_name:, headers:, routing_key: }.compact,
{ model_name:, headers:, routing_key:, compress: }.compact,
).construct
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/table_sync/publishing/message/single.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ def object
end

def params
TableSync::Publishing::Params::Single.new(object:).construct
TableSync::Publishing::Params::Single.new(object:, compress:).construct
end
end
end
3 changes: 3 additions & 0 deletions lib/table_sync/publishing/params/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,12 @@ def construct
routing_key:,
headers:,
exchange_name:,
compress: compress,
)
end

attr_accessor :compress

private

def calculated_routing_key
Expand Down
1 change: 1 addition & 0 deletions lib/table_sync/publishing/params/batch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ def initialize(attrs = {})
@exchange_name = attrs[:exchange_name]
@routing_key = attrs[:routing_key]
@headers = attrs[:headers]
@compress = attrs.fetch(:compress, false)
end

def exchange_name
Expand Down
3 changes: 2 additions & 1 deletion lib/table_sync/publishing/params/single.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ module TableSync::Publishing::Params
class Single < Base
attr_reader :object, :routing_key, :headers

def initialize(object:)
def initialize(object:, compress: false)
@object = object
@routing_key = calculated_routing_key
@headers = calculated_headers
@compress = compress
end

private
Expand Down
5 changes: 4 additions & 1 deletion lib/table_sync/publishing/raw.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ class TableSync::Publishing::Raw
:custom_version,
:routing_key,
:headers,
:event
:event,
:compress

def initialize(attributes = {})
attributes = attributes.with_indifferent_access
Expand All @@ -23,6 +24,7 @@ def initialize(attributes = {})
self.routing_key = attributes[:routing_key]
self.headers = attributes[:headers]
self.event = attributes.fetch(:event, :update).to_sym
self.compress = attributes.fetch(:compress, false)
end

require_attributes :model_name, :original_attributes
Expand All @@ -47,6 +49,7 @@ def attributes
routing_key: routing_key,
headers: headers,
event: event,
compress: compress,
}
end
end
5 changes: 4 additions & 1 deletion lib/table_sync/publishing/single.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,16 @@ class TableSync::Publishing::Single
:original_attributes,
:debounce_time,
:custom_version,
:event
:event,
:compress

def initialize(attrs = {})
attrs = attrs.with_indifferent_access

self.object_class = attrs[:object_class]
self.original_attributes = attrs[:original_attributes]
self.debounce_time = attrs[:debounce_time]
self.compress = attrs.fetch(:compress, false)
self.custom_version = attrs[:custom_version]
self.event = attrs.fetch(:event, :update)
end
Expand Down Expand Up @@ -56,6 +58,7 @@ def attributes
debounce_time: debounce_time,
custom_version: custom_version,
event: event,
compress: compress,
}
end

Expand Down
1 change: 1 addition & 0 deletions lib/table_sync/setup/active_record.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ def define_after_commit(event)
original_attributes: attributes,
event:,
debounce_time: options[:debounce_time],
compress: options.fetch(:compress, false),
).publish_later
end
end
Expand Down
3 changes: 2 additions & 1 deletion lib/table_sync/setup/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ class Base
INVALID_EVENT = Class.new(StandardError)
INVALID_CONDITION = Class.new(StandardError)

attr_accessor :object_class, :debounce_time, :on, :if_condition, :unless_condition
attr_accessor :object_class, :debounce_time, :on, :if_condition, :unless_condition, :compress

def initialize(attrs = {})
attrs.each do |key, value|
Expand Down Expand Up @@ -57,6 +57,7 @@ def options_exposed_for_block
if: if_condition,
unless: unless_condition,
debounce_time:,
compress: compress,
}
end
end
Expand Down
1 change: 1 addition & 0 deletions lib/table_sync/setup/sequel.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ def define_after_commit(event)
original_attributes: values,
event:,
debounce_time: options[:debounce_time],
compress: options.fetch(:compress, false),
).publish_later
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/table_sync/version.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# frozen_string_literal: true

module TableSync
VERSION = "6.10.0"
VERSION = "6.11.0"
end
12 changes: 12 additions & 0 deletions spec/publishing/batch_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
let(:routing_key) { object_class.tableize }
let(:expected_routing_key) { "a_record_users" }
let(:headers) { { klass: object_class } }
let(:compress) { false }

let(:attributes) do
{
Expand All @@ -20,6 +21,7 @@
headers:,
routing_key:,
custom_version: nil,
compress:,
}
end

Expand All @@ -29,6 +31,16 @@
TableSync::Publishing::Batch,
%i[object_class original_attributes]

context "when compress option has been provided" do
let(:compress) { true }

it_behaves_like "publisher#publish_now with stubbed message",
TableSync::Publishing::Message::Batch
it_behaves_like "publisher#new without expected fields",
TableSync::Publishing::Batch,
%i[object_class original_attributes]
end

context "real user" do
context "sequel" do
let(:object_class) { "SequelUser" }
Expand Down
16 changes: 16 additions & 0 deletions spec/publishing/message/batch_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
object_class: attributes[:object_class],
routing_key: attributes[:routing_key],
headers: attributes[:headers],
compress: false,
}
end

Expand All @@ -66,6 +67,21 @@

described_class.new(attributes).publish
end

context "when compress option has been specified" do
let(:attributes) { super().merge(compress: true) }
let(:params_attributes) { super().merge(compress: true) }

it "calls data and params with correct attrs" do
expect(data_class).to receive(:new).with(data_attributes)
expect(params_class).to receive(:new).with(params_attributes)

expect(data).to receive(:construct)
expect(params).to receive(:construct)

described_class.new(attributes).publish
end
end
end

it "calls Rabbit#publish" do
Expand Down
Loading
Loading