From f9f75c96f728c369a3fd1f04520bbb2de4fb26d7 Mon Sep 17 00:00:00 2001 From: Greg Lin Date: Thu, 5 Feb 2026 18:19:39 -0600 Subject: [PATCH 1/5] Migrate rsnotify to slog --- pkg/rsnotify/broadcaster/broadcaster.go | 5 ++-- pkg/rsnotify/listeners/local/locallistener.go | 9 +++--- .../listeners/local/locallistener_test.go | 28 +++++++++---------- .../listeners/postgrespgx/pgxlistener.go | 9 +++--- .../listeners/postgrespgx/pgxlistener_test.go | 15 +++++----- .../listeners/postgrespq/pqlistener.go | 3 +- .../listeners/postgrespq/pqlistener_test.go | 15 +++++----- 7 files changed, 42 insertions(+), 42 deletions(-) diff --git a/pkg/rsnotify/broadcaster/broadcaster.go b/pkg/rsnotify/broadcaster/broadcaster.go index 1982604..f7c21e2 100644 --- a/pkg/rsnotify/broadcaster/broadcaster.go +++ b/pkg/rsnotify/broadcaster/broadcaster.go @@ -3,7 +3,8 @@ package broadcaster // Copyright (C) 2022 by RStudio, PBC. import ( - "log" + "fmt" + "log/slog" "github.com/rstudio/platform-lib/v3/pkg/rsnotify/listener" ) @@ -96,7 +97,7 @@ func (b *NotificationBroadcaster) broadcast() { } case err, more := <-b.errs: if more { - log.Printf("Received error on queue addressed work notification channel: %s", err) + slog.Error(fmt.Sprintf("Received error on queue addressed work notification channel: %s", err)) } case sink := <-b.subscribe: sinks = append(sinks, sink) diff --git a/pkg/rsnotify/listeners/local/locallistener.go b/pkg/rsnotify/listeners/local/locallistener.go index d772bdf..29b94e1 100644 --- a/pkg/rsnotify/listeners/local/locallistener.go +++ b/pkg/rsnotify/listeners/local/locallistener.go @@ -5,7 +5,6 @@ package local import ( "errors" "fmt" - "log" "log/slog" "sync" "time" @@ -97,17 +96,17 @@ func (l *Listener) wait(msgs chan listener.Notification, errs chan error, stop c // channel is signaled or closed. // // This path is currently run only in the unit tests, so I included some - // `log.Printf` usages are for the benefit of verbose testing output. + // `slog.Debug` usages are for the benefit of verbose testing output. if l.deferredStart != nil { // First, wait for the test to notify that it's time to start listening. This // gives us a chance to set up a deadlock condition in the test. - log.Printf("Waiting for test notification to start") + slog.Debug("Waiting for test notification to start") <-l.deferredStart // Next, simulate an unexpected stop by waiting for a stop signal after // which we return without ever receiving from the `l.items` channel. - log.Printf("Proceeding with wait by waiting for stop signal") + slog.Debug("Proceeding with wait by waiting for stop signal") <-stop - log.Printf("Stopped. Returning without receiving from l.items.") + slog.Debug("Stopped. Returning without receiving from l.items.") return } for { diff --git a/pkg/rsnotify/listeners/local/locallistener_test.go b/pkg/rsnotify/listeners/local/locallistener_test.go index b46b37d..f9c8a8d 100644 --- a/pkg/rsnotify/listeners/local/locallistener_test.go +++ b/pkg/rsnotify/listeners/local/locallistener_test.go @@ -4,7 +4,7 @@ package local import ( "fmt" - "log" + "log/slog" "testing" "time" @@ -62,7 +62,7 @@ func (s *LocalNotifySuite) TestNotifications(c *check.C) { return } case e := <-errs: - log.Printf("error received: %s", e) + slog.Error(fmt.Sprintf("error received: %s", e)) testError = e return } @@ -109,7 +109,7 @@ func (s *LocalNotifySuite) TestNotifications(c *check.C) { c.Assert(i.(*listener.TestNotification).Val, check.Equals, "second-test") return case e := <-errs: - log.Printf("error received: %s", e) + slog.Error(fmt.Sprintf("error received: %s", e)) c.FailNow() } } @@ -125,7 +125,7 @@ func (s *LocalNotifySuite) TestNotifications(c *check.C) { c.Assert(i.(*listener.TestNotification).Val, check.Equals, "second-test-bb") return case e := <-errs2: - log.Printf("error received: %s", e) + slog.Error(fmt.Sprintf("error received: %s", e)) c.FailNow() } } @@ -177,7 +177,7 @@ func (s *LocalNotifySuite) TestNotificationsBlock(c *check.C) { defer close(done) // Block a while before receiving each message <-blocker - log.Printf("Unblocked. Starting to receive.") + slog.Info("Unblocked. Starting to receive.") for { select { case i := <-data: @@ -190,7 +190,7 @@ func (s *LocalNotifySuite) TestNotificationsBlock(c *check.C) { return } case e := <-errs: - log.Printf("error received: %s", e) + slog.Error(fmt.Sprintf("error received: %s", e)) testError = e return } @@ -205,7 +205,7 @@ func (s *LocalNotifySuite) TestNotificationsBlock(c *check.C) { } // Block receiving any notifications until all have been sent - log.Printf("All messages have been sent") + slog.Info("All messages have been sent") c.Assert(f.listeners, check.HasLen, 1) close(blocker) @@ -241,7 +241,7 @@ func (s *LocalNotifySuite) TestNotificationsErrs(c *check.C) { defer close(done) // Block a while before receiving each message <-blocker - log.Printf("Unblocked. Starting to receive.") + slog.Info("Unblocked. Starting to receive.") for { select { case i := <-data: @@ -251,7 +251,7 @@ func (s *LocalNotifySuite) TestNotificationsErrs(c *check.C) { } count++ case e := <-errs: - log.Printf("error received: %s", e) + slog.Error(fmt.Sprintf("error received: %s", e)) if e.Error() != "a notification must be of type listener.Notification" { testError = fmt.Errorf("invalid error received: %s", e.Error()) return @@ -270,7 +270,7 @@ func (s *LocalNotifySuite) TestNotificationsErrs(c *check.C) { f.Notify("test-a", struct{ name string }{"testA"}) // Block receiving any notifications until all have been sent - log.Printf("All messages have been sent") + slog.Info("All messages have been sent") close(blocker) // Wait for test to complete @@ -319,9 +319,9 @@ func (s *LocalNotifySuite) TestNotificationsDeadlockOnClose(c *check.C) { var testError error go func() { defer close(done) - log.Printf("ready to notify") + slog.Info("ready to notify") f.Notify("test-deadlock", &tn) - log.Printf("done notifying") + slog.Info("done notifying") }() // Stop the channel. This will block until we start listening, below. The idea @@ -337,13 +337,13 @@ func (s *LocalNotifySuite) TestNotificationsDeadlockOnClose(c *check.C) { // Since we're stopping the channel, the notification attempt should time out and // be re-attempted. However, the listener is gone by the second attempt and isn't // retried. - log.Printf("notifying start channel that wait can proceed") + slog.Info("notifying start channel that wait can proceed") close(starter) // Make sure we know we stopped <-stopped c.Assert(f.listeners, check.HasLen, 0) - log.Printf("Stop completed. No listeners") + slog.Info("Stop completed. No listeners") // Finally, wait for the notification to complete. The timeout/retry mechanism // ensures that we don't block here forever. diff --git a/pkg/rsnotify/listeners/postgrespgx/pgxlistener.go b/pkg/rsnotify/listeners/postgrespgx/pgxlistener.go index e928ca6..77a2590 100644 --- a/pkg/rsnotify/listeners/postgrespgx/pgxlistener.go +++ b/pkg/rsnotify/listeners/postgrespgx/pgxlistener.go @@ -7,7 +7,6 @@ import ( "encoding/json" "errors" "fmt" - "log" "log/slog" "net" "reflect" @@ -39,16 +38,16 @@ func (p *PgxIPReporter) IP() string { var ipNet net.IPNet err := p.pool.QueryRow(context.Background(), query).Scan(&ipNet) if err != nil { - log.Printf("Unable to determine client IP with inet_client_addr(). %s", err) + slog.Error(fmt.Sprintf("Unable to determine client IP with inet_client_addr(). %s", err)) return ip } if ipNet.IP != nil { ip = ipNet.IP.String() } else { - log.Printf("Unable to determine client IP with inet_client_addr().") + slog.Error("Unable to determine client IP with inet_client_addr().") } } else { - log.Printf("Invalid pool") + slog.Error("Invalid pool") } return ip } @@ -219,7 +218,7 @@ func (l *PgxListener) acquire(ready chan struct{}) (err error) { select { case <-ready: // Already closed. This means that we are reconnecting - log.Printf("successfully reconnected listener %s", l.name) + slog.Info(fmt.Sprintf("successfully reconnected listener %s", l.name)) default: // Close the `ready` channel to signal that `Listen()` can return. close(ready) diff --git a/pkg/rsnotify/listeners/postgrespgx/pgxlistener_test.go b/pkg/rsnotify/listeners/postgrespgx/pgxlistener_test.go index ec8d970..32d95c3 100644 --- a/pkg/rsnotify/listeners/postgrespgx/pgxlistener_test.go +++ b/pkg/rsnotify/listeners/postgrespgx/pgxlistener_test.go @@ -4,7 +4,8 @@ package postgrespgx import ( "context" - "log" + "fmt" + "log/slog" "strings" "testing" "time" @@ -169,7 +170,7 @@ func (s *PgxNotifySuite) TestNotificationsNormal(c *check.C) { return } case e := <-errs: - log.Printf("error received: %s", e) + slog.Error(fmt.Sprintf("error received: %s", e)) c.FailNow() } } @@ -243,7 +244,7 @@ func (s *PgxNotifySuite) TestNotificationsNormal(c *check.C) { c.Assert(i.(*testNotification).Val, check.Equals, "second-test") return case e := <-errs: - log.Printf("error received: %s", e) + slog.Error(fmt.Sprintf("error received: %s", e)) c.FailNow() } } @@ -335,7 +336,7 @@ func (s *PgxNotifySuite) TestNotificationsErrors(c *check.C) { for { select { case <-data: - log.Printf("unexpected good data") + slog.Error("unexpected good data") c.FailNow() case e := <-errs: errStr := e.Error() @@ -413,7 +414,7 @@ func (s *PgxNotifySuite) TestNotificationsBlock(c *check.C) { defer close(done) // Block a while before receiving each message <-blocker - log.Printf("Unblocked. Starting to receive.") + slog.Info("Unblocked. Starting to receive.") for { select { case i := <-data: @@ -423,7 +424,7 @@ func (s *PgxNotifySuite) TestNotificationsBlock(c *check.C) { return } case e := <-errs: - log.Printf("error received: %s", e) + slog.Error(fmt.Sprintf("error received: %s", e)) c.FailNow() } } @@ -441,7 +442,7 @@ func (s *PgxNotifySuite) TestNotificationsBlock(c *check.C) { } // Block receiving any notifications until all have been sent - log.Printf("All messages have been sent") + slog.Info("All messages have been sent") close(blocker) // Wait for test to complete diff --git a/pkg/rsnotify/listeners/postgrespq/pqlistener.go b/pkg/rsnotify/listeners/postgrespq/pqlistener.go index 6a1a695..3795fb2 100644 --- a/pkg/rsnotify/listeners/postgrespq/pqlistener.go +++ b/pkg/rsnotify/listeners/postgrespq/pqlistener.go @@ -6,7 +6,6 @@ import ( "context" "encoding/json" "fmt" - "log" "log/slog" "reflect" "time" @@ -141,7 +140,7 @@ func (l *PqListener) acquire(ready chan struct{}) (err error) { select { case <-ready: // Already closed. This means that we are reconnecting - log.Printf("successfully reconnected listener %s", l.name) + slog.Info(fmt.Sprintf("successfully reconnected listener %s", l.name)) default: // Close the `ready` channel to signal that `Listen()` can return. close(ready) diff --git a/pkg/rsnotify/listeners/postgrespq/pqlistener_test.go b/pkg/rsnotify/listeners/postgrespq/pqlistener_test.go index 622c68a..c60eb92 100644 --- a/pkg/rsnotify/listeners/postgrespq/pqlistener_test.go +++ b/pkg/rsnotify/listeners/postgrespq/pqlistener_test.go @@ -3,7 +3,8 @@ package postgrespq // Copyright (C) 2022 by RStudio, PBC. import ( - "log" + "fmt" + "log/slog" "strings" "testing" @@ -136,7 +137,7 @@ func (s *PqNotifySuite) TestNotificationsNormal(c *check.C) { return } case e := <-errs: - log.Printf("error received: %s", e) + slog.Error(fmt.Sprintf("error received: %s", e)) c.FailNow() } } @@ -188,7 +189,7 @@ func (s *PqNotifySuite) TestNotificationsNormal(c *check.C) { c.Assert(i.(*testNotification).Val, check.Equals, "second-test") return case e := <-errs: - log.Printf("error received: %s", e) + slog.Error(fmt.Sprintf("error received: %s", e)) c.FailNow() } } @@ -253,7 +254,7 @@ func (s *PqNotifySuite) TestNotificationsErrors(c *check.C) { for { select { case <-data: - log.Printf("unexpected good data") + slog.Error("unexpected good data") c.FailNow() case e := <-errs: errStr := e.Error() @@ -318,7 +319,7 @@ func (s *PqNotifySuite) TestNotificationsBlock(c *check.C) { defer close(done) // Block a while before receiving each message <-blocker - log.Printf("Unblocked. Starting to receive.") + slog.Info("Unblocked. Starting to receive.") for { select { case i := <-data: @@ -328,7 +329,7 @@ func (s *PqNotifySuite) TestNotificationsBlock(c *check.C) { return } case e := <-errs: - log.Printf("error received: %s", e) + slog.Error(fmt.Sprintf("error received: %s", e)) c.FailNow() } } @@ -346,7 +347,7 @@ func (s *PqNotifySuite) TestNotificationsBlock(c *check.C) { } // Block receiving any notifications until all have been sent - log.Printf("All messages have been sent") + slog.Info("All messages have been sent") close(blocker) // Wait for test to complete From bdf3f29c92a935129d7f0b95b017a730cf602404 Mon Sep 17 00:00:00 2001 From: Greg Lin Date: Thu, 5 Feb 2026 18:39:15 -0600 Subject: [PATCH 2/5] Finish migrating rselection to slog --- pkg/rselection/impls/pgx/follower.go | 7 +++---- pkg/rselection/impls/pgx/leader.go | 17 ++++++++++------- 2 files changed, 13 insertions(+), 11 deletions(-) diff --git a/pkg/rselection/impls/pgx/follower.go b/pkg/rselection/impls/pgx/follower.go index 944f098..f2eb9a4 100644 --- a/pkg/rselection/impls/pgx/follower.go +++ b/pkg/rselection/impls/pgx/follower.go @@ -6,7 +6,6 @@ import ( "context" "encoding/json" "fmt" - "log" "log/slog" "time" @@ -122,13 +121,13 @@ func (p *PgxFollower) handleNotify(ctx context.Context, cn *electiontypes.Cluste resp := electiontypes.NewClusterPingResponse(p.address, cn.SrcAddr, p.awb.IP()) b, err := json.Marshal(resp) if err != nil { - log.Printf("Error marshaling notification to JSON: %s", err) + slog.Error(fmt.Sprintf("Error marshaling notification to JSON: %s", err)) return } slog.Log(ctx, LevelTrace, fmt.Sprintf("Follower %s responding to ping from leader %s", p.address, cn.SrcAddr)) err = p.notify.Notify(ctx, p.chLeader, b) if err != nil { - log.Printf("Follower error responding to leader ping: %s", err) + slog.Error(fmt.Sprintf("Follower error responding to leader ping: %s", err)) } } @@ -138,7 +137,7 @@ func (p *PgxFollower) requestLeader(ctx context.Context) { now := time.Now() // Limit how often this message logs to avoid too much spam if p.lastRequestLeaderErr.IsZero() || p.lastRequestLeaderErr.Add(5*time.Minute).Before(now) { - log.Printf("Error pushing leader assumption work to queue: %s", err) + slog.Error(fmt.Sprintf("Error pushing leader assumption work to queue: %s", err)) p.lastRequestLeaderErr = now } } diff --git a/pkg/rselection/impls/pgx/leader.go b/pkg/rselection/impls/pgx/leader.go index 7a6ec02..6bafa11 100644 --- a/pkg/rselection/impls/pgx/leader.go +++ b/pkg/rselection/impls/pgx/leader.go @@ -6,7 +6,6 @@ import ( "context" "encoding/json" "fmt" - "log" "log/slog" "sort" "strings" @@ -218,7 +217,11 @@ func (p *PgxLeader) verify(vCh chan bool) { // Upon exit, notify channel with `true` or `false` depending upon error status defer func(err *error) { if *err != nil { - log.Printf("Error verifying cluster integrity: %s", *err) + // TODO this should be worded better because sometimes this error is normal and expected, + // e.g. when restarting a cluster node and the node list is temporarily out of sync: + // Error verifying cluster integrity: node list length differs. Store node count 3 does not match leader count 2 + // It would be great to be able to distinguish between expected/unexpected errors here and log accordingly. + slog.Error(fmt.Sprintf("Error verifying cluster integrity: %s", *err)) vCh <- false } else { vCh <- true @@ -262,7 +265,7 @@ func (p *PgxLeader) pingNodes(ctx context.Context) { } b, err := json.Marshal(req) if err != nil { - log.Printf("Error marshaling notification to JSON: %s", err) + slog.Error(fmt.Sprintf("Error marshaling notification to JSON: %s", err)) return } @@ -274,7 +277,7 @@ func (p *PgxLeader) pingNodes(ctx context.Context) { err = p.notify.Notify(ctx, p.chFollower, b) if err != nil { p.pingSuccess = false - log.Printf("Leader error pinging followers: %s", err) + slog.Error(fmt.Sprintf("Leader error pinging followers: %s", err)) return } @@ -284,7 +287,7 @@ func (p *PgxLeader) pingNodes(ctx context.Context) { err = p.notify.Notify(ctx, p.chLeader, b) if err != nil { p.pingSuccess = false - log.Printf("Leader error pinging leaders: %s", err) + slog.Error(fmt.Sprintf("Leader error pinging leaders: %s", err)) return } } @@ -308,14 +311,14 @@ func (p *PgxLeader) handleNodesRequest(ctx context.Context, cn *electiontypes.Cl resp := electiontypes.NewClusterNodesNotification(nodes, cn.Guid()) b, err := json.Marshal(resp) if err != nil { - log.Printf("Error marshaling notification to JSON: %s", err) + slog.Error(fmt.Sprintf("Error marshaling notification to JSON: %s", err)) return } // Broadcast the response on the generic channel err = p.notify.Notify(ctx, p.chMessages, b) if err != nil { - log.Printf("Leader error notifying of cluster nodes: %s", err) + slog.Error(fmt.Sprintf("Leader error notifying of cluster nodes: %s", err)) return } } From 252f04f2d2240ad66ce1d344a602c3b0350e773e Mon Sep 17 00:00:00 2001 From: Greg Lin Date: Thu, 5 Feb 2026 18:47:13 -0600 Subject: [PATCH 3/5] Finish slog migration in rsstorage package --- pkg/rsstorage/internal/chunks.go | 4 +- .../internal/integration_test/chunks_test.go | 23 ++++++----- .../integration_test/integration_test.go | 40 +++++++++---------- pkg/rsstorage/servers/file/server.go | 5 +-- .../servers/s3server/s3_enumeration.go | 6 +-- 5 files changed, 39 insertions(+), 39 deletions(-) diff --git a/pkg/rsstorage/internal/chunks.go b/pkg/rsstorage/internal/chunks.go index 9e3ec71..dbc907f 100644 --- a/pkg/rsstorage/internal/chunks.go +++ b/pkg/rsstorage/internal/chunks.go @@ -7,7 +7,7 @@ import ( "encoding/json" "fmt" "io" - "log" + "log/slog" "path/filepath" "strings" "time" @@ -108,7 +108,7 @@ func (w *DefaultChunkUtils) WriteChunked( }) if err != nil { // TODO: Update DefaultChunkUtils to acceptable a logger - log.Printf("Error notifying store of chunk completion for address=%s; chunk=%d: %s", address, count, err) + slog.Error(fmt.Sprintf("Error notifying store of chunk completion for address=%s; chunk=%d: %s", address, count, err)) } if count == numChunks { return nil diff --git a/pkg/rsstorage/internal/integration_test/chunks_test.go b/pkg/rsstorage/internal/integration_test/chunks_test.go index 38ed656..2cb7085 100644 --- a/pkg/rsstorage/internal/integration_test/chunks_test.go +++ b/pkg/rsstorage/internal/integration_test/chunks_test.go @@ -8,8 +8,9 @@ import ( "crypto/sha256" "encoding/hex" "encoding/json" + "fmt" "io" - "log" + "log/slog" "os" "strings" "testing" @@ -140,9 +141,9 @@ func (s *ChunksIntegrationSuite) TestWriteChunked(c *check.C) { serverSet := s.NewServerSet(c, "chunks", "") for key, server := range serverSet { if testing.Short() && key != "file" { - log.Printf("skipping chunks integration tests for %s because -short was provided", key) + slog.Info(fmt.Sprintf("skipping chunks integration tests for %s because -short was provided", key)) } else { - log.Printf("testing chunks integration tests for %s", key) + slog.Info(fmt.Sprintf("testing chunks integration tests for %s", key)) s.check(c, server) } } @@ -248,7 +249,7 @@ func (s *ChunksPartialReadSuite) TearDownSuite(c *check.C) { func (s *ChunksPartialReadSuite) TestReadPartialOk(c *check.C) { ctx := context.Background() - log.Printf("Starting test %s", c.TestName()) + slog.Info(fmt.Sprintf("Starting test %s", c.TestName())) defer leaktest.Check(c) sz := uint64(len(servertest.TestDESC)) @@ -313,7 +314,7 @@ func (s *ChunksPartialReadSuite) TestReadPartialOk(c *check.C) { resolve := func(writer io.Writer) (dir, address string, err error) { defer pR.Close() n, err := io.Copy(writer, pR) - log.Printf("Done resolving %d bytes", n) + slog.Info(fmt.Sprintf("Done resolving %d bytes", n)) return } @@ -321,12 +322,12 @@ func (s *ChunksPartialReadSuite) TestReadPartialOk(c *check.C) { go func() { err = cw.WriteChunked(ctx, "0a", "test-chunk", sz, resolve) c.Assert(err, check.IsNil) - log.Printf("Done with WriteChunked") + slog.Info("Done with WriteChunked") }() // Wait for 100 chunks to be written <-readStart - log.Printf("Starting ReadChunked after writing 100 chunks") + slog.Info("Starting ReadChunked after writing 100 chunks") // Read chunks and assemble them... r, _, size, mod, err := cw.ReadChunked(ctx, "0a", "test-chunk") @@ -406,7 +407,7 @@ func (s *ChunksPartialReadSuite) TestReadPartialTimeout(c *check.C) { // On chunk 105, hang if index == 105 { - log.Printf("Hanging for 1 minute to simulate timeout") + slog.Info("Hanging for 1 minute to simulate timeout") time.Sleep(time.Minute) } } @@ -416,7 +417,7 @@ func (s *ChunksPartialReadSuite) TestReadPartialTimeout(c *check.C) { resolve := func(writer io.Writer) (dir, address string, err error) { defer pR.Close() n, err := io.Copy(writer, pR) - log.Printf("Done resolving %d bytes", n) + slog.Info(fmt.Sprintf("Done resolving %d bytes", n)) return } @@ -424,12 +425,12 @@ func (s *ChunksPartialReadSuite) TestReadPartialTimeout(c *check.C) { go func() { err = cw.WriteChunked(ctx, "0a", "test-chunk", sz, resolve) c.Assert(err, check.IsNil) - log.Printf("Done with WriteChunked") + slog.Info("Done with WriteChunked") }() // Wait for 100 chunks to be written <-readStart - log.Printf("Starting ReadChunked after writing 100 chunks") + slog.Info("Starting ReadChunked after writing 100 chunks") // Read chunks and assemble them... r, _, size, mod, err := cw.ReadChunked(ctx, "0a", "test-chunk") diff --git a/pkg/rsstorage/internal/integration_test/integration_test.go b/pkg/rsstorage/internal/integration_test/integration_test.go index 9d4d99f..65904b8 100644 --- a/pkg/rsstorage/internal/integration_test/integration_test.go +++ b/pkg/rsstorage/internal/integration_test/integration_test.go @@ -9,7 +9,7 @@ import ( "errors" "fmt" "io" - "log" + "log/slog" "net/url" "os" "strings" @@ -268,7 +268,7 @@ func (s *StorageIntegrationSuite) CheckFile( sz int64, chunked bool, ) { - log.Printf("(%s) Verifying existence of %s on server=%s (with dir=%s)", test, address, class, dir) + slog.Info(fmt.Sprintf("(%s) Verifying existence of %s on server=%s (with dir=%s)", test, address, class, dir)) // Next, get it r, ch, sz, _, ok, err := server.Get(context.Background(), dir, address) @@ -297,7 +297,7 @@ func (s *StorageIntegrationSuite) CheckFile( // Verifies that a given asset does not exist func (s *StorageIntegrationSuite) CheckFileGone(c *check.C, server rsstorage.StorageServer, test, dir, address, classSource string) { - log.Printf("(%s) Verifying removal of %s on server=%s (with dir=%s)", test, address, classSource, dir) + slog.Info(fmt.Sprintf("(%s) Verifying removal of %s on server=%s (with dir=%s)", test, address, classSource, dir)) ok, _, _, _, err := server.Check(context.Background(), "", address) c.Check(err, check.IsNil) @@ -326,7 +326,7 @@ func (s *StorageIntegrationSuite) TestMoving(c *check.C) { // Verify for classSource := range sources { - log.Printf("\nVerify that files were successfully moved from %s to each destination server:", classSource) + slog.Info(fmt.Sprintf("\nVerify that files were successfully moved from %s to each destination server:", classSource)) for classDest, dest := range dests { // Files exist on destination s.CheckFile( @@ -367,7 +367,7 @@ func (s *StorageIntegrationSuite) TestMoving(c *check.C) { // Verify that files do not exist on source for classSource, source := range sources { - log.Printf("\nVerify that moved files were deleted from the %s server:", classSource) + slog.Info(fmt.Sprintf("\nVerify that moved files were deleted from the %s server:", classSource)) for classDest := range dests { s.CheckFileGone(c, source, "MoveSrc", "", fmt.Sprintf("%s--%s", classSource, classDest), classSource) s.CheckFileGone(c, source, "MoveSrc", "dir", fmt.Sprintf("%s--%s", classSource, classDest), classSource) @@ -397,7 +397,7 @@ func (s *StorageIntegrationSuite) TestCopying(c *check.C) { // Verify files have been copied to destination for classSource := range sources { - log.Printf("\nVerify that files were successfully copied from %s to each destination server:", classSource) + slog.Info(fmt.Sprintf("\nVerify that files were successfully copied from %s to each destination server:", classSource)) for classDest, dest := range dests { // Files exist on destination s.CheckFile( @@ -438,7 +438,7 @@ func (s *StorageIntegrationSuite) TestCopying(c *check.C) { // Verify files are still on source for classSource, source := range sources { - log.Printf("\nVerify that original files still remain on the %s server:", classSource) + slog.Info(fmt.Sprintf("\nVerify that original files still remain on the %s server:", classSource)) for classDest := range dests { // Files still exist on source s.CheckFile( @@ -479,14 +479,14 @@ func (s *StorageIntegrationSuite) TestCopying(c *check.C) { // Test Enumeration for classSource, source := range sources { - log.Printf("\nVerify enumeration on the %s source server:", classSource) + slog.Info(fmt.Sprintf("\nVerify enumeration on the %s source server:", classSource)) items, err := source.Enumerate(ctx) c.Assert(err, check.IsNil) // Each source should have three files for each destination c.Assert(items, check.HasLen, len(dests)*3) } for classDest, dest := range dests { - log.Printf("\nVerify enumeration on the %s destination server:", classDest) + slog.Info(fmt.Sprintf("\nVerify enumeration on the %s destination server:", classDest)) items, err := dest.Enumerate(ctx) c.Assert(err, check.IsNil) // Each destination should have three files from each source @@ -495,7 +495,7 @@ func (s *StorageIntegrationSuite) TestCopying(c *check.C) { // Test Removal for classSource, source := range sources { - log.Printf("\nVerify forced removal of assets on the %s source server:", classSource) + slog.Info(fmt.Sprintf("\nVerify forced removal of assets on the %s source server:", classSource)) for classDest := range dests { err := source.Remove(ctx, "", fmt.Sprintf("%s--%s", classSource, classDest)) c.Assert(err, check.IsNil) @@ -610,13 +610,13 @@ func (s *S3IntegrationSuite) TestPopulateServerSetHang(c *check.C) { //w.Write([]byte(fmt.Sprintf(testAssetData, class))) gzw := gzip.NewWriter(w) - log.Printf("resolver: wrote some data, instructing test to continue, but waiting for instruction to err") + slog.Info("resolver: wrote some data, instructing test to continue, but waiting for instruction to err") // Notify that we've started to write some data close(writing) // Wait until the test is ready for us to fail, then fail <-end - log.Printf("resolver: returning error") + slog.Info("resolver: returning error") // Emulates the behavior of returning an error before the deferred // call to close the gzip writer if true { @@ -634,7 +634,7 @@ func (s *S3IntegrationSuite) TestPopulateServerSetHang(c *check.C) { go func() { // Notify that we're done with the Put call defer close(failed) - log.Printf("put: adding an item to S3 in a separate goroutine") + slog.Info("put: adding an item to S3 in a separate goroutine") // Put an item into S3. This will fail _, _, err = s3Server.Put(ctx, resolver("test-failure"), "", itemAddress) c.Assert(err, check.NotNil) @@ -642,11 +642,11 @@ func (s *S3IntegrationSuite) TestPopulateServerSetHang(c *check.C) { }() // Don't attempt anything until we've started attempting to write to S3 - log.Printf("get: waiting for write to start") + slog.Info("get: waiting for write to start") <-writing // Check to see if we can find the item that we're writing - log.Printf("get: attempting to get item that is being written") + slog.Info("get: attempting to get item that is being written") _, _, _, _, ok, err := s3Server.Get(ctx, "", itemAddress) c.Check(err, check.IsNil) c.Check(ok, check.Equals, false) @@ -744,13 +744,13 @@ func (s *S3IntegrationSuite) TestPopulateServerSetHangChunked(c *check.C) { w.Write([]byte(fmt.Sprintf(testAssetData, class))) gzw := gzip.NewWriter(w) - log.Printf("resolver: wrote some data, instructing test to continue, but waiting for instruction to err") + slog.Info("resolver: wrote some data, instructing test to continue, but waiting for instruction to err") // Notify that we've started to write some data close(writing) // Wait until the test is ready for us to fail, then fail //<-end - log.Printf("resolver: returning error") + slog.Info("resolver: returning error") // Emulates the behavior of returning an error before the deferred // call to close the gzip writer if true { @@ -768,19 +768,19 @@ func (s *S3IntegrationSuite) TestPopulateServerSetHangChunked(c *check.C) { go func() { // Notify that we're done with the Put call defer close(failed) - log.Printf("put: adding a chunked item to S3 in a separate goroutine") + slog.Info("put: adding a chunked item to S3 in a separate goroutine") // Put an item into S3. This will fail _, _, err = s3Server.PutChunked(ctx, resolver("test-failure"), "", itemAddress, 100*1024) c.Assert(err, check.ErrorMatches, "failure resolving data") }() // Don't attempt anything until we've started attempting to write to S3 - log.Printf("get: waiting for write to start") + slog.Info("get: waiting for write to start") <-writing // Check to see if we can find the item that we're writing. Since this // is chunked data, it should appear now, even though it is incomplete. - log.Printf("get: attempting to get item that is being written") + slog.Info("get: attempting to get item that is being written") _, _, _, _, ok, err := s3Server.Get(ctx, "", itemAddress) c.Check(err, check.IsNil) c.Check(ok, check.Equals, true) diff --git a/pkg/rsstorage/servers/file/server.go b/pkg/rsstorage/servers/file/server.go index ffdcea9..94e6e59 100644 --- a/pkg/rsstorage/servers/file/server.go +++ b/pkg/rsstorage/servers/file/server.go @@ -9,7 +9,6 @@ import ( "fmt" "io" "io/fs" - "log" "log/slog" "os" "path/filepath" @@ -362,7 +361,7 @@ func (s *StorageServer) Remove(ctx context.Context, dir, address string) error { func (s *StorageServer) Enumerate(ctx context.Context) ([]types.StoredItem, error) { items, err := enumerate(s.dir, s.walkTimeout) if err != nil { - log.Printf("Error enumerating storage: %s", err) + slog.Error(fmt.Sprintf("Error enumerating storage: %s", err)) return nil, err } @@ -384,7 +383,7 @@ func enumerate(dir string, walkTimeout time.Duration) ([]types.StoredItem, error err := filepath.WalkDir(dir, func(path string, info fs.DirEntry, err error) error { if err != nil { - log.Printf("Error enumerating storage for directory %s: %s", dir, err) + slog.Error(fmt.Sprintf("Error enumerating storage for directory %s: %s", dir, err)) return nil } diff --git a/pkg/rsstorage/servers/s3server/s3_enumeration.go b/pkg/rsstorage/servers/s3server/s3_enumeration.go index 0ac5971..ae4cef9 100644 --- a/pkg/rsstorage/servers/s3server/s3_enumeration.go +++ b/pkg/rsstorage/servers/s3server/s3_enumeration.go @@ -5,7 +5,7 @@ package s3server import ( "context" "fmt" - "log" + "log/slog" "regexp" "strings" "sync" @@ -136,7 +136,7 @@ func (a *DefaultAwsOps) BucketObjects( atomic.AddUint64(&ops, uint64(len(bm))) if ops > 1000 { atomic.AddUint64(&total, atomic.LoadUint64(&ops)) - log.Printf("For S3 prefix %s parsed %d files", s3Prefix, atomic.LoadUint64(&total)) + slog.Info(fmt.Sprintf("For S3 prefix %s parsed %d files", s3Prefix, atomic.LoadUint64(&total))) atomic.SwapUint64(&ops, 0) } @@ -249,7 +249,7 @@ func (a *DefaultAwsOps) BucketObjectsETagMap( atomic.AddUint64(&ops, uint64(len(bm))) if ops > 1000 { atomic.AddUint64(&total, atomic.LoadUint64(&ops)) - log.Printf("For S3 prefix %s parsed %d files", s3Prefix, atomic.LoadUint64(&total)) + slog.Info(fmt.Sprintf("For S3 prefix %s parsed %d files", s3Prefix, atomic.LoadUint64(&total))) atomic.SwapUint64(&ops, 0) } From 4666c50b9b00c615d9487b96f98730faf712dba1 Mon Sep 17 00:00:00 2001 From: Greg Lin Date: Thu, 5 Feb 2026 18:47:26 -0600 Subject: [PATCH 4/5] Finish slog migration in rsqueue and rscache --- pkg/rscache/internal/integration_test/memory_test.go | 4 ++-- pkg/rsqueue/runnerfactory/runnerfactory.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/rscache/internal/integration_test/memory_test.go b/pkg/rscache/internal/integration_test/memory_test.go index bf56815..1c40395 100644 --- a/pkg/rscache/internal/integration_test/memory_test.go +++ b/pkg/rscache/internal/integration_test/memory_test.go @@ -9,7 +9,7 @@ import ( "flag" "fmt" "io" - "log" + "log/slog" "os" "path/filepath" "testing" @@ -217,7 +217,7 @@ func (s *MemoryCacheIntegrationSuite) TestInMemoryCaching(c *check.C) { // Create an in-memory cache that is just large enough to hold 64 entries maxCost := entrySize * 64 - log.Printf("Creating cache with MaxCost=%d", maxCost) + slog.Info(fmt.Sprintf("Creating cache with MaxCost=%d", maxCost)) rc, err := ristretto.NewCache(&ristretto.Config{ NumCounters: 1000, MaxCost: maxCost, diff --git a/pkg/rsqueue/runnerfactory/runnerfactory.go b/pkg/rsqueue/runnerfactory/runnerfactory.go index 25dacf2..0a45725 100644 --- a/pkg/rsqueue/runnerfactory/runnerfactory.go +++ b/pkg/rsqueue/runnerfactory/runnerfactory.go @@ -5,7 +5,7 @@ package runnerfactory import ( "context" "fmt" - "log" + "log/slog" "sync" "time" @@ -63,7 +63,7 @@ func (r *RunnerFactory) Stop(timeout time.Duration) error { defer wg.Done() err := runner.Stop(timeout) if err != nil { - log.Printf("Error stopping runner for type %d: %s", key, err) + slog.Error(fmt.Sprintf("Error stopping runner for type %d: %s", key, err)) } r.types.SetEnabled(key, false) }(key, runner) From 2d6f25c06b2f85bc23ba97e909c55091e90603d0 Mon Sep 17 00:00:00 2001 From: Greg Lin Date: Fri, 6 Feb 2026 16:43:04 -0600 Subject: [PATCH 5/5] Update all slog statements to use key-value pairs --- pkg/rscache/file.go | 10 +++--- .../internal/integration_test/memory_test.go | 2 +- pkg/rscache/memory_backed_file_cache.go | 5 ++- pkg/rselection/impls/pgx/follower.go | 11 +++--- pkg/rselection/impls/pgx/leader.go | 18 +++++----- pkg/rsnotify/broadcaster/broadcaster.go | 3 +- pkg/rsnotify/listeners/local/locallistener.go | 29 ++++++++-------- .../listeners/local/locallistener_test.go | 10 +++--- .../listeners/postgrespgx/pgxlistener.go | 10 +++--- .../listeners/postgrespgx/pgxlistener_test.go | 7 ++-- .../listeners/postgrespq/pqlistener.go | 8 ++--- .../listeners/postgrespq/pqlistener_test.go | 7 ++-- pkg/rsqueue/runnerfactory/runnerfactory.go | 2 +- pkg/rsstorage/internal/chunks.go | 2 +- .../internal/integration_test/chunks_test.go | 11 +++--- .../integration_test/integration_test.go | 18 +++++----- pkg/rsstorage/servers/file/server.go | 18 +++++----- pkg/rsstorage/servers/postgres/server.go | 34 +++++++++---------- .../servers/s3server/s3_enumeration.go | 4 +-- 19 files changed, 101 insertions(+), 108 deletions(-) diff --git a/pkg/rscache/file.go b/pkg/rscache/file.go index 156cf6c..b031036 100644 --- a/pkg/rscache/file.go +++ b/pkg/rscache/file.go @@ -102,9 +102,9 @@ func (o *fileCache) retryingGet(ctx context.Context, dir, address string, get fu // Preemptive get attempt if flushingGet() { if flushed == 0 { - slog.Log(ctx, LevelTrace, fmt.Sprintf("Found cached item at address '%s' immediately", address)) + slog.Log(ctx, LevelTrace, "Found cached item immediately", "address", address) } else { - slog.Log(ctx, LevelTrace, fmt.Sprintf("Found cached item at address '%s' after one flush", address)) + slog.Log(ctx, LevelTrace, "Found cached item after one flush", "address", address) } return true } @@ -184,7 +184,7 @@ func (o *fileCache) Head(ctx context.Context, resolver ResolverSpec) (size int64 err = o.queue.AddressedPush(ctx, resolver.Priority, resolver.GroupId, resolver.Address(), resolver.Work) if o.duplicateMatcher.IsDuplicate(err) { // Do nothing since; someone else has already inserted the work we need. - slog.Debug(fmt.Sprintf("FileCache: duplicate address push for '%s'", resolver.Address())) + slog.Debug("FileCache: duplicate address push", "address", resolver.Address()) } else if err != nil { return } @@ -205,7 +205,7 @@ func (o *fileCache) Head(ctx context.Context, resolver ResolverSpec) (size int64 if o.retryingGet(ctx, resolver.Dir(), resolver.Address(), head) { return } else { - slog.Debug(fmt.Sprintf("error: FileCache reported address '%s' complete, but item was not found in cache", resolver.Address())) + slog.Debug("error: FileCache reported address complete, but item was not found in cache", "address", resolver.Address()) err = fmt.Errorf("error: FileCache reported address '%s' complete, but item was not found in cache", resolver.Address()) return } @@ -274,7 +274,7 @@ func (o *fileCache) Get(ctx context.Context, resolver ResolverSpec) (value *Cach err = o.queue.AddressedPush(ctx, resolver.Priority, resolver.GroupId, address, resolver.Work) if o.duplicateMatcher.IsDuplicate(err) { // Do nothing since; someone else has already inserted the work we need. - slog.Debug(fmt.Sprintf("FileCache: duplicate address push for '%s'", address)) + slog.Debug("FileCache: duplicate address push", "address", address) } else if err != nil { value = &CacheReturn{ Err: err, diff --git a/pkg/rscache/internal/integration_test/memory_test.go b/pkg/rscache/internal/integration_test/memory_test.go index 1c40395..7d57a01 100644 --- a/pkg/rscache/internal/integration_test/memory_test.go +++ b/pkg/rscache/internal/integration_test/memory_test.go @@ -217,7 +217,7 @@ func (s *MemoryCacheIntegrationSuite) TestInMemoryCaching(c *check.C) { // Create an in-memory cache that is just large enough to hold 64 entries maxCost := entrySize * 64 - slog.Info(fmt.Sprintf("Creating cache with MaxCost=%d", maxCost)) + slog.Info("Creating cache", "maxCost", maxCost) rc, err := ristretto.NewCache(&ristretto.Config{ NumCounters: 1000, MaxCost: maxCost, diff --git a/pkg/rscache/memory_backed_file_cache.go b/pkg/rscache/memory_backed_file_cache.go index c94499a..276b105 100644 --- a/pkg/rscache/memory_backed_file_cache.go +++ b/pkg/rscache/memory_backed_file_cache.go @@ -8,7 +8,6 @@ import ( "context" "encoding/gob" "errors" - "fmt" "io" "log/slog" "time" @@ -92,7 +91,7 @@ func (mbfc *MemoryBackedFileCache) Get(ctx context.Context, resolver ResolverSpe if resolver.CacheInMemory && mbfc.mc != nil && mbfc.mc.Enabled() && ptr.Err == nil && ptr.GetSize() < mbfc.maxMemoryPerObject { err = mbfc.mc.Put(address, ptr) if err != nil { - slog.Debug(fmt.Sprintf("error caching to memory: %s", err.Error())) + slog.Debug("error caching to memory", "error", err) } } return *ptr @@ -131,7 +130,7 @@ func (mbfc *MemoryBackedFileCache) GetObject(ctx context.Context, resolver Resol if resolver.CacheInMemory && mbfc.mc != nil && mbfc.mc.Enabled() && ptr.GetSize() < mbfc.maxMemoryPerObject { err = mbfc.mc.Put(address, ptr) if err != nil { - slog.Debug(fmt.Sprintf("error caching to memory: %s", err.Error())) + slog.Debug("error caching to memory", "error", err) } } diff --git a/pkg/rselection/impls/pgx/follower.go b/pkg/rselection/impls/pgx/follower.go index f2eb9a4..fe0ccec 100644 --- a/pkg/rselection/impls/pgx/follower.go +++ b/pkg/rselection/impls/pgx/follower.go @@ -5,7 +5,6 @@ package pgxelection import ( "context" "encoding/json" - "fmt" "log/slog" "time" @@ -104,7 +103,7 @@ func (p *PgxFollower) Follow(ctx context.Context) (result FollowResult) { case <-timeout.C: // Follower has received no pings for the timeout duration. It is time to // ask for a new leader. - slog.Debug(fmt.Sprintf("Follower '%s' ping receipt timeout. Requesting a new leader", p.address)) + slog.Debug("Follower ping receipt timeout. Requesting a new leader", "address", p.address) go p.requestLeader(ctx) } return @@ -121,13 +120,13 @@ func (p *PgxFollower) handleNotify(ctx context.Context, cn *electiontypes.Cluste resp := electiontypes.NewClusterPingResponse(p.address, cn.SrcAddr, p.awb.IP()) b, err := json.Marshal(resp) if err != nil { - slog.Error(fmt.Sprintf("Error marshaling notification to JSON: %s", err)) + slog.Error("Error marshaling notification to JSON", "error", err) return } - slog.Log(ctx, LevelTrace, fmt.Sprintf("Follower %s responding to ping from leader %s", p.address, cn.SrcAddr)) + slog.Log(ctx, LevelTrace, "Follower responding to ping from leader", "follower", p.address, "leader", cn.SrcAddr) err = p.notify.Notify(ctx, p.chLeader, b) if err != nil { - slog.Error(fmt.Sprintf("Follower error responding to leader ping: %s", err)) + slog.Error("Follower error responding to leader ping", "error", err) } } @@ -137,7 +136,7 @@ func (p *PgxFollower) requestLeader(ctx context.Context) { now := time.Now() // Limit how often this message logs to avoid too much spam if p.lastRequestLeaderErr.IsZero() || p.lastRequestLeaderErr.Add(5*time.Minute).Before(now) { - slog.Error(fmt.Sprintf("Error pushing leader assumption work to queue: %s", err)) + slog.Error("Error pushing leader assumption work to queue", "error", err) p.lastRequestLeaderErr = now } } diff --git a/pkg/rselection/impls/pgx/leader.go b/pkg/rselection/impls/pgx/leader.go index 6bafa11..85e242b 100644 --- a/pkg/rselection/impls/pgx/leader.go +++ b/pkg/rselection/impls/pgx/leader.go @@ -221,7 +221,7 @@ func (p *PgxLeader) verify(vCh chan bool) { // e.g. when restarting a cluster node and the node list is temporarily out of sync: // Error verifying cluster integrity: node list length differs. Store node count 3 does not match leader count 2 // It would be great to be able to distinguish between expected/unexpected errors here and log accordingly. - slog.Error(fmt.Sprintf("Error verifying cluster integrity: %s", *err)) + slog.Error("Error verifying cluster integrity", "error", *err) vCh <- false } else { vCh <- true @@ -265,7 +265,7 @@ func (p *PgxLeader) pingNodes(ctx context.Context) { } b, err := json.Marshal(req) if err != nil { - slog.Error(fmt.Sprintf("Error marshaling notification to JSON: %s", err)) + slog.Error("Error marshaling notification to JSON", "error", err) return } @@ -277,7 +277,7 @@ func (p *PgxLeader) pingNodes(ctx context.Context) { err = p.notify.Notify(ctx, p.chFollower, b) if err != nil { p.pingSuccess = false - slog.Error(fmt.Sprintf("Leader error pinging followers: %s", err)) + slog.Error("Leader error pinging followers", "error", err) return } @@ -287,7 +287,7 @@ func (p *PgxLeader) pingNodes(ctx context.Context) { err = p.notify.Notify(ctx, p.chLeader, b) if err != nil { p.pingSuccess = false - slog.Error(fmt.Sprintf("Leader error pinging leaders: %s", err)) + slog.Error("Leader error pinging leaders", "error", err) return } } @@ -311,14 +311,14 @@ func (p *PgxLeader) handleNodesRequest(ctx context.Context, cn *electiontypes.Cl resp := electiontypes.NewClusterNodesNotification(nodes, cn.Guid()) b, err := json.Marshal(resp) if err != nil { - slog.Error(fmt.Sprintf("Error marshaling notification to JSON: %s", err)) + slog.Error("Error marshaling notification to JSON", "error", err) return } // Broadcast the response on the generic channel err = p.notify.Notify(ctx, p.chMessages, b) if err != nil { - slog.Error(fmt.Sprintf("Leader error notifying of cluster nodes: %s", err)) + slog.Error("Leader error notifying of cluster nodes", "error", err) return } } @@ -326,7 +326,7 @@ func (p *PgxLeader) handleNodesRequest(ctx context.Context, cn *electiontypes.Cl func (p *PgxLeader) handleLeaderPing(cn *electiontypes.ClusterPingRequest) { // If we received a ping from another leader, then stop leading if cn.SrcAddr != p.address { - slog.Debug(fmt.Sprintf("Leader received ping from another leader. Stopping and moving back to the follower loop.")) + slog.Debug("Leader received ping from another leader. Stopping and moving back to the follower loop.") p.stop <- true } else { resp := electiontypes.NewClusterPingResponse(p.address, cn.SrcAddr, p.awb.IP()) @@ -370,7 +370,7 @@ func (p *PgxLeader) sweepNodes() { defer p.mutex.Unlock() if p.unsuccessfulPing() { - slog.Debug(fmt.Sprintf("Skipping cluster sweep due to unsuccessful pings")) + slog.Debug("Skipping cluster sweep due to unsuccessful pings") return } @@ -381,7 +381,7 @@ func (p *PgxLeader) sweepNodes() { } if node.Ping.Before(time.Now().Add(-p.maxPingAge)) { - slog.Debug(fmt.Sprintf("Leader sweep removing cluster node %s", key)) + slog.Debug("Leader sweep removing cluster node", "node", key) delete(p.nodes, key) } } diff --git a/pkg/rsnotify/broadcaster/broadcaster.go b/pkg/rsnotify/broadcaster/broadcaster.go index f7c21e2..b8808d8 100644 --- a/pkg/rsnotify/broadcaster/broadcaster.go +++ b/pkg/rsnotify/broadcaster/broadcaster.go @@ -3,7 +3,6 @@ package broadcaster // Copyright (C) 2022 by RStudio, PBC. import ( - "fmt" "log/slog" "github.com/rstudio/platform-lib/v3/pkg/rsnotify/listener" @@ -97,7 +96,7 @@ func (b *NotificationBroadcaster) broadcast() { } case err, more := <-b.errs: if more { - slog.Error(fmt.Sprintf("Received error on queue addressed work notification channel: %s", err)) + slog.Error("Received error on queue addressed work notification channel", "error", err) } case sink := <-b.subscribe: sinks = append(sinks, sink) diff --git a/pkg/rsnotify/listeners/local/locallistener.go b/pkg/rsnotify/listeners/local/locallistener.go index 29b94e1..1c715bf 100644 --- a/pkg/rsnotify/listeners/local/locallistener.go +++ b/pkg/rsnotify/listeners/local/locallistener.go @@ -4,7 +4,6 @@ package local import ( "errors" - "fmt" "log/slog" "sync" "time" @@ -81,10 +80,10 @@ func (l *Listener) Listen() (chan listener.Notification, chan error, error) { func (l *Listener) listen(msgs chan listener.Notification, errs chan error) { defer func() { - slog.Debug(fmt.Sprintf("Stopping listener...")) + slog.Debug("Stopping listener...") l.items = nil close(l.stop) - slog.Debug(fmt.Sprintf("Stopped.")) + slog.Debug("Stopped.") }() l.wait(msgs, errs, l.stop) @@ -112,11 +111,11 @@ func (l *Listener) wait(msgs chan listener.Notification, errs chan error, stop c for { select { case <-stop: - slog.Debug(fmt.Sprintf("Stopping wait")) + slog.Debug("Stopping wait") return case i := <-l.items: if msg, ok := i.(listener.Notification); ok { - slog.Debug(fmt.Sprintf("Received message: %s. Sending to buffered channel with current size %d", msg.Guid(), len(msgs))) + slog.Debug("Received message. Sending to buffered channel", "guid", msg.Guid(), "channelSize", len(msgs)) msgs <- msg } else { errs <- ErrNotNotificationType @@ -128,13 +127,13 @@ func (l *Listener) wait(msgs chan listener.Notification, errs chan error, stop c func (l *Listener) Stop() { if l.stop != nil { // Signal to stop - slog.Debug(fmt.Sprintf("Signaling stop channel to stop...")) + slog.Debug("Signaling stop channel to stop...") l.stop <- true // Wait for stop - slog.Debug(fmt.Sprintf("Waiting for stop channel to close...")) + slog.Debug("Waiting for stop channel to close...") <-l.stop - slog.Debug(fmt.Sprintf("Stop channel for %s closed.", l.guid)) + slog.Debug("Stop channel closed.", "guid", l.guid) l.stop = nil // Remove from provider @@ -168,7 +167,7 @@ func (l *ListenerProvider) notify(channel string, n interface{}, prevMissedItems notifyTxt = "renotify" } - slog.Debug(fmt.Sprintf("Notify called with type=%s on %d listeners", notifyTxt, len(l.listeners))) + slog.Debug("Notify called", "type", notifyTxt, "listenerCount", len(l.listeners)) for _, ll := range l.listeners { var needNotify bool if prevMissedItems == nil { @@ -183,7 +182,7 @@ func (l *ListenerProvider) notify(channel string, n interface{}, prevMissedItems // There's a chance that `ll.items` could be non-nil, but not receiving. Timeout // to prevent deadlock, but keep trying until we're sure that the listener is // closed. - slog.Debug(fmt.Sprintf("Ready to %s internal items with guid %s for channel %s %s: %+v", notifyTxt, notifyGuid, ll.guid, channel, n)) + slog.Debug("Ready to notify internal items", "type", notifyTxt, "notifyGuid", notifyGuid, "listenerGuid", ll.guid, "channel", channel, "notification", n) func() { // It's important to create a ticker and stop it so it doesn't leak. This has the potential to be called // thousands of times in a relatively short period on a busy server, so timer leaks can result in @@ -192,9 +191,9 @@ func (l *ListenerProvider) notify(channel string, n interface{}, prevMissedItems defer timeout.Stop() select { case ll.items <- n: - slog.Debug(fmt.Sprintf("Done with %s in local listener with guid %s for channel %s: %+v", notifyTxt, notifyGuid, channel, n)) + slog.Debug("Done with notify in local listener", "type", notifyTxt, "notifyGuid", notifyGuid, "channel", channel, "notification", n) case <-timeout.C: - slog.Debug(fmt.Sprintf("Timeout during %s for listener %s/%s with guid %s for channel %s: %+v", notifyTxt, ll.guid, ll.name, notifyGuid, channel, n)) + slog.Debug("Timeout during notify", "type", notifyTxt, "listenerGuid", ll.guid, "listenerName", ll.name, "notifyGuid", notifyGuid, "channel", channel, "notification", n) // Record the timed-out notification in the `missed` map so we can retry it missed[ll.guid] = notifyGuid } @@ -211,7 +210,7 @@ func (l *ListenerProvider) notify(channel string, n interface{}, prevMissedItems // provider after the mutex is again locked by the recursive call to `notify` will be // attempted again as needed. if len(missed) > 0 { - slog.Debug(fmt.Sprintf("calling l.notify for %+v with guid %s for %d missed items on channel %s", n, notifyGuid, len(missed), channel)) + slog.Debug("calling l.notify for missed items", "notification", n, "notifyGuid", notifyGuid, "missedCount", len(missed), "channel", channel) stopCh := make(chan struct{}) defer close(stopCh) // If logging is enabled, periodically record notifications that are still waiting @@ -221,13 +220,13 @@ func (l *ListenerProvider) notify(channel string, n interface{}, prevMissedItems for { select { case <-tick.C: - slog.Debug(fmt.Sprintf("still waiting on l.notify for +%v with guid %s on channel %s", n, notifyGuid, channel)) + slog.Debug("still waiting on l.notify", "notification", n, "notifyGuid", notifyGuid, "channel", channel) case <-stop: return } } }(stopCh) l.notify(channel, n, missed) - slog.Debug(fmt.Sprintf("completed calling l.notify for %d missed items with guid %s on channel %s", len(missed), notifyGuid, channel)) + slog.Debug("completed calling l.notify for missed items", "missedCount", len(missed), "notifyGuid", notifyGuid, "channel", channel) } } diff --git a/pkg/rsnotify/listeners/local/locallistener_test.go b/pkg/rsnotify/listeners/local/locallistener_test.go index f9c8a8d..84d89f7 100644 --- a/pkg/rsnotify/listeners/local/locallistener_test.go +++ b/pkg/rsnotify/listeners/local/locallistener_test.go @@ -62,7 +62,7 @@ func (s *LocalNotifySuite) TestNotifications(c *check.C) { return } case e := <-errs: - slog.Error(fmt.Sprintf("error received: %s", e)) + slog.Error("error received", "error", e) testError = e return } @@ -109,7 +109,7 @@ func (s *LocalNotifySuite) TestNotifications(c *check.C) { c.Assert(i.(*listener.TestNotification).Val, check.Equals, "second-test") return case e := <-errs: - slog.Error(fmt.Sprintf("error received: %s", e)) + slog.Error("error received", "error", e) c.FailNow() } } @@ -125,7 +125,7 @@ func (s *LocalNotifySuite) TestNotifications(c *check.C) { c.Assert(i.(*listener.TestNotification).Val, check.Equals, "second-test-bb") return case e := <-errs2: - slog.Error(fmt.Sprintf("error received: %s", e)) + slog.Error("error received", "error", e) c.FailNow() } } @@ -190,7 +190,7 @@ func (s *LocalNotifySuite) TestNotificationsBlock(c *check.C) { return } case e := <-errs: - slog.Error(fmt.Sprintf("error received: %s", e)) + slog.Error("error received", "error", e) testError = e return } @@ -251,7 +251,7 @@ func (s *LocalNotifySuite) TestNotificationsErrs(c *check.C) { } count++ case e := <-errs: - slog.Error(fmt.Sprintf("error received: %s", e)) + slog.Error("error received", "error", e) if e.Error() != "a notification must be of type listener.Notification" { testError = fmt.Errorf("invalid error received: %s", e.Error()) return diff --git a/pkg/rsnotify/listeners/postgrespgx/pgxlistener.go b/pkg/rsnotify/listeners/postgrespgx/pgxlistener.go index 77a2590..7dbf7b3 100644 --- a/pkg/rsnotify/listeners/postgrespgx/pgxlistener.go +++ b/pkg/rsnotify/listeners/postgrespgx/pgxlistener.go @@ -38,7 +38,7 @@ func (p *PgxIPReporter) IP() string { var ipNet net.IPNet err := p.pool.QueryRow(context.Background(), query).Scan(&ipNet) if err != nil { - slog.Error(fmt.Sprintf("Unable to determine client IP with inet_client_addr(). %s", err)) + slog.Error("Unable to determine client IP with inet_client_addr()", "error", err) return ip } if ipNet.IP != nil { @@ -218,7 +218,7 @@ func (l *PgxListener) acquire(ready chan struct{}) (err error) { select { case <-ready: // Already closed. This means that we are reconnecting - slog.Info(fmt.Sprintf("successfully reconnected listener %s", l.name)) + slog.Info("successfully reconnected listener", "name", l.name) default: // Close the `ready` channel to signal that `Listen()` can return. close(ready) @@ -326,10 +326,10 @@ func (l *PgxListener) notify(n *pgconn.Notification, errs chan error, items chan } func (l *PgxListener) Stop() { - slog.Debug(fmt.Sprintf("Signaling context to cancel listener %s", l.name)) + slog.Debug("Signaling context to cancel listener", "name", l.name) l.cancel() // Wait for stop - slog.Debug(fmt.Sprintf("Waiting for listener %s to stop...", l.name)) + slog.Debug("Waiting for listener to stop...", "name", l.name) <-l.exit // Clean up connection @@ -339,5 +339,5 @@ func (l *PgxListener) Stop() { l.conn = nil } - slog.Debug(fmt.Sprintf("Listener %s closed.", l.name)) + slog.Debug("Listener closed.", "name", l.name) } diff --git a/pkg/rsnotify/listeners/postgrespgx/pgxlistener_test.go b/pkg/rsnotify/listeners/postgrespgx/pgxlistener_test.go index 32d95c3..189d108 100644 --- a/pkg/rsnotify/listeners/postgrespgx/pgxlistener_test.go +++ b/pkg/rsnotify/listeners/postgrespgx/pgxlistener_test.go @@ -4,7 +4,6 @@ package postgrespgx import ( "context" - "fmt" "log/slog" "strings" "testing" @@ -170,7 +169,7 @@ func (s *PgxNotifySuite) TestNotificationsNormal(c *check.C) { return } case e := <-errs: - slog.Error(fmt.Sprintf("error received: %s", e)) + slog.Error("error received", "error", e) c.FailNow() } } @@ -244,7 +243,7 @@ func (s *PgxNotifySuite) TestNotificationsNormal(c *check.C) { c.Assert(i.(*testNotification).Val, check.Equals, "second-test") return case e := <-errs: - slog.Error(fmt.Sprintf("error received: %s", e)) + slog.Error("error received", "error", e) c.FailNow() } } @@ -424,7 +423,7 @@ func (s *PgxNotifySuite) TestNotificationsBlock(c *check.C) { return } case e := <-errs: - slog.Error(fmt.Sprintf("error received: %s", e)) + slog.Error("error received", "error", e) c.FailNow() } } diff --git a/pkg/rsnotify/listeners/postgrespq/pqlistener.go b/pkg/rsnotify/listeners/postgrespq/pqlistener.go index 3795fb2..f71c6c4 100644 --- a/pkg/rsnotify/listeners/postgrespq/pqlistener.go +++ b/pkg/rsnotify/listeners/postgrespq/pqlistener.go @@ -140,7 +140,7 @@ func (l *PqListener) acquire(ready chan struct{}) (err error) { select { case <-ready: // Already closed. This means that we are reconnecting - slog.Info(fmt.Sprintf("successfully reconnected listener %s", l.name)) + slog.Info("successfully reconnected listener", "name", l.name) default: // Close the `ready` channel to signal that `Listen()` can return. close(ready) @@ -196,10 +196,10 @@ func (l *PqListener) notify(n *pq.Notification, errs chan error, items chan list } func (l *PqListener) Stop() { - slog.Debug(fmt.Sprintf("Signaling context to cancel listener %s", l.name)) + slog.Debug("Signaling context to cancel listener", "name", l.name) l.cancel() // Wait for stop - slog.Debug(fmt.Sprintf("Waiting for listener %s to stop...", l.name)) + slog.Debug("Waiting for listener to stop...", "name", l.name) <-l.exit // Clean up connection @@ -209,5 +209,5 @@ func (l *PqListener) Stop() { l.conn = nil } - slog.Debug(fmt.Sprintf("Listener %s closed.", l.name)) + slog.Debug("Listener closed.", "name", l.name) } diff --git a/pkg/rsnotify/listeners/postgrespq/pqlistener_test.go b/pkg/rsnotify/listeners/postgrespq/pqlistener_test.go index c60eb92..642dfaa 100644 --- a/pkg/rsnotify/listeners/postgrespq/pqlistener_test.go +++ b/pkg/rsnotify/listeners/postgrespq/pqlistener_test.go @@ -3,7 +3,6 @@ package postgrespq // Copyright (C) 2022 by RStudio, PBC. import ( - "fmt" "log/slog" "strings" "testing" @@ -137,7 +136,7 @@ func (s *PqNotifySuite) TestNotificationsNormal(c *check.C) { return } case e := <-errs: - slog.Error(fmt.Sprintf("error received: %s", e)) + slog.Error("error received", "error", e) c.FailNow() } } @@ -189,7 +188,7 @@ func (s *PqNotifySuite) TestNotificationsNormal(c *check.C) { c.Assert(i.(*testNotification).Val, check.Equals, "second-test") return case e := <-errs: - slog.Error(fmt.Sprintf("error received: %s", e)) + slog.Error("error received", "error", e) c.FailNow() } } @@ -329,7 +328,7 @@ func (s *PqNotifySuite) TestNotificationsBlock(c *check.C) { return } case e := <-errs: - slog.Error(fmt.Sprintf("error received: %s", e)) + slog.Error("error received", "error", e) c.FailNow() } } diff --git a/pkg/rsqueue/runnerfactory/runnerfactory.go b/pkg/rsqueue/runnerfactory/runnerfactory.go index 0a45725..916ece2 100644 --- a/pkg/rsqueue/runnerfactory/runnerfactory.go +++ b/pkg/rsqueue/runnerfactory/runnerfactory.go @@ -63,7 +63,7 @@ func (r *RunnerFactory) Stop(timeout time.Duration) error { defer wg.Done() err := runner.Stop(timeout) if err != nil { - slog.Error(fmt.Sprintf("Error stopping runner for type %d: %s", key, err)) + slog.Error("Error stopping runner", "type", key, "error", err) } r.types.SetEnabled(key, false) }(key, runner) diff --git a/pkg/rsstorage/internal/chunks.go b/pkg/rsstorage/internal/chunks.go index dbc907f..2c04165 100644 --- a/pkg/rsstorage/internal/chunks.go +++ b/pkg/rsstorage/internal/chunks.go @@ -108,7 +108,7 @@ func (w *DefaultChunkUtils) WriteChunked( }) if err != nil { // TODO: Update DefaultChunkUtils to acceptable a logger - slog.Error(fmt.Sprintf("Error notifying store of chunk completion for address=%s; chunk=%d: %s", address, count, err)) + slog.Error("Error notifying store of chunk completion", "address", address, "chunk", count, "error", err) } if count == numChunks { return nil diff --git a/pkg/rsstorage/internal/integration_test/chunks_test.go b/pkg/rsstorage/internal/integration_test/chunks_test.go index 2cb7085..2368705 100644 --- a/pkg/rsstorage/internal/integration_test/chunks_test.go +++ b/pkg/rsstorage/internal/integration_test/chunks_test.go @@ -8,7 +8,6 @@ import ( "crypto/sha256" "encoding/hex" "encoding/json" - "fmt" "io" "log/slog" "os" @@ -141,9 +140,9 @@ func (s *ChunksIntegrationSuite) TestWriteChunked(c *check.C) { serverSet := s.NewServerSet(c, "chunks", "") for key, server := range serverSet { if testing.Short() && key != "file" { - slog.Info(fmt.Sprintf("skipping chunks integration tests for %s because -short was provided", key)) + slog.Info("skipping chunks integration tests because -short was provided", "server", key) } else { - slog.Info(fmt.Sprintf("testing chunks integration tests for %s", key)) + slog.Info("testing chunks integration tests", "server", key) s.check(c, server) } } @@ -249,7 +248,7 @@ func (s *ChunksPartialReadSuite) TearDownSuite(c *check.C) { func (s *ChunksPartialReadSuite) TestReadPartialOk(c *check.C) { ctx := context.Background() - slog.Info(fmt.Sprintf("Starting test %s", c.TestName())) + slog.Info("Starting test", "name", c.TestName()) defer leaktest.Check(c) sz := uint64(len(servertest.TestDESC)) @@ -314,7 +313,7 @@ func (s *ChunksPartialReadSuite) TestReadPartialOk(c *check.C) { resolve := func(writer io.Writer) (dir, address string, err error) { defer pR.Close() n, err := io.Copy(writer, pR) - slog.Info(fmt.Sprintf("Done resolving %d bytes", n)) + slog.Info("Done resolving", "bytes", n) return } @@ -417,7 +416,7 @@ func (s *ChunksPartialReadSuite) TestReadPartialTimeout(c *check.C) { resolve := func(writer io.Writer) (dir, address string, err error) { defer pR.Close() n, err := io.Copy(writer, pR) - slog.Info(fmt.Sprintf("Done resolving %d bytes", n)) + slog.Info("Done resolving", "bytes", n) return } diff --git a/pkg/rsstorage/internal/integration_test/integration_test.go b/pkg/rsstorage/internal/integration_test/integration_test.go index 65904b8..6ed1b18 100644 --- a/pkg/rsstorage/internal/integration_test/integration_test.go +++ b/pkg/rsstorage/internal/integration_test/integration_test.go @@ -268,7 +268,7 @@ func (s *StorageIntegrationSuite) CheckFile( sz int64, chunked bool, ) { - slog.Info(fmt.Sprintf("(%s) Verifying existence of %s on server=%s (with dir=%s)", test, address, class, dir)) + slog.Info("Verifying existence", "test", test, "address", address, "server", class, "dir", dir) // Next, get it r, ch, sz, _, ok, err := server.Get(context.Background(), dir, address) @@ -297,7 +297,7 @@ func (s *StorageIntegrationSuite) CheckFile( // Verifies that a given asset does not exist func (s *StorageIntegrationSuite) CheckFileGone(c *check.C, server rsstorage.StorageServer, test, dir, address, classSource string) { - slog.Info(fmt.Sprintf("(%s) Verifying removal of %s on server=%s (with dir=%s)", test, address, classSource, dir)) + slog.Info("Verifying removal", "test", test, "address", address, "server", classSource, "dir", dir) ok, _, _, _, err := server.Check(context.Background(), "", address) c.Check(err, check.IsNil) @@ -326,7 +326,7 @@ func (s *StorageIntegrationSuite) TestMoving(c *check.C) { // Verify for classSource := range sources { - slog.Info(fmt.Sprintf("\nVerify that files were successfully moved from %s to each destination server:", classSource)) + slog.Info("Verify that files were successfully moved to each destination server", "source", classSource) for classDest, dest := range dests { // Files exist on destination s.CheckFile( @@ -367,7 +367,7 @@ func (s *StorageIntegrationSuite) TestMoving(c *check.C) { // Verify that files do not exist on source for classSource, source := range sources { - slog.Info(fmt.Sprintf("\nVerify that moved files were deleted from the %s server:", classSource)) + slog.Info("Verify that moved files were deleted from the server", "source", classSource) for classDest := range dests { s.CheckFileGone(c, source, "MoveSrc", "", fmt.Sprintf("%s--%s", classSource, classDest), classSource) s.CheckFileGone(c, source, "MoveSrc", "dir", fmt.Sprintf("%s--%s", classSource, classDest), classSource) @@ -397,7 +397,7 @@ func (s *StorageIntegrationSuite) TestCopying(c *check.C) { // Verify files have been copied to destination for classSource := range sources { - slog.Info(fmt.Sprintf("\nVerify that files were successfully copied from %s to each destination server:", classSource)) + slog.Info("Verify that files were successfully copied to each destination server", "source", classSource) for classDest, dest := range dests { // Files exist on destination s.CheckFile( @@ -438,7 +438,7 @@ func (s *StorageIntegrationSuite) TestCopying(c *check.C) { // Verify files are still on source for classSource, source := range sources { - slog.Info(fmt.Sprintf("\nVerify that original files still remain on the %s server:", classSource)) + slog.Info("Verify that original files still remain on the server", "source", classSource) for classDest := range dests { // Files still exist on source s.CheckFile( @@ -479,14 +479,14 @@ func (s *StorageIntegrationSuite) TestCopying(c *check.C) { // Test Enumeration for classSource, source := range sources { - slog.Info(fmt.Sprintf("\nVerify enumeration on the %s source server:", classSource)) + slog.Info("Verify enumeration on the source server", "source", classSource) items, err := source.Enumerate(ctx) c.Assert(err, check.IsNil) // Each source should have three files for each destination c.Assert(items, check.HasLen, len(dests)*3) } for classDest, dest := range dests { - slog.Info(fmt.Sprintf("\nVerify enumeration on the %s destination server:", classDest)) + slog.Info("Verify enumeration on the destination server", "dest", classDest) items, err := dest.Enumerate(ctx) c.Assert(err, check.IsNil) // Each destination should have three files from each source @@ -495,7 +495,7 @@ func (s *StorageIntegrationSuite) TestCopying(c *check.C) { // Test Removal for classSource, source := range sources { - slog.Info(fmt.Sprintf("\nVerify forced removal of assets on the %s source server:", classSource)) + slog.Info("Verify forced removal of assets on the source server", "source", classSource) for classDest := range dests { err := source.Remove(ctx, "", fmt.Sprintf("%s--%s", classSource, classDest)) c.Assert(err, check.IsNil) diff --git a/pkg/rsstorage/servers/file/server.go b/pkg/rsstorage/servers/file/server.go index 94e6e59..f36fc87 100644 --- a/pkg/rsstorage/servers/file/server.go +++ b/pkg/rsstorage/servers/file/server.go @@ -136,7 +136,7 @@ func (s *StorageServer) CalculateUsage() (types.Usage, error) { timeInfo := time.Now() elapsed := timeInfo.Sub(start) - slog.Debug(fmt.Sprintf("Calculated disk info for %s in %s.\n", s.dir, elapsed)) + slog.Debug("Calculated disk info", "dir", s.dir, "elapsed", elapsed) actual, err := diskUsage(s.dir, s.cacheTimeout, s.walkTimeout) if err != nil { @@ -145,7 +145,7 @@ func (s *StorageServer) CalculateUsage() (types.Usage, error) { timeUsage := time.Now() elapsed = timeUsage.Sub(timeInfo) - slog.Debug(fmt.Sprintf("Calculated disk usage for %s in %s.\n", s.dir, elapsed)) + slog.Debug("Calculated disk usage", "dir", s.dir, "elapsed", elapsed) usage := types.Usage{ SizeBytes: datasize.ByteSize(all), @@ -320,7 +320,7 @@ func (s *StorageServer) write(resolve types.Resolver) (dir, address, staging str defer stagingFile.Close() staging = stagingFile.Name() - slog.Debug(fmt.Sprintf("Opened new staging file for storage: %s.\n", stagingFile.Name())) + slog.Debug("Opened new staging file for storage", "file", stagingFile.Name()) // Resolve/get the data we need dir, address, err = resolve(stagingFile) @@ -336,7 +336,7 @@ func (s *StorageServer) cleanup(staging string) { removeError := s.fileIO.Remove(staging) if removeError != nil && !os.IsNotExist(removeError) { // Warn and discard errors cleaning up - slog.Debug(fmt.Sprintf("file.StorageServer error while cleaning up staged data: %s", removeError)) + slog.Debug("file.StorageServer error while cleaning up staged data", "error", removeError) } } @@ -361,7 +361,7 @@ func (s *StorageServer) Remove(ctx context.Context, dir, address string) error { func (s *StorageServer) Enumerate(ctx context.Context) ([]types.StoredItem, error) { items, err := enumerate(s.dir, s.walkTimeout) if err != nil { - slog.Error(fmt.Sprintf("Error enumerating storage: %s", err)) + slog.Error("Error enumerating storage", "error", err) return nil, err } @@ -383,7 +383,7 @@ func enumerate(dir string, walkTimeout time.Duration) ([]types.StoredItem, error err := filepath.WalkDir(dir, func(path string, info fs.DirEntry, err error) error { if err != nil { - slog.Error(fmt.Sprintf("Error enumerating storage for directory %s: %s", dir, err)) + slog.Error("Error enumerating storage for directory", "dir", dir, "error", err) return nil } @@ -445,10 +445,10 @@ func enumerate(dir string, walkTimeout time.Duration) ([]types.StoredItem, error func (s *StorageServer) move(dir, address string, server rsstorage.StorageServer) error { source := s.Locate(dir, address) dest := server.Locate(dir, address) - slog.Debug(fmt.Sprintf("Renaming %s to %s", source, dest)) + slog.Debug("Renaming file", "source", source, "dest", dest) destDir := filepath.Dir(dest) if destDir != dest { - slog.Debug(fmt.Sprintf("Ensuring directory %s exists", destDir)) + slog.Debug("Ensuring directory exists", "dir", destDir) err := os.MkdirAll(destDir, 0700) if err != nil { return err @@ -456,7 +456,7 @@ func (s *StorageServer) move(dir, address string, server rsstorage.StorageServer } err := os.Rename(source, dest) if err != nil { - slog.Debug(fmt.Sprintf("Error moving with os.Rename: %s", err)) + slog.Debug("Error moving with os.Rename", "error", err) return err } diff --git a/pkg/rsstorage/servers/postgres/server.go b/pkg/rsstorage/servers/postgres/server.go index dfea317..b01e5be 100644 --- a/pkg/rsstorage/servers/postgres/server.go +++ b/pkg/rsstorage/servers/postgres/server.go @@ -60,14 +60,14 @@ func NewStorageServer(args StorageServerArgs) rsstorage.StorageServer { func pgxCommit(tx pgx.Tx, desc string, err *error) { var finErr error if *err == nil { - slog.Debug(fmt.Sprintf("Committing large object on success for operation %s", desc)) + slog.Debug("Committing large object on success", "operation", desc) finErr = tx.Commit(context.Background()) } else { - slog.Debug(fmt.Sprintf("Rolling back large object on error for operation %s: %s", desc, *err)) + slog.Debug("Rolling back large object on error", "operation", desc, "error", *err) finErr = tx.Rollback(context.Background()) } if finErr != nil { - slog.Debug(fmt.Sprintf("Error committing large object: %s", finErr)) + slog.Debug("Error committing large object", "error", finErr) if *err == nil { *err = finErr } @@ -162,14 +162,14 @@ func (s *StorageServer) Check(ctx context.Context, dir, address string) (found b los := tx.LargeObjects() // Open the large object - slog.Debug(fmt.Sprintf("Opening (for check) large object %s with oid %d.", location, dbOid)) + slog.Debug("Opening (for check) large object", "location", location, "oid", dbOid) var lo *pgx.LargeObject if lo, err = los.Open(ctx, dbOid, pgx.LargeObjectModeRead); err != nil { return } defer func(err *error) { var finErr error - slog.Debug(fmt.Sprintf("Closing (after check) large object %s with oid %d.", location, dbOid)) + slog.Debug("Closing (after check) large object", "location", location, "oid", dbOid) finErr = lo.Close() if finErr != nil { if *err == nil { @@ -192,7 +192,7 @@ func (s *StorageServer) Check(ctx context.Context, dir, address string) (found b // TODO: This may be inefficient. Research other ways of getting the correct size // TODO: also return the `ts` (modification time) if sz, err = lo.Seek(0, io.SeekEnd); err != nil { - slog.Debug(fmt.Sprintf("failed during seek: %s", err)) + slog.Debug("failed during seek", "error", err) return } } @@ -265,7 +265,7 @@ func (s *StorageServer) Get( los := tx.LargeObjects() // Open the large object - slog.Debug(fmt.Sprintf("Opening (for read) large object %s with oid %d.", location, dbOid)) + slog.Debug("Opening (for read) large object", "location", location, "oid", dbOid) var lo *pgx.LargeObject if lo, err = los.Open(ctx, dbOid, pgx.LargeObjectModeRead); err != nil { return @@ -347,14 +347,14 @@ func (s *StorageServer) Put(ctx context.Context, resolve types.Resolver, dir, ad // Create a new large object var oid uint32 if oid, err = los.Create(ctx, 0); err != nil { - slog.Debug(fmt.Sprintf("Error creating large object: %s", err)) + slog.Debug("Error creating large object", "error", err) return } // Open the new large object var lo *pgx.LargeObject if lo, err = los.Open(ctx, oid, pgx.LargeObjectModeWrite); err != nil { - slog.Debug(fmt.Sprintf("Error opening large object: %s", err)) + slog.Debug("Error opening large object", "error", err) return } @@ -364,15 +364,15 @@ func (s *StorageServer) Put(ctx context.Context, resolve types.Resolver, dir, ad // Insert the large object's OID in the mapping table insert := `INSERT INTO large_objects (oid, address) VALUES ($1, $2)` if _, err = tx.Exec(ctx, insert, oid, tempLocation); err != nil { - slog.Debug(fmt.Sprintf("Error inserting large object into mapping table: %s", err)) + slog.Debug("Error inserting large object into mapping table", "error", err) return } // Copy the staging file to the large object - slog.Debug(fmt.Sprintf("Copying data to large object")) + slog.Debug("Copying data to large object") wdir, waddress, err := resolve(lo) if err != nil { - slog.Debug(fmt.Sprintf("Error copying/resolving large object to Postgres LO storage: %s", err)) + slog.Debug("Error copying/resolving large object to Postgres LO storage", "error", err) return } @@ -389,19 +389,19 @@ func (s *StorageServer) Put(ctx context.Context, resolve types.Resolver, dir, ad // Remove any conflicting mappings delete := `DELETE FROM large_objects WHERE address = $1` if _, err = tx.Exec(ctx, delete, permanentLocation); err != nil { - slog.Debug(fmt.Sprintf("Error deleting existing large object records from mapping table: %s", err)) + slog.Debug("Error deleting existing large object records from mapping table", "error", err) return } // Rename the location rename := `UPDATE large_objects SET address = $1 WHERE address = $2` if _, err = tx.Exec(ctx, rename, permanentLocation, tempLocation); err != nil { - slog.Debug(fmt.Sprintf("Error setting large object record permanent address in mapping table: %s", err)) + slog.Debug("Error setting large object record permanent address in mapping table", "error", err) return } if err = lo.Close(); err != nil { - slog.Debug(fmt.Sprintf("Error closing large object: %s", err)) + slog.Debug("Error closing large object", "error", err) return } @@ -473,7 +473,7 @@ func (s *StorageServer) rem(ctx context.Context, location string) (err error) { // Remove the mapping delete := `DELETE FROM large_objects WHERE address = $1` if _, err = tx.Exec(ctx, delete, location); err != nil { - slog.Debug(fmt.Sprintf("Error deleting large object record from mapping table: %s", err)) + slog.Debug("Error deleting large object record from mapping table", "error", err) return } @@ -547,7 +547,7 @@ func (s *StorageServer) move(ctx context.Context, dir, address string, server rs destination := server.Locate(part.Dir, part.Address) delete := `UPDATE large_objects SET address = $1 WHERE address = $2` if _, err = tx.Exec(ctx, delete, destination, source); err != nil { - slog.Debug(fmt.Sprintf("Error updating (move) large object record in mapping table: %s", err)) + slog.Debug("Error updating (move) large object record in mapping table", "error", err) return } } diff --git a/pkg/rsstorage/servers/s3server/s3_enumeration.go b/pkg/rsstorage/servers/s3server/s3_enumeration.go index ae4cef9..8a72e41 100644 --- a/pkg/rsstorage/servers/s3server/s3_enumeration.go +++ b/pkg/rsstorage/servers/s3server/s3_enumeration.go @@ -136,7 +136,7 @@ func (a *DefaultAwsOps) BucketObjects( atomic.AddUint64(&ops, uint64(len(bm))) if ops > 1000 { atomic.AddUint64(&total, atomic.LoadUint64(&ops)) - slog.Info(fmt.Sprintf("For S3 prefix %s parsed %d files", s3Prefix, atomic.LoadUint64(&total))) + slog.Info("Parsed S3 files", "prefix", s3Prefix, "fileCount", atomic.LoadUint64(&total)) atomic.SwapUint64(&ops, 0) } @@ -249,7 +249,7 @@ func (a *DefaultAwsOps) BucketObjectsETagMap( atomic.AddUint64(&ops, uint64(len(bm))) if ops > 1000 { atomic.AddUint64(&total, atomic.LoadUint64(&ops)) - slog.Info(fmt.Sprintf("For S3 prefix %s parsed %d files", s3Prefix, atomic.LoadUint64(&total))) + slog.Info("Parsed S3 files", "prefix", s3Prefix, "fileCount", atomic.LoadUint64(&total)) atomic.SwapUint64(&ops, 0) }