Check context cancellation more nicely (#1752)

This commit is contained in:
Egon Elbre 2019-04-17 13:09:44 +03:00 committed by GitHub
parent ca2055abaf
commit 5b3c146d8a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 62 additions and 60 deletions

View File

@ -6,15 +6,14 @@ package bootstrap
import ( import (
"context" "context"
"net" "net"
"net/http"
"github.com/zeebo/errs" "github.com/zeebo/errs"
"go.uber.org/zap" "go.uber.org/zap"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"storj.io/storj/bootstrap/bootstrapweb" "storj.io/storj/bootstrap/bootstrapweb"
"storj.io/storj/bootstrap/bootstrapweb/bootstrapserver" "storj.io/storj/bootstrap/bootstrapweb/bootstrapserver"
"storj.io/storj/internal/errs2"
"storj.io/storj/internal/version" "storj.io/storj/internal/version"
"storj.io/storj/pkg/identity" "storj.io/storj/pkg/identity"
"storj.io/storj/pkg/kademlia" "storj.io/storj/pkg/kademlia"
@ -195,13 +194,13 @@ func (peer *Peer) Run(ctx context.Context) error {
group, ctx := errgroup.WithContext(ctx) group, ctx := errgroup.WithContext(ctx)
group.Go(func() error { group.Go(func() error {
return ignoreCancel(peer.Version.Run(ctx)) return errs2.IgnoreCanceled(peer.Version.Run(ctx))
}) })
group.Go(func() error { group.Go(func() error {
return ignoreCancel(peer.Kademlia.Service.Bootstrap(ctx)) return errs2.IgnoreCanceled(peer.Kademlia.Service.Bootstrap(ctx))
}) })
group.Go(func() error { group.Go(func() error {
return ignoreCancel(peer.Kademlia.Service.Run(ctx)) return errs2.IgnoreCanceled(peer.Kademlia.Service.Run(ctx))
}) })
group.Go(func() error { group.Go(func() error {
// TODO: move the message into Server instead // TODO: move the message into Server instead
@ -209,22 +208,15 @@ func (peer *Peer) Run(ctx context.Context) error {
peer.Log.Sugar().Infof("Node %s started", peer.Identity.ID) peer.Log.Sugar().Infof("Node %s started", peer.Identity.ID)
peer.Log.Sugar().Infof("Public server started on %s", peer.Addr()) peer.Log.Sugar().Infof("Public server started on %s", peer.Addr())
peer.Log.Sugar().Infof("Private server started on %s", peer.PrivateAddr()) peer.Log.Sugar().Infof("Private server started on %s", peer.PrivateAddr())
return ignoreCancel(peer.Server.Run(ctx)) return errs2.IgnoreCanceled(peer.Server.Run(ctx))
}) })
group.Go(func() error { group.Go(func() error {
return ignoreCancel(peer.Web.Endpoint.Run(ctx)) return errs2.IgnoreCanceled(peer.Web.Endpoint.Run(ctx))
}) })
return group.Wait() return group.Wait()
} }
func ignoreCancel(err error) error {
if err == context.Canceled || err == grpc.ErrServerStopped || err == http.ErrServerClosed {
return nil
}
return err
}
// Close closes all the resources. // Close closes all the resources.
func (peer *Peer) Close() error { func (peer *Peer) Close() error {
var errlist errs.Group var errlist errs.Group

31
internal/errs2/ignore.go Normal file
View File

@ -0,0 +1,31 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package errs2
import (
"context"
"net/http"
"github.com/zeebo/errs"
"google.golang.org/grpc"
)
// IgnoreCanceled returns nil, when the operation was about canceling.
func IgnoreCanceled(originalError error) error {
err := originalError
for err != nil {
if err == context.Canceled ||
err == grpc.ErrServerStopped ||
err == http.ErrServerClosed {
return nil
}
unwrapped := errs.Unwrap(err)
if unwrapped == err {
return originalError
}
err = unwrapped
}
return originalError
}

View File

@ -31,7 +31,7 @@ func TestLimiterLimiting(t *testing.T) {
limiter.Wait() limiter.Wait()
} }
func TestLimiterCancelling(t *testing.T) { func TestLimiterCanceling(t *testing.T) {
t.Parallel() t.Parallel()
const N, Limit = 1000, 10 const N, Limit = 1000, 10

View File

@ -7,7 +7,6 @@ import (
"context" "context"
"fmt" "fmt"
"net" "net"
"net/http"
"net/mail" "net/mail"
"net/smtp" "net/smtp"
"os" "os"
@ -17,8 +16,8 @@ import (
"github.com/zeebo/errs" "github.com/zeebo/errs"
"go.uber.org/zap" "go.uber.org/zap"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"storj.io/storj/internal/errs2"
"storj.io/storj/internal/post" "storj.io/storj/internal/post"
"storj.io/storj/internal/post/oauth2" "storj.io/storj/internal/post/oauth2"
"storj.io/storj/internal/version" "storj.io/storj/internal/version"
@ -513,43 +512,36 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, config *Config, ve
return peer, nil return peer, nil
} }
func ignoreCancel(err error) error {
if err == context.Canceled || err == grpc.ErrServerStopped || err == http.ErrServerClosed {
return nil
}
return err
}
// Run runs storage node until it's either closed or it errors. // Run runs storage node until it's either closed or it errors.
func (peer *Peer) Run(ctx context.Context) error { func (peer *Peer) Run(ctx context.Context) error {
group, ctx := errgroup.WithContext(ctx) group, ctx := errgroup.WithContext(ctx)
group.Go(func() error { group.Go(func() error {
return ignoreCancel(peer.Version.Run(ctx)) return errs2.IgnoreCanceled(peer.Version.Run(ctx))
}) })
group.Go(func() error { group.Go(func() error {
return ignoreCancel(peer.Kademlia.Service.Bootstrap(ctx)) return errs2.IgnoreCanceled(peer.Kademlia.Service.Bootstrap(ctx))
}) })
group.Go(func() error { group.Go(func() error {
return ignoreCancel(peer.Kademlia.Service.Run(ctx)) return errs2.IgnoreCanceled(peer.Kademlia.Service.Run(ctx))
}) })
group.Go(func() error { group.Go(func() error {
return ignoreCancel(peer.Discovery.Service.Run(ctx)) return errs2.IgnoreCanceled(peer.Discovery.Service.Run(ctx))
}) })
group.Go(func() error { group.Go(func() error {
return ignoreCancel(peer.Repair.Checker.Run(ctx)) return errs2.IgnoreCanceled(peer.Repair.Checker.Run(ctx))
}) })
group.Go(func() error { group.Go(func() error {
return ignoreCancel(peer.Repair.Repairer.Run(ctx)) return errs2.IgnoreCanceled(peer.Repair.Repairer.Run(ctx))
}) })
group.Go(func() error { group.Go(func() error {
return ignoreCancel(peer.Accounting.Tally.Run(ctx)) return errs2.IgnoreCanceled(peer.Accounting.Tally.Run(ctx))
}) })
group.Go(func() error { group.Go(func() error {
return ignoreCancel(peer.Accounting.Rollup.Run(ctx)) return errs2.IgnoreCanceled(peer.Accounting.Rollup.Run(ctx))
}) })
group.Go(func() error { group.Go(func() error {
return ignoreCancel(peer.Audit.Service.Run(ctx)) return errs2.IgnoreCanceled(peer.Audit.Service.Run(ctx))
}) })
group.Go(func() error { group.Go(func() error {
// TODO: move the message into Server instead // TODO: move the message into Server instead
@ -557,10 +549,10 @@ func (peer *Peer) Run(ctx context.Context) error {
peer.Log.Sugar().Infof("Node %s started", peer.Identity.ID) peer.Log.Sugar().Infof("Node %s started", peer.Identity.ID)
peer.Log.Sugar().Infof("Public server started on %s", peer.Addr()) peer.Log.Sugar().Infof("Public server started on %s", peer.Addr())
peer.Log.Sugar().Infof("Private server started on %s", peer.PrivateAddr()) peer.Log.Sugar().Infof("Private server started on %s", peer.PrivateAddr())
return ignoreCancel(peer.Server.Run(ctx)) return errs2.IgnoreCanceled(peer.Server.Run(ctx))
}) })
group.Go(func() error { group.Go(func() error {
return ignoreCancel(peer.Console.Endpoint.Run(ctx)) return errs2.IgnoreCanceled(peer.Console.Endpoint.Run(ctx))
}) })
return group.Wait() return group.Wait()

View File

@ -9,8 +9,8 @@ import (
"github.com/zeebo/errs" "github.com/zeebo/errs"
"go.uber.org/zap" "go.uber.org/zap"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"storj.io/storj/internal/errs2"
"storj.io/storj/internal/version" "storj.io/storj/internal/version"
"storj.io/storj/pkg/auth/signing" "storj.io/storj/pkg/auth/signing"
"storj.io/storj/pkg/identity" "storj.io/storj/pkg/identity"
@ -260,22 +260,22 @@ func (peer *Peer) Run(ctx context.Context) error {
group, ctx := errgroup.WithContext(ctx) group, ctx := errgroup.WithContext(ctx)
group.Go(func() error { group.Go(func() error {
return ignoreCancel(peer.Version.Run(ctx)) return errs2.IgnoreCanceled(peer.Version.Run(ctx))
}) })
group.Go(func() error { group.Go(func() error {
return ignoreCancel(peer.Kademlia.Service.Bootstrap(ctx)) return errs2.IgnoreCanceled(peer.Kademlia.Service.Bootstrap(ctx))
}) })
group.Go(func() error { group.Go(func() error {
return ignoreCancel(peer.Kademlia.Service.Run(ctx)) return errs2.IgnoreCanceled(peer.Kademlia.Service.Run(ctx))
}) })
group.Go(func() error { group.Go(func() error {
return ignoreCancel(peer.Agreements.Sender.Run(ctx)) return errs2.IgnoreCanceled(peer.Agreements.Sender.Run(ctx))
}) })
group.Go(func() error { group.Go(func() error {
return ignoreCancel(peer.Storage2.Sender.Run(ctx)) return errs2.IgnoreCanceled(peer.Storage2.Sender.Run(ctx))
}) })
group.Go(func() error { group.Go(func() error {
return ignoreCancel(peer.Storage2.Monitor.Run(ctx)) return errs2.IgnoreCanceled(peer.Storage2.Monitor.Run(ctx))
}) })
group.Go(func() error { group.Go(func() error {
// TODO: move the message into Server instead // TODO: move the message into Server instead
@ -283,19 +283,12 @@ func (peer *Peer) Run(ctx context.Context) error {
peer.Log.Sugar().Infof("Node %s started", peer.Identity.ID) peer.Log.Sugar().Infof("Node %s started", peer.Identity.ID)
peer.Log.Sugar().Infof("Public server started on %s", peer.Addr()) peer.Log.Sugar().Infof("Public server started on %s", peer.Addr())
peer.Log.Sugar().Infof("Private server started on %s", peer.PrivateAddr()) peer.Log.Sugar().Infof("Private server started on %s", peer.PrivateAddr())
return ignoreCancel(peer.Server.Run(ctx)) return errs2.IgnoreCanceled(peer.Server.Run(ctx))
}) })
return group.Wait() return group.Wait()
} }
func ignoreCancel(err error) error {
if err == context.Canceled || err == grpc.ErrServerStopped {
return nil
}
return err
}
// Close closes all the resources. // Close closes all the resources.
func (peer *Peer) Close() error { func (peer *Peer) Close() error {
var errlist errs.Group var errlist errs.Group

View File

@ -161,7 +161,7 @@ func (endpoint *Endpoint) Upload(stream pb.Piecestore_UploadServer) (err error)
defer func() { defer func() {
// cancel error if it hasn't been committed // cancel error if it hasn't been committed
if cancelErr := pieceWriter.Cancel(); cancelErr != nil { if cancelErr := pieceWriter.Cancel(); cancelErr != nil {
endpoint.log.Error("error during cancelling a piece write", zap.Error(cancelErr)) endpoint.log.Error("error during canceling a piece write", zap.Error(cancelErr))
} }
}() }()

View File

@ -14,6 +14,7 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
"storj.io/storj/internal/errs2"
"storj.io/storj/internal/version" "storj.io/storj/internal/version"
) )
@ -48,13 +49,6 @@ type Peer struct {
response []byte response []byte
} }
func ignoreCancel(err error) error {
if err == context.Canceled || err == http.ErrServerClosed {
return nil
}
return err
}
// HandleGet contains the request handler for the version control web server // HandleGet contains the request handler for the version control web server
func (peer *Peer) HandleGet(w http.ResponseWriter, r *http.Request) { func (peer *Peer) HandleGet(w http.ResponseWriter, r *http.Request) {
// Only handle GET Requests // Only handle GET Requests
@ -127,12 +121,12 @@ func (peer *Peer) Run(ctx context.Context) (err error) {
group.Go(func() error { group.Go(func() error {
<-ctx.Done() <-ctx.Done()
return ignoreCancel(peer.Server.Endpoint.Shutdown(ctx)) return errs2.IgnoreCanceled(peer.Server.Endpoint.Shutdown(ctx))
}) })
group.Go(func() error { group.Go(func() error {
defer cancel() defer cancel()
peer.Log.Sugar().Infof("Versioning server started on %s", peer.Addr()) peer.Log.Sugar().Infof("Versioning server started on %s", peer.Addr())
return ignoreCancel(peer.Server.Endpoint.Serve(peer.Server.Listener)) return errs2.IgnoreCanceled(peer.Server.Endpoint.Serve(peer.Server.Listener))
}) })
return group.Wait() return group.Wait()
} }