From 4f0d39cc64987d967e5691e49da8c9160ba50285 Mon Sep 17 00:00:00 2001 From: Egon Elbre Date: Wed, 31 Jul 2019 17:38:44 +0300 Subject: [PATCH] don't use global loggers (#2675) --- bootstrap/peer.go | 2 +- cmd/bootstrap/main.go | 2 +- cmd/gateway/main.go | 4 +-- cmd/identity/main.go | 5 ++-- cmd/satellite/main.go | 2 +- cmd/storagenode/main.go | 2 +- cmd/uplink/cmd/root.go | 2 +- examples/eestream/serve-collected/main.go | 3 +- examples/eestream/serve/main.go | 3 +- examples/eestream/store/main.go | 3 +- internal/version/service.go | 34 +++++++++++++++-------- pkg/certificates/config.go | 2 +- pkg/process/debug.go | 2 +- pkg/process/metrics.go | 10 +++---- pkg/server/config.go | 4 +-- pkg/telemetry/client.go | 8 ++++-- pkg/telemetry/client_test.go | 15 +++++----- pkg/telemetry/example_test.go | 3 +- pkg/telemetry/tm_test.go | 3 +- satellite/peer.go | 2 +- storagenode/peer.go | 2 +- uplink/ecclient/client.go | 6 ++-- uplink/eestream/decode.go | 17 +++++++----- uplink/eestream/encode.go | 20 +++++++------ uplink/eestream/piecebuf.go | 10 +++---- uplink/eestream/rs_test.go | 25 +++++++++-------- uplink/eestream/stripe.go | 5 ++-- versioncontrol/peer.go | 4 +-- 28 files changed, 115 insertions(+), 85 deletions(-) diff --git a/bootstrap/peer.go b/bootstrap/peer.go index f7a728ab4..44bb5d17c 100644 --- a/bootstrap/peer.go +++ b/bootstrap/peer.go @@ -99,7 +99,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, config Config, ver peer.Log.Sugar().Debugf("Binary Version: %s with CommitHash %s, built at %s as Release %v", versionInfo.Version.String(), versionInfo.CommitHash, versionInfo.Timestamp.String(), versionInfo.Release) } - peer.Version = version.NewService(config.Version, versionInfo, "Bootstrap") + peer.Version = version.NewService(log.Named("version"), config.Version, versionInfo, "Bootstrap") } { // setup listener and server diff --git a/cmd/bootstrap/main.go b/cmd/bootstrap/main.go index 01f6e38a2..0e20b237f 100644 --- a/cmd/bootstrap/main.go +++ b/cmd/bootstrap/main.go @@ -99,7 +99,7 @@ func cmdRun(cmd *cobra.Command, args []string) (err error) { return err } - if err := process.InitMetricsWithCertPath(ctx, nil, runCfg.Identity.CertPath); err != nil { + if err := process.InitMetricsWithCertPath(ctx, log, nil, runCfg.Identity.CertPath); err != nil { zap.S().Error("Failed to initialize telemetry batcher: ", err) } diff --git a/cmd/gateway/main.go b/cmd/gateway/main.go index 9ef0e2f3a..135f1b6e3 100644 --- a/cmd/gateway/main.go +++ b/cmd/gateway/main.go @@ -149,11 +149,11 @@ func cmdRun(cmd *cobra.Command, args []string) (err error) { ctx := process.Ctx(cmd) - if err := process.InitMetrics(ctx, nil, ""); err != nil { + if err := process.InitMetrics(ctx, zap.L(), nil, ""); err != nil { zap.S().Error("Failed to initialize telemetry batcher: ", err) } - err = version.CheckProcessVersion(ctx, runCfg.Version, version.Build, "Gateway") + err = version.CheckProcessVersion(ctx, zap.L(), runCfg.Version, version.Build, "Gateway") if err != nil { return err } diff --git a/cmd/identity/main.go b/cmd/identity/main.go index bc495db11..c289f0b22 100644 --- a/cmd/identity/main.go +++ b/cmd/identity/main.go @@ -12,6 +12,7 @@ import ( "github.com/spf13/cobra" "github.com/zeebo/errs" + "go.uber.org/zap" "storj.io/storj/internal/fpath" "storj.io/storj/internal/version" @@ -86,7 +87,7 @@ func serviceDirectory(serviceName string) string { func cmdNewService(cmd *cobra.Command, args []string) error { ctx := process.Ctx(cmd) - err := version.CheckProcessVersion(ctx, config.Version, version.Build, "Identity") + err := version.CheckProcessVersion(ctx, zap.L(), config.Version, version.Build, "Identity") if err != nil { return err } @@ -148,7 +149,7 @@ func cmdNewService(cmd *cobra.Command, args []string) error { func cmdAuthorize(cmd *cobra.Command, args []string) error { ctx := process.Ctx(cmd) - err := version.CheckProcessVersion(ctx, config.Version, version.Build, "Identity") + err := version.CheckProcessVersion(ctx, zap.L(), config.Version, version.Build, "Identity") if err != nil { return err } diff --git a/cmd/satellite/main.go b/cmd/satellite/main.go index 96fc3fbf3..1b98a4b05 100644 --- a/cmd/satellite/main.go +++ b/cmd/satellite/main.go @@ -142,7 +142,7 @@ func cmdRun(cmd *cobra.Command, args []string) (err error) { return err } - if err := process.InitMetricsWithCertPath(ctx, nil, runCfg.Identity.CertPath); err != nil { + if err := process.InitMetricsWithCertPath(ctx, log, nil, runCfg.Identity.CertPath); err != nil { zap.S().Error("Failed to initialize telemetry batcher: ", err) } diff --git a/cmd/storagenode/main.go b/cmd/storagenode/main.go index d10399970..973c654a7 100644 --- a/cmd/storagenode/main.go +++ b/cmd/storagenode/main.go @@ -152,7 +152,7 @@ func cmdRun(cmd *cobra.Command, args []string) (err error) { return err } - if err := process.InitMetricsWithCertPath(ctx, nil, runCfg.Identity.CertPath); err != nil { + if err := process.InitMetricsWithCertPath(ctx, log, nil, runCfg.Identity.CertPath); err != nil { zap.S().Error("Failed to initialize telemetry batcher: ", err) } diff --git a/cmd/uplink/cmd/root.go b/cmd/uplink/cmd/root.go index c0d89c54f..d826422d0 100644 --- a/cmd/uplink/cmd/root.go +++ b/cmd/uplink/cmd/root.go @@ -98,7 +98,7 @@ func (cliCfg *UplinkFlags) NewUplink(ctx context.Context) (*libuplink.Uplink, er // GetProject returns a *libuplink.Project for interacting with a specific project func (cliCfg *UplinkFlags) GetProject(ctx context.Context) (*libuplink.Project, error) { - err := version.CheckProcessVersion(ctx, cliCfg.Version, version.Build, "Uplink") + err := version.CheckProcessVersion(ctx, zap.L(), cliCfg.Version, version.Build, "Uplink") if err != nil { return nil, err } diff --git a/examples/eestream/serve-collected/main.go b/examples/eestream/serve-collected/main.go index 1b1ed52f0..227a5cf5c 100644 --- a/examples/eestream/serve-collected/main.go +++ b/examples/eestream/serve-collected/main.go @@ -13,6 +13,7 @@ import ( "time" "github.com/vivint/infectious" + "go.uber.org/zap" "storj.io/storj/pkg/encryption" "storj.io/storj/pkg/ranger" @@ -74,7 +75,7 @@ func Main() error { } rrs[res.i] = res.rr } - rc, err := eestream.Decode(rrs, es, 4*1024*1024, false) + rc, err := eestream.Decode(zap.L(), rrs, es, 4*1024*1024, false) if err != nil { return err } diff --git a/examples/eestream/serve/main.go b/examples/eestream/serve/main.go index 15618eccc..7b6b6f1e8 100644 --- a/examples/eestream/serve/main.go +++ b/examples/eestream/serve/main.go @@ -17,6 +17,7 @@ import ( "time" "github.com/vivint/infectious" + "go.uber.org/zap" "storj.io/storj/pkg/encryption" "storj.io/storj/pkg/ranger" @@ -74,7 +75,7 @@ func Main() error { } rrs[piecenum] = r } - rc, err := eestream.Decode(rrs, es, 4*1024*1024, false) + rc, err := eestream.Decode(zap.L(), rrs, es, 4*1024*1024, false) if err != nil { return err } diff --git a/examples/eestream/store/main.go b/examples/eestream/store/main.go index ce4150880..3025df103 100644 --- a/examples/eestream/store/main.go +++ b/examples/eestream/store/main.go @@ -13,6 +13,7 @@ import ( "path/filepath" "github.com/vivint/infectious" + "go.uber.org/zap" "storj.io/storj/pkg/encryption" "storj.io/storj/pkg/storj" @@ -60,7 +61,7 @@ func Main() error { if err != nil { return err } - readers, err := eestream.EncodeReader(context.Background(), + readers, err := eestream.EncodeReader(context.Background(), zap.L(), encryption.TransformReader(eestream.PadReader(os.Stdin, encrypter.InBlockSize()), encrypter, 0), rs) if err != nil { diff --git a/internal/version/service.go b/internal/version/service.go index fb59d303a..b94b0b95b 100644 --- a/internal/version/service.go +++ b/internal/version/service.go @@ -26,6 +26,7 @@ type Config struct { // Service contains the information and variables to ensure the Software is up to date type Service struct { + log *zap.Logger config Config info Info service string @@ -38,8 +39,9 @@ type Service struct { } // NewService creates a Version Check Client with default configuration -func NewService(config Config, info Info, service string) (client *Service) { +func NewService(log *zap.Logger, config Config, info Info, service string) (client *Service) { return &Service{ + log: log, config: config, info: info, service: service, @@ -59,9 +61,9 @@ func (srv *Service) CheckVersion(ctx context.Context) (err error) { // CheckProcessVersion is not meant to be used for peers but is meant to be // used for other utilities -func CheckProcessVersion(ctx context.Context, config Config, info Info, service string) (err error) { +func CheckProcessVersion(ctx context.Context, log *zap.Logger, config Config, info Info, service string) (err error) { defer mon.Task()(&ctx)(&err) - return NewService(config, info, service).CheckVersion(ctx) + return NewService(log, config, info, service).CheckVersion(ctx) } // Run logs the current version information @@ -105,22 +107,22 @@ func (srv *Service) checkVersion(ctx context.Context) (allowed bool) { accepted, err := srv.queryVersionFromControlServer(ctx) if err != nil { // Log about the error, but dont crash the service and allow further operation - zap.S().Errorf("Failed to do periodic version check: %s", err.Error()) + srv.log.Sugar().Errorf("Failed to do periodic version check: %s", err.Error()) return true } minimum := getFieldString(&accepted, srv.service) - zap.S().Debugf("allowed minimum version from control server is: %s", minimum.String()) + srv.log.Sugar().Debugf("allowed minimum version from control server is: %s", minimum.String()) if minimum.String() == "" { - zap.S().Errorf("no version from control server, accepting to run") + srv.log.Sugar().Errorf("no version from control server, accepting to run") return true } if isAcceptedVersion(srv.info.Version, minimum) { - zap.S().Infof("running on version %s", srv.info.Version.String()) + srv.log.Sugar().Infof("running on version %s", srv.info.Version.String()) return true } - zap.S().Errorf("running on not allowed/outdated version %s", srv.info.Version.String()) + srv.log.Sugar().Errorf("running on not allowed/outdated version %s", srv.info.Version.String()) return false } @@ -151,8 +153,18 @@ func (srv *Service) queryVersionFromControlServer(ctx context.Context) (ver Allo return ver, err } -// DebugHandler returns a json representation of the current version information for the binary -func DebugHandler(w http.ResponseWriter, r *http.Request) { +// DebugHandler implements version info endpoint. +type DebugHandler struct { + log *zap.Logger +} + +// NewDebugHandler returns new debug handler. +func NewDebugHandler(log *zap.Logger) *DebugHandler { + return &DebugHandler{log} +} + +// ServeHTTP returns a json representation of the current version information for the binary. +func (server *DebugHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { j, err := Build.Marshal() if err != nil { w.WriteHeader(http.StatusInternalServerError) @@ -164,7 +176,7 @@ func DebugHandler(w http.ResponseWriter, r *http.Request) { _, err = w.Write(append(j, '\n')) if err != nil { - zap.S().Errorf("error writing data to client %v", err) + server.log.Sugar().Errorf("error writing data to client %v", err) } } diff --git a/pkg/certificates/config.go b/pkg/certificates/config.go index 277a6da96..f0af21cc6 100644 --- a/pkg/certificates/config.go +++ b/pkg/certificates/config.go @@ -111,7 +111,7 @@ func (c CertServerConfig) Run(ctx context.Context, srv *server.Server) (err erro } certSrv := NewServer( - zap.L(), + zap.L(), // TODO: pass this in from somewhere signer, authDB, uint16(c.MinDifficulty), diff --git a/pkg/process/debug.go b/pkg/process/debug.go index c3922fc20..e1189dd24 100644 --- a/pkg/process/debug.go +++ b/pkg/process/debug.go @@ -36,7 +36,7 @@ func initDebug(logger *zap.Logger, r *monkit.Registry) (err error) { mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol) mux.HandleFunc("/debug/pprof/trace", pprof.Trace) - mux.Handle("/version/", http.StripPrefix("/version", http.HandlerFunc(version.DebugHandler))) + mux.Handle("/version/", http.StripPrefix("/version", version.NewDebugHandler(logger.Named("version")))) mux.Handle("/mon/", http.StripPrefix("/mon", present.HTTP(r))) mux.HandleFunc("/metrics", prometheus) mux.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) { diff --git a/pkg/process/metrics.go b/pkg/process/metrics.go index 8b3b57dcd..ad8de36fc 100644 --- a/pkg/process/metrics.go +++ b/pkg/process/metrics.go @@ -42,7 +42,7 @@ func flagDefault(dev, release string) string { // InitMetrics initializes telemetry reporting. Makes a telemetry.Client and calls // its Run() method in a goroutine. -func InitMetrics(ctx context.Context, r *monkit.Registry, instanceID string) (err error) { +func InitMetrics(ctx context.Context, log *zap.Logger, r *monkit.Registry, instanceID string) (err error) { if *metricCollector == "" || *metricInterval == 0 { return Error.New("telemetry disabled") } @@ -56,7 +56,7 @@ func InitMetrics(ctx context.Context, r *monkit.Registry, instanceID string) (er if len(instanceID) > maxInstanceLength { instanceID = instanceID[:maxInstanceLength] } - c, err := telemetry.NewClient(*metricCollector, telemetry.ClientOpts{ + c, err := telemetry.NewClient(log, *metricCollector, telemetry.ClientOpts{ Interval: *metricInterval, Application: *metricApp + *metricAppSuffix, Instance: instanceID, @@ -75,14 +75,14 @@ func InitMetrics(ctx context.Context, r *monkit.Registry, instanceID string) (er // InitMetricsWithCertPath initializes telemetry reporting, using the node ID // corresponding to the given certificate as the telemetry instance ID. -func InitMetricsWithCertPath(ctx context.Context, r *monkit.Registry, certPath string) error { +func InitMetricsWithCertPath(ctx context.Context, log *zap.Logger, r *monkit.Registry, certPath string) error { var metricsID string nodeID, err := identity.NodeIDFromCertPath(certPath) if err != nil { - zap.S().Errorf("Could not read identity for telemetry setup: %v", err) + log.Sugar().Errorf("Could not read identity for telemetry setup: %v", err) metricsID = "" // InitMetrics() will fill in a default value } else { metricsID = nodeID.String() } - return InitMetrics(ctx, r, metricsID) + return InitMetrics(ctx, log, r, metricsID) } diff --git a/pkg/server/config.go b/pkg/server/config.go index ce80fb128..e8b4e3e8e 100644 --- a/pkg/server/config.go +++ b/pkg/server/config.go @@ -40,10 +40,10 @@ func (sc Config) Run(ctx context.Context, log *zap.Logger, identity *identity.Fu go func() { <-ctx.Done() if closeErr := server.Close(); closeErr != nil { - zap.S().Errorf("Failed to close server: %s", closeErr) + log.Sugar().Errorf("Failed to close server: %s", closeErr) } }() - zap.S().Infof("Node %s started on %s", server.Identity().ID, sc.Address) + log.Sugar().Infof("Node %s started on %s", server.Identity().ID, sc.Address) return server.Run(ctx) } diff --git a/pkg/telemetry/client.go b/pkg/telemetry/client.go index 2b56f0d2f..06e774c0a 100644 --- a/pkg/telemetry/client.go +++ b/pkg/telemetry/client.go @@ -58,6 +58,7 @@ type ClientOpts struct { // Client is a telemetry client for sending UDP packets at a regular interval // from a monkit.Registry type Client struct { + log *zap.Logger interval time.Duration opts admmonkit.Options send func(context.Context, admmonkit.Options) error @@ -65,7 +66,7 @@ type Client struct { // NewClient constructs a telemetry client that sends packets to remoteAddr // over UDP. -func NewClient(remoteAddr string, opts ClientOpts) (rv *Client, err error) { +func NewClient(log *zap.Logger, remoteAddr string, opts ClientOpts) (rv *Client, err error) { if opts.Interval == 0 { opts.Interval = DefaultInterval } @@ -88,6 +89,7 @@ func NewClient(remoteAddr string, opts ClientOpts) (rv *Client, err error) { } return &Client{ + log: log, interval: opts.Interval, send: admmonkit.Send, opts: admmonkit.Options{ @@ -103,7 +105,7 @@ func NewClient(remoteAddr string, opts ClientOpts) (rv *Client, err error) { // Run calls Report roughly every Interval func (c *Client) Run(ctx context.Context) { - zap.S().Debugf("Initialized telemetry batcher with id = %q", c.opts.InstanceId) + c.log.Sugar().Debugf("Initialized telemetry batcher with id = %q", c.opts.InstanceId) for { time.Sleep(jitter(c.interval)) if ctx.Err() != nil { @@ -112,7 +114,7 @@ func (c *Client) Run(ctx context.Context) { err := c.Report(ctx) if err != nil { - zap.S().Errorf("failed sending telemetry report: %v", err) + c.log.Sugar().Errorf("failed sending telemetry report: %v", err) } } } diff --git a/pkg/telemetry/client_test.go b/pkg/telemetry/client_test.go index 32a2c6b45..26ba22986 100644 --- a/pkg/telemetry/client_test.go +++ b/pkg/telemetry/client_test.go @@ -10,6 +10,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/zeebo/admission/admmonkit" + "go.uber.org/zap/zaptest" monkit "gopkg.in/spacemonkeygo/monkit.v2" ) @@ -18,7 +19,7 @@ func TestNewClient_IntervalIsZero(t *testing.T) { assert.NoError(t, err) defer func() { assert.NoError(t, s.Close()) }() - client, err := NewClient(s.Addr(), ClientOpts{ + client, err := NewClient(zaptest.NewLogger(t), s.Addr(), ClientOpts{ Application: "testapp", Instance: "testinst", Interval: 0, @@ -41,7 +42,7 @@ func TestNewClient_ApplicationAndArgsAreEmpty(t *testing.T) { os.Args = nil - client, err := NewClient(s.Addr(), ClientOpts{ + client, err := NewClient(zaptest.NewLogger(t), s.Addr(), ClientOpts{ Application: "", Instance: "testinst", Interval: 0, @@ -57,7 +58,7 @@ func TestNewClient_ApplicationIsEmpty(t *testing.T) { assert.NoError(t, err) defer func() { assert.NoError(t, s.Close()) }() - client, err := NewClient(s.Addr(), ClientOpts{ + client, err := NewClient(zaptest.NewLogger(t), s.Addr(), ClientOpts{ Application: "", Instance: "testinst", Interval: 0, @@ -73,7 +74,7 @@ func TestNewClient_InstanceIsEmpty(t *testing.T) { assert.NoError(t, err) defer func() { assert.NoError(t, s.Close()) }() - client, err := NewClient(s.Addr(), ClientOpts{ + client, err := NewClient(zaptest.NewLogger(t), s.Addr(), ClientOpts{ Application: "qwe", Instance: "", Interval: 0, @@ -92,7 +93,7 @@ func TestNewClient_RegistryIsNil(t *testing.T) { assert.NoError(t, err) defer func() { assert.NoError(t, s.Close()) }() - client, err := NewClient(s.Addr(), ClientOpts{ + client, err := NewClient(zaptest.NewLogger(t), s.Addr(), ClientOpts{ Application: "qwe", Instance: "", Interval: 0, @@ -111,7 +112,7 @@ func TestNewClient_PacketSizeIsZero(t *testing.T) { assert.NoError(t, err) defer func() { assert.NoError(t, s.Close()) }() - client, err := NewClient(s.Addr(), ClientOpts{ + client, err := NewClient(zaptest.NewLogger(t), s.Addr(), ClientOpts{ Application: "qwe", Instance: "", Interval: 0, @@ -129,7 +130,7 @@ func TestNewClient_PacketSizeIsZero(t *testing.T) { } func TestRun_ReportNoCalled(t *testing.T) { - client, err := NewClient("127.0.0.1:0", ClientOpts{ + client, err := NewClient(zaptest.NewLogger(t), "127.0.0.1:0", ClientOpts{ Application: "qwe", Instance: "", Interval: time.Millisecond, diff --git a/pkg/telemetry/example_test.go b/pkg/telemetry/example_test.go index 9792d893e..e2ca91031 100644 --- a/pkg/telemetry/example_test.go +++ b/pkg/telemetry/example_test.go @@ -11,6 +11,7 @@ import ( "github.com/zeebo/admission/admproto" "github.com/zeebo/errs" + "go.uber.org/zap" "golang.org/x/sync/errgroup" monkit "gopkg.in/spacemonkeygo/monkit.v2" @@ -45,7 +46,7 @@ func Example() { // sender group.Go(func() error { - client, err := telemetry.NewClient(receiver.Addr(), telemetry.ClientOpts{ + client, err := telemetry.NewClient(zap.L(), receiver.Addr(), telemetry.ClientOpts{ Interval: time.Second, Application: "example", Instance: telemetry.DefaultInstanceID(), diff --git a/pkg/telemetry/tm_test.go b/pkg/telemetry/tm_test.go index 4bfab0617..45ee8bcf7 100644 --- a/pkg/telemetry/tm_test.go +++ b/pkg/telemetry/tm_test.go @@ -9,6 +9,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + "go.uber.org/zap/zaptest" monkit "gopkg.in/spacemonkeygo/monkit.v2" ) @@ -26,7 +27,7 @@ func TestMetrics(t *testing.T) { assert.NoError(t, err) defer func() { _ = s.Close() }() - c, err := NewClient(s.Addr(), ClientOpts{ + c, err := NewClient(zaptest.NewLogger(t), s.Addr(), ClientOpts{ Application: "testapp", Instance: "testinst", }) diff --git a/satellite/peer.go b/satellite/peer.go index 763da5501..fec6f5e0b 100644 --- a/satellite/peer.go +++ b/satellite/peer.go @@ -245,7 +245,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, config *Config, ve peer.Log.Sugar().Debugf("Binary Version: %s with CommitHash %s, built at %s as Release %v", versionInfo.Version.String(), versionInfo.CommitHash, versionInfo.Timestamp.String(), versionInfo.Release) } - peer.Version = version.NewService(config.Version, versionInfo, "Satellite") + peer.Version = version.NewService(log.Named("version"), config.Version, versionInfo, "Satellite") } { // setup listener and server diff --git a/storagenode/peer.go b/storagenode/peer.go index 445f2228f..63faeca16 100644 --- a/storagenode/peer.go +++ b/storagenode/peer.go @@ -153,7 +153,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, config Config, ver peer.Log.Sugar().Debugf("Binary Version: %s with CommitHash %s, built at %s as Release %v", versionInfo.Version.String(), versionInfo.CommitHash, versionInfo.Timestamp.String(), versionInfo.Release) } - peer.Version = version.NewService(config.Version, versionInfo, "Storagenode") + peer.Version = version.NewService(log.Named("version"), config.Version, versionInfo, "Storagenode") } { // setup listener and server diff --git a/uplink/ecclient/client.go b/uplink/ecclient/client.go index c149a53a5..25d72ab3f 100644 --- a/uplink/ecclient/client.go +++ b/uplink/ecclient/client.go @@ -83,7 +83,7 @@ func (ec *ecClient) Put(ctx context.Context, limits []*pb.AddressedOrderLimit, p rs.ErasureShareSize(), rs.StripeSize(), rs.RepairThreshold(), rs.OptimalThreshold()) padded := eestream.PadReader(ioutil.NopCloser(data), rs.StripeSize()) - readers, err := eestream.EncodeReader(ctx, padded, rs) + readers, err := eestream.EncodeReader(ctx, ec.log, padded, rs) if err != nil { return nil, nil, err } @@ -171,7 +171,7 @@ func (ec *ecClient) Repair(ctx context.Context, limits []*pb.AddressedOrderLimit } padded := eestream.PadReader(ioutil.NopCloser(data), rs.StripeSize()) - readers, err := eestream.EncodeReader(ctx, padded, rs) + readers, err := eestream.EncodeReader(ctx, ec.log, padded, rs) if err != nil { return nil, nil, err } @@ -355,7 +355,7 @@ func (ec *ecClient) Get(ctx context.Context, limits []*pb.AddressedOrderLimit, p } } - rr, err = eestream.Decode(rrs, es, ec.memoryLimit, ec.forceErrorDetection) + rr, err = eestream.Decode(ec.log, rrs, es, ec.memoryLimit, ec.forceErrorDetection) if err != nil { return nil, Error.Wrap(err) } diff --git a/uplink/eestream/decode.go b/uplink/eestream/decode.go index 0c2a60a43..8fff8d3a2 100644 --- a/uplink/eestream/decode.go +++ b/uplink/eestream/decode.go @@ -19,6 +19,7 @@ import ( ) type decodedReader struct { + log *zap.Logger ctx context.Context cancel context.CancelFunc readers map[int]io.ReadCloser @@ -41,7 +42,7 @@ type decodedReader struct { // set to 0, the minimum possible memory will be used. // if forceErrorDetection is set to true then k+1 pieces will be always // required for decoding, so corrupted pieces can be detected. -func DecodeReaders(ctx context.Context, rs map[int]io.ReadCloser, es ErasureScheme, expectedSize int64, mbm int, forceErrorDetection bool) io.ReadCloser { +func DecodeReaders(ctx context.Context, log *zap.Logger, rs map[int]io.ReadCloser, es ErasureScheme, expectedSize int64, mbm int, forceErrorDetection bool) io.ReadCloser { defer mon.Task()(&ctx)(nil) if expectedSize < 0 { return readcloser.FatalReadCloser(Error.New("negative expected size")) @@ -55,9 +56,10 @@ func DecodeReaders(ctx context.Context, rs map[int]io.ReadCloser, es ErasureSche return readcloser.FatalReadCloser(err) } dr := &decodedReader{ + log: log, readers: rs, scheme: es, - stripeReader: NewStripeReader(rs, es, mbm, forceErrorDetection), + stripeReader: NewStripeReader(log, rs, es, mbm, forceErrorDetection), outbuf: make([]byte, 0, es.StripeSize()), expectedStripes: expectedSize / int64(es.StripeSize()), } @@ -127,12 +129,13 @@ func (dr *decodedReader) Close() (err error) { return dr.closeErr } if dr.closeErr != nil { - zap.L().Debug("decode close non fatal error: ", zap.Error(dr.closeErr)) + dr.log.Debug("decode close non fatal error: ", zap.Error(dr.closeErr)) } return nil } type decodedRanger struct { + log *zap.Logger es ErasureScheme rrs map[int]ranger.Ranger inSize int64 @@ -148,7 +151,7 @@ type decodedRanger struct { // set to 0, the minimum possible memory will be used. // if forceErrorDetection is set to true then k+1 pieces will be always // required for decoding, so corrupted pieces can be detected. -func Decode(rrs map[int]ranger.Ranger, es ErasureScheme, mbm int, forceErrorDetection bool) (ranger.Ranger, error) { +func Decode(log *zap.Logger, rrs map[int]ranger.Ranger, es ErasureScheme, mbm int, forceErrorDetection bool) (ranger.Ranger, error) { if err := checkMBM(mbm); err != nil { return nil, err } @@ -173,6 +176,7 @@ func Decode(rrs map[int]ranger.Ranger, es ErasureScheme, mbm int, forceErrorDete size, es.ErasureShareSize()) } return &decodedRanger{ + log: log, es: es, rrs: rrs, inSize: size, @@ -218,10 +222,9 @@ func (dr *decodedRanger) Range(ctx context.Context, offset, length int64) (_ io. } } // decode from all those ranges - r := DecodeReaders(ctx, readers, dr.es, blockCount*int64(dr.es.StripeSize()), dr.mbm, dr.forceErrorDetection) + r := DecodeReaders(ctx, dr.log, readers, dr.es, blockCount*int64(dr.es.StripeSize()), dr.mbm, dr.forceErrorDetection) // offset might start a few bytes in, potentially discard the initial bytes - _, err = io.CopyN(ioutil.Discard, r, - offset-firstBlock*int64(dr.es.StripeSize())) + _, err = io.CopyN(ioutil.Discard, r, offset-firstBlock*int64(dr.es.StripeSize())) if err != nil { return nil, Error.Wrap(err) } diff --git a/uplink/eestream/encode.go b/uplink/eestream/encode.go index 5dac05bcc..edfb10412 100644 --- a/uplink/eestream/encode.go +++ b/uplink/eestream/encode.go @@ -119,6 +119,7 @@ func (rs *RedundancyStrategy) OptimalThreshold() int { } type encodedReader struct { + log *zap.Logger ctx context.Context rs RedundancyStrategy pieces map[int]*encodedPiece @@ -126,10 +127,11 @@ type encodedReader struct { // EncodeReader takes a Reader and a RedundancyStrategy and returns a slice of // io.ReadClosers. -func EncodeReader(ctx context.Context, r io.Reader, rs RedundancyStrategy) (_ []io.ReadCloser, err error) { +func EncodeReader(ctx context.Context, log *zap.Logger, r io.Reader, rs RedundancyStrategy) (_ []io.ReadCloser, err error) { defer mon.Task()(&ctx)(&err) er := &encodedReader{ + log: log, ctx: ctx, rs: rs, pieces: make(map[int]*encodedPiece, rs.TotalCount()), @@ -175,7 +177,7 @@ func (er *encodedReader) fillBuffer(ctx context.Context, r io.Reader, w sync2.Pi _, err = sync2.Copy(ctx, w, r) err = w.CloseWithError(err) if err != nil { - zap.S().Error(err) + er.log.Sugar().Error(err) } } @@ -231,20 +233,22 @@ func (ep *encodedPiece) Close() (err error) { // multiple Ranged sub-Readers. EncodedRanger does not match the normal Ranger // interface. type EncodedRanger struct { - rr ranger.Ranger - rs RedundancyStrategy + log *zap.Logger + rr ranger.Ranger + rs RedundancyStrategy } // NewEncodedRanger from the given Ranger and RedundancyStrategy. See the // comments for EncodeReader about the repair and success thresholds. -func NewEncodedRanger(rr ranger.Ranger, rs RedundancyStrategy) (*EncodedRanger, error) { +func NewEncodedRanger(log *zap.Logger, rr ranger.Ranger, rs RedundancyStrategy) (*EncodedRanger, error) { if rr.Size()%int64(rs.StripeSize()) != 0 { return nil, Error.New("invalid erasure encoder and range reader combo. " + "range reader size must be a multiple of erasure encoder block size") } return &EncodedRanger{ - rs: rs, - rr: rr, + log: log, + rs: rs, + rr: rr, }, nil } @@ -269,7 +273,7 @@ func (er *EncodedRanger) Range(ctx context.Context, offset, length int64) (_ []i if err != nil { return nil, err } - readers, err := EncodeReader(ctx, r, er.rs) + readers, err := EncodeReader(ctx, er.log, r, er.rs) if err != nil { return nil, err } diff --git a/uplink/eestream/piecebuf.go b/uplink/eestream/piecebuf.go index d47ba122f..adcaa10fc 100644 --- a/uplink/eestream/piecebuf.go +++ b/uplink/eestream/piecebuf.go @@ -12,6 +12,7 @@ import ( // PieceBuffer is a synchronized buffer for storing erasure shares for a piece. type PieceBuffer struct { + log *zap.Logger buf []byte shareSize int cond *sync.Cond @@ -27,8 +28,9 @@ type PieceBuffer struct { // NewPieceBuffer creates and initializes a new PieceBuffer using buf as its // internal content. If new data is written to the buffer, newDataCond will be // notified. -func NewPieceBuffer(buf []byte, shareSize int, newDataCond *sync.Cond) *PieceBuffer { +func NewPieceBuffer(log *zap.Logger, buf []byte, shareSize int, newDataCond *sync.Cond) *PieceBuffer { return &PieceBuffer{ + log: log, buf: buf, shareSize: shareSize, cond: sync.NewCond(&sync.Mutex{}), @@ -230,8 +232,7 @@ func (b *PieceBuffer) buffered() int { func (b *PieceBuffer) HasShare(num int64) bool { if num < b.currentShare { // we should never get here! - zap.S().Fatalf("Checking for erasure share %d while the current erasure share is %d.", - num, b.currentShare) + b.log.Sugar().Fatalf("Checking for erasure share %d while the current erasure share is %d.", num, b.currentShare) } if b.getError() != nil { @@ -257,8 +258,7 @@ func (b *PieceBuffer) HasShare(num int64) bool { func (b *PieceBuffer) ReadShare(num int64, p []byte) error { if num < b.currentShare { // we should never get here! - zap.S().Fatalf("Trying to read erasure share %d while the current erasure share is already %d.", - num, b.currentShare) + b.log.Sugar().Fatalf("Trying to read erasure share %d while the current erasure share is already %d.", num, b.currentShare) } err := b.discardUntil(num) diff --git a/uplink/eestream/rs_test.go b/uplink/eestream/rs_test.go index a216f7e6b..974022085 100644 --- a/uplink/eestream/rs_test.go +++ b/uplink/eestream/rs_test.go @@ -19,6 +19,7 @@ import ( "github.com/stretchr/testify/require" "github.com/vivint/infectious" "github.com/zeebo/errs" + "go.uber.org/zap/zaptest" "storj.io/storj/internal/memory" "storj.io/storj/internal/readcloser" @@ -41,7 +42,7 @@ func TestRS(t *testing.T) { if err != nil { t.Fatal(err) } - readers, err := EncodeReader(ctx, bytes.NewReader(data), rs) + readers, err := EncodeReader(ctx, zaptest.NewLogger(t), bytes.NewReader(data), rs) if err != nil { t.Fatal(err) } @@ -49,7 +50,7 @@ func TestRS(t *testing.T) { for i, reader := range readers { readerMap[i] = reader } - decoder := DecodeReaders(ctx, readerMap, rs, 32*1024, 0, false) + decoder := DecodeReaders(ctx, zaptest.NewLogger(t), readerMap, rs, 32*1024, 0, false) defer func() { assert.NoError(t, decoder.Close()) }() data2, err := ioutil.ReadAll(decoder) if err != nil { @@ -72,7 +73,7 @@ func TestRSUnexpectedEOF(t *testing.T) { if err != nil { t.Fatal(err) } - readers, err := EncodeReader(ctx, bytes.NewReader(data), rs) + readers, err := EncodeReader(ctx, zaptest.NewLogger(t), bytes.NewReader(data), rs) if err != nil { t.Fatal(err) } @@ -80,7 +81,7 @@ func TestRSUnexpectedEOF(t *testing.T) { for i, reader := range readers { readerMap[i] = reader } - decoder := DecodeReaders(ctx, readerMap, rs, 32*1024, 0, false) + decoder := DecodeReaders(ctx, zaptest.NewLogger(t), readerMap, rs, 32*1024, 0, false) defer func() { assert.NoError(t, decoder.Close()) }() // Try ReadFull more data from DecodeReaders than available data2 := make([]byte, len(data)+1024) @@ -108,7 +109,7 @@ func TestRSRanger(t *testing.T) { if err != nil { t.Fatal(err) } - readers, err := EncodeReader(ctx, encryption.TransformReader(PadReader(ioutil.NopCloser( + readers, err := EncodeReader(ctx, zaptest.NewLogger(t), encryption.TransformReader(PadReader(ioutil.NopCloser( bytes.NewReader(data)), encrypter.InBlockSize()), encrypter, 0), rs) if err != nil { t.Fatal(err) @@ -125,7 +126,7 @@ func TestRSRanger(t *testing.T) { if err != nil { t.Fatal(err) } - rc, err := Decode(rrs, rs, 0, false) + rc, err := Decode(zaptest.NewLogger(t), rrs, rs, 0, false) if err != nil { t.Fatal(err) } @@ -386,7 +387,7 @@ func testRSProblematic(t *testing.T, tt testCase, i int, fn problematicReadClose if !assert.NoError(t, err, errTag) { return } - readers, err := EncodeReader(ctx, bytes.NewReader(data), rs) + readers, err := EncodeReader(ctx, zaptest.NewLogger(t), bytes.NewReader(data), rs) if !assert.NoError(t, err, errTag) { return } @@ -405,7 +406,7 @@ func testRSProblematic(t *testing.T, tt testCase, i int, fn problematicReadClose for i := tt.problematic; i < tt.total; i++ { readerMap[i] = ioutil.NopCloser(bytes.NewReader(pieces[i])) } - decoder := DecodeReaders(ctx, readerMap, rs, int64(tt.dataSize), 3*1024, false) + decoder := DecodeReaders(ctx, zaptest.NewLogger(t), readerMap, rs, int64(tt.dataSize), 3*1024, false) defer func() { assert.NoError(t, decoder.Close()) }() data2, err := ioutil.ReadAll(decoder) if tt.fail { @@ -462,7 +463,7 @@ func TestEncoderStalledReaders(t *testing.T) { if err != nil { t.Fatal(err) } - readers, err := EncodeReader(ctx, bytes.NewReader(data), rs) + readers, err := EncodeReader(ctx, zaptest.NewLogger(t), bytes.NewReader(data), rs) if err != nil { t.Fatal(err) } @@ -508,7 +509,7 @@ func TestDecoderErrorWithStalledReaders(t *testing.T) { if err != nil { t.Fatal(err) } - readers, err := EncodeReader(ctx, bytes.NewReader(data), rs) + readers, err := EncodeReader(ctx, zaptest.NewLogger(t), bytes.NewReader(data), rs) if err != nil { t.Fatal(err) } @@ -531,7 +532,7 @@ func TestDecoderErrorWithStalledReaders(t *testing.T) { for i := 7; i < 20; i++ { readerMap[i] = readcloser.FatalReadCloser(errors.New("I am an error piece")) } - decoder := DecodeReaders(ctx, readerMap, rs, int64(10*1024), 0, false) + decoder := DecodeReaders(ctx, zaptest.NewLogger(t), readerMap, rs, int64(10*1024), 0, false) defer func() { assert.NoError(t, decoder.Close()) }() // record the time for reading the data from the decoder start := time.Now() @@ -663,7 +664,7 @@ func TestCalcPieceSize(t *testing.T) { calculatedSize := CalcPieceSize(dataSize, es) randReader := ioutil.NopCloser(io.LimitReader(testrand.Reader(), dataSize)) - readers, err := EncodeReader(ctx, PadReader(randReader, es.StripeSize()), rs) + readers, err := EncodeReader(ctx, zaptest.NewLogger(t), PadReader(randReader, es.StripeSize()), rs) require.NoError(t, err, errTag) for _, reader := range readers { diff --git a/uplink/eestream/stripe.go b/uplink/eestream/stripe.go index e37d82401..58286ff53 100644 --- a/uplink/eestream/stripe.go +++ b/uplink/eestream/stripe.go @@ -12,6 +12,7 @@ import ( "sync" "github.com/vivint/infectious" + "go.uber.org/zap" monkit "gopkg.in/spacemonkeygo/monkit.v2" ) @@ -33,7 +34,7 @@ type StripeReader struct { // NewStripeReader creates a new StripeReader from the given readers, erasure // scheme and max buffer memory. -func NewStripeReader(rs map[int]io.ReadCloser, es ErasureScheme, mbm int, forceErrorDetection bool) *StripeReader { +func NewStripeReader(log *zap.Logger, rs map[int]io.ReadCloser, es ErasureScheme, mbm int, forceErrorDetection bool) *StripeReader { readerCount := len(rs) r := &StripeReader{ @@ -55,7 +56,7 @@ func NewStripeReader(rs map[int]io.ReadCloser, es ErasureScheme, mbm int, forceE for i := range rs { r.inbufs[i] = make([]byte, es.ErasureShareSize()) - r.bufs[i] = NewPieceBuffer(make([]byte, bufSize), es.ErasureShareSize(), r.cond) + r.bufs[i] = NewPieceBuffer(log, make([]byte, bufSize), es.ErasureShareSize(), r.cond) // Kick off a goroutine each reader to be copied into a PieceBuffer. go func(r io.Reader, buf *PieceBuffer) { _, err := io.Copy(buf, r) diff --git a/versioncontrol/peer.go b/versioncontrol/peer.go index d158178a6..059808c66 100644 --- a/versioncontrol/peer.go +++ b/versioncontrol/peer.go @@ -61,12 +61,12 @@ func (peer *Peer) HandleGet(w http.ResponseWriter, r *http.Request) { if xfor = r.Header.Get("X-Forwarded-For"); xfor == "" { xfor = r.RemoteAddr } - zap.S().Debugf("Request from: %s for %s", r.RemoteAddr, xfor) + peer.Log.Sugar().Debugf("Request from: %s for %s", r.RemoteAddr, xfor) w.Header().Set("Content-Type", "application/json") _, err := w.Write(peer.response) if err != nil { - zap.S().Errorf("error writing response to client: %v", err) + peer.Log.Sugar().Errorf("error writing response to client: %v", err) } }