satellite/repair/repairer: fix temporary file handling
Change-Id: Ice1a467510737b3375c018ae37b16431c7dffe9e
This commit is contained in:
parent
e1a443b04a
commit
480ea1e4b5
1
go.mod
1
go.mod
@ -6,6 +6,7 @@ require (
|
||||
github.com/alessio/shellescape v0.0.0-20190409004728-b115ca0f9053
|
||||
github.com/alicebob/miniredis/v2 v2.11.1
|
||||
github.com/btcsuite/btcutil v1.0.1
|
||||
github.com/calebcase/tmpfile v1.0.1
|
||||
github.com/cheggaaa/pb/v3 v3.0.1
|
||||
github.com/fatih/color v1.7.0
|
||||
github.com/go-redis/redis v6.14.1+incompatible
|
||||
|
@ -8,18 +8,15 @@ import (
|
||||
"context"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"sort"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/calebcase/tmpfile"
|
||||
"github.com/vivint/infectious"
|
||||
"github.com/zeebo/errs"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/sys/unix"
|
||||
|
||||
"storj.io/common/errs2"
|
||||
"storj.io/common/pb"
|
||||
@ -41,17 +38,17 @@ type ECRepairer struct {
|
||||
dialer rpc.Dialer
|
||||
satelliteSignee signing.Signee
|
||||
downloadTimeout time.Duration
|
||||
inMemoryRepair bool
|
||||
inmemory bool
|
||||
}
|
||||
|
||||
// NewECRepairer creates a new repairer for interfacing with storagenodes.
|
||||
func NewECRepairer(log *zap.Logger, dialer rpc.Dialer, satelliteSignee signing.Signee, downloadTimeout time.Duration, inMemoryRepair bool) *ECRepairer {
|
||||
func NewECRepairer(log *zap.Logger, dialer rpc.Dialer, satelliteSignee signing.Signee, downloadTimeout time.Duration, inmemory bool) *ECRepairer {
|
||||
return &ECRepairer{
|
||||
log: log,
|
||||
dialer: dialer,
|
||||
satelliteSignee: satelliteSignee,
|
||||
downloadTimeout: downloadTimeout,
|
||||
inMemoryRepair: inMemoryRepair,
|
||||
inmemory: inmemory,
|
||||
}
|
||||
}
|
||||
|
||||
@ -170,62 +167,6 @@ func (ec *ECRepairer) Get(ctx context.Context, limits []*pb.AddressedOrderLimit,
|
||||
return decodeReader, failedPieces, nil
|
||||
}
|
||||
|
||||
// tempFileReadWriteCloser wraps a temporary file
|
||||
// and deletes the file when Close() is called.
|
||||
type tempFileReadWriteCloser struct {
|
||||
file *os.File
|
||||
}
|
||||
|
||||
func newTempFileReadWriteCloser() (*tempFileReadWriteCloser, error) {
|
||||
tempDir := os.TempDir()
|
||||
repairDir := filepath.Join(tempDir, "repair")
|
||||
err := os.MkdirAll(repairDir, os.ModePerm)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// on linux, create an unlinked tempfile that will automatically be removed when we close
|
||||
// on non-linux systems, create a temp file with ioutil that we will delete inside Close()
|
||||
var newFile *os.File
|
||||
if runtime.GOOS == "linux" {
|
||||
newFile, err = os.OpenFile(repairDir, os.O_RDWR|os.O_EXCL|unix.O_TMPFILE, 0600)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
newFile, err = ioutil.TempFile(repairDir, "storj-satellite-repair-*")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return &tempFileReadWriteCloser{
|
||||
file: newFile,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (f *tempFileReadWriteCloser) SeekToFront() (err error) {
|
||||
_, err = f.file.Seek(0, 0)
|
||||
return err
|
||||
}
|
||||
|
||||
func (f *tempFileReadWriteCloser) Read(p []byte) (n int, err error) {
|
||||
return f.file.Read(p)
|
||||
}
|
||||
|
||||
func (f *tempFileReadWriteCloser) Write(b []byte) (n int, err error) {
|
||||
return f.file.Write(b)
|
||||
}
|
||||
|
||||
func (f *tempFileReadWriteCloser) Close() error {
|
||||
closeErr := f.file.Close()
|
||||
// we only need to explicitly remove file on non-linux systems
|
||||
if runtime.GOOS != "linux" {
|
||||
filename := f.file.Name()
|
||||
removeErr := os.Remove(filename)
|
||||
return errs.Combine(closeErr, removeErr)
|
||||
}
|
||||
return closeErr
|
||||
}
|
||||
|
||||
// downloadAndVerifyPiece downloads a piece from a storagenode,
|
||||
// expects the original order limit to have the correct piece public key,
|
||||
// and expects the hash of the data to match the signed hash provided by the storagenode.
|
||||
@ -253,7 +194,7 @@ func (ec *ECRepairer) downloadAndVerifyPiece(ctx context.Context, limit *pb.Addr
|
||||
downloadReader := io.TeeReader(downloader, hashWriter)
|
||||
var downloadedPieceSize int64
|
||||
|
||||
if ec.inMemoryRepair {
|
||||
if ec.inmemory {
|
||||
pieceBytes, err := ioutil.ReadAll(downloadReader)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -261,26 +202,28 @@ func (ec *ECRepairer) downloadAndVerifyPiece(ctx context.Context, limit *pb.Addr
|
||||
downloadedPieceSize = int64(len(pieceBytes))
|
||||
pieceReadCloser = ioutil.NopCloser(bytes.NewReader(pieceBytes))
|
||||
} else {
|
||||
pieceReadWriteCloser, err := newTempFileReadWriteCloser()
|
||||
tempfile, err := tmpfile.New("", "satellite-repair-*")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer func() {
|
||||
// close and remove file if there is some error
|
||||
if err != nil {
|
||||
err = errs.Combine(err, pieceReadWriteCloser.Close())
|
||||
err = errs.Combine(err, tempfile.Close())
|
||||
}
|
||||
}()
|
||||
downloadedPieceSize, err = io.Copy(pieceReadWriteCloser, downloadReader)
|
||||
|
||||
downloadedPieceSize, err = io.Copy(tempfile, downloadReader)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// seek to beginning of file so the repair job starts at the beginning of the piece
|
||||
err = pieceReadWriteCloser.SeekToFront()
|
||||
_, err = tempfile.Seek(0, io.SeekStart)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
pieceReadCloser = pieceReadWriteCloser
|
||||
pieceReadCloser = tempfile
|
||||
}
|
||||
|
||||
if downloadedPieceSize != pieceSize {
|
||||
|
Loading…
Reference in New Issue
Block a user