Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pkg/etl/db/sql/migrations/0030_social_upsert_indexes.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
DROP INDEX IF EXISTS reposts_current_uniq_idx;
DROP INDEX IF EXISTS saves_current_uniq_idx;
DROP INDEX IF EXISTS follows_current_uniq_idx;
DROP INDEX IF EXISTS subscriptions_current_uniq_idx;
17 changes: 17 additions & 0 deletions pkg/etl/db/sql/migrations/0030_social_upsert_indexes.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
-- Upsert support for social writes (reposts/saves/follows/subscriptions).
-- The entity_manager now upserts the single is_current row in place instead of
-- demote-then-insert, so it needs a unique arbiter on the entity identity.
-- Partial on is_current=true: only one current row per identity is an existing
-- invariant, so historical demoted rows are ignored and no dedup/backfill runs.

CREATE UNIQUE INDEX IF NOT EXISTS reposts_current_uniq_idx
ON reposts (user_id, repost_item_id, repost_type) WHERE is_current = true;

CREATE UNIQUE INDEX IF NOT EXISTS saves_current_uniq_idx
ON saves (user_id, save_item_id, save_type) WHERE is_current = true;

CREATE UNIQUE INDEX IF NOT EXISTS follows_current_uniq_idx
ON follows (follower_user_id, followee_user_id) WHERE is_current = true;

CREATE UNIQUE INDEX IF NOT EXISTS subscriptions_current_uniq_idx
ON subscriptions (subscriber_id, user_id) WHERE is_current = true;
37 changes: 17 additions & 20 deletions pkg/etl/processors/entity_manager/social_follow.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,41 +77,38 @@ func validateUnfollow(ctx context.Context, params *Params) error {
// --- shared ---

func insertFollow(ctx context.Context, params *Params, isDelete bool) error {
_, err := params.DBTX.Exec(ctx,
"UPDATE follows SET is_current = false WHERE follower_user_id = $1 AND followee_user_id = $2 AND is_current = true",
params.UserID, params.EntityID)
if err != nil {
return err
}

// PK is (follower_user_id, followee_user_id, txhash); re-delivery of the
// same chain tx would otherwise 23505 on the PK. Same txhash means
// identical row content, so DO NOTHING is correct.
_, err = params.DBTX.Exec(ctx, `
// Upsert the single current row in place (arbiter: follows_current_uniq_idx).
// Replaces demote-then-insert: avoids unbounded is_current=false history and
// gives the aggregate triggers an O(1) is_delete transition to track.
_, err := params.DBTX.Exec(ctx, `
INSERT INTO follows (
follower_user_id, followee_user_id, is_current, is_delete,
created_at, txhash, blocknumber
) VALUES ($1, $2, true, $3, $4, $5, $6)
ON CONFLICT (follower_user_id, followee_user_id, txhash) DO NOTHING
ON CONFLICT (follower_user_id, followee_user_id) WHERE is_current = true
DO UPDATE SET
is_delete = EXCLUDED.is_delete,
created_at = EXCLUDED.created_at,
txhash = EXCLUDED.txhash,
blocknumber = EXCLUDED.blocknumber
`, params.UserID, params.EntityID, isDelete, params.BlockTime, params.TxHash, params.BlockNumber)
if err != nil {
return err
}

// Follow/Unfollow also creates/deletes a Subscription record
_, err = params.DBTX.Exec(ctx,
"UPDATE subscriptions SET is_current = false WHERE subscriber_id = $1 AND user_id = $2 AND is_current = true",
params.UserID, params.EntityID)
if err != nil {
return err
}
// PK is (subscriber_id, user_id, txhash); same DO NOTHING rationale.
// (arbiter: subscriptions_current_uniq_idx).
_, err = params.DBTX.Exec(ctx, `
INSERT INTO subscriptions (
subscriber_id, user_id, is_current, is_delete,
created_at, txhash, blocknumber
) VALUES ($1, $2, true, $3, $4, $5, $6)
ON CONFLICT (subscriber_id, user_id, txhash) DO NOTHING
ON CONFLICT (subscriber_id, user_id) WHERE is_current = true
DO UPDATE SET
is_delete = EXCLUDED.is_delete,
created_at = EXCLUDED.created_at,
txhash = EXCLUDED.txhash,
blocknumber = EXCLUDED.blocknumber
`, params.UserID, params.EntityID, isDelete, params.BlockTime, params.TxHash, params.BlockNumber)
return err
}
Expand Down
25 changes: 11 additions & 14 deletions pkg/etl/processors/entity_manager/social_repost.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,24 +82,21 @@ func insertRepost(ctx context.Context, params *Params, isDelete bool) error {
repostType := resolveRepostType(ctx, params)
isRepostOfRepost := params.MetadataBoolOr("is_repost_of_repost", false)

// Mark existing repost rows as not current
_, err := params.DBTX.Exec(ctx,
"UPDATE reposts SET is_current = false WHERE user_id = $1 AND repost_item_id = $2 AND repost_type = $3::reposttype AND is_current = true",
params.UserID, params.EntityID, repostType)
if err != nil {
return err
}

// PK is (user_id, repost_item_id, repost_type, txhash); re-delivery of
// the same chain tx would otherwise 23505 on the PK. Same txhash means
// identical row content by construction, so DO NOTHING is the correct
// dedup.
_, err = params.DBTX.Exec(ctx, `
// Upsert the single current row in place (arbiter: reposts_current_uniq_idx).
// Replaces demote-then-insert: avoids unbounded is_current=false history and
// gives the aggregate triggers an O(1) is_delete transition to track.
_, err := params.DBTX.Exec(ctx, `
INSERT INTO reposts (
user_id, repost_item_id, repost_type, is_current, is_delete, is_repost_of_repost,
created_at, txhash, blocknumber
) VALUES ($1, $2, $3::reposttype, true, $4, $5, $6, $7, $8)
ON CONFLICT (user_id, repost_item_id, repost_type, txhash) DO NOTHING
ON CONFLICT (user_id, repost_item_id, repost_type) WHERE is_current = true
DO UPDATE SET
is_delete = EXCLUDED.is_delete,
is_repost_of_repost = EXCLUDED.is_repost_of_repost,
created_at = EXCLUDED.created_at,
txhash = EXCLUDED.txhash,
blocknumber = EXCLUDED.blocknumber
`, params.UserID, params.EntityID, repostType, isDelete, isRepostOfRepost, params.BlockTime, params.TxHash, params.BlockNumber)
return err
}
Expand Down
25 changes: 11 additions & 14 deletions pkg/etl/processors/entity_manager/social_save.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,24 +88,21 @@ func insertSave(ctx context.Context, params *Params, isDelete bool) error {
saveType := resolveSaveType(ctx, params)
isSaveOfRepost := params.MetadataBoolOr("is_save_of_repost", false)

// Mark existing save rows as not current
_, err := params.DBTX.Exec(ctx,
"UPDATE saves SET is_current = false WHERE user_id = $1 AND save_item_id = $2 AND save_type = $3::savetype AND is_current = true",
params.UserID, params.EntityID, saveType)
if err != nil {
return err
}

// PK is (user_id, save_item_id, save_type, txhash); re-delivery of the
// same chain tx (prefetcher anomaly) would otherwise 23505 on the PK.
// Same txhash means identical row content by construction, so DO NOTHING
// is the correct dedup.
_, err = params.DBTX.Exec(ctx, `
// Upsert the single current row in place (arbiter: saves_current_uniq_idx).
// Replaces demote-then-insert: avoids unbounded is_current=false history and
// gives the aggregate triggers an O(1) is_delete transition to track.
_, err := params.DBTX.Exec(ctx, `
INSERT INTO saves (
user_id, save_item_id, save_type, is_current, is_delete, is_save_of_repost,
created_at, txhash, blocknumber
) VALUES ($1, $2, $3::savetype, true, $4, $5, $6, $7, $8)
ON CONFLICT (user_id, save_item_id, save_type, txhash) DO NOTHING
ON CONFLICT (user_id, save_item_id, save_type) WHERE is_current = true
DO UPDATE SET
is_delete = EXCLUDED.is_delete,
is_save_of_repost = EXCLUDED.is_save_of_repost,
created_at = EXCLUDED.created_at,
txhash = EXCLUDED.txhash,
blocknumber = EXCLUDED.blocknumber
`, params.UserID, params.EntityID, saveType, isDelete, isSaveOfRepost, params.BlockTime, params.TxHash, params.BlockNumber)
return err
}
Expand Down
Loading