don't use global loggers (#2675)

This commit is contained in:
Egon Elbre 2019-07-31 17:38:44 +03:00 committed by GitHub
parent 3cd477454f
commit 4f0d39cc64
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 115 additions and 85 deletions

View File

@ -99,7 +99,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, config Config, ver
peer.Log.Sugar().Debugf("Binary Version: %s with CommitHash %s, built at %s as Release %v",
versionInfo.Version.String(), versionInfo.CommitHash, versionInfo.Timestamp.String(), versionInfo.Release)
}
peer.Version = version.NewService(config.Version, versionInfo, "Bootstrap")
peer.Version = version.NewService(log.Named("version"), config.Version, versionInfo, "Bootstrap")
}
{ // setup listener and server

View File

@ -99,7 +99,7 @@ func cmdRun(cmd *cobra.Command, args []string) (err error) {
return err
}
if err := process.InitMetricsWithCertPath(ctx, nil, runCfg.Identity.CertPath); err != nil {
if err := process.InitMetricsWithCertPath(ctx, log, nil, runCfg.Identity.CertPath); err != nil {
zap.S().Error("Failed to initialize telemetry batcher: ", err)
}

View File

@ -149,11 +149,11 @@ func cmdRun(cmd *cobra.Command, args []string) (err error) {
ctx := process.Ctx(cmd)
if err := process.InitMetrics(ctx, nil, ""); err != nil {
if err := process.InitMetrics(ctx, zap.L(), nil, ""); err != nil {
zap.S().Error("Failed to initialize telemetry batcher: ", err)
}
err = version.CheckProcessVersion(ctx, runCfg.Version, version.Build, "Gateway")
err = version.CheckProcessVersion(ctx, zap.L(), runCfg.Version, version.Build, "Gateway")
if err != nil {
return err
}

View File

@ -12,6 +12,7 @@ import (
"github.com/spf13/cobra"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/storj/internal/fpath"
"storj.io/storj/internal/version"
@ -86,7 +87,7 @@ func serviceDirectory(serviceName string) string {
func cmdNewService(cmd *cobra.Command, args []string) error {
ctx := process.Ctx(cmd)
err := version.CheckProcessVersion(ctx, config.Version, version.Build, "Identity")
err := version.CheckProcessVersion(ctx, zap.L(), config.Version, version.Build, "Identity")
if err != nil {
return err
}
@ -148,7 +149,7 @@ func cmdNewService(cmd *cobra.Command, args []string) error {
func cmdAuthorize(cmd *cobra.Command, args []string) error {
ctx := process.Ctx(cmd)
err := version.CheckProcessVersion(ctx, config.Version, version.Build, "Identity")
err := version.CheckProcessVersion(ctx, zap.L(), config.Version, version.Build, "Identity")
if err != nil {
return err
}

View File

@ -142,7 +142,7 @@ func cmdRun(cmd *cobra.Command, args []string) (err error) {
return err
}
if err := process.InitMetricsWithCertPath(ctx, nil, runCfg.Identity.CertPath); err != nil {
if err := process.InitMetricsWithCertPath(ctx, log, nil, runCfg.Identity.CertPath); err != nil {
zap.S().Error("Failed to initialize telemetry batcher: ", err)
}

View File

@ -152,7 +152,7 @@ func cmdRun(cmd *cobra.Command, args []string) (err error) {
return err
}
if err := process.InitMetricsWithCertPath(ctx, nil, runCfg.Identity.CertPath); err != nil {
if err := process.InitMetricsWithCertPath(ctx, log, nil, runCfg.Identity.CertPath); err != nil {
zap.S().Error("Failed to initialize telemetry batcher: ", err)
}

View File

@ -98,7 +98,7 @@ func (cliCfg *UplinkFlags) NewUplink(ctx context.Context) (*libuplink.Uplink, er
// GetProject returns a *libuplink.Project for interacting with a specific project
func (cliCfg *UplinkFlags) GetProject(ctx context.Context) (*libuplink.Project, error) {
err := version.CheckProcessVersion(ctx, cliCfg.Version, version.Build, "Uplink")
err := version.CheckProcessVersion(ctx, zap.L(), cliCfg.Version, version.Build, "Uplink")
if err != nil {
return nil, err
}

View File

@ -13,6 +13,7 @@ import (
"time"
"github.com/vivint/infectious"
"go.uber.org/zap"
"storj.io/storj/pkg/encryption"
"storj.io/storj/pkg/ranger"
@ -74,7 +75,7 @@ func Main() error {
}
rrs[res.i] = res.rr
}
rc, err := eestream.Decode(rrs, es, 4*1024*1024, false)
rc, err := eestream.Decode(zap.L(), rrs, es, 4*1024*1024, false)
if err != nil {
return err
}

View File

@ -17,6 +17,7 @@ import (
"time"
"github.com/vivint/infectious"
"go.uber.org/zap"
"storj.io/storj/pkg/encryption"
"storj.io/storj/pkg/ranger"
@ -74,7 +75,7 @@ func Main() error {
}
rrs[piecenum] = r
}
rc, err := eestream.Decode(rrs, es, 4*1024*1024, false)
rc, err := eestream.Decode(zap.L(), rrs, es, 4*1024*1024, false)
if err != nil {
return err
}

View File

@ -13,6 +13,7 @@ import (
"path/filepath"
"github.com/vivint/infectious"
"go.uber.org/zap"
"storj.io/storj/pkg/encryption"
"storj.io/storj/pkg/storj"
@ -60,7 +61,7 @@ func Main() error {
if err != nil {
return err
}
readers, err := eestream.EncodeReader(context.Background(),
readers, err := eestream.EncodeReader(context.Background(), zap.L(),
encryption.TransformReader(eestream.PadReader(os.Stdin,
encrypter.InBlockSize()), encrypter, 0), rs)
if err != nil {

View File

@ -26,6 +26,7 @@ type Config struct {
// Service contains the information and variables to ensure the Software is up to date
type Service struct {
log *zap.Logger
config Config
info Info
service string
@ -38,8 +39,9 @@ type Service struct {
}
// NewService creates a Version Check Client with default configuration
func NewService(config Config, info Info, service string) (client *Service) {
func NewService(log *zap.Logger, config Config, info Info, service string) (client *Service) {
return &Service{
log: log,
config: config,
info: info,
service: service,
@ -59,9 +61,9 @@ func (srv *Service) CheckVersion(ctx context.Context) (err error) {
// CheckProcessVersion is not meant to be used for peers but is meant to be
// used for other utilities
func CheckProcessVersion(ctx context.Context, config Config, info Info, service string) (err error) {
func CheckProcessVersion(ctx context.Context, log *zap.Logger, config Config, info Info, service string) (err error) {
defer mon.Task()(&ctx)(&err)
return NewService(config, info, service).CheckVersion(ctx)
return NewService(log, config, info, service).CheckVersion(ctx)
}
// Run logs the current version information
@ -105,22 +107,22 @@ func (srv *Service) checkVersion(ctx context.Context) (allowed bool) {
accepted, err := srv.queryVersionFromControlServer(ctx)
if err != nil {
// Log about the error, but dont crash the service and allow further operation
zap.S().Errorf("Failed to do periodic version check: %s", err.Error())
srv.log.Sugar().Errorf("Failed to do periodic version check: %s", err.Error())
return true
}
minimum := getFieldString(&accepted, srv.service)
zap.S().Debugf("allowed minimum version from control server is: %s", minimum.String())
srv.log.Sugar().Debugf("allowed minimum version from control server is: %s", minimum.String())
if minimum.String() == "" {
zap.S().Errorf("no version from control server, accepting to run")
srv.log.Sugar().Errorf("no version from control server, accepting to run")
return true
}
if isAcceptedVersion(srv.info.Version, minimum) {
zap.S().Infof("running on version %s", srv.info.Version.String())
srv.log.Sugar().Infof("running on version %s", srv.info.Version.String())
return true
}
zap.S().Errorf("running on not allowed/outdated version %s", srv.info.Version.String())
srv.log.Sugar().Errorf("running on not allowed/outdated version %s", srv.info.Version.String())
return false
}
@ -151,8 +153,18 @@ func (srv *Service) queryVersionFromControlServer(ctx context.Context) (ver Allo
return ver, err
}
// DebugHandler returns a json representation of the current version information for the binary
func DebugHandler(w http.ResponseWriter, r *http.Request) {
// DebugHandler implements version info endpoint.
type DebugHandler struct {
log *zap.Logger
}
// NewDebugHandler returns new debug handler.
func NewDebugHandler(log *zap.Logger) *DebugHandler {
return &DebugHandler{log}
}
// ServeHTTP returns a json representation of the current version information for the binary.
func (server *DebugHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
j, err := Build.Marshal()
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
@ -164,7 +176,7 @@ func DebugHandler(w http.ResponseWriter, r *http.Request) {
_, err = w.Write(append(j, '\n'))
if err != nil {
zap.S().Errorf("error writing data to client %v", err)
server.log.Sugar().Errorf("error writing data to client %v", err)
}
}

View File

@ -111,7 +111,7 @@ func (c CertServerConfig) Run(ctx context.Context, srv *server.Server) (err erro
}
certSrv := NewServer(
zap.L(),
zap.L(), // TODO: pass this in from somewhere
signer,
authDB,
uint16(c.MinDifficulty),

View File

@ -36,7 +36,7 @@ func initDebug(logger *zap.Logger, r *monkit.Registry) (err error) {
mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
mux.HandleFunc("/debug/pprof/trace", pprof.Trace)
mux.Handle("/version/", http.StripPrefix("/version", http.HandlerFunc(version.DebugHandler)))
mux.Handle("/version/", http.StripPrefix("/version", version.NewDebugHandler(logger.Named("version"))))
mux.Handle("/mon/", http.StripPrefix("/mon", present.HTTP(r)))
mux.HandleFunc("/metrics", prometheus)
mux.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {

View File

@ -42,7 +42,7 @@ func flagDefault(dev, release string) string {
// InitMetrics initializes telemetry reporting. Makes a telemetry.Client and calls
// its Run() method in a goroutine.
func InitMetrics(ctx context.Context, r *monkit.Registry, instanceID string) (err error) {
func InitMetrics(ctx context.Context, log *zap.Logger, r *monkit.Registry, instanceID string) (err error) {
if *metricCollector == "" || *metricInterval == 0 {
return Error.New("telemetry disabled")
}
@ -56,7 +56,7 @@ func InitMetrics(ctx context.Context, r *monkit.Registry, instanceID string) (er
if len(instanceID) > maxInstanceLength {
instanceID = instanceID[:maxInstanceLength]
}
c, err := telemetry.NewClient(*metricCollector, telemetry.ClientOpts{
c, err := telemetry.NewClient(log, *metricCollector, telemetry.ClientOpts{
Interval: *metricInterval,
Application: *metricApp + *metricAppSuffix,
Instance: instanceID,
@ -75,14 +75,14 @@ func InitMetrics(ctx context.Context, r *monkit.Registry, instanceID string) (er
// InitMetricsWithCertPath initializes telemetry reporting, using the node ID
// corresponding to the given certificate as the telemetry instance ID.
func InitMetricsWithCertPath(ctx context.Context, r *monkit.Registry, certPath string) error {
func InitMetricsWithCertPath(ctx context.Context, log *zap.Logger, r *monkit.Registry, certPath string) error {
var metricsID string
nodeID, err := identity.NodeIDFromCertPath(certPath)
if err != nil {
zap.S().Errorf("Could not read identity for telemetry setup: %v", err)
log.Sugar().Errorf("Could not read identity for telemetry setup: %v", err)
metricsID = "" // InitMetrics() will fill in a default value
} else {
metricsID = nodeID.String()
}
return InitMetrics(ctx, r, metricsID)
return InitMetrics(ctx, log, r, metricsID)
}

View File

@ -40,10 +40,10 @@ func (sc Config) Run(ctx context.Context, log *zap.Logger, identity *identity.Fu
go func() {
<-ctx.Done()
if closeErr := server.Close(); closeErr != nil {
zap.S().Errorf("Failed to close server: %s", closeErr)
log.Sugar().Errorf("Failed to close server: %s", closeErr)
}
}()
zap.S().Infof("Node %s started on %s", server.Identity().ID, sc.Address)
log.Sugar().Infof("Node %s started on %s", server.Identity().ID, sc.Address)
return server.Run(ctx)
}

View File

@ -58,6 +58,7 @@ type ClientOpts struct {
// Client is a telemetry client for sending UDP packets at a regular interval
// from a monkit.Registry
type Client struct {
log *zap.Logger
interval time.Duration
opts admmonkit.Options
send func(context.Context, admmonkit.Options) error
@ -65,7 +66,7 @@ type Client struct {
// NewClient constructs a telemetry client that sends packets to remoteAddr
// over UDP.
func NewClient(remoteAddr string, opts ClientOpts) (rv *Client, err error) {
func NewClient(log *zap.Logger, remoteAddr string, opts ClientOpts) (rv *Client, err error) {
if opts.Interval == 0 {
opts.Interval = DefaultInterval
}
@ -88,6 +89,7 @@ func NewClient(remoteAddr string, opts ClientOpts) (rv *Client, err error) {
}
return &Client{
log: log,
interval: opts.Interval,
send: admmonkit.Send,
opts: admmonkit.Options{
@ -103,7 +105,7 @@ func NewClient(remoteAddr string, opts ClientOpts) (rv *Client, err error) {
// Run calls Report roughly every Interval
func (c *Client) Run(ctx context.Context) {
zap.S().Debugf("Initialized telemetry batcher with id = %q", c.opts.InstanceId)
c.log.Sugar().Debugf("Initialized telemetry batcher with id = %q", c.opts.InstanceId)
for {
time.Sleep(jitter(c.interval))
if ctx.Err() != nil {
@ -112,7 +114,7 @@ func (c *Client) Run(ctx context.Context) {
err := c.Report(ctx)
if err != nil {
zap.S().Errorf("failed sending telemetry report: %v", err)
c.log.Sugar().Errorf("failed sending telemetry report: %v", err)
}
}
}

View File

@ -10,6 +10,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/zeebo/admission/admmonkit"
"go.uber.org/zap/zaptest"
monkit "gopkg.in/spacemonkeygo/monkit.v2"
)
@ -18,7 +19,7 @@ func TestNewClient_IntervalIsZero(t *testing.T) {
assert.NoError(t, err)
defer func() { assert.NoError(t, s.Close()) }()
client, err := NewClient(s.Addr(), ClientOpts{
client, err := NewClient(zaptest.NewLogger(t), s.Addr(), ClientOpts{
Application: "testapp",
Instance: "testinst",
Interval: 0,
@ -41,7 +42,7 @@ func TestNewClient_ApplicationAndArgsAreEmpty(t *testing.T) {
os.Args = nil
client, err := NewClient(s.Addr(), ClientOpts{
client, err := NewClient(zaptest.NewLogger(t), s.Addr(), ClientOpts{
Application: "",
Instance: "testinst",
Interval: 0,
@ -57,7 +58,7 @@ func TestNewClient_ApplicationIsEmpty(t *testing.T) {
assert.NoError(t, err)
defer func() { assert.NoError(t, s.Close()) }()
client, err := NewClient(s.Addr(), ClientOpts{
client, err := NewClient(zaptest.NewLogger(t), s.Addr(), ClientOpts{
Application: "",
Instance: "testinst",
Interval: 0,
@ -73,7 +74,7 @@ func TestNewClient_InstanceIsEmpty(t *testing.T) {
assert.NoError(t, err)
defer func() { assert.NoError(t, s.Close()) }()
client, err := NewClient(s.Addr(), ClientOpts{
client, err := NewClient(zaptest.NewLogger(t), s.Addr(), ClientOpts{
Application: "qwe",
Instance: "",
Interval: 0,
@ -92,7 +93,7 @@ func TestNewClient_RegistryIsNil(t *testing.T) {
assert.NoError(t, err)
defer func() { assert.NoError(t, s.Close()) }()
client, err := NewClient(s.Addr(), ClientOpts{
client, err := NewClient(zaptest.NewLogger(t), s.Addr(), ClientOpts{
Application: "qwe",
Instance: "",
Interval: 0,
@ -111,7 +112,7 @@ func TestNewClient_PacketSizeIsZero(t *testing.T) {
assert.NoError(t, err)
defer func() { assert.NoError(t, s.Close()) }()
client, err := NewClient(s.Addr(), ClientOpts{
client, err := NewClient(zaptest.NewLogger(t), s.Addr(), ClientOpts{
Application: "qwe",
Instance: "",
Interval: 0,
@ -129,7 +130,7 @@ func TestNewClient_PacketSizeIsZero(t *testing.T) {
}
func TestRun_ReportNoCalled(t *testing.T) {
client, err := NewClient("127.0.0.1:0", ClientOpts{
client, err := NewClient(zaptest.NewLogger(t), "127.0.0.1:0", ClientOpts{
Application: "qwe",
Instance: "",
Interval: time.Millisecond,

View File

@ -11,6 +11,7 @@ import (
"github.com/zeebo/admission/admproto"
"github.com/zeebo/errs"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
monkit "gopkg.in/spacemonkeygo/monkit.v2"
@ -45,7 +46,7 @@ func Example() {
// sender
group.Go(func() error {
client, err := telemetry.NewClient(receiver.Addr(), telemetry.ClientOpts{
client, err := telemetry.NewClient(zap.L(), receiver.Addr(), telemetry.ClientOpts{
Interval: time.Second,
Application: "example",
Instance: telemetry.DefaultInstanceID(),

View File

@ -9,6 +9,7 @@ import (
"testing"
"github.com/stretchr/testify/assert"
"go.uber.org/zap/zaptest"
monkit "gopkg.in/spacemonkeygo/monkit.v2"
)
@ -26,7 +27,7 @@ func TestMetrics(t *testing.T) {
assert.NoError(t, err)
defer func() { _ = s.Close() }()
c, err := NewClient(s.Addr(), ClientOpts{
c, err := NewClient(zaptest.NewLogger(t), s.Addr(), ClientOpts{
Application: "testapp",
Instance: "testinst",
})

View File

@ -245,7 +245,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, config *Config, ve
peer.Log.Sugar().Debugf("Binary Version: %s with CommitHash %s, built at %s as Release %v",
versionInfo.Version.String(), versionInfo.CommitHash, versionInfo.Timestamp.String(), versionInfo.Release)
}
peer.Version = version.NewService(config.Version, versionInfo, "Satellite")
peer.Version = version.NewService(log.Named("version"), config.Version, versionInfo, "Satellite")
}
{ // setup listener and server

View File

@ -153,7 +153,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, config Config, ver
peer.Log.Sugar().Debugf("Binary Version: %s with CommitHash %s, built at %s as Release %v",
versionInfo.Version.String(), versionInfo.CommitHash, versionInfo.Timestamp.String(), versionInfo.Release)
}
peer.Version = version.NewService(config.Version, versionInfo, "Storagenode")
peer.Version = version.NewService(log.Named("version"), config.Version, versionInfo, "Storagenode")
}
{ // setup listener and server

View File

@ -83,7 +83,7 @@ func (ec *ecClient) Put(ctx context.Context, limits []*pb.AddressedOrderLimit, p
rs.ErasureShareSize(), rs.StripeSize(), rs.RepairThreshold(), rs.OptimalThreshold())
padded := eestream.PadReader(ioutil.NopCloser(data), rs.StripeSize())
readers, err := eestream.EncodeReader(ctx, padded, rs)
readers, err := eestream.EncodeReader(ctx, ec.log, padded, rs)
if err != nil {
return nil, nil, err
}
@ -171,7 +171,7 @@ func (ec *ecClient) Repair(ctx context.Context, limits []*pb.AddressedOrderLimit
}
padded := eestream.PadReader(ioutil.NopCloser(data), rs.StripeSize())
readers, err := eestream.EncodeReader(ctx, padded, rs)
readers, err := eestream.EncodeReader(ctx, ec.log, padded, rs)
if err != nil {
return nil, nil, err
}
@ -355,7 +355,7 @@ func (ec *ecClient) Get(ctx context.Context, limits []*pb.AddressedOrderLimit, p
}
}
rr, err = eestream.Decode(rrs, es, ec.memoryLimit, ec.forceErrorDetection)
rr, err = eestream.Decode(ec.log, rrs, es, ec.memoryLimit, ec.forceErrorDetection)
if err != nil {
return nil, Error.Wrap(err)
}

View File

@ -19,6 +19,7 @@ import (
)
type decodedReader struct {
log *zap.Logger
ctx context.Context
cancel context.CancelFunc
readers map[int]io.ReadCloser
@ -41,7 +42,7 @@ type decodedReader struct {
// set to 0, the minimum possible memory will be used.
// if forceErrorDetection is set to true then k+1 pieces will be always
// required for decoding, so corrupted pieces can be detected.
func DecodeReaders(ctx context.Context, rs map[int]io.ReadCloser, es ErasureScheme, expectedSize int64, mbm int, forceErrorDetection bool) io.ReadCloser {
func DecodeReaders(ctx context.Context, log *zap.Logger, rs map[int]io.ReadCloser, es ErasureScheme, expectedSize int64, mbm int, forceErrorDetection bool) io.ReadCloser {
defer mon.Task()(&ctx)(nil)
if expectedSize < 0 {
return readcloser.FatalReadCloser(Error.New("negative expected size"))
@ -55,9 +56,10 @@ func DecodeReaders(ctx context.Context, rs map[int]io.ReadCloser, es ErasureSche
return readcloser.FatalReadCloser(err)
}
dr := &decodedReader{
log: log,
readers: rs,
scheme: es,
stripeReader: NewStripeReader(rs, es, mbm, forceErrorDetection),
stripeReader: NewStripeReader(log, rs, es, mbm, forceErrorDetection),
outbuf: make([]byte, 0, es.StripeSize()),
expectedStripes: expectedSize / int64(es.StripeSize()),
}
@ -127,12 +129,13 @@ func (dr *decodedReader) Close() (err error) {
return dr.closeErr
}
if dr.closeErr != nil {
zap.L().Debug("decode close non fatal error: ", zap.Error(dr.closeErr))
dr.log.Debug("decode close non fatal error: ", zap.Error(dr.closeErr))
}
return nil
}
type decodedRanger struct {
log *zap.Logger
es ErasureScheme
rrs map[int]ranger.Ranger
inSize int64
@ -148,7 +151,7 @@ type decodedRanger struct {
// set to 0, the minimum possible memory will be used.
// if forceErrorDetection is set to true then k+1 pieces will be always
// required for decoding, so corrupted pieces can be detected.
func Decode(rrs map[int]ranger.Ranger, es ErasureScheme, mbm int, forceErrorDetection bool) (ranger.Ranger, error) {
func Decode(log *zap.Logger, rrs map[int]ranger.Ranger, es ErasureScheme, mbm int, forceErrorDetection bool) (ranger.Ranger, error) {
if err := checkMBM(mbm); err != nil {
return nil, err
}
@ -173,6 +176,7 @@ func Decode(rrs map[int]ranger.Ranger, es ErasureScheme, mbm int, forceErrorDete
size, es.ErasureShareSize())
}
return &decodedRanger{
log: log,
es: es,
rrs: rrs,
inSize: size,
@ -218,10 +222,9 @@ func (dr *decodedRanger) Range(ctx context.Context, offset, length int64) (_ io.
}
}
// decode from all those ranges
r := DecodeReaders(ctx, readers, dr.es, blockCount*int64(dr.es.StripeSize()), dr.mbm, dr.forceErrorDetection)
r := DecodeReaders(ctx, dr.log, readers, dr.es, blockCount*int64(dr.es.StripeSize()), dr.mbm, dr.forceErrorDetection)
// offset might start a few bytes in, potentially discard the initial bytes
_, err = io.CopyN(ioutil.Discard, r,
offset-firstBlock*int64(dr.es.StripeSize()))
_, err = io.CopyN(ioutil.Discard, r, offset-firstBlock*int64(dr.es.StripeSize()))
if err != nil {
return nil, Error.Wrap(err)
}

View File

@ -119,6 +119,7 @@ func (rs *RedundancyStrategy) OptimalThreshold() int {
}
type encodedReader struct {
log *zap.Logger
ctx context.Context
rs RedundancyStrategy
pieces map[int]*encodedPiece
@ -126,10 +127,11 @@ type encodedReader struct {
// EncodeReader takes a Reader and a RedundancyStrategy and returns a slice of
// io.ReadClosers.
func EncodeReader(ctx context.Context, r io.Reader, rs RedundancyStrategy) (_ []io.ReadCloser, err error) {
func EncodeReader(ctx context.Context, log *zap.Logger, r io.Reader, rs RedundancyStrategy) (_ []io.ReadCloser, err error) {
defer mon.Task()(&ctx)(&err)
er := &encodedReader{
log: log,
ctx: ctx,
rs: rs,
pieces: make(map[int]*encodedPiece, rs.TotalCount()),
@ -175,7 +177,7 @@ func (er *encodedReader) fillBuffer(ctx context.Context, r io.Reader, w sync2.Pi
_, err = sync2.Copy(ctx, w, r)
err = w.CloseWithError(err)
if err != nil {
zap.S().Error(err)
er.log.Sugar().Error(err)
}
}
@ -231,20 +233,22 @@ func (ep *encodedPiece) Close() (err error) {
// multiple Ranged sub-Readers. EncodedRanger does not match the normal Ranger
// interface.
type EncodedRanger struct {
rr ranger.Ranger
rs RedundancyStrategy
log *zap.Logger
rr ranger.Ranger
rs RedundancyStrategy
}
// NewEncodedRanger from the given Ranger and RedundancyStrategy. See the
// comments for EncodeReader about the repair and success thresholds.
func NewEncodedRanger(rr ranger.Ranger, rs RedundancyStrategy) (*EncodedRanger, error) {
func NewEncodedRanger(log *zap.Logger, rr ranger.Ranger, rs RedundancyStrategy) (*EncodedRanger, error) {
if rr.Size()%int64(rs.StripeSize()) != 0 {
return nil, Error.New("invalid erasure encoder and range reader combo. " +
"range reader size must be a multiple of erasure encoder block size")
}
return &EncodedRanger{
rs: rs,
rr: rr,
log: log,
rs: rs,
rr: rr,
}, nil
}
@ -269,7 +273,7 @@ func (er *EncodedRanger) Range(ctx context.Context, offset, length int64) (_ []i
if err != nil {
return nil, err
}
readers, err := EncodeReader(ctx, r, er.rs)
readers, err := EncodeReader(ctx, er.log, r, er.rs)
if err != nil {
return nil, err
}

View File

@ -12,6 +12,7 @@ import (
// PieceBuffer is a synchronized buffer for storing erasure shares for a piece.
type PieceBuffer struct {
log *zap.Logger
buf []byte
shareSize int
cond *sync.Cond
@ -27,8 +28,9 @@ type PieceBuffer struct {
// NewPieceBuffer creates and initializes a new PieceBuffer using buf as its
// internal content. If new data is written to the buffer, newDataCond will be
// notified.
func NewPieceBuffer(buf []byte, shareSize int, newDataCond *sync.Cond) *PieceBuffer {
func NewPieceBuffer(log *zap.Logger, buf []byte, shareSize int, newDataCond *sync.Cond) *PieceBuffer {
return &PieceBuffer{
log: log,
buf: buf,
shareSize: shareSize,
cond: sync.NewCond(&sync.Mutex{}),
@ -230,8 +232,7 @@ func (b *PieceBuffer) buffered() int {
func (b *PieceBuffer) HasShare(num int64) bool {
if num < b.currentShare {
// we should never get here!
zap.S().Fatalf("Checking for erasure share %d while the current erasure share is %d.",
num, b.currentShare)
b.log.Sugar().Fatalf("Checking for erasure share %d while the current erasure share is %d.", num, b.currentShare)
}
if b.getError() != nil {
@ -257,8 +258,7 @@ func (b *PieceBuffer) HasShare(num int64) bool {
func (b *PieceBuffer) ReadShare(num int64, p []byte) error {
if num < b.currentShare {
// we should never get here!
zap.S().Fatalf("Trying to read erasure share %d while the current erasure share is already %d.",
num, b.currentShare)
b.log.Sugar().Fatalf("Trying to read erasure share %d while the current erasure share is already %d.", num, b.currentShare)
}
err := b.discardUntil(num)

View File

@ -19,6 +19,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/vivint/infectious"
"github.com/zeebo/errs"
"go.uber.org/zap/zaptest"
"storj.io/storj/internal/memory"
"storj.io/storj/internal/readcloser"
@ -41,7 +42,7 @@ func TestRS(t *testing.T) {
if err != nil {
t.Fatal(err)
}
readers, err := EncodeReader(ctx, bytes.NewReader(data), rs)
readers, err := EncodeReader(ctx, zaptest.NewLogger(t), bytes.NewReader(data), rs)
if err != nil {
t.Fatal(err)
}
@ -49,7 +50,7 @@ func TestRS(t *testing.T) {
for i, reader := range readers {
readerMap[i] = reader
}
decoder := DecodeReaders(ctx, readerMap, rs, 32*1024, 0, false)
decoder := DecodeReaders(ctx, zaptest.NewLogger(t), readerMap, rs, 32*1024, 0, false)
defer func() { assert.NoError(t, decoder.Close()) }()
data2, err := ioutil.ReadAll(decoder)
if err != nil {
@ -72,7 +73,7 @@ func TestRSUnexpectedEOF(t *testing.T) {
if err != nil {
t.Fatal(err)
}
readers, err := EncodeReader(ctx, bytes.NewReader(data), rs)
readers, err := EncodeReader(ctx, zaptest.NewLogger(t), bytes.NewReader(data), rs)
if err != nil {
t.Fatal(err)
}
@ -80,7 +81,7 @@ func TestRSUnexpectedEOF(t *testing.T) {
for i, reader := range readers {
readerMap[i] = reader
}
decoder := DecodeReaders(ctx, readerMap, rs, 32*1024, 0, false)
decoder := DecodeReaders(ctx, zaptest.NewLogger(t), readerMap, rs, 32*1024, 0, false)
defer func() { assert.NoError(t, decoder.Close()) }()
// Try ReadFull more data from DecodeReaders than available
data2 := make([]byte, len(data)+1024)
@ -108,7 +109,7 @@ func TestRSRanger(t *testing.T) {
if err != nil {
t.Fatal(err)
}
readers, err := EncodeReader(ctx, encryption.TransformReader(PadReader(ioutil.NopCloser(
readers, err := EncodeReader(ctx, zaptest.NewLogger(t), encryption.TransformReader(PadReader(ioutil.NopCloser(
bytes.NewReader(data)), encrypter.InBlockSize()), encrypter, 0), rs)
if err != nil {
t.Fatal(err)
@ -125,7 +126,7 @@ func TestRSRanger(t *testing.T) {
if err != nil {
t.Fatal(err)
}
rc, err := Decode(rrs, rs, 0, false)
rc, err := Decode(zaptest.NewLogger(t), rrs, rs, 0, false)
if err != nil {
t.Fatal(err)
}
@ -386,7 +387,7 @@ func testRSProblematic(t *testing.T, tt testCase, i int, fn problematicReadClose
if !assert.NoError(t, err, errTag) {
return
}
readers, err := EncodeReader(ctx, bytes.NewReader(data), rs)
readers, err := EncodeReader(ctx, zaptest.NewLogger(t), bytes.NewReader(data), rs)
if !assert.NoError(t, err, errTag) {
return
}
@ -405,7 +406,7 @@ func testRSProblematic(t *testing.T, tt testCase, i int, fn problematicReadClose
for i := tt.problematic; i < tt.total; i++ {
readerMap[i] = ioutil.NopCloser(bytes.NewReader(pieces[i]))
}
decoder := DecodeReaders(ctx, readerMap, rs, int64(tt.dataSize), 3*1024, false)
decoder := DecodeReaders(ctx, zaptest.NewLogger(t), readerMap, rs, int64(tt.dataSize), 3*1024, false)
defer func() { assert.NoError(t, decoder.Close()) }()
data2, err := ioutil.ReadAll(decoder)
if tt.fail {
@ -462,7 +463,7 @@ func TestEncoderStalledReaders(t *testing.T) {
if err != nil {
t.Fatal(err)
}
readers, err := EncodeReader(ctx, bytes.NewReader(data), rs)
readers, err := EncodeReader(ctx, zaptest.NewLogger(t), bytes.NewReader(data), rs)
if err != nil {
t.Fatal(err)
}
@ -508,7 +509,7 @@ func TestDecoderErrorWithStalledReaders(t *testing.T) {
if err != nil {
t.Fatal(err)
}
readers, err := EncodeReader(ctx, bytes.NewReader(data), rs)
readers, err := EncodeReader(ctx, zaptest.NewLogger(t), bytes.NewReader(data), rs)
if err != nil {
t.Fatal(err)
}
@ -531,7 +532,7 @@ func TestDecoderErrorWithStalledReaders(t *testing.T) {
for i := 7; i < 20; i++ {
readerMap[i] = readcloser.FatalReadCloser(errors.New("I am an error piece"))
}
decoder := DecodeReaders(ctx, readerMap, rs, int64(10*1024), 0, false)
decoder := DecodeReaders(ctx, zaptest.NewLogger(t), readerMap, rs, int64(10*1024), 0, false)
defer func() { assert.NoError(t, decoder.Close()) }()
// record the time for reading the data from the decoder
start := time.Now()
@ -663,7 +664,7 @@ func TestCalcPieceSize(t *testing.T) {
calculatedSize := CalcPieceSize(dataSize, es)
randReader := ioutil.NopCloser(io.LimitReader(testrand.Reader(), dataSize))
readers, err := EncodeReader(ctx, PadReader(randReader, es.StripeSize()), rs)
readers, err := EncodeReader(ctx, zaptest.NewLogger(t), PadReader(randReader, es.StripeSize()), rs)
require.NoError(t, err, errTag)
for _, reader := range readers {

View File

@ -12,6 +12,7 @@ import (
"sync"
"github.com/vivint/infectious"
"go.uber.org/zap"
monkit "gopkg.in/spacemonkeygo/monkit.v2"
)
@ -33,7 +34,7 @@ type StripeReader struct {
// NewStripeReader creates a new StripeReader from the given readers, erasure
// scheme and max buffer memory.
func NewStripeReader(rs map[int]io.ReadCloser, es ErasureScheme, mbm int, forceErrorDetection bool) *StripeReader {
func NewStripeReader(log *zap.Logger, rs map[int]io.ReadCloser, es ErasureScheme, mbm int, forceErrorDetection bool) *StripeReader {
readerCount := len(rs)
r := &StripeReader{
@ -55,7 +56,7 @@ func NewStripeReader(rs map[int]io.ReadCloser, es ErasureScheme, mbm int, forceE
for i := range rs {
r.inbufs[i] = make([]byte, es.ErasureShareSize())
r.bufs[i] = NewPieceBuffer(make([]byte, bufSize), es.ErasureShareSize(), r.cond)
r.bufs[i] = NewPieceBuffer(log, make([]byte, bufSize), es.ErasureShareSize(), r.cond)
// Kick off a goroutine each reader to be copied into a PieceBuffer.
go func(r io.Reader, buf *PieceBuffer) {
_, err := io.Copy(buf, r)

View File

@ -61,12 +61,12 @@ func (peer *Peer) HandleGet(w http.ResponseWriter, r *http.Request) {
if xfor = r.Header.Get("X-Forwarded-For"); xfor == "" {
xfor = r.RemoteAddr
}
zap.S().Debugf("Request from: %s for %s", r.RemoteAddr, xfor)
peer.Log.Sugar().Debugf("Request from: %s for %s", r.RemoteAddr, xfor)
w.Header().Set("Content-Type", "application/json")
_, err := w.Write(peer.response)
if err != nil {
zap.S().Errorf("error writing response to client: %v", err)
peer.Log.Sugar().Errorf("error writing response to client: %v", err)
}
}