Improve audit cursor NextStripe (#1821)
This commit is contained in:
parent
42116594d1
commit
50a0bffa0a
@ -5,12 +5,14 @@ package audit
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"math/big"
|
||||
crand "crypto/rand"
|
||||
"encoding/binary"
|
||||
"math/rand"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/golang/protobuf/ptypes"
|
||||
"github.com/zeebo/errs"
|
||||
|
||||
"storj.io/storj/pkg/eestream"
|
||||
"storj.io/storj/pkg/pb"
|
||||
@ -54,17 +56,6 @@ func (cursor *Cursor) NextStripe(ctx context.Context) (stripe *Stripe, err error
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(pointerItems) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
pointerItem, err := getRandomPointer(pointerItems)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
path = pointerItem.Path
|
||||
|
||||
// keep track of last path listed
|
||||
if !more {
|
||||
cursor.lastPath = ""
|
||||
@ -72,31 +63,11 @@ func (cursor *Cursor) NextStripe(ctx context.Context) (stripe *Stripe, err error
|
||||
cursor.lastPath = pointerItems[len(pointerItems)-1].Path
|
||||
}
|
||||
|
||||
// get pointer info
|
||||
pointer, err := cursor.metainfo.Get(path)
|
||||
pointer, path, err := cursor.getRandomValidPointer(pointerItems)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
//delete expired items rather than auditing them
|
||||
if expiration := pointer.GetExpirationDate(); expiration != nil {
|
||||
t, err := ptypes.Timestamp(expiration)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if t.Before(time.Now()) {
|
||||
return nil, cursor.metainfo.Delete(path)
|
||||
}
|
||||
}
|
||||
|
||||
if pointer.GetType() != pb.Pointer_REMOTE {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
if pointer.GetSegmentSize() == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
index, err := getRandomStripe(pointer)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -120,19 +91,72 @@ func getRandomStripe(pointer *pb.Pointer) (index int64, err error) {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
randomStripeIndex, err := rand.Int(rand.Reader, big.NewInt(pointer.GetSegmentSize()/int64(redundancy.StripeSize())))
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
var src cryptoSource
|
||||
rnd := rand.New(src)
|
||||
numStripes := pointer.GetSegmentSize() / int64(redundancy.StripeSize())
|
||||
randomStripeIndex := rnd.Int63n(numStripes)
|
||||
|
||||
return randomStripeIndex.Int64(), nil
|
||||
return randomStripeIndex, nil
|
||||
}
|
||||
|
||||
func getRandomPointer(pointerItems []*pb.ListResponse_Item) (pointer *pb.ListResponse_Item, err error) {
|
||||
randomNum, err := rand.Int(rand.Reader, big.NewInt(int64(len(pointerItems))))
|
||||
if err != nil {
|
||||
return &pb.ListResponse_Item{}, err
|
||||
// getRandomValidPointer attempts to get a random remote pointer from a list. If it sees expired pointers in the process of looking, deletes them
|
||||
func (cursor *Cursor) getRandomValidPointer(pointerItems []*pb.ListResponse_Item) (pointer *pb.Pointer, path storj.Path, err error) {
|
||||
var src cryptoSource
|
||||
rnd := rand.New(src)
|
||||
errGroup := new(errs.Group)
|
||||
randomNums := rnd.Perm(len(pointerItems))
|
||||
|
||||
for _, randomIndex := range randomNums {
|
||||
pointerItem := pointerItems[randomIndex]
|
||||
path := pointerItem.Path
|
||||
|
||||
// get pointer info
|
||||
pointer, err := cursor.metainfo.Get(path)
|
||||
if err != nil {
|
||||
errGroup.Add(err)
|
||||
continue
|
||||
}
|
||||
|
||||
//delete expired items rather than auditing them
|
||||
if expiration := pointer.GetExpirationDate(); expiration != nil {
|
||||
t, err := ptypes.Timestamp(expiration)
|
||||
if err != nil {
|
||||
errGroup.Add(err)
|
||||
continue
|
||||
}
|
||||
if t.Before(time.Now()) {
|
||||
err := cursor.metainfo.Delete(path)
|
||||
if err != nil {
|
||||
errGroup.Add(err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
if pointer.GetType() != pb.Pointer_REMOTE || pointer.GetSegmentSize() == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
return pointer, path, nil
|
||||
}
|
||||
|
||||
return pointerItems[randomNum.Int64()], nil
|
||||
errGroup.Add(Error.New("no valid node found in selection"))
|
||||
return nil, "", errGroup.Err()
|
||||
}
|
||||
|
||||
// cryptoSource implements the math/rand Source interface using crypto/rand
|
||||
type cryptoSource struct{}
|
||||
|
||||
func (s cryptoSource) Seed(seed int64) {}
|
||||
|
||||
func (s cryptoSource) Int63() int64 {
|
||||
return int64(s.Uint64() & ^uint64(1<<63))
|
||||
}
|
||||
|
||||
func (s cryptoSource) Uint64() (v uint64) {
|
||||
err := binary.Read(crand.Reader, binary.BigEndian, &v)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return v
|
||||
}
|
||||
|
@ -36,8 +36,8 @@ func TestAuditSegment(t *testing.T) {
|
||||
// note: to simulate better,
|
||||
// change limit in library to 5 in
|
||||
// list api call, default is 0 == 1000 listing
|
||||
//populate pointerdb with 10 non-expired pointers of test data
|
||||
tests, cursor, pointerdb := populateTestData(t, planet, ×tamp.Timestamp{Seconds: time.Now().Unix() + 3000})
|
||||
//populate metainfo with 10 non-expired pointers of test data
|
||||
tests, cursor, metainfo := populateTestData(t, planet, ×tamp.Timestamp{Seconds: time.Now().Unix() + 3000})
|
||||
|
||||
t.Run("NextStripe", func(t *testing.T) {
|
||||
for _, tt := range tests {
|
||||
@ -46,9 +46,8 @@ func TestAuditSegment(t *testing.T) {
|
||||
if err != nil {
|
||||
require.Error(t, err)
|
||||
require.Nil(t, stripe)
|
||||
}
|
||||
if stripe != nil {
|
||||
require.Nil(t, err)
|
||||
} else {
|
||||
require.NotNil(t, stripe)
|
||||
}
|
||||
})
|
||||
}
|
||||
@ -56,7 +55,7 @@ func TestAuditSegment(t *testing.T) {
|
||||
|
||||
// test to see how random paths are
|
||||
t.Run("probabilisticTest", func(t *testing.T) {
|
||||
list, _, err := pointerdb.List("", "", "", true, 10, meta.None)
|
||||
list, _, err := metainfo.List("", "", "", true, 10, meta.None)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, list, 10)
|
||||
|
||||
@ -118,20 +117,16 @@ func TestDeleteExpired(t *testing.T) {
|
||||
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
//populate metainfo with 10 expired pointers of test data
|
||||
tests, cursor, metainfo := populateTestData(t, planet, ×tamp.Timestamp{})
|
||||
_, cursor, metainfo := populateTestData(t, planet, ×tamp.Timestamp{})
|
||||
//make sure it they're in there
|
||||
list, _, err := metainfo.List("", "", "", true, 10, meta.None)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, list, 10)
|
||||
// make sure its all null and no errors
|
||||
// make sure an error and no pointer is returned
|
||||
t.Run("NextStripe", func(t *testing.T) {
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.bm, func(t *testing.T) {
|
||||
stripe, err := cursor.NextStripe(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Nil(t, stripe)
|
||||
})
|
||||
}
|
||||
stripe, err := cursor.NextStripe(ctx)
|
||||
require.Error(t, err)
|
||||
require.Nil(t, stripe)
|
||||
})
|
||||
//make sure it they're not in there anymore
|
||||
list, _, err = metainfo.List("", "", "", true, 10, meta.None)
|
||||
|
@ -17,15 +17,9 @@ import (
|
||||
)
|
||||
|
||||
func TestVerifierHappyPath(t *testing.T) {
|
||||
t.Skip("flaky")
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 6, UplinkCount: 1,
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
// TODO (back story): the way NextStripe currently works, it will get a random segment
|
||||
// from metainfo. If it picks an inline segment, it will return nil. If this happens
|
||||
// 3 times in a row, the test will fail. Increasing the amount of iterations will
|
||||
// decrease risk of flaking but not eliminate it. Kaloyan and Nat are working on refactoring NextStripe.
|
||||
|
||||
err := planet.Satellites[0].Audit.Service.Close()
|
||||
require.NoError(t, err)
|
||||
|
||||
@ -41,16 +35,9 @@ func TestVerifierHappyPath(t *testing.T) {
|
||||
overlay := planet.Satellites[0].Overlay.Service
|
||||
cursor := audit.NewCursor(metainfo)
|
||||
|
||||
var stripe *audit.Stripe
|
||||
maxRetries := 3
|
||||
for i := 0; i < maxRetries; i++ {
|
||||
stripe, err = cursor.NextStripe(ctx)
|
||||
if stripe != nil || err != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
stripe, err := cursor.NextStripe(ctx)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, stripe, "unable to get stripe; likely no pointers in pointerdb")
|
||||
require.NotNil(t, stripe)
|
||||
|
||||
transport := planet.Satellites[0].Transport
|
||||
orders := planet.Satellites[0].Orders.Service
|
||||
|
@ -77,9 +77,6 @@ func (service *Service) process(ctx context.Context) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if stripe == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
verifiedNodes, err := service.Verifier.Verify(ctx, stripe)
|
||||
if err != nil {
|
||||
|
@ -44,12 +44,7 @@ func TestGetShareTimeout(t *testing.T) {
|
||||
cursor := audit.NewCursor(metainfo)
|
||||
|
||||
var stripe *audit.Stripe
|
||||
for {
|
||||
stripe, err = cursor.NextStripe(ctx)
|
||||
if stripe != nil || err != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
stripe, err = cursor.NextStripe(ctx)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, stripe)
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user