From a933bcc99ad8d02108baa377eb99175444e02134 Mon Sep 17 00:00:00 2001 From: Moby von Briesen Date: Wed, 18 Mar 2020 19:55:09 -0400 Subject: [PATCH] satellite/repair/repairer/ec.go: add option for downloading pieces onto disk instead of in memory during repair Add flag to satellite repairer, "InMemoryRepair" that allows the satellite to decide whether to download the entire segment being repaired into memory (this is what the satellite already does), or to download it into temporary files on disk that will be read from in the upload phase of repair. This should help with handling high repair traffic on satellites that cannot afford to spend 64mb of memory per repair worker. Updates tests to test repair for both in memory and to disk. Change-Id: Iddf591e165621497c98533d45bfea3c28b08a194 --- private/testplanet/satellite.go | 1 + satellite/repair/repair_test.go | 78 ++++++++++++-- satellite/repair/repairer/ec.go | 113 ++++++++++++++++++-- satellite/repair/repairer/repairer.go | 1 + satellite/repair/repairer/segments.go | 4 +- satellite/repairer.go | 1 + scripts/testdata/satellite-config.yaml.lock | 3 + 7 files changed, 180 insertions(+), 21 deletions(-) mode change 100644 => 100755 scripts/testdata/satellite-config.yaml.lock diff --git a/private/testplanet/satellite.go b/private/testplanet/satellite.go index eecc061af..1082ae87e 100644 --- a/private/testplanet/satellite.go +++ b/private/testplanet/satellite.go @@ -380,6 +380,7 @@ func (planet *Planet) newSatellites(count int) ([]*Satellite, error) { TotalTimeout: 10 * time.Minute, MaxBufferMem: 4 * memory.MiB, MaxExcessRateOptimalThreshold: 0.05, + InMemoryRepair: false, }, Audit: audit.Config{ MaxRetriesStatDB: 0, diff --git a/satellite/repair/repair_test.go b/satellite/repair/repair_test.go index df42f2f1f..f8cec7bb4 100644 --- a/satellite/repair/repair_test.go +++ b/satellite/repair/repair_test.go @@ -33,7 +33,14 @@ import ( // threshold // - Downloads the data from those left nodes and check that it's the same than // the uploaded one -func TestDataRepair(t *testing.T) { +func TestDataRepairInMemory(t *testing.T) { + testDataRepair(t, true) +} +func TestDataRepairToDisk(t *testing.T) { + testDataRepair(t, false) +} + +func testDataRepair(t *testing.T, inMemoryRepair bool) { const ( RepairMaxExcessRateOptimalThreshold = 0.05 minThreshold = 3 @@ -47,6 +54,7 @@ func TestDataRepair(t *testing.T) { Reconfigure: testplanet.Reconfigure{ Satellite: func(log *zap.Logger, index int, config *satellite.Config) { config.Repairer.MaxExcessRateOptimalThreshold = RepairMaxExcessRateOptimalThreshold + config.Repairer.InMemoryRepair = inMemoryRepair config.Metainfo.RS.MinThreshold = minThreshold config.Metainfo.RS.RepairThreshold = 5 @@ -164,7 +172,14 @@ func TestDataRepair(t *testing.T) { // - Triggers data repair, which attempts to repair the data from the remaining nodes to // the numbers of nodes determined by the upload repair max threshold // - Expects that the repair failed and the pointer was not updated -func TestCorruptDataRepair_Failed(t *testing.T) { +func TestCorruptDataRepairInMemory_Failed(t *testing.T) { + testCorruptDataRepairFailed(t, true) +} +func TestCorruptDataRepairToDisk_Failed(t *testing.T) { + testCorruptDataRepairFailed(t, false) +} + +func testCorruptDataRepairFailed(t *testing.T, inMemoryRepair bool) { const RepairMaxExcessRateOptimalThreshold = 0.05 testplanet.Run(t, testplanet.Config{ @@ -174,6 +189,7 @@ func TestCorruptDataRepair_Failed(t *testing.T) { Reconfigure: testplanet.Reconfigure{ Satellite: func(log *zap.Logger, index int, config *satellite.Config) { config.Repairer.MaxExcessRateOptimalThreshold = RepairMaxExcessRateOptimalThreshold + config.Repairer.InMemoryRepair = inMemoryRepair config.Metainfo.RS.MinThreshold = 3 config.Metainfo.RS.RepairThreshold = 5 @@ -275,7 +291,14 @@ func TestCorruptDataRepair_Failed(t *testing.T) { // - Triggers data repair, which attempts to repair the data from the remaining nodes to // the numbers of nodes determined by the upload repair max threshold // - Expects that the repair succeed and the pointer should not contain the corrupted piece -func TestCorruptDataRepair_Succeed(t *testing.T) { +func TestCorruptDataRepairInMemory_Succeed(t *testing.T) { + testCorruptDataRepairSucceed(t, true) +} +func TestCorruptDataRepairToDisk_Succeed(t *testing.T) { + testCorruptDataRepairSucceed(t, false) +} + +func testCorruptDataRepairSucceed(t *testing.T, inMemoryRepair bool) { const RepairMaxExcessRateOptimalThreshold = 0.05 testplanet.Run(t, testplanet.Config{ @@ -285,6 +308,7 @@ func TestCorruptDataRepair_Succeed(t *testing.T) { Reconfigure: testplanet.Reconfigure{ Satellite: func(log *zap.Logger, index int, config *satellite.Config) { config.Repairer.MaxExcessRateOptimalThreshold = RepairMaxExcessRateOptimalThreshold + config.Repairer.InMemoryRepair = inMemoryRepair config.Metainfo.RS.MinThreshold = 3 config.Metainfo.RS.RepairThreshold = 5 @@ -669,13 +693,27 @@ func TestIrreparableSegmentNodesOffline(t *testing.T) { // - Now we have just the 3 new nodes to which the data was repaired // - Downloads the data from these 3 nodes (succeeds because 3 nodes are enough for download) // - Expect newly repaired pointer does not contain the disqualified or suspended nodes -func TestRepairMultipleDisqualifiedAndSuspended(t *testing.T) { +func TestRepairMultipleDisqualifiedAndSuspendedInMemory(t *testing.T) { + testRepairMultipleDisqualifiedAndSuspended(t, true) +} +func TestRepairMultipleDisqualifiedAndSuspendedToDisk(t *testing.T) { + testRepairMultipleDisqualifiedAndSuspended(t, false) +} + +func testRepairMultipleDisqualifiedAndSuspended(t *testing.T, inMemoryRepair bool) { testplanet.Run(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 12, UplinkCount: 1, Reconfigure: testplanet.Reconfigure{ - Satellite: testplanet.ReconfigureRS(3, 5, 7, 7), + Satellite: func(log *zap.Logger, index int, config *satellite.Config) { + config.Repairer.InMemoryRepair = inMemoryRepair + + config.Metainfo.RS.MinThreshold = 3 + config.Metainfo.RS.RepairThreshold = 5 + config.Metainfo.RS.SuccessThreshold = 7 + config.Metainfo.RS.TotalThreshold = 7 + }, }, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { // first, upload some remote data @@ -773,7 +811,14 @@ func TestRepairMultipleDisqualifiedAndSuspended(t *testing.T) { // - Kills nodes to fall to the Repair Override Value of the checker but stays above the original Repair Threshold // - Triggers data repair, which attempts to repair the data from the remaining nodes to // the numbers of nodes determined by the upload repair max threshold -func TestDataRepairOverride_HigherLimit(t *testing.T) { +func TestDataRepairOverride_HigherLimitInMemory(t *testing.T) { + testDataRepairOverrideHigherLimit(t, true) +} +func TestDataRepairOverride_HigherLimitToDisk(t *testing.T) { + testDataRepairOverrideHigherLimit(t, false) +} + +func testDataRepairOverrideHigherLimit(t *testing.T, inMemoryRepair bool) { const repairOverride = 6 testplanet.Run(t, testplanet.Config{ @@ -783,6 +828,7 @@ func TestDataRepairOverride_HigherLimit(t *testing.T) { Reconfigure: testplanet.Reconfigure{ Satellite: func(log *zap.Logger, index int, config *satellite.Config) { config.Checker.RepairOverride = repairOverride + config.Repairer.InMemoryRepair = inMemoryRepair config.Metainfo.RS.MinThreshold = 3 config.Metainfo.RS.RepairThreshold = 4 @@ -858,7 +904,14 @@ func TestDataRepairOverride_HigherLimit(t *testing.T) { // - Kills more nodes to fall to the Override Value to trigger repair // - Triggers data repair, which attempts to repair the data from the remaining nodes to // the numbers of nodes determined by the upload repair max threshold -func TestDataRepairOverride_LowerLimit(t *testing.T) { +func TestDataRepairOverride_LowerLimitInMemory(t *testing.T) { + testDataRepairOverrideLowerLimit(t, true) +} +func TestDataRepairOverride_LowerLimitToDisk(t *testing.T) { + testDataRepairOverrideLowerLimit(t, false) +} + +func testDataRepairOverrideLowerLimit(t *testing.T, inMemoryRepair bool) { const repairOverride = 4 testplanet.Run(t, testplanet.Config{ @@ -868,6 +921,7 @@ func TestDataRepairOverride_LowerLimit(t *testing.T) { Reconfigure: testplanet.Reconfigure{ Satellite: func(log *zap.Logger, index int, config *satellite.Config) { config.Checker.RepairOverride = repairOverride + config.Repairer.InMemoryRepair = inMemoryRepair config.Metainfo.RS.MinThreshold = 3 config.Metainfo.RS.RepairThreshold = 6 @@ -973,7 +1027,14 @@ func TestDataRepairOverride_LowerLimit(t *testing.T) { // - Triggers data repair // - Verify that the number of pieces which repaired has uploaded don't overpass // the established limit (success threshold + % of excess) -func TestDataRepairUploadLimit(t *testing.T) { +func TestDataRepairUploadLimitInMemory(t *testing.T) { + testDataRepairUploadLimit(t, true) +} +func TestDataRepairUploadLimitToDisk(t *testing.T) { + testDataRepairUploadLimit(t, false) +} + +func testDataRepairUploadLimit(t *testing.T, inMemoryRepair bool) { const ( RepairMaxExcessRateOptimalThreshold = 0.05 repairThreshold = 5 @@ -988,6 +1049,7 @@ func TestDataRepairUploadLimit(t *testing.T) { Reconfigure: testplanet.Reconfigure{ Satellite: func(log *zap.Logger, index int, config *satellite.Config) { config.Repairer.MaxExcessRateOptimalThreshold = RepairMaxExcessRateOptimalThreshold + config.Repairer.InMemoryRepair = inMemoryRepair config.Metainfo.RS.MinThreshold = 3 config.Metainfo.RS.RepairThreshold = repairThreshold diff --git a/satellite/repair/repairer/ec.go b/satellite/repair/repairer/ec.go index 983cb6f87..148a578fe 100644 --- a/satellite/repair/repairer/ec.go +++ b/satellite/repair/repairer/ec.go @@ -8,6 +8,9 @@ import ( "context" "io" "io/ioutil" + "os" + "path/filepath" + "runtime" "sort" "sync" "sync/atomic" @@ -16,6 +19,7 @@ import ( "github.com/vivint/infectious" "github.com/zeebo/errs" "go.uber.org/zap" + "golang.org/x/sys/unix" "storj.io/common/errs2" "storj.io/common/pb" @@ -37,15 +41,17 @@ type ECRepairer struct { dialer rpc.Dialer satelliteSignee signing.Signee downloadTimeout time.Duration + inMemoryRepair bool } // NewECRepairer creates a new repairer for interfacing with storagenodes. -func NewECRepairer(log *zap.Logger, dialer rpc.Dialer, satelliteSignee signing.Signee, downloadTimeout time.Duration) *ECRepairer { +func NewECRepairer(log *zap.Logger, dialer rpc.Dialer, satelliteSignee signing.Signee, downloadTimeout time.Duration, inMemoryRepair bool) *ECRepairer { return &ECRepairer{ log: log, dialer: dialer, satelliteSignee: satelliteSignee, downloadTimeout: downloadTimeout, + inMemoryRepair: inMemoryRepair, } } @@ -113,7 +119,7 @@ func (ec *ECRepairer) Get(ctx context.Context, limits []*pb.AddressedOrderLimit, inProgress++ cond.L.Unlock() - downloadedPiece, err := ec.downloadAndVerifyPiece(ctx, limit, privateKey, pieceSize) + pieceReadCloser, err := ec.downloadAndVerifyPiece(ctx, limit, privateKey, pieceSize) cond.L.Lock() inProgress-- if err != nil { @@ -131,7 +137,7 @@ func (ec *ECRepairer) Get(ctx context.Context, limits []*pb.AddressedOrderLimit, return } - pieceReaders[currentLimitIndex] = ioutil.NopCloser(bytes.NewReader(downloadedPiece)) + pieceReaders[currentLimitIndex] = pieceReadCloser successfulPieces++ return @@ -164,10 +170,66 @@ func (ec *ECRepairer) Get(ctx context.Context, limits []*pb.AddressedOrderLimit, return decodeReader, failedPieces, nil } +// tempFileReadWriteCloser wraps a temporary file +// and deletes the file when Close() is called. +type tempFileReadWriteCloser struct { + file *os.File +} + +func newTempFileReadWriteCloser() (*tempFileReadWriteCloser, error) { + tempDir := os.TempDir() + repairDir := filepath.Join(tempDir, "repair") + err := os.MkdirAll(repairDir, os.ModePerm) + if err != nil { + return nil, err + } + // on linux, create an unlinked tempfile that will automatically be removed when we close + // on non-linux systems, create a temp file with ioutil that we will delete inside Close() + var newFile *os.File + if runtime.GOOS == "linux" { + newFile, err = os.OpenFile(repairDir, os.O_RDWR|os.O_EXCL|unix.O_TMPFILE, 0600) + if err != nil { + return nil, err + } + } else { + newFile, err = ioutil.TempFile(repairDir, "storj-satellite-repair-*") + if err != nil { + return nil, err + } + } + return &tempFileReadWriteCloser{ + file: newFile, + }, nil +} + +func (f *tempFileReadWriteCloser) SeekToFront() (err error) { + _, err = f.file.Seek(0, 0) + return err +} + +func (f *tempFileReadWriteCloser) Read(p []byte) (n int, err error) { + return f.file.Read(p) +} + +func (f *tempFileReadWriteCloser) Write(b []byte) (n int, err error) { + return f.file.Write(b) +} + +func (f *tempFileReadWriteCloser) Close() error { + closeErr := f.file.Close() + // we only need to explicitly remove file on non-linux systems + if runtime.GOOS != "linux" { + filename := f.file.Name() + removeErr := os.Remove(filename) + return errs.Combine(closeErr, removeErr) + } + return closeErr +} + // downloadAndVerifyPiece downloads a piece from a storagenode, // expects the original order limit to have the correct piece public key, // and expects the hash of the data to match the signed hash provided by the storagenode. -func (ec *ECRepairer) downloadAndVerifyPiece(ctx context.Context, limit *pb.AddressedOrderLimit, privateKey storj.PiecePrivateKey, pieceSize int64) (data []byte, err error) { +func (ec *ECRepairer) downloadAndVerifyPiece(ctx context.Context, limit *pb.AddressedOrderLimit, privateKey storj.PiecePrivateKey, pieceSize int64) (pieceReadCloser io.ReadCloser, err error) { // contact node downloadCtx, cancel := context.WithTimeout(ctx, ec.downloadTimeout) defer cancel() @@ -187,13 +249,42 @@ func (ec *ECRepairer) downloadAndVerifyPiece(ctx context.Context, limit *pb.Addr } defer func() { err = errs.Combine(err, downloader.Close()) }() - pieceBytes, err := ioutil.ReadAll(downloader) - if err != nil { - return nil, err + hashWriter := pkcrypto.NewHash() + downloadReader := io.TeeReader(downloader, hashWriter) + var downloadedPieceSize int64 + + if ec.inMemoryRepair { + pieceBytes, err := ioutil.ReadAll(downloadReader) + if err != nil { + return nil, err + } + downloadedPieceSize = int64(len(pieceBytes)) + pieceReadCloser = ioutil.NopCloser(bytes.NewReader(pieceBytes)) + } else { + pieceReadWriteCloser, err := newTempFileReadWriteCloser() + if err != nil { + return nil, err + } + defer func() { + // close and remove file if there is some error + if err != nil { + err = errs.Combine(err, pieceReadWriteCloser.Close()) + } + }() + downloadedPieceSize, err = io.Copy(pieceReadWriteCloser, downloadReader) + if err != nil { + return nil, err + } + // seek to beginning of file so the repair job starts at the beginning of the piece + err = pieceReadWriteCloser.SeekToFront() + if err != nil { + return nil, err + } + pieceReadCloser = pieceReadWriteCloser } - if int64(len(pieceBytes)) != pieceSize { - return nil, Error.New("didn't download the correct amount of data, want %d, got %d", pieceSize, len(pieceBytes)) + if downloadedPieceSize != pieceSize { + return nil, Error.New("didn't download the correct amount of data, want %d, got %d", pieceSize, downloadedPieceSize) } // get signed piece hash and original order limit @@ -211,12 +302,12 @@ func (ec *ECRepairer) downloadAndVerifyPiece(ctx context.Context, limit *pb.Addr } // verify the hashes from storage node - calculatedHash := pkcrypto.SHA256Hash(pieceBytes) + calculatedHash := hashWriter.Sum(nil) if err := verifyPieceHash(ctx, originalLimit, hash, calculatedHash); err != nil { return nil, ErrPieceHashVerifyFailed.Wrap(err) } - return pieceBytes, nil + return pieceReadCloser, nil } func verifyPieceHash(ctx context.Context, limit *pb.OrderLimit, hash *pb.PieceHash, expectedHash []byte) (err error) { diff --git a/satellite/repair/repairer/repairer.go b/satellite/repair/repairer/repairer.go index d06942c4c..4095aec25 100644 --- a/satellite/repair/repairer/repairer.go +++ b/satellite/repair/repairer/repairer.go @@ -35,6 +35,7 @@ type Config struct { TotalTimeout time.Duration `help:"time limit for an entire repair job, from queue pop to upload completion" default:"45m"` MaxBufferMem memory.Size `help:"maximum buffer memory (in bytes) to be allocated for read buffers" default:"4M"` MaxExcessRateOptimalThreshold float64 `help:"ratio applied to the optimal threshold to calculate the excess of the maximum number of repaired pieces to upload" default:"0.05"` + InMemoryRepair bool `help:"whether to download pieces for repair in memory (true) or download to disk (false)" default:"false"` } // Service contains the information needed to run the repair service diff --git a/satellite/repair/repairer/segments.go b/satellite/repair/repairer/segments.go index 65ab8d5cc..5d3bdaa5b 100644 --- a/satellite/repair/repairer/segments.go +++ b/satellite/repair/repairer/segments.go @@ -73,7 +73,7 @@ func NewSegmentRepairer( log *zap.Logger, metainfo *metainfo.Service, orders *orders.Service, overlay *overlay.Service, dialer rpc.Dialer, timeout time.Duration, excessOptimalThreshold float64, repairOverride int, - downloadTimeout time.Duration, + downloadTimeout time.Duration, inMemoryRepair bool, satelliteSignee signing.Signee, ) *SegmentRepairer { @@ -86,7 +86,7 @@ func NewSegmentRepairer( metainfo: metainfo, orders: orders, overlay: overlay, - ec: NewECRepairer(log.Named("ec repairer"), dialer, satelliteSignee, downloadTimeout), + ec: NewECRepairer(log.Named("ec repairer"), dialer, satelliteSignee, downloadTimeout, inMemoryRepair), timeout: timeout, multiplierOptimalThreshold: 1 + excessOptimalThreshold, repairOverride: repairOverride, diff --git a/satellite/repairer.go b/satellite/repairer.go index fd358e6a9..1b37d4a5a 100644 --- a/satellite/repairer.go +++ b/satellite/repairer.go @@ -174,6 +174,7 @@ func NewRepairer(log *zap.Logger, full *identity.FullIdentity, config.Repairer.MaxExcessRateOptimalThreshold, config.Checker.RepairOverride, config.Repairer.DownloadTimeout, + config.Repairer.InMemoryRepair, signing.SigneeFromPeerIdentity(peer.Identity.PeerIdentity()), ) peer.Repairer = repairer.NewService(log.Named("repairer"), repairQueue, &config.Repairer, peer.SegmentRepairer, irrDB) diff --git a/scripts/testdata/satellite-config.yaml.lock b/scripts/testdata/satellite-config.yaml.lock old mode 100644 new mode 100755 index 0c8bdd2b5..24e0d6404 --- a/scripts/testdata/satellite-config.yaml.lock +++ b/scripts/testdata/satellite-config.yaml.lock @@ -465,6 +465,9 @@ identity.key-path: /root/.local/share/storj/identity/satellite/identity.key # time limit for downloading pieces from a node for repair # repairer.download-timeout: 5m0s +# whether to download pieces for repair in memory (true) or download to disk (false) +# repairer.in-memory-repair: false + # how frequently repairer should try and repair more data # repairer.interval: 5m0s