Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions pkg/rscache/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,9 @@ func (o *fileCache) retryingGet(ctx context.Context, dir, address string, get fu
// Preemptive get attempt
if flushingGet() {
if flushed == 0 {
slog.Log(ctx, LevelTrace, fmt.Sprintf("Found cached item at address '%s' immediately", address))
slog.Log(ctx, LevelTrace, "Found cached item immediately", "address", address)
} else {
slog.Log(ctx, LevelTrace, fmt.Sprintf("Found cached item at address '%s' after one flush", address))
slog.Log(ctx, LevelTrace, "Found cached item after one flush", "address", address)
}
return true
}
Expand Down Expand Up @@ -184,7 +184,7 @@ func (o *fileCache) Head(ctx context.Context, resolver ResolverSpec) (size int64
err = o.queue.AddressedPush(ctx, resolver.Priority, resolver.GroupId, resolver.Address(), resolver.Work)
if o.duplicateMatcher.IsDuplicate(err) {
// Do nothing since; someone else has already inserted the work we need.
slog.Debug(fmt.Sprintf("FileCache: duplicate address push for '%s'", resolver.Address()))
slog.Debug("FileCache: duplicate address push", "address", resolver.Address())
} else if err != nil {
return
}
Expand All @@ -205,7 +205,7 @@ func (o *fileCache) Head(ctx context.Context, resolver ResolverSpec) (size int64
if o.retryingGet(ctx, resolver.Dir(), resolver.Address(), head) {
return
} else {
slog.Debug(fmt.Sprintf("error: FileCache reported address '%s' complete, but item was not found in cache", resolver.Address()))
slog.Debug("error: FileCache reported address complete, but item was not found in cache", "address", resolver.Address())
err = fmt.Errorf("error: FileCache reported address '%s' complete, but item was not found in cache", resolver.Address())
return
}
Expand Down Expand Up @@ -274,7 +274,7 @@ func (o *fileCache) Get(ctx context.Context, resolver ResolverSpec) (value *Cach
err = o.queue.AddressedPush(ctx, resolver.Priority, resolver.GroupId, address, resolver.Work)
if o.duplicateMatcher.IsDuplicate(err) {
// Do nothing since; someone else has already inserted the work we need.
slog.Debug(fmt.Sprintf("FileCache: duplicate address push for '%s'", address))
slog.Debug("FileCache: duplicate address push", "address", address)
} else if err != nil {
value = &CacheReturn{
Err: err,
Expand Down
4 changes: 2 additions & 2 deletions pkg/rscache/internal/integration_test/memory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"flag"
"fmt"
"io"
"log"
"log/slog"
"os"
"path/filepath"
"testing"
Expand Down Expand Up @@ -217,7 +217,7 @@ func (s *MemoryCacheIntegrationSuite) TestInMemoryCaching(c *check.C) {

// Create an in-memory cache that is just large enough to hold 64 entries
maxCost := entrySize * 64
log.Printf("Creating cache with MaxCost=%d", maxCost)
slog.Info("Creating cache", "maxCost", maxCost)
rc, err := ristretto.NewCache(&ristretto.Config{
NumCounters: 1000,
MaxCost: maxCost,
Expand Down
5 changes: 2 additions & 3 deletions pkg/rscache/memory_backed_file_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"context"
"encoding/gob"
"errors"
"fmt"
"io"
"log/slog"
"time"
Expand Down Expand Up @@ -92,7 +91,7 @@ func (mbfc *MemoryBackedFileCache) Get(ctx context.Context, resolver ResolverSpe
if resolver.CacheInMemory && mbfc.mc != nil && mbfc.mc.Enabled() && ptr.Err == nil && ptr.GetSize() < mbfc.maxMemoryPerObject {
err = mbfc.mc.Put(address, ptr)
if err != nil {
slog.Debug(fmt.Sprintf("error caching to memory: %s", err.Error()))
slog.Debug("error caching to memory", "error", err)
}
}
return *ptr
Expand Down Expand Up @@ -131,7 +130,7 @@ func (mbfc *MemoryBackedFileCache) GetObject(ctx context.Context, resolver Resol
if resolver.CacheInMemory && mbfc.mc != nil && mbfc.mc.Enabled() && ptr.GetSize() < mbfc.maxMemoryPerObject {
err = mbfc.mc.Put(address, ptr)
if err != nil {
slog.Debug(fmt.Sprintf("error caching to memory: %s", err.Error()))
slog.Debug("error caching to memory", "error", err)
}
}

Expand Down
12 changes: 5 additions & 7 deletions pkg/rselection/impls/pgx/follower.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ package pgxelection
import (
"context"
"encoding/json"
"fmt"
"log"
"log/slog"
"time"

Expand Down Expand Up @@ -105,7 +103,7 @@ func (p *PgxFollower) Follow(ctx context.Context) (result FollowResult) {
case <-timeout.C:
// Follower has received no pings for the timeout duration. It is time to
// ask for a new leader.
slog.Debug(fmt.Sprintf("Follower '%s' ping receipt timeout. Requesting a new leader", p.address))
slog.Debug("Follower ping receipt timeout. Requesting a new leader", "address", p.address)
go p.requestLeader(ctx)
}
return
Expand All @@ -122,13 +120,13 @@ func (p *PgxFollower) handleNotify(ctx context.Context, cn *electiontypes.Cluste
resp := electiontypes.NewClusterPingResponse(p.address, cn.SrcAddr, p.awb.IP())
b, err := json.Marshal(resp)
if err != nil {
log.Printf("Error marshaling notification to JSON: %s", err)
slog.Error("Error marshaling notification to JSON", "error", err)
return
}
slog.Log(ctx, LevelTrace, fmt.Sprintf("Follower %s responding to ping from leader %s", p.address, cn.SrcAddr))
slog.Log(ctx, LevelTrace, "Follower responding to ping from leader", "follower", p.address, "leader", cn.SrcAddr)
err = p.notify.Notify(ctx, p.chLeader, b)
if err != nil {
log.Printf("Follower error responding to leader ping: %s", err)
slog.Error("Follower error responding to leader ping", "error", err)
}
}

Expand All @@ -138,7 +136,7 @@ func (p *PgxFollower) requestLeader(ctx context.Context) {
now := time.Now()
// Limit how often this message logs to avoid too much spam
if p.lastRequestLeaderErr.IsZero() || p.lastRequestLeaderErr.Add(5*time.Minute).Before(now) {
log.Printf("Error pushing leader assumption work to queue: %s", err)
slog.Error("Error pushing leader assumption work to queue", "error", err)
p.lastRequestLeaderErr = now
}
}
Expand Down
23 changes: 13 additions & 10 deletions pkg/rselection/impls/pgx/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"context"
"encoding/json"
"fmt"
"log"
"log/slog"
"sort"
"strings"
Expand Down Expand Up @@ -218,7 +217,11 @@ func (p *PgxLeader) verify(vCh chan bool) {
// Upon exit, notify channel with `true` or `false` depending upon error status
defer func(err *error) {
if *err != nil {
log.Printf("Error verifying cluster integrity: %s", *err)
// TODO this should be worded better because sometimes this error is normal and expected,
// e.g. when restarting a cluster node and the node list is temporarily out of sync:
// Error verifying cluster integrity: node list length differs. Store node count 3 does not match leader count 2
// It would be great to be able to distinguish between expected/unexpected errors here and log accordingly.
slog.Error("Error verifying cluster integrity", "error", *err)
vCh <- false
} else {
vCh <- true
Expand Down Expand Up @@ -262,7 +265,7 @@ func (p *PgxLeader) pingNodes(ctx context.Context) {
}
b, err := json.Marshal(req)
if err != nil {
log.Printf("Error marshaling notification to JSON: %s", err)
slog.Error("Error marshaling notification to JSON", "error", err)
return
}

Expand All @@ -274,7 +277,7 @@ func (p *PgxLeader) pingNodes(ctx context.Context) {
err = p.notify.Notify(ctx, p.chFollower, b)
if err != nil {
p.pingSuccess = false
log.Printf("Leader error pinging followers: %s", err)
slog.Error("Leader error pinging followers", "error", err)
return
}

Expand All @@ -284,7 +287,7 @@ func (p *PgxLeader) pingNodes(ctx context.Context) {
err = p.notify.Notify(ctx, p.chLeader, b)
if err != nil {
p.pingSuccess = false
log.Printf("Leader error pinging leaders: %s", err)
slog.Error("Leader error pinging leaders", "error", err)
return
}
}
Expand All @@ -308,22 +311,22 @@ func (p *PgxLeader) handleNodesRequest(ctx context.Context, cn *electiontypes.Cl
resp := electiontypes.NewClusterNodesNotification(nodes, cn.Guid())
b, err := json.Marshal(resp)
if err != nil {
log.Printf("Error marshaling notification to JSON: %s", err)
slog.Error("Error marshaling notification to JSON", "error", err)
return
}

// Broadcast the response on the generic channel
err = p.notify.Notify(ctx, p.chMessages, b)
if err != nil {
log.Printf("Leader error notifying of cluster nodes: %s", err)
slog.Error("Leader error notifying of cluster nodes", "error", err)
return
}
}

func (p *PgxLeader) handleLeaderPing(cn *electiontypes.ClusterPingRequest) {
// If we received a ping from another leader, then stop leading
if cn.SrcAddr != p.address {
slog.Debug(fmt.Sprintf("Leader received ping from another leader. Stopping and moving back to the follower loop."))
slog.Debug("Leader received ping from another leader. Stopping and moving back to the follower loop.")
p.stop <- true
} else {
resp := electiontypes.NewClusterPingResponse(p.address, cn.SrcAddr, p.awb.IP())
Expand Down Expand Up @@ -367,7 +370,7 @@ func (p *PgxLeader) sweepNodes() {
defer p.mutex.Unlock()

if p.unsuccessfulPing() {
slog.Debug(fmt.Sprintf("Skipping cluster sweep due to unsuccessful pings"))
slog.Debug("Skipping cluster sweep due to unsuccessful pings")
return
}

Expand All @@ -378,7 +381,7 @@ func (p *PgxLeader) sweepNodes() {
}

if node.Ping.Before(time.Now().Add(-p.maxPingAge)) {
slog.Debug(fmt.Sprintf("Leader sweep removing cluster node %s", key))
slog.Debug("Leader sweep removing cluster node", "node", key)
delete(p.nodes, key)
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/rsnotify/broadcaster/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package broadcaster
// Copyright (C) 2022 by RStudio, PBC.

import (
"log"
"log/slog"

"github.com/rstudio/platform-lib/v3/pkg/rsnotify/listener"
)
Expand Down Expand Up @@ -96,7 +96,7 @@ func (b *NotificationBroadcaster) broadcast() {
}
case err, more := <-b.errs:
if more {
log.Printf("Received error on queue addressed work notification channel: %s", err)
slog.Error("Received error on queue addressed work notification channel", "error", err)
}
case sink := <-b.subscribe:
sinks = append(sinks, sink)
Expand Down
38 changes: 18 additions & 20 deletions pkg/rsnotify/listeners/local/locallistener.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ package local

import (
"errors"
"fmt"
"log"
"log/slog"
"sync"
"time"
Expand Down Expand Up @@ -82,10 +80,10 @@ func (l *Listener) Listen() (chan listener.Notification, chan error, error) {

func (l *Listener) listen(msgs chan listener.Notification, errs chan error) {
defer func() {
slog.Debug(fmt.Sprintf("Stopping listener..."))
slog.Debug("Stopping listener...")
l.items = nil
close(l.stop)
slog.Debug(fmt.Sprintf("Stopped."))
slog.Debug("Stopped.")
}()

l.wait(msgs, errs, l.stop)
Expand All @@ -97,27 +95,27 @@ func (l *Listener) wait(msgs chan listener.Notification, errs chan error, stop c
// channel is signaled or closed.
//
// This path is currently run only in the unit tests, so I included some
// `log.Printf` usages are for the benefit of verbose testing output.
// `slog.Debug` usages are for the benefit of verbose testing output.
if l.deferredStart != nil {
// First, wait for the test to notify that it's time to start listening. This
// gives us a chance to set up a deadlock condition in the test.
log.Printf("Waiting for test notification to start")
slog.Debug("Waiting for test notification to start")
<-l.deferredStart
// Next, simulate an unexpected stop by waiting for a stop signal after
// which we return without ever receiving from the `l.items` channel.
log.Printf("Proceeding with wait by waiting for stop signal")
slog.Debug("Proceeding with wait by waiting for stop signal")
<-stop
log.Printf("Stopped. Returning without receiving from l.items.")
slog.Debug("Stopped. Returning without receiving from l.items.")
return
}
for {
select {
case <-stop:
slog.Debug(fmt.Sprintf("Stopping wait"))
slog.Debug("Stopping wait")
return
case i := <-l.items:
if msg, ok := i.(listener.Notification); ok {
slog.Debug(fmt.Sprintf("Received message: %s. Sending to buffered channel with current size %d", msg.Guid(), len(msgs)))
slog.Debug("Received message. Sending to buffered channel", "guid", msg.Guid(), "channelSize", len(msgs))
msgs <- msg
} else {
errs <- ErrNotNotificationType
Expand All @@ -129,13 +127,13 @@ func (l *Listener) wait(msgs chan listener.Notification, errs chan error, stop c
func (l *Listener) Stop() {
if l.stop != nil {
// Signal to stop
slog.Debug(fmt.Sprintf("Signaling stop channel to stop..."))
slog.Debug("Signaling stop channel to stop...")
l.stop <- true

// Wait for stop
slog.Debug(fmt.Sprintf("Waiting for stop channel to close..."))
slog.Debug("Waiting for stop channel to close...")
<-l.stop
slog.Debug(fmt.Sprintf("Stop channel for %s closed.", l.guid))
slog.Debug("Stop channel closed.", "guid", l.guid)
l.stop = nil

// Remove from provider
Expand Down Expand Up @@ -169,7 +167,7 @@ func (l *ListenerProvider) notify(channel string, n interface{}, prevMissedItems
notifyTxt = "renotify"
}

slog.Debug(fmt.Sprintf("Notify called with type=%s on %d listeners", notifyTxt, len(l.listeners)))
slog.Debug("Notify called", "type", notifyTxt, "listenerCount", len(l.listeners))
for _, ll := range l.listeners {
var needNotify bool
if prevMissedItems == nil {
Expand All @@ -184,7 +182,7 @@ func (l *ListenerProvider) notify(channel string, n interface{}, prevMissedItems
// There's a chance that `ll.items` could be non-nil, but not receiving. Timeout
// to prevent deadlock, but keep trying until we're sure that the listener is
// closed.
slog.Debug(fmt.Sprintf("Ready to %s internal items with guid %s for channel %s %s: %+v", notifyTxt, notifyGuid, ll.guid, channel, n))
slog.Debug("Ready to notify internal items", "type", notifyTxt, "notifyGuid", notifyGuid, "listenerGuid", ll.guid, "channel", channel, "notification", n)
func() {
// It's important to create a ticker and stop it so it doesn't leak. This has the potential to be called
// thousands of times in a relatively short period on a busy server, so timer leaks can result in
Expand All @@ -193,9 +191,9 @@ func (l *ListenerProvider) notify(channel string, n interface{}, prevMissedItems
defer timeout.Stop()
select {
case ll.items <- n:
slog.Debug(fmt.Sprintf("Done with %s in local listener with guid %s for channel %s: %+v", notifyTxt, notifyGuid, channel, n))
slog.Debug("Done with notify in local listener", "type", notifyTxt, "notifyGuid", notifyGuid, "channel", channel, "notification", n)
case <-timeout.C:
slog.Debug(fmt.Sprintf("Timeout during %s for listener %s/%s with guid %s for channel %s: %+v", notifyTxt, ll.guid, ll.name, notifyGuid, channel, n))
slog.Debug("Timeout during notify", "type", notifyTxt, "listenerGuid", ll.guid, "listenerName", ll.name, "notifyGuid", notifyGuid, "channel", channel, "notification", n)
// Record the timed-out notification in the `missed` map so we can retry it
missed[ll.guid] = notifyGuid
}
Expand All @@ -212,7 +210,7 @@ func (l *ListenerProvider) notify(channel string, n interface{}, prevMissedItems
// provider after the mutex is again locked by the recursive call to `notify` will be
// attempted again as needed.
if len(missed) > 0 {
slog.Debug(fmt.Sprintf("calling l.notify for %+v with guid %s for %d missed items on channel %s", n, notifyGuid, len(missed), channel))
slog.Debug("calling l.notify for missed items", "notification", n, "notifyGuid", notifyGuid, "missedCount", len(missed), "channel", channel)
stopCh := make(chan struct{})
defer close(stopCh)
// If logging is enabled, periodically record notifications that are still waiting
Expand All @@ -222,13 +220,13 @@ func (l *ListenerProvider) notify(channel string, n interface{}, prevMissedItems
for {
select {
case <-tick.C:
slog.Debug(fmt.Sprintf("still waiting on l.notify for +%v with guid %s on channel %s", n, notifyGuid, channel))
slog.Debug("still waiting on l.notify", "notification", n, "notifyGuid", notifyGuid, "channel", channel)
case <-stop:
return
}
}
}(stopCh)
l.notify(channel, n, missed)
slog.Debug(fmt.Sprintf("completed calling l.notify for %d missed items with guid %s on channel %s", len(missed), notifyGuid, channel))
slog.Debug("completed calling l.notify for missed items", "missedCount", len(missed), "notifyGuid", notifyGuid, "channel", channel)
}
}
Loading
Loading