Remove utils.LogClose (#1169)

This commit is contained in:
Egon Elbre 2019-01-29 22:42:27 +02:00 committed by GitHub
parent 8d7944bcf8
commit 6132ce86b7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 39 additions and 48 deletions

View File

@ -13,6 +13,7 @@ import (
progressbar "github.com/cheggaaa/pb"
"github.com/spf13/cobra"
"github.com/zeebo/errs"
"storj.io/storj/internal/fpath"
"storj.io/storj/pkg/process"
@ -36,7 +37,7 @@ func init() {
}
// upload transfers src from local machine to s3 compatible object dst
func upload(ctx context.Context, src fpath.FPath, dst fpath.FPath, showProgress bool) error {
func upload(ctx context.Context, src fpath.FPath, dst fpath.FPath, showProgress bool) (err error) {
if !src.IsLocal() {
return fmt.Errorf("source must be local path: %s", src)
}
@ -51,7 +52,6 @@ func upload(ctx context.Context, src fpath.FPath, dst fpath.FPath, showProgress
}
var file *os.File
var err error
if src.Base() == "-" {
file = os.Stdin
} else {
@ -59,7 +59,7 @@ func upload(ctx context.Context, src fpath.FPath, dst fpath.FPath, showProgress
if err != nil {
return err
}
defer utils.LogClose(file)
defer func() { err = errs.Combine(err, file.Close()) }()
}
fileInfo, err := file.Stat()
@ -121,7 +121,7 @@ func uploadStream(ctx context.Context, streams streams.Store, mutableObject stor
}
// download transfers s3 compatible object src to dst on local machine
func download(ctx context.Context, src fpath.FPath, dst fpath.FPath, showProgress bool) error {
func download(ctx context.Context, src fpath.FPath, dst fpath.FPath, showProgress bool) (err error) {
if src.IsLocal() {
return fmt.Errorf("source must be Storj URL: %s", src)
}
@ -141,7 +141,7 @@ func download(ctx context.Context, src fpath.FPath, dst fpath.FPath, showProgres
}
download := stream.NewDownload(ctx, readOnlyStream, streams)
defer utils.LogClose(download)
defer func() { err = errs.Combine(err, download.Close()) }()
var bar *progressbar.ProgressBar
var reader io.Reader
@ -165,7 +165,7 @@ func download(ctx context.Context, src fpath.FPath, dst fpath.FPath, showProgres
if err != nil {
return err
}
defer utils.LogClose(file)
defer func() { err = errs.Combine(err, file.Close()) }()
}
_, err = io.Copy(file, reader)
@ -185,7 +185,7 @@ func download(ctx context.Context, src fpath.FPath, dst fpath.FPath, showProgres
}
// copy copies s3 compatible object src to s3 compatible object dst
func copy(ctx context.Context, src fpath.FPath, dst fpath.FPath) error {
func copy(ctx context.Context, src fpath.FPath, dst fpath.FPath) (err error) {
if src.IsLocal() {
return fmt.Errorf("source must be Storj URL: %s", src)
}
@ -205,7 +205,7 @@ func copy(ctx context.Context, src fpath.FPath, dst fpath.FPath) error {
}
download := stream.NewDownload(ctx, readOnlyStream, streams)
defer utils.LogClose(download)
defer func() { err = errs.Combine(err, download.Close()) }()
var bar *progressbar.ProgressBar
var reader io.Reader

View File

@ -25,7 +25,6 @@ import (
"storj.io/storj/pkg/storage/streams"
"storj.io/storj/pkg/storj"
"storj.io/storj/pkg/stream"
"storj.io/storj/pkg/utils"
)
func init() {
@ -212,7 +211,11 @@ func (sf *storjFS) Mkdir(name string, mode uint32, context *fuse.Context) fuse.S
}
upload := stream.NewUpload(sf.ctx, mutableStream, sf.streams)
defer utils.LogClose(upload)
defer func() {
if err := upload.Close(); err != nil {
zap.S().Errorf("Failed to close file: %s", err)
}
}()
_, err = upload.Write(nil)
if err != nil {
@ -448,14 +451,21 @@ func (f *storjFile) Flush() fuse.Status {
func (f *storjFile) closeReader() {
if f.reader != nil {
utils.LogClose(f.reader)
closeErr := f.reader.Close()
if closeErr != nil {
zap.S().Errorf("error closing reader: %v", closeErr)
}
f.reader = nil
}
}
func (f *storjFile) closeWriter() {
if f.writer != nil {
utils.LogClose(f.writer)
closeErr := f.writer.Close()
if closeErr != nil {
zap.S().Errorf("error closing writer: %v", closeErr)
}
f.FS.removeCreatedFile(f.name)
err := f.mutableObject.Commit(f.ctx)
if err != nil {

View File

@ -45,7 +45,7 @@ func (mr *multiReadCloser) Read(p []byte) (n int, err error) {
}
n, err = mr.readers[0].Read(p)
if err == io.EOF {
utils.LogClose(mr.readers[0])
err = mr.readers[0].Close()
// Use eofReader instead of nil to avoid nil panic
// after performing flatten (Issue 18232).
mr.readers[0] = eofReadCloser{} // permit earlier GC

View File

@ -9,6 +9,7 @@ import (
"io"
"github.com/vivint/infectious"
"github.com/zeebo/errs"
monkit "gopkg.in/spacemonkeygo/monkit.v2"
"storj.io/storj/pkg/overlay"
@ -17,7 +18,6 @@ import (
"storj.io/storj/pkg/provider"
"storj.io/storj/pkg/storj"
"storj.io/storj/pkg/transport"
"storj.io/storj/pkg/utils"
)
var mon = monkit.Package()
@ -89,8 +89,7 @@ func (d *defaultDownloader) getShare(ctx context.Context, stripeIndex, shareSize
if err != nil {
return s, err
}
// TODO: proper logging
defer utils.LogClose(rc)
defer func() { err = errs.Combine(err, rc.Close()) }()
buf := make([]byte, shareSize)
_, err = io.ReadFull(rc, buf)

View File

@ -124,7 +124,7 @@ func (layer *gatewayLayer) GetObject(ctx context.Context, bucket, object string,
}
download := stream.NewDownload(ctx, readOnlyStream, layer.gateway.streams)
defer utils.LogClose(download)
defer func() { err = errs.Combine(err, download.Close()) }()
_, err = download.Seek(startOffset, io.SeekStart)
if err != nil {
@ -347,7 +347,7 @@ func (layer *gatewayLayer) CopyObject(ctx context.Context, srcBucket, srcObject,
}
download := stream.NewDownload(ctx, readOnlyStream, layer.gateway.streams)
defer utils.LogClose(download)
defer func() { err = errs.Combine(err, download.Close()) }()
info := readOnlyStream.Info()
createInfo := storj.CreateObject{

View File

@ -10,6 +10,7 @@ import (
"sort"
"time"
"github.com/zeebo/errs"
"go.uber.org/zap"
monkit "gopkg.in/spacemonkeygo/monkit.v2"
@ -114,7 +115,7 @@ func (ec *ecClient) Put(ctx context.Context, nodes []*pb.Node, rs eestream.Redun
err = ps.Put(ctx, derivedPieceID, readers[i], expiration, pba, authorization)
// normally the bellow call should be deferred, but doing so fails
// randomly the unit tests
utils.LogClose(ps)
err = errs.Combine(err, ps.Close())
// io.ErrUnexpectedEOF means the piece upload was interrupted due to slow connection.
// No error logging for this case.
if err != nil && err != io.ErrUnexpectedEOF {
@ -231,7 +232,7 @@ func (ec *ecClient) Get(ctx context.Context, nodes []*pb.Node, es eestream.Erasu
func (ec *ecClient) Delete(ctx context.Context, nodes []*pb.Node, pieceID psclient.PieceID, authorization *pb.SignedMessage) (err error) {
defer mon.Task()(&ctx)(&err)
errs := make(chan error, len(nodes))
errch := make(chan error, len(nodes))
for _, v := range nodes {
if v != nil {
v.Type.DPanicOnInvalid("ec client delete")
@ -239,7 +240,7 @@ func (ec *ecClient) Delete(ctx context.Context, nodes []*pb.Node, pieceID psclie
}
for _, n := range nodes {
if n == nil {
errs <- nil
errch <- nil
continue
}
@ -247,29 +248,29 @@ func (ec *ecClient) Delete(ctx context.Context, nodes []*pb.Node, pieceID psclie
derivedPieceID, err := pieceID.Derive(n.Id.Bytes())
if err != nil {
zap.S().Errorf("Failed deriving piece id for %s: %v", pieceID, err)
errs <- err
errch <- err
return
}
ps, err := ec.newPSClient(ctx, n)
if err != nil {
zap.S().Errorf("Failed dialing for deleting piece %s -> %s from node %s: %v",
pieceID, derivedPieceID, n.Id, err)
errs <- err
errch <- err
return
}
err = ps.Delete(ctx, derivedPieceID, authorization)
// normally the bellow call should be deferred, but doing so fails
// randomly the unit tests
utils.LogClose(ps)
err = errs.Combine(err, ps.Close())
if err != nil {
zap.S().Errorf("Failed deleting piece %s -> %s from node %s: %v",
pieceID, derivedPieceID, n.Id, err)
}
errs <- err
errch <- err
}(n)
}
allerrs := collectErrors(errs, len(nodes))
allerrs := collectErrors(errch, len(nodes))
for _, v := range nodes {
if v != nil {
v.Type.DPanicOnInvalid("ec client delete 2")

View File

@ -6,13 +6,14 @@ package segments
import (
"context"
"github.com/zeebo/errs"
"storj.io/storj/pkg/overlay"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/piecestore/psclient"
"storj.io/storj/pkg/pointerdb/pdbclient"
ecclient "storj.io/storj/pkg/storage/ec"
"storj.io/storj/pkg/storj"
"storj.io/storj/pkg/utils"
)
// Repairer for segments
@ -132,7 +133,7 @@ func (s *Repairer) Repair(ctx context.Context, path storj.Path, lostPieces []int
if err != nil {
return Error.Wrap(err)
}
defer utils.LogClose(r)
defer func() { err = errs.Combine(err, r.Close()) }()
pbaPut, err := s.pdb.PayerBandwidthAllocation(ctx, pb.BandwidthAction_PUT_REPAIR)
if err != nil {

View File

@ -3,13 +3,6 @@
package utils
import (
"io"
"os"
"go.uber.org/zap"
)
// ReaderSource takes a src func and turns it into an io.Reader
type ReaderSource struct {
src func() ([]byte, error)
@ -35,16 +28,3 @@ func (rs *ReaderSource) Read(p []byte) (n int, err error) {
rs.buf = rs.buf[n:]
return n, rs.err
}
// LogClose closes an io.Closer, logging the error if there is one that isn't
// os.ErrClosed
func LogClose(fh io.Closer) {
err := fh.Close()
if err == nil || err == os.ErrClosed {
return
}
if perr, ok := err.(*os.PathError); ok && perr.Err == os.ErrClosed {
return
}
zap.S().Errorf("Failed to close file: %s", err)
}