remove utils.CombineErrors and utils.ErrorGroup (#1603)
This commit is contained in:
parent
0ce6d4ab81
commit
de15a4fdcf
@ -11,7 +11,6 @@ import (
|
||||
"github.com/zeebo/errs"
|
||||
|
||||
"storj.io/storj/bootstrap/bootstrapweb/bootstrapserver/bootstrapql"
|
||||
"storj.io/storj/pkg/utils"
|
||||
)
|
||||
|
||||
// JSON request from graphql clients
|
||||
@ -40,10 +39,10 @@ func queryPOST(req *http.Request) (query graphqlJSON, err error) {
|
||||
case applicationGraphql:
|
||||
body, err := ioutil.ReadAll(req.Body)
|
||||
query.Query = string(body)
|
||||
return query, utils.CombineErrors(err, req.Body.Close())
|
||||
return query, errs.Combine(err, req.Body.Close())
|
||||
case applicationJSON:
|
||||
err := json.NewDecoder(req.Body).Decode(&query)
|
||||
return query, utils.CombineErrors(err, req.Body.Close())
|
||||
return query, errs.Combine(err, req.Body.Close())
|
||||
default:
|
||||
return query, errs.New("can't parse request body of type %s", typ)
|
||||
}
|
||||
|
@ -18,7 +18,6 @@ import (
|
||||
"github.com/zeebo/errs"
|
||||
|
||||
"storj.io/storj/pkg/certificates"
|
||||
"storj.io/storj/pkg/utils"
|
||||
)
|
||||
|
||||
var (
|
||||
@ -86,13 +85,13 @@ func cmdCreateAuth(cmd *cobra.Command, args []string) error {
|
||||
}
|
||||
}
|
||||
|
||||
var incErrs utils.ErrorGroup
|
||||
var incErrs errs.Group
|
||||
for _, email := range emails {
|
||||
if _, err := authDB.Create(email, count); err != nil {
|
||||
incErrs.Add(err)
|
||||
}
|
||||
}
|
||||
return incErrs.Finish()
|
||||
return incErrs.Err()
|
||||
}
|
||||
|
||||
func cmdInfoAuth(cmd *cobra.Command, args []string) error {
|
||||
@ -121,7 +120,7 @@ func cmdInfoAuth(cmd *cobra.Command, args []string) error {
|
||||
}
|
||||
}
|
||||
|
||||
var emailErrs, printErrs utils.ErrorGroup
|
||||
var emailErrs, printErrs errs.Group
|
||||
w := tabwriter.NewWriter(os.Stdout, 0, 2, 2, ' ', 0)
|
||||
if _, err := fmt.Fprintln(w, "Email\tClaimed\tAvail.\t"); err != nil {
|
||||
return err
|
||||
@ -137,7 +136,7 @@ func cmdInfoAuth(cmd *cobra.Command, args []string) error {
|
||||
if err := w.Flush(); err != nil {
|
||||
return errs.Wrap(err)
|
||||
}
|
||||
return utils.CombineErrors(emailErrs.Finish(), printErrs.Finish())
|
||||
return errs.Combine(emailErrs.Err(), printErrs.Err())
|
||||
}
|
||||
|
||||
func writeAuthInfo(authDB *certificates.AuthorizationDB, email string, w io.Writer) error {
|
||||
@ -216,7 +215,7 @@ func cmdExportAuth(cmd *cobra.Command, args []string) error {
|
||||
}
|
||||
|
||||
var (
|
||||
emailErrs, csvErrs utils.ErrorGroup
|
||||
emailErrs, csvErrs errs.Group
|
||||
output io.Writer
|
||||
)
|
||||
switch config.Out {
|
||||
@ -240,7 +239,7 @@ func cmdExportAuth(cmd *cobra.Command, args []string) error {
|
||||
}
|
||||
|
||||
csvWriter.Flush()
|
||||
return utils.CombineErrors(emailErrs.Finish(), csvErrs.Finish())
|
||||
return errs.Combine(emailErrs.Err(), csvErrs.Err())
|
||||
}
|
||||
|
||||
func writeAuthExport(authDB *certificates.AuthorizationDB, email string, w *csv.Writer) error {
|
||||
@ -252,11 +251,11 @@ func writeAuthExport(authDB *certificates.AuthorizationDB, email string, w *csv.
|
||||
return nil
|
||||
}
|
||||
|
||||
var authErrs utils.ErrorGroup
|
||||
var authErrs errs.Group
|
||||
for _, auth := range auths {
|
||||
if err := w.Write([]string{email, auth.Token.String()}); err != nil {
|
||||
authErrs.Add(err)
|
||||
}
|
||||
}
|
||||
return authErrs.Finish()
|
||||
return authErrs.Err()
|
||||
}
|
||||
|
@ -21,7 +21,6 @@ import (
|
||||
"storj.io/storj/pkg/storage/streams"
|
||||
"storj.io/storj/pkg/storj"
|
||||
"storj.io/storj/pkg/stream"
|
||||
"storj.io/storj/pkg/utils"
|
||||
)
|
||||
|
||||
var (
|
||||
@ -132,7 +131,7 @@ func uploadStream(ctx context.Context, streams streams.Store, mutableObject stor
|
||||
|
||||
_, err = io.Copy(upload, reader)
|
||||
|
||||
return utils.CombineErrors(err, upload.Close())
|
||||
return errs.Combine(err, upload.Close())
|
||||
}
|
||||
|
||||
// download transfers s3 compatible object src to dst on local machine
|
||||
|
@ -12,7 +12,7 @@ import (
|
||||
"runtime"
|
||||
"strings"
|
||||
|
||||
"storj.io/storj/pkg/utils"
|
||||
"github.com/zeebo/errs"
|
||||
)
|
||||
|
||||
// IsRoot returns whether path is the root directory
|
||||
@ -77,7 +77,7 @@ func IsValidSetupDir(name string) (ok bool, err error) {
|
||||
return false, err
|
||||
}
|
||||
defer func() {
|
||||
err = utils.CombineErrors(err, f.Close())
|
||||
err = errs.Combine(err, f.Close())
|
||||
}()
|
||||
|
||||
for {
|
||||
|
@ -7,8 +7,6 @@ import (
|
||||
"database/sql"
|
||||
|
||||
"github.com/zeebo/errs"
|
||||
|
||||
"storj.io/storj/pkg/utils"
|
||||
)
|
||||
|
||||
// DB is the minimal implementation that is needed by migration.
|
||||
@ -32,7 +30,7 @@ func Create(identifier string, db DB) error {
|
||||
|
||||
_, err = tx.Exec(db.Rebind(`CREATE TABLE IF NOT EXISTS table_schemas (id text, schemaText text);`))
|
||||
if err != nil {
|
||||
return Error.Wrap(utils.CombineErrors(err, tx.Rollback()))
|
||||
return Error.Wrap(errs.Combine(err, tx.Rollback()))
|
||||
}
|
||||
|
||||
row := tx.QueryRow(db.Rebind(`SELECT schemaText FROM table_schemas WHERE id = ?;`), identifier)
|
||||
@ -44,23 +42,23 @@ func Create(identifier string, db DB) error {
|
||||
if err == sql.ErrNoRows {
|
||||
_, err := tx.Exec(schema)
|
||||
if err != nil {
|
||||
return Error.Wrap(utils.CombineErrors(err, tx.Rollback()))
|
||||
return Error.Wrap(errs.Combine(err, tx.Rollback()))
|
||||
}
|
||||
|
||||
_, err = tx.Exec(db.Rebind(`INSERT INTO table_schemas(id, schemaText) VALUES (?, ?);`), identifier, schema)
|
||||
if err != nil {
|
||||
return Error.Wrap(utils.CombineErrors(err, tx.Rollback()))
|
||||
return Error.Wrap(errs.Combine(err, tx.Rollback()))
|
||||
}
|
||||
|
||||
return Error.Wrap(tx.Commit())
|
||||
}
|
||||
if err != nil {
|
||||
return Error.Wrap(utils.CombineErrors(err, tx.Rollback()))
|
||||
return Error.Wrap(errs.Combine(err, tx.Rollback()))
|
||||
}
|
||||
|
||||
if schema != previousSchema {
|
||||
err := Error.New("schema mismatch:\nold %v\nnew %v", previousSchema, schema)
|
||||
return Error.Wrap(utils.CombineErrors(err, tx.Rollback()))
|
||||
return Error.Wrap(errs.Combine(err, tx.Rollback()))
|
||||
}
|
||||
|
||||
return Error.Wrap(tx.Rollback())
|
||||
|
@ -6,7 +6,7 @@ package readcloser
|
||||
import (
|
||||
"io"
|
||||
|
||||
"storj.io/storj/pkg/utils"
|
||||
"github.com/zeebo/errs"
|
||||
)
|
||||
|
||||
type eofReadCloser struct{}
|
||||
@ -63,9 +63,9 @@ func (mr *multiReadCloser) Read(p []byte) (n int, err error) {
|
||||
}
|
||||
|
||||
func (mr *multiReadCloser) Close() error {
|
||||
errs := make([]error, len(mr.readers))
|
||||
errlist := make([]error, len(mr.readers))
|
||||
for i, r := range mr.readers {
|
||||
errs[i] = r.Close()
|
||||
errlist[i] = r.Close()
|
||||
}
|
||||
return utils.CombineErrors(errs...)
|
||||
return errs.Combine(errlist...)
|
||||
}
|
||||
|
@ -10,9 +10,10 @@ import (
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
|
||||
"github.com/zeebo/errs"
|
||||
|
||||
"storj.io/storj/pkg/storj"
|
||||
"storj.io/storj/pkg/stream"
|
||||
"storj.io/storj/pkg/utils"
|
||||
)
|
||||
|
||||
// Encryption holds the cipher, path, key, and enc. scheme for each bucket since they
|
||||
@ -94,7 +95,7 @@ func (b *Bucket) Upload(ctx context.Context, path storj.Path, data []byte, opts
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
|
||||
return utils.CombineErrors(err, upload.Close())
|
||||
return errs.Combine(err, upload.Close())
|
||||
}
|
||||
|
||||
// Download downloads an object from a bucket
|
||||
@ -111,7 +112,7 @@ func (b *Bucket) Download(ctx context.Context, path storj.Path) ([]byte, error)
|
||||
|
||||
stream := stream.NewDownload(ctx, readStream, streams)
|
||||
|
||||
defer func() { err = utils.CombineErrors(err, stream.Close()) }()
|
||||
defer func() { err = errs.Combine(err, stream.Close()) }()
|
||||
|
||||
data, err := ioutil.ReadAll(stream)
|
||||
if err != nil {
|
||||
|
@ -26,7 +26,6 @@ import (
|
||||
"storj.io/storj/pkg/identity"
|
||||
"storj.io/storj/pkg/pb"
|
||||
"storj.io/storj/pkg/transport"
|
||||
"storj.io/storj/pkg/utils"
|
||||
"storj.io/storj/storage"
|
||||
)
|
||||
|
||||
@ -259,7 +258,7 @@ func (authDB *AuthorizationDB) Create(userID string, count int) (Authorizations,
|
||||
|
||||
var (
|
||||
newAuths Authorizations
|
||||
authErrs utils.ErrorGroup
|
||||
authErrs errs.Group
|
||||
)
|
||||
for i := 0; i < count; i++ {
|
||||
auth, err := NewAuthorization(userID)
|
||||
@ -269,7 +268,7 @@ func (authDB *AuthorizationDB) Create(userID string, count int) (Authorizations,
|
||||
}
|
||||
newAuths = append(newAuths, auth)
|
||||
}
|
||||
if err := authErrs.Finish(); err != nil {
|
||||
if err := authErrs.Err(); err != nil {
|
||||
return nil, ErrAuthorizationDB.Wrap(err)
|
||||
}
|
||||
|
||||
|
@ -30,7 +30,6 @@ import (
|
||||
"storj.io/storj/pkg/pkcrypto"
|
||||
"storj.io/storj/pkg/server"
|
||||
"storj.io/storj/pkg/transport"
|
||||
"storj.io/storj/pkg/utils"
|
||||
"storj.io/storj/storage"
|
||||
)
|
||||
|
||||
@ -473,15 +472,14 @@ func TestAuthorizationDB_Emails(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
defer ctx.Check(authDB.Close)
|
||||
|
||||
var authErrs utils.ErrorGroup
|
||||
var authErrs errs.Group
|
||||
for i := 0; i < 5; i++ {
|
||||
_, err := authDB.Create(fmt.Sprintf("user%d@example.com", i), 1)
|
||||
if err != nil {
|
||||
authErrs.Add(err)
|
||||
}
|
||||
}
|
||||
err = authErrs.Finish()
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, authErrs.Err())
|
||||
|
||||
userIDs, err := authDB.UserIDs()
|
||||
assert.NoError(t, err)
|
||||
|
@ -9,10 +9,11 @@ import (
|
||||
"io/ioutil"
|
||||
"sync"
|
||||
|
||||
"github.com/zeebo/errs"
|
||||
|
||||
"storj.io/storj/internal/readcloser"
|
||||
"storj.io/storj/pkg/encryption"
|
||||
"storj.io/storj/pkg/ranger"
|
||||
"storj.io/storj/pkg/utils"
|
||||
)
|
||||
|
||||
type decodedReader struct {
|
||||
@ -98,20 +99,14 @@ func (dr *decodedReader) Close() error {
|
||||
dr.cancel()
|
||||
// avoid double close of readers
|
||||
dr.close.Do(func() {
|
||||
var errs []error
|
||||
var errlist errs.Group
|
||||
// close the readers
|
||||
for _, r := range dr.readers {
|
||||
err := r.Close()
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
}
|
||||
errlist.Add(r.Close())
|
||||
}
|
||||
// close the stripe reader
|
||||
err := dr.stripeReader.Close()
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
}
|
||||
dr.closeErr = utils.CombineErrors(errs...)
|
||||
errlist.Add(dr.stripeReader.Close())
|
||||
dr.closeErr = errlist.Err()
|
||||
})
|
||||
return dr.closeErr
|
||||
}
|
||||
|
@ -23,7 +23,6 @@ import (
|
||||
"storj.io/storj/pkg/peertls"
|
||||
"storj.io/storj/pkg/pkcrypto"
|
||||
"storj.io/storj/pkg/storj"
|
||||
"storj.io/storj/pkg/utils"
|
||||
)
|
||||
|
||||
// PeerIdentity represents another peer on the network.
|
||||
@ -335,12 +334,12 @@ func (ic Config) Save(fi *FullIdentity) error {
|
||||
writeKeyDataErr = writeKeyData(ic.KeyPath, keyData.Bytes())
|
||||
}
|
||||
|
||||
writeErr := utils.CombineErrors(writeChainErr, writeKeyErr)
|
||||
writeErr := errs.Combine(writeChainErr, writeKeyErr)
|
||||
if writeErr != nil {
|
||||
return writeErr
|
||||
}
|
||||
|
||||
return utils.CombineErrors(
|
||||
return errs.Combine(
|
||||
writeChainDataErr,
|
||||
writeKeyDataErr,
|
||||
)
|
||||
@ -390,12 +389,12 @@ func (ic PeerConfig) Save(fi *PeerIdentity) error {
|
||||
writeChainDataErr = writeChainData(ic.CertPath, certData.Bytes())
|
||||
}
|
||||
|
||||
writeErr := utils.CombineErrors(writeChainErr)
|
||||
writeErr := errs.Combine(writeChainErr)
|
||||
if writeErr != nil {
|
||||
return writeErr
|
||||
}
|
||||
|
||||
return utils.CombineErrors(
|
||||
return errs.Combine(
|
||||
writeChainDataErr,
|
||||
)
|
||||
}
|
||||
|
@ -16,7 +16,6 @@ import (
|
||||
|
||||
"storj.io/storj/pkg/pb"
|
||||
"storj.io/storj/pkg/storj"
|
||||
"storj.io/storj/pkg/utils"
|
||||
"storj.io/storj/storage"
|
||||
)
|
||||
|
||||
@ -153,23 +152,23 @@ func (rt *RoutingTable) GetBucketIds() (storage.Keys, error) {
|
||||
// DumpNodes iterates through all nodes in the nodeBucketDB and marshals them to &pb.Nodes, then returns them
|
||||
func (rt *RoutingTable) DumpNodes() ([]*pb.Node, error) {
|
||||
var nodes []*pb.Node
|
||||
var errors utils.ErrorGroup
|
||||
var nodeErrors errs.Group
|
||||
|
||||
err := rt.iterateNodes(storj.NodeID{}, func(newID storj.NodeID, protoNode []byte) error {
|
||||
newNode := pb.Node{}
|
||||
err := proto.Unmarshal(protoNode, &newNode)
|
||||
if err != nil {
|
||||
errors.Add(err)
|
||||
nodeErrors.Add(err)
|
||||
}
|
||||
nodes = append(nodes, &newNode)
|
||||
return nil
|
||||
}, false)
|
||||
|
||||
if err != nil {
|
||||
errors.Add(err)
|
||||
nodeErrors.Add(err)
|
||||
}
|
||||
|
||||
return nodes, errors.Finish()
|
||||
return nodes, nodeErrors.Err()
|
||||
}
|
||||
|
||||
// FindNear returns the node corresponding to the provided nodeID
|
||||
|
@ -18,7 +18,6 @@ import (
|
||||
"storj.io/storj/pkg/storage/streams"
|
||||
"storj.io/storj/pkg/storj"
|
||||
"storj.io/storj/pkg/stream"
|
||||
"storj.io/storj/pkg/utils"
|
||||
)
|
||||
|
||||
var (
|
||||
@ -402,7 +401,7 @@ func upload(ctx context.Context, streams streams.Store, mutableObject storj.Muta
|
||||
|
||||
_, err = io.Copy(upload, reader)
|
||||
|
||||
return utils.CombineErrors(err, upload.Close())
|
||||
return errs.Combine(err, upload.Close())
|
||||
}
|
||||
|
||||
func (layer *gatewayLayer) PutObject(ctx context.Context, bucket, object string, data *hash.Reader, metadata map[string]string) (objInfo minio.ObjectInfo, err error) {
|
||||
|
@ -10,7 +10,6 @@ import (
|
||||
monkit "gopkg.in/spacemonkeygo/monkit.v2"
|
||||
|
||||
"storj.io/storj/pkg/storj"
|
||||
"storj.io/storj/pkg/utils"
|
||||
)
|
||||
|
||||
var (
|
||||
@ -55,7 +54,7 @@ func (c LookupConfig) ParseIDs() (ids storj.NodeIDList, err error) {
|
||||
}
|
||||
ids = append(ids, id)
|
||||
}
|
||||
if err := utils.CombineErrors(idErrs...); err != nil {
|
||||
if err := errs.Combine(idErrs...); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return ids, nil
|
||||
|
@ -14,7 +14,6 @@ import (
|
||||
"github.com/zeebo/errs"
|
||||
|
||||
"storj.io/storj/pkg/pkcrypto"
|
||||
"storj.io/storj/pkg/utils"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -110,7 +109,7 @@ func WriteChain(w io.Writer, chain ...*x509.Certificate) error {
|
||||
return errs.New("expected at least one certificate for writing")
|
||||
}
|
||||
|
||||
var extErrs utils.ErrorGroup
|
||||
var extErrs errs.Group
|
||||
for _, c := range chain {
|
||||
if err := pkcrypto.WriteCertPEM(w, c); err != nil {
|
||||
return errs.Wrap(err)
|
||||
@ -122,7 +121,7 @@ func WriteChain(w io.Writer, chain ...*x509.Certificate) error {
|
||||
}
|
||||
}
|
||||
}
|
||||
return extErrs.Finish()
|
||||
return extErrs.Err()
|
||||
}
|
||||
|
||||
// ChainBytes returns bytes of the certificate chain (leaf-first) to the writer, PEM-encoded.
|
||||
|
@ -14,7 +14,6 @@ import (
|
||||
|
||||
"storj.io/storj/pkg/auth"
|
||||
"storj.io/storj/pkg/pb"
|
||||
"storj.io/storj/pkg/utils"
|
||||
)
|
||||
|
||||
// OK - Success!
|
||||
@ -68,7 +67,7 @@ func (s *Server) Store(reqStream pb.PieceStoreRoutes_StoreServer) (err error) {
|
||||
|
||||
if err = s.DB.AddTTL(id, pd.GetExpirationUnixSec(), total); err != nil {
|
||||
deleteErr := s.deleteByID(id)
|
||||
return StoreError.New("failed to write piece meta data to database: %v", utils.CombineErrors(err, deleteErr))
|
||||
return StoreError.New("failed to write piece meta data to database: %v", errs.Combine(err, deleteErr))
|
||||
}
|
||||
|
||||
signedHash := &pb.SignedHash{Hash: hash}
|
||||
|
@ -13,8 +13,6 @@ import (
|
||||
"math/big"
|
||||
|
||||
"github.com/zeebo/errs"
|
||||
|
||||
"storj.io/storj/pkg/utils"
|
||||
)
|
||||
|
||||
// WritePublicKeyPEM writes the public key, in a PEM-enveloped
|
||||
@ -175,7 +173,7 @@ func CertsFromDER(rawCerts [][]byte) ([]*x509.Certificate, error) {
|
||||
func CertsFromPEM(pemBytes []byte) ([]*x509.Certificate, error) {
|
||||
var (
|
||||
encChain encodedChain
|
||||
blockErrs utils.ErrorGroup
|
||||
blockErrs errs.Group
|
||||
)
|
||||
for {
|
||||
var pemBlock *pem.Block
|
||||
@ -187,12 +185,11 @@ func CertsFromPEM(pemBytes []byte) ([]*x509.Certificate, error) {
|
||||
case BlockLabelCertificate:
|
||||
encChain.AddCert(pemBlock.Bytes)
|
||||
case BlockLabelExtension:
|
||||
if err := encChain.AddExtension(pemBlock.Bytes); err != nil {
|
||||
blockErrs.Add(err)
|
||||
}
|
||||
err := encChain.AddExtension(pemBlock.Bytes)
|
||||
blockErrs.Add(err)
|
||||
}
|
||||
}
|
||||
if err := blockErrs.Finish(); err != nil {
|
||||
if err := blockErrs.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -226,17 +223,15 @@ func (e *encodedChain) Parse() ([]*x509.Certificate, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var extErrs utils.ErrorGroup
|
||||
var extErrs errs.Group
|
||||
for i, cert := range chain {
|
||||
for _, ee := range e.extensions[i] {
|
||||
ext, err := PKIXExtensionFromASN1(ee)
|
||||
if err != nil {
|
||||
extErrs.Add(err)
|
||||
}
|
||||
extErrs.Add(err) // TODO: is this correct?
|
||||
cert.ExtraExtensions = append(cert.ExtraExtensions, *ext)
|
||||
}
|
||||
}
|
||||
if err := extErrs.Finish(); err != nil {
|
||||
if err := extErrs.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
@ -6,12 +6,12 @@ package server
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/zeebo/errs"
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/grpc"
|
||||
|
||||
"storj.io/storj/pkg/identity"
|
||||
"storj.io/storj/pkg/peertls/tlsopts"
|
||||
"storj.io/storj/pkg/utils"
|
||||
)
|
||||
|
||||
// Config holds server specific configuration parameters
|
||||
@ -29,7 +29,7 @@ func (sc Config) Run(ctx context.Context, identity *identity.FullIdentity, inter
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() { err = utils.CombineErrors(err, opts.RevDB.Close()) }()
|
||||
defer func() { err = errs.Combine(err, opts.RevDB.Close()) }()
|
||||
|
||||
server, err := New(opts, sc.Address, sc.PrivateAddress, interceptor, services...)
|
||||
if err != nil {
|
||||
|
@ -10,8 +10,6 @@ import (
|
||||
|
||||
"github.com/btcsuite/btcutil/base58"
|
||||
"github.com/zeebo/errs"
|
||||
|
||||
"storj.io/storj/pkg/utils"
|
||||
)
|
||||
|
||||
// IDVersion is the default version used in the base58check node ID encoding
|
||||
@ -51,7 +49,7 @@ func NodeIDsFromBytes(b [][]byte) (ids NodeIDList, err error) {
|
||||
ids = append(ids, id)
|
||||
}
|
||||
|
||||
if err = utils.CombineErrors(idErrs...); err != nil {
|
||||
if err = errs.Combine(idErrs...); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return ids, nil
|
||||
|
@ -8,12 +8,12 @@ import (
|
||||
"io"
|
||||
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"github.com/zeebo/errs"
|
||||
"golang.org/x/sync/errgroup"
|
||||
|
||||
"storj.io/storj/pkg/pb"
|
||||
"storj.io/storj/pkg/storage/streams"
|
||||
"storj.io/storj/pkg/storj"
|
||||
"storj.io/storj/pkg/utils"
|
||||
)
|
||||
|
||||
// Upload implements Writer and Closer for writing to stream.
|
||||
@ -46,12 +46,12 @@ func NewUpload(ctx context.Context, stream storj.MutableStream, streams streams.
|
||||
}
|
||||
metadata, err := proto.Marshal(&serMetaInfo)
|
||||
if err != nil {
|
||||
return utils.CombineErrors(err, reader.CloseWithError(err))
|
||||
return errs.Combine(err, reader.CloseWithError(err))
|
||||
}
|
||||
|
||||
_, err = streams.Put(ctx, storj.JoinPaths(obj.Bucket.Name, obj.Path), obj.Bucket.PathCipher, reader, metadata, obj.Expires)
|
||||
if err != nil {
|
||||
return utils.CombineErrors(err, reader.CloseWithError(err))
|
||||
return errs.Combine(err, reader.CloseWithError(err))
|
||||
}
|
||||
|
||||
return nil
|
||||
@ -82,5 +82,5 @@ func (upload *Upload) Close() error {
|
||||
err := upload.writer.Close()
|
||||
|
||||
// Wait for streams.Put to commit the upload to the PointerDB
|
||||
return utils.CombineErrors(err, upload.errgroup.Wait())
|
||||
return errs.Combine(err, upload.errgroup.Wait())
|
||||
}
|
||||
|
@ -9,63 +9,17 @@ import (
|
||||
"github.com/zeebo/errs"
|
||||
)
|
||||
|
||||
// CombineErrors combines multiple errors to a single error
|
||||
func CombineErrors(errs ...error) error {
|
||||
var errlist ErrorGroup
|
||||
errlist.Add(errs...)
|
||||
return errlist.Finish()
|
||||
}
|
||||
|
||||
type combinedError []error
|
||||
|
||||
func (errs combinedError) Cause() error {
|
||||
if len(errs) > 0 {
|
||||
return errs[0]
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (errs combinedError) Error() string {
|
||||
if len(errs) > 0 {
|
||||
limit := 5
|
||||
if len(errs) < limit {
|
||||
limit = len(errs)
|
||||
}
|
||||
allErrors := errs[0].Error()
|
||||
for _, err := range errs[1:limit] {
|
||||
allErrors += "\n" + err.Error()
|
||||
}
|
||||
return allErrors
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
// ErrorGroup contains a set of non-nil errors
|
||||
type ErrorGroup errs.Group
|
||||
|
||||
// Add adds an error to the ErrorGroup if it is non-nil
|
||||
func (e *ErrorGroup) Add(errrs ...error) {
|
||||
(*errs.Group)(e).Add(errrs...)
|
||||
}
|
||||
|
||||
// Finish returns nil if there were no non-nil errors, the first error if there
|
||||
// was only one non-nil error, or the result of CombineErrors if there was more
|
||||
// than one non-nil error.
|
||||
func (e *ErrorGroup) Finish() error {
|
||||
return (*errs.Group)(e).Err()
|
||||
}
|
||||
|
||||
// CollectErrors returns first error from channel and all errors that happen within duration
|
||||
func CollectErrors(errch chan error, duration time.Duration) error {
|
||||
errch = discardNil(errch)
|
||||
errs := []error{<-errch}
|
||||
errlist := []error{<-errch}
|
||||
timeout := time.After(duration)
|
||||
for {
|
||||
select {
|
||||
case err := <-errch:
|
||||
errs = append(errs, err)
|
||||
errlist = append(errlist, err)
|
||||
case <-timeout:
|
||||
return CombineErrors(errs...)
|
||||
return errs.Combine(errlist...)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -40,21 +40,3 @@ func TestCollecMultipleError(t *testing.T) {
|
||||
assert.Error(t, err)
|
||||
assert.Equal(t, err.Error(), "error1; error2; error3")
|
||||
}
|
||||
|
||||
func TestErrorGroup(t *testing.T) {
|
||||
var errlist utils.ErrorGroup
|
||||
errlist.Add(nil, nil, nil)
|
||||
assert.NoError(t, errlist.Finish())
|
||||
assert.Equal(t, len(errlist), 0)
|
||||
e1 := errs.New("err1")
|
||||
errlist.Add(nil, nil, e1, nil)
|
||||
assert.Equal(t, errlist.Finish(), e1)
|
||||
assert.Equal(t, len(errlist), 1)
|
||||
e2, e3 := errs.New("err2"), errs.New("err3")
|
||||
errlist.Add(e2, e3)
|
||||
assert.Error(t, errlist.Finish())
|
||||
assert.Equal(t, len(errlist), 3)
|
||||
assert.Equal(t, errlist[0], e1)
|
||||
assert.Equal(t, errlist[1], e2)
|
||||
assert.Equal(t, errlist[2], e3)
|
||||
}
|
||||
|
@ -11,7 +11,6 @@ import (
|
||||
|
||||
"github.com/zeebo/errs"
|
||||
|
||||
"storj.io/storj/pkg/utils"
|
||||
"storj.io/storj/satellite/console/consoleweb/consoleql"
|
||||
)
|
||||
|
||||
@ -55,10 +54,10 @@ func queryPOST(req *http.Request) (query graphqlJSON, err error) {
|
||||
case applicationGraphql:
|
||||
body, err := ioutil.ReadAll(req.Body)
|
||||
query.Query = string(body)
|
||||
return query, utils.CombineErrors(err, req.Body.Close())
|
||||
return query, errs.Combine(err, req.Body.Close())
|
||||
case applicationJSON:
|
||||
err := json.NewDecoder(req.Body).Decode(&query)
|
||||
return query, utils.CombineErrors(err, req.Body.Close())
|
||||
return query, errs.Combine(err, req.Body.Close())
|
||||
default:
|
||||
return query, errs.New("can't parse request body of type %s", typ)
|
||||
}
|
||||
|
@ -8,8 +8,6 @@ import (
|
||||
"unicode"
|
||||
|
||||
"github.com/zeebo/errs"
|
||||
|
||||
"storj.io/storj/pkg/utils"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -36,7 +34,7 @@ func (validation *validationErrors) AddWrap(err error) {
|
||||
|
||||
// Combine returns combined validation errors
|
||||
func (validation *validationErrors) Combine() error {
|
||||
return utils.CombineErrors(*validation...)
|
||||
return errs.Combine(*validation...)
|
||||
}
|
||||
|
||||
// countNumerics returns total number of digits in string
|
||||
|
@ -7,9 +7,10 @@ import (
|
||||
"context"
|
||||
"crypto"
|
||||
|
||||
"github.com/zeebo/errs"
|
||||
|
||||
"storj.io/storj/pkg/pkcrypto"
|
||||
"storj.io/storj/pkg/storj"
|
||||
"storj.io/storj/pkg/utils"
|
||||
dbx "storj.io/storj/satellite/satellitedb/dbx"
|
||||
)
|
||||
|
||||
@ -28,18 +29,18 @@ func (b *certDB) SavePublicKey(ctx context.Context, nodeID storj.NodeID, publicK
|
||||
// no rows err, so create/insert an entry
|
||||
pubbytes, err := pkcrypto.PublicKeyToPKIX(publicKey)
|
||||
if err != nil {
|
||||
return Error.Wrap(utils.CombineErrors(err, tx.Rollback()))
|
||||
return Error.Wrap(errs.Combine(err, tx.Rollback()))
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return Error.Wrap(utils.CombineErrors(err, tx.Rollback()))
|
||||
return Error.Wrap(errs.Combine(err, tx.Rollback()))
|
||||
}
|
||||
_, err = tx.Create_CertRecord(ctx,
|
||||
dbx.CertRecord_Publickey(pubbytes),
|
||||
dbx.CertRecord_Id(nodeID.Bytes()),
|
||||
)
|
||||
if err != nil {
|
||||
return Error.Wrap(utils.CombineErrors(err, tx.Rollback()))
|
||||
return Error.Wrap(errs.Combine(err, tx.Rollback()))
|
||||
}
|
||||
} else {
|
||||
// nodeID entry already exists, just return
|
||||
|
@ -7,9 +7,9 @@ import (
|
||||
"context"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/zeebo/errs"
|
||||
|
||||
"storj.io/storj/pkg/pb"
|
||||
"storj.io/storj/pkg/utils"
|
||||
dbx "storj.io/storj/satellite/satellitedb/dbx"
|
||||
)
|
||||
|
||||
@ -41,7 +41,7 @@ func (db *irreparableDB) IncrementRepairAttempts(ctx context.Context, segmentInf
|
||||
dbx.Irreparabledb_RepairAttemptCount(segmentInfo.RepairAttemptCount),
|
||||
)
|
||||
if err != nil {
|
||||
return Error.Wrap(utils.CombineErrors(err, tx.Rollback()))
|
||||
return Error.Wrap(errs.Combine(err, tx.Rollback()))
|
||||
}
|
||||
} else {
|
||||
// row exits increment the attempt counter
|
||||
@ -55,7 +55,7 @@ func (db *irreparableDB) IncrementRepairAttempts(ctx context.Context, segmentInf
|
||||
updateFields,
|
||||
)
|
||||
if err != nil {
|
||||
return Error.Wrap(utils.CombineErrors(err, tx.Rollback()))
|
||||
return Error.Wrap(errs.Combine(err, tx.Rollback()))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -15,7 +15,6 @@ import (
|
||||
"storj.io/storj/pkg/overlay"
|
||||
"storj.io/storj/pkg/pb"
|
||||
"storj.io/storj/pkg/storj"
|
||||
"storj.io/storj/pkg/utils"
|
||||
dbx "storj.io/storj/satellite/satellitedb/dbx"
|
||||
"storj.io/storj/storage"
|
||||
)
|
||||
@ -305,18 +304,18 @@ func (cache *overlaycache) CreateStats(ctx context.Context, nodeID storj.NodeID,
|
||||
}
|
||||
dbNode, err := tx.Get_Node_By_Id(ctx, dbx.Node_Id(nodeID.Bytes()))
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(utils.CombineErrors(err, tx.Rollback()))
|
||||
return nil, Error.Wrap(errs.Combine(err, tx.Rollback()))
|
||||
}
|
||||
|
||||
if startingStats != nil {
|
||||
auditSuccessRatio, err := checkRatioVars(startingStats.AuditSuccessCount, startingStats.AuditCount)
|
||||
if err != nil {
|
||||
return nil, errAuditSuccess.Wrap(utils.CombineErrors(err, tx.Rollback()))
|
||||
return nil, errAuditSuccess.Wrap(errs.Combine(err, tx.Rollback()))
|
||||
}
|
||||
|
||||
uptimeRatio, err := checkRatioVars(startingStats.UptimeSuccessCount, startingStats.UptimeCount)
|
||||
if err != nil {
|
||||
return nil, errUptime.Wrap(utils.CombineErrors(err, tx.Rollback()))
|
||||
return nil, errUptime.Wrap(errs.Combine(err, tx.Rollback()))
|
||||
}
|
||||
|
||||
updateFields := dbx.Node_Update_Fields{
|
||||
@ -330,7 +329,7 @@ func (cache *overlaycache) CreateStats(ctx context.Context, nodeID storj.NodeID,
|
||||
|
||||
dbNode, err = tx.Update_Node_By_Id(ctx, dbx.Node_Id(nodeID.Bytes()), updateFields)
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(utils.CombineErrors(err, tx.Rollback()))
|
||||
return nil, Error.Wrap(errs.Combine(err, tx.Rollback()))
|
||||
}
|
||||
}
|
||||
|
||||
@ -368,7 +367,7 @@ func (cache *overlaycache) FindInvalidNodes(ctx context.Context, nodeIDs storj.N
|
||||
return nil, err
|
||||
}
|
||||
defer func() {
|
||||
err = utils.CombineErrors(err, rows.Close())
|
||||
err = errs.Combine(err, rows.Close())
|
||||
}()
|
||||
|
||||
for rows.Next() {
|
||||
@ -421,7 +420,7 @@ func (cache *overlaycache) UpdateStats(ctx context.Context, updateReq *overlay.U
|
||||
}
|
||||
dbNode, err := tx.Get_Node_By_Id(ctx, dbx.Node_Id(nodeID.Bytes()))
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(utils.CombineErrors(err, tx.Rollback()))
|
||||
return nil, Error.Wrap(errs.Combine(err, tx.Rollback()))
|
||||
}
|
||||
|
||||
auditSuccessCount := dbNode.AuditSuccessCount
|
||||
@ -460,7 +459,7 @@ func (cache *overlaycache) UpdateStats(ctx context.Context, updateReq *overlay.U
|
||||
|
||||
dbNode, err = tx.Update_Node_By_Id(ctx, dbx.Node_Id(nodeID.Bytes()), updateFields)
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(utils.CombineErrors(err, tx.Rollback()))
|
||||
return nil, Error.Wrap(errs.Combine(err, tx.Rollback()))
|
||||
}
|
||||
|
||||
nodeStats := getNodeStats(nodeID, dbNode)
|
||||
@ -488,7 +487,7 @@ func (cache *overlaycache) UpdateOperator(ctx context.Context, nodeID storj.Node
|
||||
|
||||
updated := getNodeStats(nodeID, updatedDBNode)
|
||||
|
||||
return updated, utils.CombineErrors(err, tx.Commit())
|
||||
return updated, errs.Combine(err, tx.Commit())
|
||||
}
|
||||
|
||||
// UpdateUptime updates a single storagenode's uptime stats in the db
|
||||
@ -501,7 +500,7 @@ func (cache *overlaycache) UpdateUptime(ctx context.Context, nodeID storj.NodeID
|
||||
}
|
||||
dbNode, err := tx.Get_Node_By_Id(ctx, dbx.Node_Id(nodeID.Bytes()))
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(utils.CombineErrors(err, tx.Rollback()))
|
||||
return nil, Error.Wrap(errs.Combine(err, tx.Rollback()))
|
||||
}
|
||||
|
||||
uptimeSuccessCount := dbNode.UptimeSuccessCount
|
||||
@ -528,7 +527,7 @@ func (cache *overlaycache) UpdateUptime(ctx context.Context, nodeID storj.NodeID
|
||||
|
||||
dbNode, err = tx.Update_Node_By_Id(ctx, dbx.Node_Id(nodeID.Bytes()), updateFields)
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(utils.CombineErrors(err, tx.Rollback()))
|
||||
return nil, Error.Wrap(errs.Combine(err, tx.Rollback()))
|
||||
}
|
||||
|
||||
nodeStats := getNodeStats(nodeID, dbNode)
|
||||
@ -555,7 +554,7 @@ func (cache *overlaycache) UpdateBatch(ctx context.Context, updateReqList []*ove
|
||||
}
|
||||
|
||||
if len(allErrors) > 0 {
|
||||
return nodeStatsList, failedUpdateReqs, Error.Wrap(utils.CombineErrors(allErrors...))
|
||||
return nodeStatsList, failedUpdateReqs, Error.Wrap(errs.Combine(allErrors...))
|
||||
}
|
||||
return nodeStatsList, nil, nil
|
||||
}
|
||||
|
@ -10,7 +10,6 @@ import (
|
||||
"github.com/skyrings/skyring-common/tools/uuid"
|
||||
"github.com/zeebo/errs"
|
||||
|
||||
"storj.io/storj/pkg/utils"
|
||||
"storj.io/storj/satellite/console"
|
||||
dbx "storj.io/storj/satellite/satellitedb/dbx"
|
||||
)
|
||||
@ -170,5 +169,5 @@ func projectMembersFromDbxSlice(projectMembersDbx []*dbx.ProjectMember) ([]conso
|
||||
projectMembers = append(projectMembers, *projectMember)
|
||||
}
|
||||
|
||||
return projectMembers, utils.CombineErrors(errors...)
|
||||
return projectMembers, errs.Combine(errors...)
|
||||
}
|
||||
|
@ -9,7 +9,6 @@ import (
|
||||
"github.com/skyrings/skyring-common/tools/uuid"
|
||||
"github.com/zeebo/errs"
|
||||
|
||||
"storj.io/storj/pkg/utils"
|
||||
"storj.io/storj/satellite/console"
|
||||
dbx "storj.io/storj/satellite/satellitedb/dbx"
|
||||
)
|
||||
@ -125,5 +124,5 @@ func projectsFromDbxSlice(projectsDbx []*dbx.Project) ([]console.Project, error)
|
||||
projects = append(projects, *project)
|
||||
}
|
||||
|
||||
return projects, utils.CombineErrors(errors...)
|
||||
return projects, errs.Combine(errors...)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user