satellite/repair/repairer/ec.go: add option for downloading pieces onto disk instead of in memory during repair
Add flag to satellite repairer, "InMemoryRepair" that allows the satellite to decide whether to download the entire segment being repaired into memory (this is what the satellite already does), or to download it into temporary files on disk that will be read from in the upload phase of repair. This should help with handling high repair traffic on satellites that cannot afford to spend 64mb of memory per repair worker. Updates tests to test repair for both in memory and to disk. Change-Id: Iddf591e165621497c98533d45bfea3c28b08a194
This commit is contained in:
parent
e8f18a2cfe
commit
a933bcc99a
@ -380,6 +380,7 @@ func (planet *Planet) newSatellites(count int) ([]*Satellite, error) {
|
||||
TotalTimeout: 10 * time.Minute,
|
||||
MaxBufferMem: 4 * memory.MiB,
|
||||
MaxExcessRateOptimalThreshold: 0.05,
|
||||
InMemoryRepair: false,
|
||||
},
|
||||
Audit: audit.Config{
|
||||
MaxRetriesStatDB: 0,
|
||||
|
@ -33,7 +33,14 @@ import (
|
||||
// threshold
|
||||
// - Downloads the data from those left nodes and check that it's the same than
|
||||
// the uploaded one
|
||||
func TestDataRepair(t *testing.T) {
|
||||
func TestDataRepairInMemory(t *testing.T) {
|
||||
testDataRepair(t, true)
|
||||
}
|
||||
func TestDataRepairToDisk(t *testing.T) {
|
||||
testDataRepair(t, false)
|
||||
}
|
||||
|
||||
func testDataRepair(t *testing.T, inMemoryRepair bool) {
|
||||
const (
|
||||
RepairMaxExcessRateOptimalThreshold = 0.05
|
||||
minThreshold = 3
|
||||
@ -47,6 +54,7 @@ func TestDataRepair(t *testing.T) {
|
||||
Reconfigure: testplanet.Reconfigure{
|
||||
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
|
||||
config.Repairer.MaxExcessRateOptimalThreshold = RepairMaxExcessRateOptimalThreshold
|
||||
config.Repairer.InMemoryRepair = inMemoryRepair
|
||||
|
||||
config.Metainfo.RS.MinThreshold = minThreshold
|
||||
config.Metainfo.RS.RepairThreshold = 5
|
||||
@ -164,7 +172,14 @@ func TestDataRepair(t *testing.T) {
|
||||
// - Triggers data repair, which attempts to repair the data from the remaining nodes to
|
||||
// the numbers of nodes determined by the upload repair max threshold
|
||||
// - Expects that the repair failed and the pointer was not updated
|
||||
func TestCorruptDataRepair_Failed(t *testing.T) {
|
||||
func TestCorruptDataRepairInMemory_Failed(t *testing.T) {
|
||||
testCorruptDataRepairFailed(t, true)
|
||||
}
|
||||
func TestCorruptDataRepairToDisk_Failed(t *testing.T) {
|
||||
testCorruptDataRepairFailed(t, false)
|
||||
}
|
||||
|
||||
func testCorruptDataRepairFailed(t *testing.T, inMemoryRepair bool) {
|
||||
const RepairMaxExcessRateOptimalThreshold = 0.05
|
||||
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
@ -174,6 +189,7 @@ func TestCorruptDataRepair_Failed(t *testing.T) {
|
||||
Reconfigure: testplanet.Reconfigure{
|
||||
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
|
||||
config.Repairer.MaxExcessRateOptimalThreshold = RepairMaxExcessRateOptimalThreshold
|
||||
config.Repairer.InMemoryRepair = inMemoryRepair
|
||||
|
||||
config.Metainfo.RS.MinThreshold = 3
|
||||
config.Metainfo.RS.RepairThreshold = 5
|
||||
@ -275,7 +291,14 @@ func TestCorruptDataRepair_Failed(t *testing.T) {
|
||||
// - Triggers data repair, which attempts to repair the data from the remaining nodes to
|
||||
// the numbers of nodes determined by the upload repair max threshold
|
||||
// - Expects that the repair succeed and the pointer should not contain the corrupted piece
|
||||
func TestCorruptDataRepair_Succeed(t *testing.T) {
|
||||
func TestCorruptDataRepairInMemory_Succeed(t *testing.T) {
|
||||
testCorruptDataRepairSucceed(t, true)
|
||||
}
|
||||
func TestCorruptDataRepairToDisk_Succeed(t *testing.T) {
|
||||
testCorruptDataRepairSucceed(t, false)
|
||||
}
|
||||
|
||||
func testCorruptDataRepairSucceed(t *testing.T, inMemoryRepair bool) {
|
||||
const RepairMaxExcessRateOptimalThreshold = 0.05
|
||||
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
@ -285,6 +308,7 @@ func TestCorruptDataRepair_Succeed(t *testing.T) {
|
||||
Reconfigure: testplanet.Reconfigure{
|
||||
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
|
||||
config.Repairer.MaxExcessRateOptimalThreshold = RepairMaxExcessRateOptimalThreshold
|
||||
config.Repairer.InMemoryRepair = inMemoryRepair
|
||||
|
||||
config.Metainfo.RS.MinThreshold = 3
|
||||
config.Metainfo.RS.RepairThreshold = 5
|
||||
@ -669,13 +693,27 @@ func TestIrreparableSegmentNodesOffline(t *testing.T) {
|
||||
// - Now we have just the 3 new nodes to which the data was repaired
|
||||
// - Downloads the data from these 3 nodes (succeeds because 3 nodes are enough for download)
|
||||
// - Expect newly repaired pointer does not contain the disqualified or suspended nodes
|
||||
func TestRepairMultipleDisqualifiedAndSuspended(t *testing.T) {
|
||||
func TestRepairMultipleDisqualifiedAndSuspendedInMemory(t *testing.T) {
|
||||
testRepairMultipleDisqualifiedAndSuspended(t, true)
|
||||
}
|
||||
func TestRepairMultipleDisqualifiedAndSuspendedToDisk(t *testing.T) {
|
||||
testRepairMultipleDisqualifiedAndSuspended(t, false)
|
||||
}
|
||||
|
||||
func testRepairMultipleDisqualifiedAndSuspended(t *testing.T, inMemoryRepair bool) {
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1,
|
||||
StorageNodeCount: 12,
|
||||
UplinkCount: 1,
|
||||
Reconfigure: testplanet.Reconfigure{
|
||||
Satellite: testplanet.ReconfigureRS(3, 5, 7, 7),
|
||||
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
|
||||
config.Repairer.InMemoryRepair = inMemoryRepair
|
||||
|
||||
config.Metainfo.RS.MinThreshold = 3
|
||||
config.Metainfo.RS.RepairThreshold = 5
|
||||
config.Metainfo.RS.SuccessThreshold = 7
|
||||
config.Metainfo.RS.TotalThreshold = 7
|
||||
},
|
||||
},
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
// first, upload some remote data
|
||||
@ -773,7 +811,14 @@ func TestRepairMultipleDisqualifiedAndSuspended(t *testing.T) {
|
||||
// - Kills nodes to fall to the Repair Override Value of the checker but stays above the original Repair Threshold
|
||||
// - Triggers data repair, which attempts to repair the data from the remaining nodes to
|
||||
// the numbers of nodes determined by the upload repair max threshold
|
||||
func TestDataRepairOverride_HigherLimit(t *testing.T) {
|
||||
func TestDataRepairOverride_HigherLimitInMemory(t *testing.T) {
|
||||
testDataRepairOverrideHigherLimit(t, true)
|
||||
}
|
||||
func TestDataRepairOverride_HigherLimitToDisk(t *testing.T) {
|
||||
testDataRepairOverrideHigherLimit(t, false)
|
||||
}
|
||||
|
||||
func testDataRepairOverrideHigherLimit(t *testing.T, inMemoryRepair bool) {
|
||||
const repairOverride = 6
|
||||
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
@ -783,6 +828,7 @@ func TestDataRepairOverride_HigherLimit(t *testing.T) {
|
||||
Reconfigure: testplanet.Reconfigure{
|
||||
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
|
||||
config.Checker.RepairOverride = repairOverride
|
||||
config.Repairer.InMemoryRepair = inMemoryRepair
|
||||
|
||||
config.Metainfo.RS.MinThreshold = 3
|
||||
config.Metainfo.RS.RepairThreshold = 4
|
||||
@ -858,7 +904,14 @@ func TestDataRepairOverride_HigherLimit(t *testing.T) {
|
||||
// - Kills more nodes to fall to the Override Value to trigger repair
|
||||
// - Triggers data repair, which attempts to repair the data from the remaining nodes to
|
||||
// the numbers of nodes determined by the upload repair max threshold
|
||||
func TestDataRepairOverride_LowerLimit(t *testing.T) {
|
||||
func TestDataRepairOverride_LowerLimitInMemory(t *testing.T) {
|
||||
testDataRepairOverrideLowerLimit(t, true)
|
||||
}
|
||||
func TestDataRepairOverride_LowerLimitToDisk(t *testing.T) {
|
||||
testDataRepairOverrideLowerLimit(t, false)
|
||||
}
|
||||
|
||||
func testDataRepairOverrideLowerLimit(t *testing.T, inMemoryRepair bool) {
|
||||
const repairOverride = 4
|
||||
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
@ -868,6 +921,7 @@ func TestDataRepairOverride_LowerLimit(t *testing.T) {
|
||||
Reconfigure: testplanet.Reconfigure{
|
||||
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
|
||||
config.Checker.RepairOverride = repairOverride
|
||||
config.Repairer.InMemoryRepair = inMemoryRepair
|
||||
|
||||
config.Metainfo.RS.MinThreshold = 3
|
||||
config.Metainfo.RS.RepairThreshold = 6
|
||||
@ -973,7 +1027,14 @@ func TestDataRepairOverride_LowerLimit(t *testing.T) {
|
||||
// - Triggers data repair
|
||||
// - Verify that the number of pieces which repaired has uploaded don't overpass
|
||||
// the established limit (success threshold + % of excess)
|
||||
func TestDataRepairUploadLimit(t *testing.T) {
|
||||
func TestDataRepairUploadLimitInMemory(t *testing.T) {
|
||||
testDataRepairUploadLimit(t, true)
|
||||
}
|
||||
func TestDataRepairUploadLimitToDisk(t *testing.T) {
|
||||
testDataRepairUploadLimit(t, false)
|
||||
}
|
||||
|
||||
func testDataRepairUploadLimit(t *testing.T, inMemoryRepair bool) {
|
||||
const (
|
||||
RepairMaxExcessRateOptimalThreshold = 0.05
|
||||
repairThreshold = 5
|
||||
@ -988,6 +1049,7 @@ func TestDataRepairUploadLimit(t *testing.T) {
|
||||
Reconfigure: testplanet.Reconfigure{
|
||||
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
|
||||
config.Repairer.MaxExcessRateOptimalThreshold = RepairMaxExcessRateOptimalThreshold
|
||||
config.Repairer.InMemoryRepair = inMemoryRepair
|
||||
|
||||
config.Metainfo.RS.MinThreshold = 3
|
||||
config.Metainfo.RS.RepairThreshold = repairThreshold
|
||||
|
@ -8,6 +8,9 @@ import (
|
||||
"context"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"sort"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
@ -16,6 +19,7 @@ import (
|
||||
"github.com/vivint/infectious"
|
||||
"github.com/zeebo/errs"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/sys/unix"
|
||||
|
||||
"storj.io/common/errs2"
|
||||
"storj.io/common/pb"
|
||||
@ -37,15 +41,17 @@ type ECRepairer struct {
|
||||
dialer rpc.Dialer
|
||||
satelliteSignee signing.Signee
|
||||
downloadTimeout time.Duration
|
||||
inMemoryRepair bool
|
||||
}
|
||||
|
||||
// NewECRepairer creates a new repairer for interfacing with storagenodes.
|
||||
func NewECRepairer(log *zap.Logger, dialer rpc.Dialer, satelliteSignee signing.Signee, downloadTimeout time.Duration) *ECRepairer {
|
||||
func NewECRepairer(log *zap.Logger, dialer rpc.Dialer, satelliteSignee signing.Signee, downloadTimeout time.Duration, inMemoryRepair bool) *ECRepairer {
|
||||
return &ECRepairer{
|
||||
log: log,
|
||||
dialer: dialer,
|
||||
satelliteSignee: satelliteSignee,
|
||||
downloadTimeout: downloadTimeout,
|
||||
inMemoryRepair: inMemoryRepair,
|
||||
}
|
||||
}
|
||||
|
||||
@ -113,7 +119,7 @@ func (ec *ECRepairer) Get(ctx context.Context, limits []*pb.AddressedOrderLimit,
|
||||
inProgress++
|
||||
cond.L.Unlock()
|
||||
|
||||
downloadedPiece, err := ec.downloadAndVerifyPiece(ctx, limit, privateKey, pieceSize)
|
||||
pieceReadCloser, err := ec.downloadAndVerifyPiece(ctx, limit, privateKey, pieceSize)
|
||||
cond.L.Lock()
|
||||
inProgress--
|
||||
if err != nil {
|
||||
@ -131,7 +137,7 @@ func (ec *ECRepairer) Get(ctx context.Context, limits []*pb.AddressedOrderLimit,
|
||||
return
|
||||
}
|
||||
|
||||
pieceReaders[currentLimitIndex] = ioutil.NopCloser(bytes.NewReader(downloadedPiece))
|
||||
pieceReaders[currentLimitIndex] = pieceReadCloser
|
||||
successfulPieces++
|
||||
|
||||
return
|
||||
@ -164,10 +170,66 @@ func (ec *ECRepairer) Get(ctx context.Context, limits []*pb.AddressedOrderLimit,
|
||||
return decodeReader, failedPieces, nil
|
||||
}
|
||||
|
||||
// tempFileReadWriteCloser wraps a temporary file
|
||||
// and deletes the file when Close() is called.
|
||||
type tempFileReadWriteCloser struct {
|
||||
file *os.File
|
||||
}
|
||||
|
||||
func newTempFileReadWriteCloser() (*tempFileReadWriteCloser, error) {
|
||||
tempDir := os.TempDir()
|
||||
repairDir := filepath.Join(tempDir, "repair")
|
||||
err := os.MkdirAll(repairDir, os.ModePerm)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// on linux, create an unlinked tempfile that will automatically be removed when we close
|
||||
// on non-linux systems, create a temp file with ioutil that we will delete inside Close()
|
||||
var newFile *os.File
|
||||
if runtime.GOOS == "linux" {
|
||||
newFile, err = os.OpenFile(repairDir, os.O_RDWR|os.O_EXCL|unix.O_TMPFILE, 0600)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
newFile, err = ioutil.TempFile(repairDir, "storj-satellite-repair-*")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return &tempFileReadWriteCloser{
|
||||
file: newFile,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (f *tempFileReadWriteCloser) SeekToFront() (err error) {
|
||||
_, err = f.file.Seek(0, 0)
|
||||
return err
|
||||
}
|
||||
|
||||
func (f *tempFileReadWriteCloser) Read(p []byte) (n int, err error) {
|
||||
return f.file.Read(p)
|
||||
}
|
||||
|
||||
func (f *tempFileReadWriteCloser) Write(b []byte) (n int, err error) {
|
||||
return f.file.Write(b)
|
||||
}
|
||||
|
||||
func (f *tempFileReadWriteCloser) Close() error {
|
||||
closeErr := f.file.Close()
|
||||
// we only need to explicitly remove file on non-linux systems
|
||||
if runtime.GOOS != "linux" {
|
||||
filename := f.file.Name()
|
||||
removeErr := os.Remove(filename)
|
||||
return errs.Combine(closeErr, removeErr)
|
||||
}
|
||||
return closeErr
|
||||
}
|
||||
|
||||
// downloadAndVerifyPiece downloads a piece from a storagenode,
|
||||
// expects the original order limit to have the correct piece public key,
|
||||
// and expects the hash of the data to match the signed hash provided by the storagenode.
|
||||
func (ec *ECRepairer) downloadAndVerifyPiece(ctx context.Context, limit *pb.AddressedOrderLimit, privateKey storj.PiecePrivateKey, pieceSize int64) (data []byte, err error) {
|
||||
func (ec *ECRepairer) downloadAndVerifyPiece(ctx context.Context, limit *pb.AddressedOrderLimit, privateKey storj.PiecePrivateKey, pieceSize int64) (pieceReadCloser io.ReadCloser, err error) {
|
||||
// contact node
|
||||
downloadCtx, cancel := context.WithTimeout(ctx, ec.downloadTimeout)
|
||||
defer cancel()
|
||||
@ -187,13 +249,42 @@ func (ec *ECRepairer) downloadAndVerifyPiece(ctx context.Context, limit *pb.Addr
|
||||
}
|
||||
defer func() { err = errs.Combine(err, downloader.Close()) }()
|
||||
|
||||
pieceBytes, err := ioutil.ReadAll(downloader)
|
||||
hashWriter := pkcrypto.NewHash()
|
||||
downloadReader := io.TeeReader(downloader, hashWriter)
|
||||
var downloadedPieceSize int64
|
||||
|
||||
if ec.inMemoryRepair {
|
||||
pieceBytes, err := ioutil.ReadAll(downloadReader)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
downloadedPieceSize = int64(len(pieceBytes))
|
||||
pieceReadCloser = ioutil.NopCloser(bytes.NewReader(pieceBytes))
|
||||
} else {
|
||||
pieceReadWriteCloser, err := newTempFileReadWriteCloser()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer func() {
|
||||
// close and remove file if there is some error
|
||||
if err != nil {
|
||||
err = errs.Combine(err, pieceReadWriteCloser.Close())
|
||||
}
|
||||
}()
|
||||
downloadedPieceSize, err = io.Copy(pieceReadWriteCloser, downloadReader)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// seek to beginning of file so the repair job starts at the beginning of the piece
|
||||
err = pieceReadWriteCloser.SeekToFront()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
pieceReadCloser = pieceReadWriteCloser
|
||||
}
|
||||
|
||||
if int64(len(pieceBytes)) != pieceSize {
|
||||
return nil, Error.New("didn't download the correct amount of data, want %d, got %d", pieceSize, len(pieceBytes))
|
||||
if downloadedPieceSize != pieceSize {
|
||||
return nil, Error.New("didn't download the correct amount of data, want %d, got %d", pieceSize, downloadedPieceSize)
|
||||
}
|
||||
|
||||
// get signed piece hash and original order limit
|
||||
@ -211,12 +302,12 @@ func (ec *ECRepairer) downloadAndVerifyPiece(ctx context.Context, limit *pb.Addr
|
||||
}
|
||||
|
||||
// verify the hashes from storage node
|
||||
calculatedHash := pkcrypto.SHA256Hash(pieceBytes)
|
||||
calculatedHash := hashWriter.Sum(nil)
|
||||
if err := verifyPieceHash(ctx, originalLimit, hash, calculatedHash); err != nil {
|
||||
return nil, ErrPieceHashVerifyFailed.Wrap(err)
|
||||
}
|
||||
|
||||
return pieceBytes, nil
|
||||
return pieceReadCloser, nil
|
||||
}
|
||||
|
||||
func verifyPieceHash(ctx context.Context, limit *pb.OrderLimit, hash *pb.PieceHash, expectedHash []byte) (err error) {
|
||||
|
@ -35,6 +35,7 @@ type Config struct {
|
||||
TotalTimeout time.Duration `help:"time limit for an entire repair job, from queue pop to upload completion" default:"45m"`
|
||||
MaxBufferMem memory.Size `help:"maximum buffer memory (in bytes) to be allocated for read buffers" default:"4M"`
|
||||
MaxExcessRateOptimalThreshold float64 `help:"ratio applied to the optimal threshold to calculate the excess of the maximum number of repaired pieces to upload" default:"0.05"`
|
||||
InMemoryRepair bool `help:"whether to download pieces for repair in memory (true) or download to disk (false)" default:"false"`
|
||||
}
|
||||
|
||||
// Service contains the information needed to run the repair service
|
||||
|
@ -73,7 +73,7 @@ func NewSegmentRepairer(
|
||||
log *zap.Logger, metainfo *metainfo.Service, orders *orders.Service,
|
||||
overlay *overlay.Service, dialer rpc.Dialer, timeout time.Duration,
|
||||
excessOptimalThreshold float64, repairOverride int,
|
||||
downloadTimeout time.Duration,
|
||||
downloadTimeout time.Duration, inMemoryRepair bool,
|
||||
satelliteSignee signing.Signee,
|
||||
) *SegmentRepairer {
|
||||
|
||||
@ -86,7 +86,7 @@ func NewSegmentRepairer(
|
||||
metainfo: metainfo,
|
||||
orders: orders,
|
||||
overlay: overlay,
|
||||
ec: NewECRepairer(log.Named("ec repairer"), dialer, satelliteSignee, downloadTimeout),
|
||||
ec: NewECRepairer(log.Named("ec repairer"), dialer, satelliteSignee, downloadTimeout, inMemoryRepair),
|
||||
timeout: timeout,
|
||||
multiplierOptimalThreshold: 1 + excessOptimalThreshold,
|
||||
repairOverride: repairOverride,
|
||||
|
@ -174,6 +174,7 @@ func NewRepairer(log *zap.Logger, full *identity.FullIdentity,
|
||||
config.Repairer.MaxExcessRateOptimalThreshold,
|
||||
config.Checker.RepairOverride,
|
||||
config.Repairer.DownloadTimeout,
|
||||
config.Repairer.InMemoryRepair,
|
||||
signing.SigneeFromPeerIdentity(peer.Identity.PeerIdentity()),
|
||||
)
|
||||
peer.Repairer = repairer.NewService(log.Named("repairer"), repairQueue, &config.Repairer, peer.SegmentRepairer, irrDB)
|
||||
|
3
scripts/testdata/satellite-config.yaml.lock
vendored
Normal file → Executable file
3
scripts/testdata/satellite-config.yaml.lock
vendored
Normal file → Executable file
@ -465,6 +465,9 @@ identity.key-path: /root/.local/share/storj/identity/satellite/identity.key
|
||||
# time limit for downloading pieces from a node for repair
|
||||
# repairer.download-timeout: 5m0s
|
||||
|
||||
# whether to download pieces for repair in memory (true) or download to disk (false)
|
||||
# repairer.in-memory-repair: false
|
||||
|
||||
# how frequently repairer should try and repair more data
|
||||
# repairer.interval: 5m0s
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user