Improve stability of TestDownloadSharesDownloadTimeout (#2210)
This commit is contained in:
parent
b5f8a536e5
commit
f9ed0dc1a8
96
internal/testblobs/slow.go
Normal file
96
internal/testblobs/slow.go
Normal file
@ -0,0 +1,96 @@
|
||||
// Copyright (C) 2019 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package testblobs
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
"storj.io/storj/storage"
|
||||
"storj.io/storj/storagenode"
|
||||
)
|
||||
|
||||
// SlowDB implements slow storage node DB.
|
||||
type SlowDB struct {
|
||||
storagenode.DB
|
||||
blobs *SlowBlobs
|
||||
log *zap.Logger
|
||||
}
|
||||
|
||||
// NewSlowDB creates a new slow storage node DB wrapping the provided db.
|
||||
// Use SetLatency to dynamically configure the latency of all piece operations.
|
||||
func NewSlowDB(log *zap.Logger, db storagenode.DB) *SlowDB {
|
||||
return &SlowDB{
|
||||
DB: db,
|
||||
blobs: NewSlowBlobs(log, db.Pieces()),
|
||||
log: log,
|
||||
}
|
||||
}
|
||||
|
||||
// Pieces returns the blob store.
|
||||
func (slow *SlowDB) Pieces() storage.Blobs {
|
||||
return slow.blobs
|
||||
}
|
||||
|
||||
// SetLatency enables a sleep for delay duration for all piece operations.
|
||||
// A zero or negative delay means no sleep.
|
||||
func (slow *SlowDB) SetLatency(delay time.Duration) {
|
||||
slow.blobs.SetLatency(delay)
|
||||
}
|
||||
|
||||
// SlowBlobs implements a slow blob store.
|
||||
type SlowBlobs struct {
|
||||
delay int64 // time.Duration
|
||||
blobs storage.Blobs
|
||||
log *zap.Logger
|
||||
}
|
||||
|
||||
// NewSlowBlobs creates a new slow blob store wrapping the provided blobs.
|
||||
// Use SetLatency to dynamically configure the latency of all operations.
|
||||
func NewSlowBlobs(log *zap.Logger, blobs storage.Blobs) *SlowBlobs {
|
||||
return &SlowBlobs{
|
||||
log: log,
|
||||
blobs: blobs,
|
||||
}
|
||||
}
|
||||
|
||||
// Create creates a new blob that can be written optionally takes a size
|
||||
// argument for performance improvements, -1 is unknown size.
|
||||
func (slow *SlowBlobs) Create(ctx context.Context, ref storage.BlobRef, size int64) (storage.BlobWriter, error) {
|
||||
slow.sleep()
|
||||
return slow.blobs.Create(ctx, ref, size)
|
||||
}
|
||||
|
||||
// Open opens a reader with the specified namespace and key.
|
||||
func (slow *SlowBlobs) Open(ctx context.Context, ref storage.BlobRef) (storage.BlobReader, error) {
|
||||
slow.sleep()
|
||||
return slow.blobs.Open(ctx, ref)
|
||||
}
|
||||
|
||||
// Delete deletes the blob with the namespace and key.
|
||||
func (slow *SlowBlobs) Delete(ctx context.Context, ref storage.BlobRef) error {
|
||||
slow.sleep()
|
||||
return slow.blobs.Delete(ctx, ref)
|
||||
}
|
||||
|
||||
// FreeSpace return how much free space left for writing.
|
||||
func (slow *SlowBlobs) FreeSpace() (int64, error) {
|
||||
slow.sleep()
|
||||
return slow.blobs.FreeSpace()
|
||||
}
|
||||
|
||||
// SetLatency configures the blob store to sleep for delay duration for all
|
||||
// operations. A zero or negative delay means no sleep.
|
||||
func (slow *SlowBlobs) SetLatency(delay time.Duration) {
|
||||
atomic.StoreInt64(&slow.delay, int64(delay))
|
||||
}
|
||||
|
||||
// sleep sleeps for the duration set to slow.delay
|
||||
func (slow *SlowBlobs) sleep() {
|
||||
delay := time.Duration(atomic.LoadInt64(&slow.delay))
|
||||
time.Sleep(delay)
|
||||
}
|
@ -19,7 +19,7 @@ type Reconfigure struct {
|
||||
NewSatelliteDB func(log *zap.Logger, index int) (satellite.DB, error)
|
||||
Satellite func(log *zap.Logger, index int, config *satellite.Config)
|
||||
|
||||
NewStorageNodeDB func(index int) (storagenode.DB, error)
|
||||
NewStorageNodeDB func(index int, db storagenode.DB, log *zap.Logger) (storagenode.DB, error)
|
||||
StorageNode func(index int, config *storagenode.Config)
|
||||
NewIPCount int
|
||||
}
|
||||
|
@ -55,7 +55,6 @@ func Run(t *testing.T, config Config, test func(t *testing.T, ctx *testcontext.C
|
||||
schema: schema,
|
||||
}, nil
|
||||
}
|
||||
planetConfig.Reconfigure.NewStorageNodeDB = nil
|
||||
|
||||
planet, err := NewCustom(zaptest.NewLogger(t), planetConfig)
|
||||
if err != nil {
|
||||
|
@ -51,15 +51,18 @@ func (planet *Planet) newStorageNodes(count int, whitelistedSatellites storj.Nod
|
||||
}
|
||||
|
||||
var db storagenode.DB
|
||||
if planet.config.Reconfigure.NewStorageNodeDB != nil {
|
||||
db, err = planet.config.Reconfigure.NewStorageNodeDB(i)
|
||||
} else {
|
||||
db, err = storagenodedb.NewInMemory(log.Named("db"), storageDir)
|
||||
}
|
||||
db, err = storagenodedb.NewInMemory(log.Named("db"), storageDir)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if planet.config.Reconfigure.NewStorageNodeDB != nil {
|
||||
db, err = planet.config.Reconfigure.NewStorageNodeDB(i, db, planet.log)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
err = db.CreateTables()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -11,10 +11,12 @@ import (
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/zeebo/errs"
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/grpc/codes"
|
||||
|
||||
"storj.io/storj/internal/errs2"
|
||||
"storj.io/storj/internal/memory"
|
||||
"storj.io/storj/internal/testblobs"
|
||||
"storj.io/storj/internal/testcontext"
|
||||
"storj.io/storj/internal/testplanet"
|
||||
"storj.io/storj/internal/testrand"
|
||||
@ -22,7 +24,7 @@ import (
|
||||
"storj.io/storj/pkg/peertls/tlsopts"
|
||||
"storj.io/storj/pkg/storj"
|
||||
"storj.io/storj/pkg/transport"
|
||||
"storj.io/storj/uplink"
|
||||
"storj.io/storj/storagenode"
|
||||
)
|
||||
|
||||
// TestDownloadSharesHappyPath checks that the Share.Error field of all shares
|
||||
@ -210,8 +212,7 @@ func TestDownloadSharesDialTimeout(t *testing.T) {
|
||||
require.NotNil(t, slowClient)
|
||||
|
||||
// This config value will create a very short timeframe allowed for receiving
|
||||
// data from storage nodes. This will cause context to cancel and start
|
||||
// downloading from new nodes.
|
||||
// data from storage nodes. This will cause context to cancel with timeout.
|
||||
minBytesPerSecond := 100 * memory.KiB
|
||||
|
||||
verifier := audit.NewVerifier(
|
||||
@ -249,23 +250,22 @@ func TestDownloadSharesDialTimeout(t *testing.T) {
|
||||
// change that affects the audit service.
|
||||
func TestDownloadSharesDownloadTimeout(t *testing.T) {
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
||||
SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 1,
|
||||
Reconfigure: testplanet.Reconfigure{
|
||||
NewStorageNodeDB: func(index int, db storagenode.DB, log *zap.Logger) (storagenode.DB, error) {
|
||||
return testblobs.NewSlowDB(log.Named("slowdb"), db), nil
|
||||
},
|
||||
},
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
storageNodeDB := planet.StorageNodes[0].DB.(*testblobs.SlowDB)
|
||||
audits := planet.Satellites[0].Audit.Service
|
||||
err := audits.Close()
|
||||
require.NoError(t, err)
|
||||
|
||||
upl := planet.Uplinks[0]
|
||||
testData := testrand.Bytes(32 * memory.KiB)
|
||||
testData := testrand.Bytes(8 * memory.KiB)
|
||||
|
||||
// Upload with larger erasure share size to simulate longer download over slow transport client
|
||||
err = upl.UploadWithConfig(ctx, planet.Satellites[0], &uplink.RSConfig{
|
||||
MinThreshold: 1,
|
||||
RepairThreshold: 2,
|
||||
SuccessThreshold: 3,
|
||||
MaxThreshold: 4,
|
||||
ErasureShareSize: 32 * memory.KiB,
|
||||
}, "testbucket", "test/path", testData)
|
||||
err = upl.Upload(ctx, planet.Satellites[0], "testbucket", "test/path", testData)
|
||||
require.NoError(t, err)
|
||||
|
||||
projects, err := planet.Satellites[0].DB.Console().Projects().GetAll(ctx)
|
||||
@ -276,44 +276,36 @@ func TestDownloadSharesDownloadTimeout(t *testing.T) {
|
||||
stripe, _, err := audits.Cursor.NextStripe(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
// set stripe index to 0 to ensure we are auditing large enough stripe
|
||||
// instead of the last stripe, which could be smaller
|
||||
stripe.Index = 0
|
||||
|
||||
network := &transport.SimulatedNetwork{
|
||||
BytesPerSecond: 128 * memory.KiB,
|
||||
}
|
||||
|
||||
slowClient := network.NewClient(planet.Satellites[0].Transport)
|
||||
require.NotNil(t, slowClient)
|
||||
|
||||
// This config value will create a very short timeframe allowed for receiving
|
||||
// data from storage nodes. This will cause context to cancel and start
|
||||
// downloading from new nodes.
|
||||
minBytesPerSecond := 1 * memory.MiB
|
||||
// data from storage nodes. This will cause context to cancel with timeout.
|
||||
minBytesPerSecond := 100 * memory.KiB
|
||||
|
||||
verifier := audit.NewVerifier(
|
||||
planet.Satellites[0].Log.Named("verifier"),
|
||||
planet.Satellites[0].Metainfo.Service,
|
||||
slowClient,
|
||||
planet.Satellites[0].Transport,
|
||||
planet.Satellites[0].Overlay.Service,
|
||||
planet.Satellites[0].DB.Containment(),
|
||||
planet.Satellites[0].Orders.Service,
|
||||
planet.Satellites[0].Identity,
|
||||
minBytesPerSecond,
|
||||
100*time.Millisecond)
|
||||
50*time.Millisecond)
|
||||
|
||||
shareSize := stripe.Segment.GetRemote().GetRedundancy().GetErasureShareSize()
|
||||
limits, err := planet.Satellites[0].Orders.Service.CreateAuditOrderLimits(ctx, planet.Satellites[0].Identity.PeerIdentity(), bucketID, stripe.Segment, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
// make downloads slow
|
||||
delay := 100 * time.Millisecond
|
||||
storageNodeDB.SetLatency(delay)
|
||||
|
||||
shares, err := verifier.DownloadShares(ctx, limits, stripe.Index, shareSize)
|
||||
require.NoError(t, err)
|
||||
|
||||
for _, share := range shares {
|
||||
assert.True(t, errs2.IsRPC(share.Error, codes.DeadlineExceeded), "unexpected error: %+v", share.Error)
|
||||
assert.False(t, transport.Error.Has(share.Error), "unexpected error: %+v", share.Error)
|
||||
}
|
||||
require.Len(t, shares, 1)
|
||||
share := shares[0]
|
||||
assert.True(t, errs2.IsRPC(share.Error, codes.DeadlineExceeded), "unexpected error: %+v", share.Error)
|
||||
assert.False(t, transport.Error.Has(share.Error), "unexpected error: %+v", share.Error)
|
||||
})
|
||||
}
|
||||
|
||||
@ -445,8 +437,7 @@ func TestVerifierDialTimeout(t *testing.T) {
|
||||
require.NotNil(t, slowClient)
|
||||
|
||||
// This config value will create a very short timeframe allowed for receiving
|
||||
// data from storage nodes. This will cause context to cancel and start
|
||||
// downloading from new nodes.
|
||||
// data from storage nodes. This will cause context to cancel with timeout.
|
||||
minBytesPerSecond := 100 * memory.KiB
|
||||
|
||||
verifier := audit.NewVerifier(
|
||||
|
Loading…
Reference in New Issue
Block a user