f61230a670
A chore responsible for purging data from the console DB has been implemented. Currently, it removes old records for unverified user accounts. We plan to extend this functionality to include expired project member invitations in the future. Resolves #5790 References #5816 Change-Id: I1f3ef62fc96c10a42a383804b3b1d2846d7813f7
511 lines
15 KiB
Go
511 lines
15 KiB
Go
// Copyright (C) 2022 Storj Labs, Inc.
|
|
// See LICENSE for copying information.
|
|
|
|
package main_test
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"os"
|
|
"sort"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
"github.com/zeebo/errs"
|
|
|
|
"storj.io/common/pb"
|
|
"storj.io/common/storj"
|
|
"storj.io/common/testcontext"
|
|
"storj.io/common/uuid"
|
|
segmentverify "storj.io/storj/cmd/tools/segment-verify"
|
|
"storj.io/storj/private/testplanet"
|
|
"storj.io/storj/satellite/metabase"
|
|
"storj.io/storj/satellite/overlay"
|
|
)
|
|
|
|
func TestService_EmptyRange(t *testing.T) {
|
|
ctx := testcontext.New(t)
|
|
log := testplanet.NewLogger(t)
|
|
|
|
config := segmentverify.ServiceConfig{
|
|
NotFoundPath: ctx.File("not-found.csv"),
|
|
RetryPath: ctx.File("retry.csv"),
|
|
ProblemPiecesPath: ctx.File("problem-pieces.csv"),
|
|
MaxOffline: 2,
|
|
}
|
|
|
|
metabase := newMetabaseMock(map[metabase.NodeAlias]storj.NodeID{})
|
|
verifier := &verifierMock{allSuccess: true}
|
|
|
|
service, err := segmentverify.NewService(log.Named("segment-verify"), metabase, verifier, metabase, config)
|
|
require.NoError(t, err)
|
|
require.NotNil(t, service)
|
|
|
|
defer ctx.Check(service.Close)
|
|
|
|
err = service.ProcessRange(ctx, uuid.UUID{}, uuid.Max())
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
func TestService_Success(t *testing.T) {
|
|
ctx := testcontext.New(t)
|
|
log := testplanet.NewLogger(t)
|
|
|
|
config := segmentverify.ServiceConfig{
|
|
NotFoundPath: ctx.File("not-found.csv"),
|
|
RetryPath: ctx.File("retry.csv"),
|
|
ProblemPiecesPath: ctx.File("problem-pieces.csv"),
|
|
PriorityNodesPath: ctx.File("priority-nodes.txt"),
|
|
|
|
Check: 3,
|
|
BatchSize: 100,
|
|
Concurrency: 3,
|
|
MaxOffline: 2,
|
|
}
|
|
|
|
// the node 1 is going to be priority
|
|
err := os.WriteFile(config.PriorityNodesPath, []byte((storj.NodeID{1}).String()+"\n"), 0755)
|
|
require.NoError(t, err)
|
|
|
|
func() {
|
|
nodes := map[metabase.NodeAlias]storj.NodeID{}
|
|
for i := 1; i <= 0xFF; i++ {
|
|
nodes[metabase.NodeAlias(i)] = storj.NodeID{byte(i)}
|
|
}
|
|
|
|
segments := []metabase.VerifySegment{
|
|
{
|
|
StreamID: uuid.UUID{0x10, 0x10},
|
|
AliasPieces: metabase.AliasPieces{{Number: 1, Alias: 8}, {Number: 3, Alias: 9}, {Number: 5, Alias: 10}, {Number: 0, Alias: 1}},
|
|
},
|
|
{
|
|
StreamID: uuid.UUID{0x20, 0x20},
|
|
AliasPieces: metabase.AliasPieces{{Number: 0, Alias: 2}, {Number: 1, Alias: 3}, {Number: 7, Alias: 4}},
|
|
},
|
|
{ // this won't get processed due to the high limit
|
|
StreamID: uuid.UUID{0x30, 0x30},
|
|
AliasPieces: metabase.AliasPieces{{Number: 0, Alias: 2}, {Number: 1, Alias: 3}, {Number: 7, Alias: 4}},
|
|
},
|
|
}
|
|
|
|
metabase := newMetabaseMock(nodes, segments...)
|
|
verifier := &verifierMock{allSuccess: true}
|
|
|
|
service, err := segmentverify.NewService(log.Named("segment-verify"), metabase, verifier, metabase, config)
|
|
require.NoError(t, err)
|
|
require.NotNil(t, service)
|
|
|
|
defer ctx.Check(service.Close)
|
|
|
|
err = service.ProcessRange(ctx, uuid.UUID{0x10, 0x10}, uuid.UUID{0x30, 0x30})
|
|
require.NoError(t, err)
|
|
|
|
for node, list := range verifier.processed {
|
|
assert.True(t, isUnique(list), "each node should process only once: %v %#v", node, list)
|
|
}
|
|
|
|
// node 1 is a priority node in the segments[0]
|
|
assert.Len(t, verifier.processed[nodes[1]], 1)
|
|
// we should get two other checks against the nodes in segments[8-10]
|
|
assert.Equal(t, 2,
|
|
len(verifier.processed[nodes[8]])+len(verifier.processed[nodes[9]])+len(verifier.processed[nodes[10]]),
|
|
)
|
|
// these correspond to checks against segment[1]
|
|
assert.Len(t, verifier.processed[nodes[2]], 1)
|
|
assert.Len(t, verifier.processed[nodes[3]], 1)
|
|
assert.Len(t, verifier.processed[nodes[4]], 1)
|
|
}()
|
|
|
|
retryCSV, err := os.ReadFile(config.RetryPath)
|
|
require.NoError(t, err)
|
|
require.Equal(t, "stream id,position,found,not found,retry\n", string(retryCSV))
|
|
|
|
notFoundCSV, err := os.ReadFile(config.NotFoundPath)
|
|
require.NoError(t, err)
|
|
require.Equal(t, "stream id,position,found,not found,retry\n", string(notFoundCSV))
|
|
}
|
|
|
|
func TestService_Buckets_Success(t *testing.T) {
|
|
ctx := testcontext.New(t)
|
|
log := testplanet.NewLogger(t)
|
|
|
|
config := segmentverify.ServiceConfig{
|
|
NotFoundPath: ctx.File("not-found.csv"),
|
|
RetryPath: ctx.File("retry.csv"),
|
|
ProblemPiecesPath: ctx.File("problem-pieces.csv"),
|
|
PriorityNodesPath: ctx.File("priority-nodes.txt"),
|
|
|
|
Check: 3,
|
|
BatchSize: 100,
|
|
Concurrency: 3,
|
|
MaxOffline: 2,
|
|
}
|
|
|
|
// the node 1 is going to be priority
|
|
err := os.WriteFile(config.PriorityNodesPath, []byte((storj.NodeID{1}).String()+"\n"), 0755)
|
|
require.NoError(t, err)
|
|
|
|
bucketListPath := ctx.File("buckets.csv")
|
|
err = os.WriteFile(bucketListPath, []byte(`
|
|
00000000000000000000000000000001,67616c617879
|
|
00000000000000000000000000000002,7368696e6f6269`), 0755)
|
|
require.NoError(t, err)
|
|
|
|
func() {
|
|
nodes := map[metabase.NodeAlias]storj.NodeID{}
|
|
for i := 1; i <= 0xFF; i++ {
|
|
nodes[metabase.NodeAlias(i)] = storj.NodeID{byte(i)}
|
|
}
|
|
|
|
segments := []metabase.VerifySegment{
|
|
{
|
|
StreamID: uuid.UUID{0x10, 0x10},
|
|
AliasPieces: metabase.AliasPieces{{Number: 1, Alias: 8}, {Number: 3, Alias: 9}, {Number: 5, Alias: 10}, {Number: 0, Alias: 1}},
|
|
},
|
|
{
|
|
StreamID: uuid.UUID{0x20, 0x20},
|
|
AliasPieces: metabase.AliasPieces{{Number: 0, Alias: 2}, {Number: 1, Alias: 3}, {Number: 7, Alias: 4}},
|
|
},
|
|
{ // this won't get processed due to the high limit
|
|
StreamID: uuid.UUID{0x30, 0x30},
|
|
AliasPieces: metabase.AliasPieces{{Number: 0, Alias: 2}, {Number: 1, Alias: 3}, {Number: 7, Alias: 4}},
|
|
},
|
|
}
|
|
|
|
metabase := newMetabaseMock(nodes, segments...)
|
|
verifier := &verifierMock{allSuccess: true}
|
|
|
|
metabase.AddStreamIDToBucket(uuid.UUID{1}, "67616c617879", uuid.UUID{0x10, 0x10})
|
|
metabase.AddStreamIDToBucket(uuid.UUID{2}, "7368696e6f6269", uuid.UUID{0x20, 0x20})
|
|
metabase.AddStreamIDToBucket(uuid.UUID{2}, "7777777", uuid.UUID{0x30, 0x30})
|
|
|
|
service, err := segmentverify.NewService(log.Named("segment-verify"), metabase, verifier, metabase, config)
|
|
require.NoError(t, err)
|
|
require.NotNil(t, service)
|
|
|
|
defer ctx.Check(service.Close)
|
|
|
|
bucketList, err := service.ParseBucketFile(bucketListPath)
|
|
require.NoError(t, err)
|
|
|
|
err = service.ProcessBuckets(ctx, bucketList.Buckets)
|
|
require.NoError(t, err)
|
|
|
|
for node, list := range verifier.processed {
|
|
assert.True(t, isUnique(list), "each node should process only once: %v %#v", node, list)
|
|
}
|
|
|
|
// node 1 is a priority node in the segments[0]
|
|
assert.Len(t, verifier.processed[nodes[1]], 1)
|
|
// we should get two other checks against the nodes in segments[8-10]
|
|
assert.Equal(t, 2,
|
|
len(verifier.processed[nodes[8]])+len(verifier.processed[nodes[9]])+len(verifier.processed[nodes[10]]),
|
|
)
|
|
}()
|
|
|
|
retryCSV, err := os.ReadFile(config.RetryPath)
|
|
require.NoError(t, err)
|
|
require.Equal(t, "stream id,position,found,not found,retry\n", string(retryCSV))
|
|
|
|
notFoundCSV, err := os.ReadFile(config.NotFoundPath)
|
|
require.NoError(t, err)
|
|
require.Equal(t, "stream id,position,found,not found,retry\n", string(notFoundCSV))
|
|
}
|
|
|
|
func TestService_Failures(t *testing.T) {
|
|
ctx := testcontext.New(t)
|
|
log := testplanet.NewLogger(t)
|
|
|
|
config := segmentverify.ServiceConfig{
|
|
NotFoundPath: ctx.File("not-found.csv"),
|
|
RetryPath: ctx.File("retry.csv"),
|
|
ProblemPiecesPath: ctx.File("problem-pieces.csv"),
|
|
PriorityNodesPath: ctx.File("priority-nodes.txt"),
|
|
|
|
Check: 2,
|
|
BatchSize: 100,
|
|
Concurrency: 3,
|
|
MaxOffline: 2,
|
|
}
|
|
|
|
// the node 1 is going to be priority
|
|
err := os.WriteFile(config.PriorityNodesPath, []byte((storj.NodeID{1}).String()+"\n"), 0755)
|
|
require.NoError(t, err)
|
|
|
|
func() {
|
|
nodes := map[metabase.NodeAlias]storj.NodeID{}
|
|
for i := 1; i <= 0xFF; i++ {
|
|
nodes[metabase.NodeAlias(i)] = storj.NodeID{byte(i)}
|
|
}
|
|
|
|
segments := []metabase.VerifySegment{
|
|
{
|
|
StreamID: uuid.UUID{0x10, 0x10},
|
|
AliasPieces: metabase.AliasPieces{{Number: 1, Alias: 8}, {Number: 3, Alias: 9}, {Number: 5, Alias: 10}, {Number: 0, Alias: 1}},
|
|
},
|
|
{
|
|
StreamID: uuid.UUID{0x20, 0x20},
|
|
AliasPieces: metabase.AliasPieces{{Number: 0, Alias: 2}, {Number: 1, Alias: 3}, {Number: 7, Alias: 4}},
|
|
},
|
|
{
|
|
StreamID: uuid.UUID{0x30, 0x30},
|
|
AliasPieces: metabase.AliasPieces{{Number: 0, Alias: 2}, {Number: 1, Alias: 3}, {Number: 7, Alias: 4}},
|
|
},
|
|
}
|
|
|
|
metabase := newMetabaseMock(nodes, segments...)
|
|
verifier := &verifierMock{
|
|
offline: []storj.NodeID{{0x02}, {0x08}, {0x09}, {0x0A}},
|
|
success: []uuid.UUID{segments[0].StreamID, segments[2].StreamID},
|
|
notFound: []uuid.UUID{segments[1].StreamID},
|
|
}
|
|
|
|
service, err := segmentverify.NewService(log.Named("segment-verify"), metabase, verifier, metabase, config)
|
|
require.NoError(t, err)
|
|
require.NotNil(t, service)
|
|
|
|
defer ctx.Check(service.Close)
|
|
|
|
err = service.ProcessRange(ctx, uuid.UUID{}, uuid.Max())
|
|
require.NoError(t, err)
|
|
|
|
for node, list := range verifier.processed {
|
|
assert.True(t, isUnique(list), "each node should process only once: %v %#v", node, list)
|
|
}
|
|
}()
|
|
|
|
retryCSV, err := os.ReadFile(config.RetryPath)
|
|
require.NoError(t, err)
|
|
require.Equal(t, ""+
|
|
"stream id,position,found,not found,retry\n"+
|
|
"10100000-0000-0000-0000-000000000000,0,1,0,1\n",
|
|
string(retryCSV))
|
|
|
|
notFoundCSV, err := os.ReadFile(config.NotFoundPath)
|
|
require.NoError(t, err)
|
|
require.Equal(t, ""+
|
|
"stream id,position,found,not found,retry\n"+
|
|
"20200000-0000-0000-0000-000000000000,0,0,2,0\n",
|
|
string(notFoundCSV))
|
|
}
|
|
|
|
func isUnique(segments []*segmentverify.Segment) bool {
|
|
type segmentID struct {
|
|
StreamID uuid.UUID
|
|
Position metabase.SegmentPosition
|
|
}
|
|
seen := map[segmentID]bool{}
|
|
for _, seg := range segments {
|
|
id := segmentID{StreamID: seg.StreamID, Position: seg.Position}
|
|
if seen[id] {
|
|
return false
|
|
}
|
|
seen[id] = true
|
|
}
|
|
return true
|
|
}
|
|
|
|
type metabaseMock struct {
|
|
nodeIDToAlias map[storj.NodeID]metabase.NodeAlias
|
|
aliasToNodeID map[metabase.NodeAlias]storj.NodeID
|
|
streamIDsPerBucket map[metabase.BucketLocation][]uuid.UUID
|
|
segments []metabase.VerifySegment
|
|
}
|
|
|
|
func newMetabaseMock(nodes map[metabase.NodeAlias]storj.NodeID, segments ...metabase.VerifySegment) *metabaseMock {
|
|
mock := &metabaseMock{
|
|
nodeIDToAlias: map[storj.NodeID]metabase.NodeAlias{},
|
|
aliasToNodeID: nodes,
|
|
segments: segments,
|
|
streamIDsPerBucket: make(map[metabase.BucketLocation][]uuid.UUID),
|
|
}
|
|
for n, id := range nodes {
|
|
mock.nodeIDToAlias[id] = n
|
|
}
|
|
return mock
|
|
}
|
|
|
|
func (db *metabaseMock) AddStreamIDToBucket(projectID uuid.UUID, bucketName string, streamIDs ...uuid.UUID) {
|
|
bucket := metabase.BucketLocation{ProjectID: projectID, BucketName: bucketName}
|
|
db.streamIDsPerBucket[bucket] = append(db.streamIDsPerBucket[bucket], streamIDs...)
|
|
}
|
|
|
|
func (db *metabaseMock) Get(ctx context.Context, nodeID storj.NodeID) (*overlay.NodeDossier, error) {
|
|
return &overlay.NodeDossier{
|
|
Node: pb.Node{
|
|
Id: nodeID,
|
|
Address: &pb.NodeAddress{
|
|
Address: fmt.Sprintf("nodeid:%v", nodeID),
|
|
},
|
|
},
|
|
}, nil
|
|
}
|
|
|
|
func (db *metabaseMock) SelectAllStorageNodesDownload(ctx context.Context, onlineWindow time.Duration, asOf overlay.AsOfSystemTimeConfig) ([]*overlay.SelectedNode, error) {
|
|
var xs []*overlay.SelectedNode
|
|
for nodeID := range db.nodeIDToAlias {
|
|
xs = append(xs, &overlay.SelectedNode{
|
|
ID: nodeID,
|
|
Address: &pb.NodeAddress{
|
|
Address: fmt.Sprintf("nodeid:%v", nodeID),
|
|
},
|
|
LastNet: "nodeid",
|
|
LastIPPort: fmt.Sprintf("nodeid:%v", nodeID),
|
|
CountryCode: 0,
|
|
})
|
|
}
|
|
return xs, nil
|
|
}
|
|
|
|
func (db *metabaseMock) LatestNodesAliasMap(ctx context.Context) (*metabase.NodeAliasMap, error) {
|
|
var entries []metabase.NodeAliasEntry
|
|
for id, alias := range db.nodeIDToAlias {
|
|
entries = append(entries, metabase.NodeAliasEntry{
|
|
ID: id,
|
|
Alias: alias,
|
|
})
|
|
}
|
|
return metabase.NewNodeAliasMap(entries), nil
|
|
}
|
|
|
|
func (db *metabaseMock) DeleteSegmentByPosition(ctx context.Context, opts metabase.GetSegmentByPosition) error {
|
|
for i, s := range db.segments {
|
|
if opts.StreamID == s.StreamID && opts.Position == s.Position {
|
|
db.segments = append(db.segments[:i], db.segments[i+1:]...)
|
|
return nil
|
|
}
|
|
}
|
|
return metabase.ErrSegmentNotFound.New("%v", opts)
|
|
}
|
|
|
|
func (db *metabaseMock) GetSegmentByPosition(ctx context.Context, opts metabase.GetSegmentByPosition) (segment metabase.Segment, err error) {
|
|
for _, s := range db.segments {
|
|
if opts.StreamID == s.StreamID && opts.Position == s.Position {
|
|
var pieces metabase.Pieces
|
|
for _, p := range s.AliasPieces {
|
|
pieces = append(pieces, metabase.Piece{
|
|
Number: p.Number,
|
|
StorageNode: db.aliasToNodeID[p.Alias],
|
|
})
|
|
}
|
|
|
|
return metabase.Segment{
|
|
StreamID: s.StreamID,
|
|
Position: s.Position,
|
|
Pieces: pieces,
|
|
}, nil
|
|
}
|
|
}
|
|
|
|
return metabase.Segment{}, metabase.ErrSegmentNotFound.New("%v", opts)
|
|
}
|
|
|
|
func (db *metabaseMock) ListBucketsStreamIDs(ctx context.Context, opts metabase.ListBucketsStreamIDs) (metabase.ListBucketsStreamIDsResult, error) {
|
|
result := metabase.ListBucketsStreamIDsResult{}
|
|
|
|
for _, bucket := range opts.BucketList.Buckets {
|
|
if bucket.ProjectID.Compare(opts.CursorBucket.ProjectID) <= 0 {
|
|
continue
|
|
}
|
|
if bucket.BucketName <= opts.CursorBucket.BucketName {
|
|
continue
|
|
}
|
|
if opts.CursorStreamID.IsZero() {
|
|
result.StreamIDs = append(result.StreamIDs, db.streamIDsPerBucket[bucket]...)
|
|
continue
|
|
}
|
|
for cursorIndex, streamID := range db.streamIDsPerBucket[bucket] {
|
|
if opts.CursorStreamID.Less(streamID) {
|
|
result.StreamIDs = append(result.StreamIDs, db.streamIDsPerBucket[bucket][cursorIndex:]...)
|
|
break
|
|
}
|
|
}
|
|
if len(result.StreamIDs) > opts.Limit {
|
|
break
|
|
}
|
|
}
|
|
if len(result.StreamIDs) > opts.Limit {
|
|
result.StreamIDs = result.StreamIDs[:opts.Limit]
|
|
}
|
|
sort.Slice(result.StreamIDs, func(i, j int) bool {
|
|
return result.StreamIDs[i].Less(result.StreamIDs[j])
|
|
})
|
|
return result, nil
|
|
}
|
|
|
|
func (db *metabaseMock) ListVerifySegments(ctx context.Context, opts metabase.ListVerifySegments) (result metabase.ListVerifySegmentsResult, err error) {
|
|
r := metabase.ListVerifySegmentsResult{}
|
|
|
|
for _, s := range db.segments {
|
|
if s.StreamID.Less(opts.CursorStreamID) {
|
|
continue
|
|
}
|
|
if s.StreamID == opts.CursorStreamID && !opts.CursorPosition.Less(s.Position) {
|
|
continue
|
|
}
|
|
|
|
r.Segments = append(r.Segments, s)
|
|
if len(r.Segments) >= opts.Limit {
|
|
break
|
|
}
|
|
}
|
|
|
|
return r, nil
|
|
}
|
|
|
|
type verifierMock struct {
|
|
allSuccess bool
|
|
fail error
|
|
offline []storj.NodeID
|
|
success []uuid.UUID
|
|
notFound []uuid.UUID
|
|
|
|
mu sync.Mutex
|
|
processed map[storj.NodeID][]*segmentverify.Segment
|
|
}
|
|
|
|
func (v *verifierMock) Verify(ctx context.Context, alias metabase.NodeAlias, target storj.NodeURL, targetVersion string, segments []*segmentverify.Segment, _ bool) (int, error) {
|
|
v.mu.Lock()
|
|
if v.processed == nil {
|
|
v.processed = map[storj.NodeID][]*segmentverify.Segment{}
|
|
}
|
|
v.processed[target.ID] = append(v.processed[target.ID], segments...)
|
|
v.mu.Unlock()
|
|
|
|
for _, n := range v.offline {
|
|
if n == target.ID {
|
|
return 0, segmentverify.ErrNodeOffline.New("node did not respond %v", target)
|
|
}
|
|
}
|
|
if v.fail != nil {
|
|
return 0, errs.Wrap(v.fail)
|
|
}
|
|
|
|
if v.allSuccess {
|
|
for _, seg := range segments {
|
|
seg.Status.MarkFound()
|
|
}
|
|
return len(segments), nil
|
|
}
|
|
|
|
for _, seg := range v.success {
|
|
for _, t := range segments {
|
|
if t.StreamID == seg {
|
|
t.Status.MarkFound()
|
|
}
|
|
}
|
|
}
|
|
for _, seg := range v.notFound {
|
|
for _, t := range segments {
|
|
if t.StreamID == seg {
|
|
t.Status.MarkNotFound()
|
|
}
|
|
}
|
|
}
|
|
|
|
return len(segments), nil
|
|
}
|