diff --git a/pkg/etl/db/sql/migrations/0030_social_upsert_indexes.down.sql b/pkg/etl/db/sql/migrations/0030_social_upsert_indexes.down.sql new file mode 100644 index 00000000..90c40651 --- /dev/null +++ b/pkg/etl/db/sql/migrations/0030_social_upsert_indexes.down.sql @@ -0,0 +1,4 @@ +DROP INDEX IF EXISTS reposts_current_uniq_idx; +DROP INDEX IF EXISTS saves_current_uniq_idx; +DROP INDEX IF EXISTS follows_current_uniq_idx; +DROP INDEX IF EXISTS subscriptions_current_uniq_idx; diff --git a/pkg/etl/db/sql/migrations/0030_social_upsert_indexes.up.sql b/pkg/etl/db/sql/migrations/0030_social_upsert_indexes.up.sql new file mode 100644 index 00000000..04c6a396 --- /dev/null +++ b/pkg/etl/db/sql/migrations/0030_social_upsert_indexes.up.sql @@ -0,0 +1,17 @@ +-- Upsert support for social writes (reposts/saves/follows/subscriptions). +-- The entity_manager now upserts the single is_current row in place instead of +-- demote-then-insert, so it needs a unique arbiter on the entity identity. +-- Partial on is_current=true: only one current row per identity is an existing +-- invariant, so historical demoted rows are ignored and no dedup/backfill runs. + +CREATE UNIQUE INDEX IF NOT EXISTS reposts_current_uniq_idx + ON reposts (user_id, repost_item_id, repost_type) WHERE is_current = true; + +CREATE UNIQUE INDEX IF NOT EXISTS saves_current_uniq_idx + ON saves (user_id, save_item_id, save_type) WHERE is_current = true; + +CREATE UNIQUE INDEX IF NOT EXISTS follows_current_uniq_idx + ON follows (follower_user_id, followee_user_id) WHERE is_current = true; + +CREATE UNIQUE INDEX IF NOT EXISTS subscriptions_current_uniq_idx + ON subscriptions (subscriber_id, user_id) WHERE is_current = true; diff --git a/pkg/etl/processors/entity_manager/social_follow.go b/pkg/etl/processors/entity_manager/social_follow.go index 8f95577c..131e0bba 100644 --- a/pkg/etl/processors/entity_manager/social_follow.go +++ b/pkg/etl/processors/entity_manager/social_follow.go @@ -77,41 +77,38 @@ func validateUnfollow(ctx context.Context, params *Params) error { // --- shared --- func insertFollow(ctx context.Context, params *Params, isDelete bool) error { - _, err := params.DBTX.Exec(ctx, - "UPDATE follows SET is_current = false WHERE follower_user_id = $1 AND followee_user_id = $2 AND is_current = true", - params.UserID, params.EntityID) - if err != nil { - return err - } - - // PK is (follower_user_id, followee_user_id, txhash); re-delivery of the - // same chain tx would otherwise 23505 on the PK. Same txhash means - // identical row content, so DO NOTHING is correct. - _, err = params.DBTX.Exec(ctx, ` + // Upsert the single current row in place (arbiter: follows_current_uniq_idx). + // Replaces demote-then-insert: avoids unbounded is_current=false history and + // gives the aggregate triggers an O(1) is_delete transition to track. + _, err := params.DBTX.Exec(ctx, ` INSERT INTO follows ( follower_user_id, followee_user_id, is_current, is_delete, created_at, txhash, blocknumber ) VALUES ($1, $2, true, $3, $4, $5, $6) - ON CONFLICT (follower_user_id, followee_user_id, txhash) DO NOTHING + ON CONFLICT (follower_user_id, followee_user_id) WHERE is_current = true + DO UPDATE SET + is_delete = EXCLUDED.is_delete, + created_at = EXCLUDED.created_at, + txhash = EXCLUDED.txhash, + blocknumber = EXCLUDED.blocknumber `, params.UserID, params.EntityID, isDelete, params.BlockTime, params.TxHash, params.BlockNumber) if err != nil { return err } // Follow/Unfollow also creates/deletes a Subscription record - _, err = params.DBTX.Exec(ctx, - "UPDATE subscriptions SET is_current = false WHERE subscriber_id = $1 AND user_id = $2 AND is_current = true", - params.UserID, params.EntityID) - if err != nil { - return err - } - // PK is (subscriber_id, user_id, txhash); same DO NOTHING rationale. + // (arbiter: subscriptions_current_uniq_idx). _, err = params.DBTX.Exec(ctx, ` INSERT INTO subscriptions ( subscriber_id, user_id, is_current, is_delete, created_at, txhash, blocknumber ) VALUES ($1, $2, true, $3, $4, $5, $6) - ON CONFLICT (subscriber_id, user_id, txhash) DO NOTHING + ON CONFLICT (subscriber_id, user_id) WHERE is_current = true + DO UPDATE SET + is_delete = EXCLUDED.is_delete, + created_at = EXCLUDED.created_at, + txhash = EXCLUDED.txhash, + blocknumber = EXCLUDED.blocknumber `, params.UserID, params.EntityID, isDelete, params.BlockTime, params.TxHash, params.BlockNumber) return err } diff --git a/pkg/etl/processors/entity_manager/social_repost.go b/pkg/etl/processors/entity_manager/social_repost.go index 68fc1d2f..75390222 100644 --- a/pkg/etl/processors/entity_manager/social_repost.go +++ b/pkg/etl/processors/entity_manager/social_repost.go @@ -82,24 +82,21 @@ func insertRepost(ctx context.Context, params *Params, isDelete bool) error { repostType := resolveRepostType(ctx, params) isRepostOfRepost := params.MetadataBoolOr("is_repost_of_repost", false) - // Mark existing repost rows as not current - _, err := params.DBTX.Exec(ctx, - "UPDATE reposts SET is_current = false WHERE user_id = $1 AND repost_item_id = $2 AND repost_type = $3::reposttype AND is_current = true", - params.UserID, params.EntityID, repostType) - if err != nil { - return err - } - - // PK is (user_id, repost_item_id, repost_type, txhash); re-delivery of - // the same chain tx would otherwise 23505 on the PK. Same txhash means - // identical row content by construction, so DO NOTHING is the correct - // dedup. - _, err = params.DBTX.Exec(ctx, ` + // Upsert the single current row in place (arbiter: reposts_current_uniq_idx). + // Replaces demote-then-insert: avoids unbounded is_current=false history and + // gives the aggregate triggers an O(1) is_delete transition to track. + _, err := params.DBTX.Exec(ctx, ` INSERT INTO reposts ( user_id, repost_item_id, repost_type, is_current, is_delete, is_repost_of_repost, created_at, txhash, blocknumber ) VALUES ($1, $2, $3::reposttype, true, $4, $5, $6, $7, $8) - ON CONFLICT (user_id, repost_item_id, repost_type, txhash) DO NOTHING + ON CONFLICT (user_id, repost_item_id, repost_type) WHERE is_current = true + DO UPDATE SET + is_delete = EXCLUDED.is_delete, + is_repost_of_repost = EXCLUDED.is_repost_of_repost, + created_at = EXCLUDED.created_at, + txhash = EXCLUDED.txhash, + blocknumber = EXCLUDED.blocknumber `, params.UserID, params.EntityID, repostType, isDelete, isRepostOfRepost, params.BlockTime, params.TxHash, params.BlockNumber) return err } diff --git a/pkg/etl/processors/entity_manager/social_save.go b/pkg/etl/processors/entity_manager/social_save.go index e79eaf3e..a6670205 100644 --- a/pkg/etl/processors/entity_manager/social_save.go +++ b/pkg/etl/processors/entity_manager/social_save.go @@ -88,24 +88,21 @@ func insertSave(ctx context.Context, params *Params, isDelete bool) error { saveType := resolveSaveType(ctx, params) isSaveOfRepost := params.MetadataBoolOr("is_save_of_repost", false) - // Mark existing save rows as not current - _, err := params.DBTX.Exec(ctx, - "UPDATE saves SET is_current = false WHERE user_id = $1 AND save_item_id = $2 AND save_type = $3::savetype AND is_current = true", - params.UserID, params.EntityID, saveType) - if err != nil { - return err - } - - // PK is (user_id, save_item_id, save_type, txhash); re-delivery of the - // same chain tx (prefetcher anomaly) would otherwise 23505 on the PK. - // Same txhash means identical row content by construction, so DO NOTHING - // is the correct dedup. - _, err = params.DBTX.Exec(ctx, ` + // Upsert the single current row in place (arbiter: saves_current_uniq_idx). + // Replaces demote-then-insert: avoids unbounded is_current=false history and + // gives the aggregate triggers an O(1) is_delete transition to track. + _, err := params.DBTX.Exec(ctx, ` INSERT INTO saves ( user_id, save_item_id, save_type, is_current, is_delete, is_save_of_repost, created_at, txhash, blocknumber ) VALUES ($1, $2, $3::savetype, true, $4, $5, $6, $7, $8) - ON CONFLICT (user_id, save_item_id, save_type, txhash) DO NOTHING + ON CONFLICT (user_id, save_item_id, save_type) WHERE is_current = true + DO UPDATE SET + is_delete = EXCLUDED.is_delete, + is_save_of_repost = EXCLUDED.is_save_of_repost, + created_at = EXCLUDED.created_at, + txhash = EXCLUDED.txhash, + blocknumber = EXCLUDED.blocknumber `, params.UserID, params.EntityID, saveType, isDelete, isSaveOfRepost, params.BlockTime, params.TxHash, params.BlockNumber) return err }