Skip to content

Commit da120e5

Browse files
authored
Fix ActiveRecord JSONB args (#60)
* Fix ActiveRecord inserting job args as JSONB strings instead of objects The ActiveRecord driver passed the JSON-encoded args string directly to the insert. ActiveRecord's JSONB type serializer JSON-encodes all values on write, so a string like '{"job_num":1}' gets double-encoded into a JSONB string literal rather than stored as a JSONB object. Go workers then fail to unmarshal the double-encoded value: json: cannot unmarshal string into Go value of type ...Args Parse the encoded args to a Hash before insert so ActiveRecord serializes it as a proper JSONB object. Add a deserialize_json helper on read paths for backwards compatibility with existing fixture data. * Expect JSONB object values on Ruby read paths, same as Go Remove the deserialize_json helper that tolerated double-encoded string values. ActiveRecord natively deserializes JSONB objects to Hashes, so no parsing is needed. Update test fixtures to use Hash args matching what ActiveRecord returns from properly stored JSONB objects.
1 parent 37b3cd2 commit da120e5

3 files changed

Lines changed: 39 additions & 9 deletions

File tree

driver/riverqueue-activerecord/lib/driver.rb

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ def transaction(&)
7171

7272
private def insert_params_to_hash(insert_params)
7373
{
74-
args: insert_params.encoded_args,
74+
args: JSON.parse(insert_params.encoded_args),
7575
kind: insert_params.kind,
7676
max_attempts: insert_params.max_attempts,
7777
priority: insert_params.priority,
@@ -91,7 +91,7 @@ def transaction(&)
9191

9292
River::JobRow.new(
9393
id: river_job.id,
94-
args: JSON.parse(river_job.args),
94+
args: river_job.args,
9595
attempt: river_job.attempt,
9696
attempted_at: river_job.attempted_at&.getutc,
9797
attempted_by: river_job.attempted_by,
@@ -153,7 +153,7 @@ def transaction(&)
153153
[
154154
River::JobRow.new(
155155
id: river_job["id"],
156-
args: JSON.parse(river_job["args"]),
156+
args: river_job["args"],
157157
attempt: river_job["attempt"],
158158
attempted_at: river_job["attempted_at"]&.getutc,
159159
attempted_by: river_job["attempted_by"],

driver/riverqueue-activerecord/spec/driver_spec.rb

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,26 @@
1515

1616
it_behaves_like "driver shared examples"
1717

18+
describe "client inserts" do
19+
it "persists args as a JSON object rather than a JSON string" do
20+
insert_res = client.insert(SimpleArgs.new(job_num: 1))
21+
22+
row = ActiveRecord::Base.connection.exec_query(<<~SQL).first
23+
SELECT args, jsonb_typeof(args) AS args_type
24+
FROM river_job
25+
WHERE id = #{insert_res.job.id}
26+
SQL
27+
28+
expect(row["args_type"]).to eq("object")
29+
expect(JSON.parse(row["args"])).to eq({"job_num" => 1})
30+
end
31+
end
32+
1833
describe "#to_job_row_from_model" do
1934
it "converts a database record to `River::JobRow` with minimal properties" do
2035
river_job = River::Driver::ActiveRecord::RiverJob.create(
2136
id: 1,
22-
args: %({"job_num":1}),
37+
args: {"job_num" => 1},
2338
kind: "simple",
2439
max_attempts: River::MAX_ATTEMPTS_DEFAULT,
2540
priority: River::PRIORITY_DEFAULT,
@@ -56,7 +71,7 @@
5671
attempted_at: now,
5772
attempted_by: ["client1"],
5873
created_at: now,
59-
args: %({"job_num":1}),
74+
args: {"job_num" => 1},
6075
finalized_at: now,
6176
kind: "simple",
6277
max_attempts: River::MAX_ATTEMPTS_DEFAULT,
@@ -93,7 +108,7 @@
93108
it "with errors" do
94109
now = Time.now.utc
95110
river_job = River::Driver::ActiveRecord::RiverJob.create(
96-
args: %({"job_num":1}),
111+
args: {"job_num" => 1},
97112
errors: [JSON.dump(
98113
{
99114
at: now,
@@ -124,7 +139,7 @@
124139
it "converts a database record to `River::JobRow` with minimal properties" do
125140
res = River::Driver::ActiveRecord::RiverJob.insert({
126141
id: 1,
127-
args: %({"job_num":1}),
142+
args: {"job_num" => 1},
128143
kind: "simple",
129144
max_attempts: River::MAX_ATTEMPTS_DEFAULT
130145
}, returning: Arel.sql("*, false AS unique_skipped_as_duplicate"))
@@ -159,7 +174,7 @@
159174
attempted_at: now,
160175
attempted_by: ["client1"],
161176
created_at: now,
162-
args: %({"job_num":1}),
177+
args: {"job_num" => 1},
163178
finalized_at: now,
164179
kind: "simple",
165180
max_attempts: River::MAX_ATTEMPTS_DEFAULT,
@@ -197,7 +212,7 @@
197212
it "with errors" do
198213
now = Time.now.utc
199214
res = River::Driver::ActiveRecord::RiverJob.insert({
200-
args: %({"job_num":1}),
215+
args: {"job_num" => 1},
201216
errors: [JSON.dump(
202217
{
203218
at: now,

driver/riverqueue-sequel/spec/driver_spec.rb

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,21 @@
99

1010
it_behaves_like "driver shared examples"
1111

12+
describe "client inserts" do
13+
it "persists args as a JSON object rather than a JSON string" do
14+
insert_res = client.insert(SimpleArgs.new(job_num: 1))
15+
16+
row = DB.fetch(<<~SQL, insert_res.job.id).first
17+
SELECT args, jsonb_typeof(args) AS args_type
18+
FROM river_job
19+
WHERE id = ?
20+
SQL
21+
22+
expect(row[:args_type]).to eq("object")
23+
expect(row[:args].to_h).to eq({"job_num" => 1})
24+
end
25+
end
26+
1227
describe "#to_job_row" do
1328
it "converts a database record to `River::JobRow` with minimal properties" do
1429
river_job = DB[:river_job].returning.insert_select({

0 commit comments

Comments
 (0)