Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 1 addition & 12 deletions pkg/connector/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,19 +278,8 @@ func (lc *LineClient) Connect(ctx context.Context) {
}

// Fetch initial blocked contacts list before starting sync loops.
blockedMIDs, err := func() ([]string, error) {
client := line.NewClient(lc.AccessToken)
return client.GetBlockedContactIds()
}()
if err != nil {
if err := lc.refreshBlockedContacts(ctx); err != nil {
lc.UserLogin.Bridge.Log.Warn().Err(err).Msg("Failed to fetch blocked contacts, continuing without block list")
} else {
lc.cacheMu.Lock()
for _, mid := range blockedMIDs {
lc.blockedUsers[mid] = true
}
lc.cacheMu.Unlock()
lc.UserLogin.Bridge.Log.Info().Int("count", len(blockedMIDs)).Msg("Fetched blocked contacts")
}

lc.wg.Add(4)
Expand Down
125 changes: 92 additions & 33 deletions pkg/connector/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"io"
"strconv"
"strings"
"sync"
"time"

"go.mau.fi/util/ptr"
Expand All @@ -23,6 +24,34 @@ import (
"github.com/highesttt/matrix-line-messenger/pkg/line"
)

const prefetchMessagesConcurrency = 4

func (lc *LineClient) refreshBlockedContacts(ctx context.Context) error {
client := line.NewClient(lc.AccessToken)
blockedMIDs, err := client.GetBlockedContactIds()
if err != nil && (lc.isRefreshRequired(err) || lc.isLoggedOut(err)) {
if errRecover := lc.recoverToken(ctx); errRecover == nil {
client = line.NewClient(lc.AccessToken)
blockedMIDs, err = client.GetBlockedContactIds()
}
}
if err != nil {
return err
}

blockedUsers := make(map[string]bool, len(blockedMIDs))
for _, mid := range blockedMIDs {
blockedUsers[mid] = true
}

lc.cacheMu.Lock()
lc.blockedUsers = blockedUsers
lc.cacheMu.Unlock()

lc.UserLogin.Bridge.Log.Info().Int("count", len(blockedMIDs)).Msg("Refreshed blocked contacts")
return nil
}

func (lc *LineClient) syncDMChats(ctx context.Context) {
defer lc.wg.Done()

Expand Down Expand Up @@ -59,18 +88,26 @@ func (lc *LineClient) syncDMChats(ctx context.Context) {
continue
}

lc.queueDMChatResync(ctx, mid, false)
lc.queueDMChatResync(ctx, mid, false, false)
}
}

// queueDMChatResync emits a ChatResync event with full DM ChatInfo.
// If createPortal is true, the framework will create the portal when it
// doesn't already exist (e.g. after the DM was deleted on block).
func (lc *LineClient) queueDMChatResync(ctx context.Context, mid string, createPortal bool) {
// If forceBackfill is true, an existing portal will run a forward backfill, and
// a newly-created portal will backfill immediately after Matrix room creation.
func (lc *LineClient) queueDMChatResync(ctx context.Context, mid string, createPortal, forceBackfill bool) {
contact := lc.getContact(ctx, mid)
dmType := database.RoomTypeDM
chatName := contact.EffectiveDisplayName()
portalKey := networkid.PortalKey{ID: makePortalID(mid), Receiver: lc.UserLogin.ID}
var checkNeedsBackfill func(ctx context.Context, latestMessage *database.Message) (bool, error)
if forceBackfill {
checkNeedsBackfill = func(ctx context.Context, latestMessage *database.Message) (bool, error) {
return true, nil
}
}
lc.UserLogin.Bridge.QueueRemoteEvent(lc.UserLogin, &simplevent.ChatResync{
EventMeta: simplevent.EventMeta{
Type: bridgev2.RemoteEventChatResync,
Expand All @@ -79,9 +116,10 @@ func (lc *LineClient) queueDMChatResync(ctx context.Context, mid string, createP
CreatePortal: createPortal,
},
ChatInfo: &bridgev2.ChatInfo{
Type: &dmType,
Name: &chatName,
Avatar: lc.avatarFromPicturePath(contact.PicturePath),
Type: &dmType,
Name: &chatName,
Avatar: lc.avatarFromPicturePath(contact.PicturePath),
CanBackfill: forceBackfill,
Members: &bridgev2.ChatMemberList{
IsFull: true,
ExcludeChangesFromTimeline: true,
Expand All @@ -105,32 +143,13 @@ func (lc *LineClient) queueDMChatResync(ctx context.Context, mid string, createP
},
ExcludeChangesFromTimeline: true,
},
})
}

// queueDMBackfill asks the framework to backfill a DM portal's recent history.
// It must run after the portal already exists (e.g. right after queueDMChatResync
// recreated it on unblock), because the framework skips the backfill check on the
// resync that creates a portal. CheckNeedsBackfillFunc forces a forward backfill,
// which goes through FetchMessages and is batch-sent silently — no per-message
// notifications.
func (lc *LineClient) queueDMBackfill(mid string) {
portalKey := networkid.PortalKey{ID: makePortalID(mid), Receiver: lc.UserLogin.ID}
lc.UserLogin.Bridge.QueueRemoteEvent(lc.UserLogin, &simplevent.ChatResync{
EventMeta: simplevent.EventMeta{
Type: bridgev2.RemoteEventChatResync,
PortalKey: portalKey,
Timestamp: time.Now(),
},
CheckNeedsBackfillFunc: func(ctx context.Context, latestMessage *database.Message) (bool, error) {
return true, nil
},
CheckNeedsBackfillFunc: checkNeedsBackfill,
})
}

// FetchMessages implements bridgev2.BackfillingNetworkAPI. It powers silent,
// batch-sent history backfill. It is currently triggered when a DM portal is
// recreated after the contact is unblocked (see queueDMBackfill), repopulating
// recreated after the contact is unblocked (see queueDMChatResync), repopulating
// the restored chat's recent history without notifying for every old message.
// Only the newest params.Count messages are returned; there is no older-history
// pagination, so backward fetches return an empty, final batch.
Expand Down Expand Up @@ -232,9 +251,48 @@ func (lc *LineClient) prefetchMessages(ctx context.Context) {
return
}

workerCount := prefetchMessagesConcurrency
if len(res.MessageBoxes) < workerCount {
workerCount = len(res.MessageBoxes)
}
if workerCount == 0 {
return
}

lc.UserLogin.Bridge.Log.Info().
Int("chat_count", len(res.MessageBoxes)).
Int("concurrency", workerCount).
Msg("Prefetching recent messages")

jobs := make(chan string)
var workers sync.WaitGroup
workers.Add(workerCount)
for i := 0; i < workerCount; i++ {
go func() {
defer workers.Done()
for chatMID := range jobs {
if ctx.Err() != nil {
return
}
lc.backfillRecentMessages(ctx, chatMID, 50)
}
}()
}

for _, box := range res.MessageBoxes {
lc.backfillRecentMessages(ctx, box.ID, 50)
if lc.isUserBlocked(box.ID) {
continue
}
select {
case <-ctx.Done():
close(jobs)
workers.Wait()
return
case jobs <- box.ID:
}
}
close(jobs)
workers.Wait()
}

// backfillRecentMessages fetches up to limit recent messages for a single
Expand Down Expand Up @@ -703,6 +761,9 @@ func (lc *LineClient) pollLoop(ctx context.Context) {

}
}
if err := lc.refreshBlockedContacts(ctx); err != nil {
lc.UserLogin.Bridge.Log.Warn().Err(err).Msg("Failed to refresh blocked contacts during fullSync")
}
lc.wg.Add(3)
go lc.syncChats(ctx)
go lc.syncDMChats(ctx)
Expand Down Expand Up @@ -809,18 +870,16 @@ func (lc *LineClient) handleOperation(ctx context.Context, op line.Operation) {
delete(lc.blockedUsers, mid)
lc.cacheMu.Unlock()
lc.UserLogin.Bridge.Log.Info().Str("mid", mid).Msg("Contact unblocked")
// Reattach the DM portal: emit a ChatResync with CreatePortal so the
// framework recreates the portal that was deleted on block, then ask it
// to backfill recent history. The backfill is batch-sent silently (see
// FetchMessages), so the restored chat repopulates without firing a
// Reattach the DM portal and request an immediate backfill in the same
// resync. The backfill is batch-sent silently (see FetchMessages), so the
// restored chat repopulates without firing a
// notification for every old message — a blocked contact can't have sent
// anything new, so notifying on unblock is never useful.
lowerMid := strings.ToLower(mid)
if strings.HasPrefix(lowerMid, "c") || strings.HasPrefix(lowerMid, "r") {
return
}
lc.queueDMChatResync(ctx, mid, true)
lc.queueDMBackfill(mid)
lc.queueDMChatResync(ctx, mid, true, true)

case OpContactUpdate:
mid := op.Param1
Expand Down
Loading