From 11fc156b7f1e18f4d6229ba50721acb037757b78 Mon Sep 17 00:00:00 2001 From: highesttt Date: Wed, 10 Jun 2026 23:47:13 -0500 Subject: [PATCH] chore: improved backfill speed, notably when unblocking a user --- pkg/connector/client.go | 13 +---- pkg/connector/sync.go | 125 +++++++++++++++++++++++++++++----------- 2 files changed, 93 insertions(+), 45 deletions(-) diff --git a/pkg/connector/client.go b/pkg/connector/client.go index a72d873..47df04e 100644 --- a/pkg/connector/client.go +++ b/pkg/connector/client.go @@ -278,19 +278,8 @@ func (lc *LineClient) Connect(ctx context.Context) { } // Fetch initial blocked contacts list before starting sync loops. - blockedMIDs, err := func() ([]string, error) { - client := line.NewClient(lc.AccessToken) - return client.GetBlockedContactIds() - }() - if err != nil { + if err := lc.refreshBlockedContacts(ctx); err != nil { lc.UserLogin.Bridge.Log.Warn().Err(err).Msg("Failed to fetch blocked contacts, continuing without block list") - } else { - lc.cacheMu.Lock() - for _, mid := range blockedMIDs { - lc.blockedUsers[mid] = true - } - lc.cacheMu.Unlock() - lc.UserLogin.Bridge.Log.Info().Int("count", len(blockedMIDs)).Msg("Fetched blocked contacts") } lc.wg.Add(4) diff --git a/pkg/connector/sync.go b/pkg/connector/sync.go index 6c782d6..03d730b 100644 --- a/pkg/connector/sync.go +++ b/pkg/connector/sync.go @@ -8,6 +8,7 @@ import ( "io" "strconv" "strings" + "sync" "time" "go.mau.fi/util/ptr" @@ -23,6 +24,34 @@ import ( "github.com/highesttt/matrix-line-messenger/pkg/line" ) +const prefetchMessagesConcurrency = 4 + +func (lc *LineClient) refreshBlockedContacts(ctx context.Context) error { + client := line.NewClient(lc.AccessToken) + blockedMIDs, err := client.GetBlockedContactIds() + if err != nil && (lc.isRefreshRequired(err) || lc.isLoggedOut(err)) { + if errRecover := lc.recoverToken(ctx); errRecover == nil { + client = line.NewClient(lc.AccessToken) + blockedMIDs, err = client.GetBlockedContactIds() + } + } + if err != nil { + return err + } + + blockedUsers := make(map[string]bool, len(blockedMIDs)) + for _, mid := range blockedMIDs { + blockedUsers[mid] = true + } + + lc.cacheMu.Lock() + lc.blockedUsers = blockedUsers + lc.cacheMu.Unlock() + + lc.UserLogin.Bridge.Log.Info().Int("count", len(blockedMIDs)).Msg("Refreshed blocked contacts") + return nil +} + func (lc *LineClient) syncDMChats(ctx context.Context) { defer lc.wg.Done() @@ -59,18 +88,26 @@ func (lc *LineClient) syncDMChats(ctx context.Context) { continue } - lc.queueDMChatResync(ctx, mid, false) + lc.queueDMChatResync(ctx, mid, false, false) } } // queueDMChatResync emits a ChatResync event with full DM ChatInfo. // If createPortal is true, the framework will create the portal when it // doesn't already exist (e.g. after the DM was deleted on block). -func (lc *LineClient) queueDMChatResync(ctx context.Context, mid string, createPortal bool) { +// If forceBackfill is true, an existing portal will run a forward backfill, and +// a newly-created portal will backfill immediately after Matrix room creation. +func (lc *LineClient) queueDMChatResync(ctx context.Context, mid string, createPortal, forceBackfill bool) { contact := lc.getContact(ctx, mid) dmType := database.RoomTypeDM chatName := contact.EffectiveDisplayName() portalKey := networkid.PortalKey{ID: makePortalID(mid), Receiver: lc.UserLogin.ID} + var checkNeedsBackfill func(ctx context.Context, latestMessage *database.Message) (bool, error) + if forceBackfill { + checkNeedsBackfill = func(ctx context.Context, latestMessage *database.Message) (bool, error) { + return true, nil + } + } lc.UserLogin.Bridge.QueueRemoteEvent(lc.UserLogin, &simplevent.ChatResync{ EventMeta: simplevent.EventMeta{ Type: bridgev2.RemoteEventChatResync, @@ -79,9 +116,10 @@ func (lc *LineClient) queueDMChatResync(ctx context.Context, mid string, createP CreatePortal: createPortal, }, ChatInfo: &bridgev2.ChatInfo{ - Type: &dmType, - Name: &chatName, - Avatar: lc.avatarFromPicturePath(contact.PicturePath), + Type: &dmType, + Name: &chatName, + Avatar: lc.avatarFromPicturePath(contact.PicturePath), + CanBackfill: forceBackfill, Members: &bridgev2.ChatMemberList{ IsFull: true, ExcludeChangesFromTimeline: true, @@ -105,32 +143,13 @@ func (lc *LineClient) queueDMChatResync(ctx context.Context, mid string, createP }, ExcludeChangesFromTimeline: true, }, - }) -} - -// queueDMBackfill asks the framework to backfill a DM portal's recent history. -// It must run after the portal already exists (e.g. right after queueDMChatResync -// recreated it on unblock), because the framework skips the backfill check on the -// resync that creates a portal. CheckNeedsBackfillFunc forces a forward backfill, -// which goes through FetchMessages and is batch-sent silently — no per-message -// notifications. -func (lc *LineClient) queueDMBackfill(mid string) { - portalKey := networkid.PortalKey{ID: makePortalID(mid), Receiver: lc.UserLogin.ID} - lc.UserLogin.Bridge.QueueRemoteEvent(lc.UserLogin, &simplevent.ChatResync{ - EventMeta: simplevent.EventMeta{ - Type: bridgev2.RemoteEventChatResync, - PortalKey: portalKey, - Timestamp: time.Now(), - }, - CheckNeedsBackfillFunc: func(ctx context.Context, latestMessage *database.Message) (bool, error) { - return true, nil - }, + CheckNeedsBackfillFunc: checkNeedsBackfill, }) } // FetchMessages implements bridgev2.BackfillingNetworkAPI. It powers silent, // batch-sent history backfill. It is currently triggered when a DM portal is -// recreated after the contact is unblocked (see queueDMBackfill), repopulating +// recreated after the contact is unblocked (see queueDMChatResync), repopulating // the restored chat's recent history without notifying for every old message. // Only the newest params.Count messages are returned; there is no older-history // pagination, so backward fetches return an empty, final batch. @@ -232,9 +251,48 @@ func (lc *LineClient) prefetchMessages(ctx context.Context) { return } + workerCount := prefetchMessagesConcurrency + if len(res.MessageBoxes) < workerCount { + workerCount = len(res.MessageBoxes) + } + if workerCount == 0 { + return + } + + lc.UserLogin.Bridge.Log.Info(). + Int("chat_count", len(res.MessageBoxes)). + Int("concurrency", workerCount). + Msg("Prefetching recent messages") + + jobs := make(chan string) + var workers sync.WaitGroup + workers.Add(workerCount) + for i := 0; i < workerCount; i++ { + go func() { + defer workers.Done() + for chatMID := range jobs { + if ctx.Err() != nil { + return + } + lc.backfillRecentMessages(ctx, chatMID, 50) + } + }() + } + for _, box := range res.MessageBoxes { - lc.backfillRecentMessages(ctx, box.ID, 50) + if lc.isUserBlocked(box.ID) { + continue + } + select { + case <-ctx.Done(): + close(jobs) + workers.Wait() + return + case jobs <- box.ID: + } } + close(jobs) + workers.Wait() } // backfillRecentMessages fetches up to limit recent messages for a single @@ -703,6 +761,9 @@ func (lc *LineClient) pollLoop(ctx context.Context) { } } + if err := lc.refreshBlockedContacts(ctx); err != nil { + lc.UserLogin.Bridge.Log.Warn().Err(err).Msg("Failed to refresh blocked contacts during fullSync") + } lc.wg.Add(3) go lc.syncChats(ctx) go lc.syncDMChats(ctx) @@ -809,18 +870,16 @@ func (lc *LineClient) handleOperation(ctx context.Context, op line.Operation) { delete(lc.blockedUsers, mid) lc.cacheMu.Unlock() lc.UserLogin.Bridge.Log.Info().Str("mid", mid).Msg("Contact unblocked") - // Reattach the DM portal: emit a ChatResync with CreatePortal so the - // framework recreates the portal that was deleted on block, then ask it - // to backfill recent history. The backfill is batch-sent silently (see - // FetchMessages), so the restored chat repopulates without firing a + // Reattach the DM portal and request an immediate backfill in the same + // resync. The backfill is batch-sent silently (see FetchMessages), so the + // restored chat repopulates without firing a // notification for every old message — a blocked contact can't have sent // anything new, so notifying on unblock is never useful. lowerMid := strings.ToLower(mid) if strings.HasPrefix(lowerMid, "c") || strings.HasPrefix(lowerMid, "r") { return } - lc.queueDMChatResync(ctx, mid, true) - lc.queueDMBackfill(mid) + lc.queueDMChatResync(ctx, mid, true, true) case OpContactUpdate: mid := op.Param1