Re-enable repair checker tests (#1172)
* Re-enable repair checker tests * simpler NodeID construction * adjust benchmark
This commit is contained in:
parent
5e1ab841cf
commit
14434594c4
@ -56,11 +56,6 @@ type checker struct {
|
||||
// NewChecker creates a new instance of checker
|
||||
func NewChecker(pointerdb *pointerdb.Service, sdb statdb.DB, repairQueue queue.RepairQueue, overlay pb.OverlayServer, irrdb irreparable.DB, limit int, logger *zap.Logger, interval time.Duration) Checker {
|
||||
// TODO: reorder arguments
|
||||
return newChecker(pointerdb, sdb, repairQueue, overlay, irrdb, limit, logger, interval)
|
||||
}
|
||||
|
||||
// newChecker creates a new instance of checker
|
||||
func newChecker(pointerdb *pointerdb.Service, sdb statdb.DB, repairQueue queue.RepairQueue, overlay pb.OverlayServer, irrdb irreparable.DB, limit int, logger *zap.Logger, interval time.Duration) *checker {
|
||||
return &checker{
|
||||
statdb: sdb,
|
||||
pointerdb: pointerdb,
|
||||
|
@ -1,15 +1,9 @@
|
||||
// Copyright (C) 2019 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
//TODO: reenable
|
||||
// +build ignore
|
||||
|
||||
package checker_test
|
||||
|
||||
import (
|
||||
"math/rand"
|
||||
"sort"
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -18,22 +12,78 @@ import (
|
||||
|
||||
"storj.io/storj/internal/testcontext"
|
||||
"storj.io/storj/internal/testplanet"
|
||||
"storj.io/storj/internal/teststorj"
|
||||
"storj.io/storj/pkg/auth"
|
||||
"storj.io/storj/pkg/pb"
|
||||
"storj.io/storj/pkg/storj"
|
||||
"storj.io/storj/satellite/satellitedb"
|
||||
)
|
||||
|
||||
func TestIdentifyInjuredSegments(t *testing.T) {
|
||||
t.Skip("needs update")
|
||||
// TODO note satellite's: own sub-systems need to be disabled
|
||||
// TODO test irreparable ??
|
||||
|
||||
// logic should be roughly:
|
||||
// use storagenodes as the valid and
|
||||
// generate invalid ids
|
||||
// identify should then find the invalid ones
|
||||
// note satellite's: own sub-systems need to be disabled
|
||||
tctx := testcontext.New(t)
|
||||
defer tctx.Cleanup()
|
||||
|
||||
const numberOfNodes = 10
|
||||
planet, err := testplanet.New(t, 1, 4, 0)
|
||||
require.NoError(t, err)
|
||||
defer tctx.Check(planet.Shutdown)
|
||||
|
||||
planet.Start(tctx)
|
||||
time.Sleep(2 * time.Second)
|
||||
|
||||
pieces := make([]*pb.RemotePiece, 0, numberOfNodes)
|
||||
// use online nodes
|
||||
for i, storagenode := range planet.StorageNodes {
|
||||
pieces = append(pieces, &pb.RemotePiece{
|
||||
PieceNum: int32(i),
|
||||
NodeId: storagenode.Identity.ID,
|
||||
})
|
||||
}
|
||||
|
||||
// simulate offline nodes
|
||||
expectedLostPieces := make(map[int32]bool)
|
||||
for i := len(pieces); i < numberOfNodes; i++ {
|
||||
pieces = append(pieces, &pb.RemotePiece{
|
||||
PieceNum: int32(i),
|
||||
NodeId: storj.NodeID{byte(i)},
|
||||
})
|
||||
expectedLostPieces[int32(i)] = true
|
||||
}
|
||||
pointer := &pb.Pointer{
|
||||
Remote: &pb.RemoteSegment{
|
||||
Redundancy: &pb.RedundancyScheme{
|
||||
MinReq: int32(4),
|
||||
RepairThreshold: int32(8),
|
||||
},
|
||||
PieceId: "fake-piece-id",
|
||||
RemotePieces: pieces,
|
||||
},
|
||||
}
|
||||
|
||||
// put test pointer to db
|
||||
pointerdb := planet.Satellites[0].Metainfo.Service
|
||||
err = pointerdb.Put(pointer.Remote.PieceId, pointer)
|
||||
assert.NoError(t, err)
|
||||
|
||||
checker := planet.Satellites[0].Repair.Checker
|
||||
err = checker.IdentifyInjuredSegments(tctx)
|
||||
assert.NoError(t, err)
|
||||
|
||||
//check if the expected segments were added to the queue
|
||||
repairQueue := planet.Satellites[0].DB.RepairQueue()
|
||||
injuredSegment, err := repairQueue.Dequeue(tctx)
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.Equal(t, "fake-piece-id", injuredSegment.Path)
|
||||
assert.Equal(t, len(expectedLostPieces), len(injuredSegment.LostPieces))
|
||||
for _, lostPiece := range injuredSegment.LostPieces {
|
||||
if !expectedLostPieces[lostPiece] {
|
||||
t.Error("should be lost: ", lostPiece)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestOfflineNodes(t *testing.T) {
|
||||
tctx := testcontext.New(t)
|
||||
defer tctx.Cleanup()
|
||||
|
||||
@ -44,222 +94,87 @@ func TestIdentifyInjuredSegments(t *testing.T) {
|
||||
planet.Start(tctx)
|
||||
time.Sleep(2 * time.Second)
|
||||
|
||||
pointerdb := planet.Satellites[0].Metainfo.Endpoint
|
||||
repairQueue := planet.Satellites[0].DB.RepairQueue()
|
||||
|
||||
ctx := auth.WithAPIKey(tctx, nil)
|
||||
|
||||
const N = 25
|
||||
nodes := []*pb.Node{}
|
||||
segs := []*pb.InjuredSegment{}
|
||||
//fill a pointerdb
|
||||
for i := 0; i < N; i++ {
|
||||
s := strconv.Itoa(i)
|
||||
ids := teststorj.NodeIDsFromStrings([]string{s + "a", s + "b", s + "c", s + "d"}...)
|
||||
|
||||
p := &pb.Pointer{
|
||||
Remote: &pb.RemoteSegment{
|
||||
Redundancy: &pb.RedundancyScheme{
|
||||
RepairThreshold: int32(2),
|
||||
},
|
||||
PieceId: strconv.Itoa(i),
|
||||
RemotePieces: []*pb.RemotePiece{
|
||||
{PieceNum: 0, NodeId: ids[0]},
|
||||
{PieceNum: 1, NodeId: ids[1]},
|
||||
{PieceNum: 2, NodeId: ids[2]},
|
||||
{PieceNum: 3, NodeId: ids[3]},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
req := &pb.PutRequest{
|
||||
Path: p.Remote.PieceId,
|
||||
Pointer: p,
|
||||
}
|
||||
|
||||
resp, err := pointerdb.Put(ctx, req)
|
||||
assert.NotNil(t, resp)
|
||||
assert.NoError(t, err)
|
||||
|
||||
//nodes for cache
|
||||
selection := rand.Intn(4)
|
||||
for _, v := range ids[:selection] {
|
||||
n := &pb.Node{Id: v, Type: pb.NodeType_STORAGE, Address: &pb.NodeAddress{Address: ""}}
|
||||
nodes = append(nodes, n)
|
||||
}
|
||||
|
||||
pieces := []int32{0, 1, 2, 3}
|
||||
//expected injured segments
|
||||
if len(ids[:selection]) < int(p.Remote.Redundancy.RepairThreshold) {
|
||||
seg := &pb.InjuredSegment{
|
||||
Path: p.Remote.PieceId,
|
||||
LostPieces: pieces[selection:],
|
||||
}
|
||||
segs = append(segs, seg)
|
||||
}
|
||||
}
|
||||
|
||||
checker := planet.Satellites[0].Repair.Checker
|
||||
assert.NoError(t, err)
|
||||
err = checker.IdentifyInjuredSegments(ctx)
|
||||
assert.NoError(t, err)
|
||||
|
||||
expected := map[string]*pb.InjuredSegment{}
|
||||
for _, seg := range segs {
|
||||
expected[seg.Path] = seg
|
||||
}
|
||||
|
||||
//check if the expected segments were added to the queue
|
||||
dequeued := []*pb.InjuredSegment{}
|
||||
for i := 0; i < len(segs); i++ {
|
||||
injSeg, err := repairQueue.Dequeue(ctx)
|
||||
assert.NoError(t, err)
|
||||
|
||||
if _, ok := expected[injSeg.Path]; ok {
|
||||
t.Log("got", injSeg.Path)
|
||||
delete(expected, injSeg.Path)
|
||||
} else {
|
||||
t.Error("unexpected", injSeg)
|
||||
}
|
||||
dequeued = append(dequeued, &injSeg)
|
||||
}
|
||||
|
||||
for _, missing := range expected {
|
||||
t.Error("did not get", missing)
|
||||
}
|
||||
}
|
||||
|
||||
func TestOfflineNodes(t *testing.T) {
|
||||
t.Skip("needs update")
|
||||
|
||||
tctx := testcontext.New(t)
|
||||
defer tctx.Cleanup()
|
||||
|
||||
planet, err := testplanet.New(t, 1, 0, 0)
|
||||
require.NoError(t, err)
|
||||
defer tctx.Check(planet.Shutdown)
|
||||
|
||||
planet.Start(tctx)
|
||||
time.Sleep(2 * time.Second)
|
||||
|
||||
const N = 50
|
||||
nodes := []*pb.Node{}
|
||||
const numberOfNodes = 10
|
||||
nodeIDs := storj.NodeIDList{}
|
||||
expectedOffline := []int32{}
|
||||
for i := 0; i < N; i++ {
|
||||
id := teststorj.NodeIDFromString(strconv.Itoa(i))
|
||||
n := &pb.Node{Id: id, Type: pb.NodeType_STORAGE, Address: &pb.NodeAddress{Address: ""}}
|
||||
nodes = append(nodes, n)
|
||||
if i%(rand.Intn(5)+2) == 0 {
|
||||
nodeIDs = append(nodeIDs, teststorj.NodeIDFromString("id"+id.String()))
|
||||
expectedOffline = append(expectedOffline, int32(i))
|
||||
} else {
|
||||
nodeIDs = append(nodeIDs, id)
|
||||
|
||||
// use online nodes
|
||||
for _, storagenode := range planet.StorageNodes {
|
||||
nodeIDs = append(nodeIDs, storagenode.Identity.ID)
|
||||
}
|
||||
|
||||
// simulate offline nodes
|
||||
expectedOffline := make([]int32, 0)
|
||||
for i := len(nodeIDs); i < numberOfNodes; i++ {
|
||||
nodeIDs = append(nodeIDs, storj.NodeID{byte(i)})
|
||||
expectedOffline = append(expectedOffline, int32(i))
|
||||
}
|
||||
|
||||
checker := planet.Satellites[0].Repair.Checker
|
||||
assert.NoError(t, err)
|
||||
offline, err := checker.OfflineNodes(tctx, nodeIDs)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, expectedOffline, offline)
|
||||
}
|
||||
|
||||
func BenchmarkIdentifyInjuredSegments(b *testing.B) {
|
||||
b.Skip("needs update")
|
||||
|
||||
tctx := testcontext.New(b)
|
||||
defer tctx.Cleanup()
|
||||
|
||||
planet, err := testplanet.New(b, 1, 0, 0)
|
||||
const numberOfNodes = 10
|
||||
planet, err := testplanet.New(b, 1, 4, 0)
|
||||
require.NoError(b, err)
|
||||
defer tctx.Check(planet.Shutdown)
|
||||
|
||||
planet.Start(tctx)
|
||||
time.Sleep(2 * time.Second)
|
||||
|
||||
pointerdb := planet.Satellites[0].Metainfo.Endpoint
|
||||
repairQueue := planet.Satellites[0].DB.RepairQueue()
|
||||
pieces := make([]*pb.RemotePiece, 0, numberOfNodes)
|
||||
// use online nodes
|
||||
for i, storagenode := range planet.StorageNodes {
|
||||
pieces = append(pieces, &pb.RemotePiece{
|
||||
PieceNum: int32(i),
|
||||
NodeId: storagenode.Identity.ID,
|
||||
})
|
||||
}
|
||||
|
||||
ctx := auth.WithAPIKey(tctx, nil)
|
||||
|
||||
// creating in-memory db and opening connection
|
||||
db, err := satellitedb.NewInMemory()
|
||||
defer func() {
|
||||
err = db.Close()
|
||||
assert.NoError(b, err)
|
||||
}()
|
||||
err = db.CreateTables()
|
||||
assert.NoError(b, err)
|
||||
|
||||
const N = 25
|
||||
nodes := []*pb.Node{}
|
||||
segs := []*pb.InjuredSegment{}
|
||||
//fill a pointerdb
|
||||
for i := 0; i < N; i++ {
|
||||
s := strconv.Itoa(i)
|
||||
ids := teststorj.NodeIDsFromStrings([]string{s + "a", s + "b", s + "c", s + "d"}...)
|
||||
|
||||
p := &pb.Pointer{
|
||||
// simulate offline nodes
|
||||
expectedLostPieces := make(map[int32]bool)
|
||||
for i := len(pieces); i < numberOfNodes; i++ {
|
||||
pieces = append(pieces, &pb.RemotePiece{
|
||||
PieceNum: int32(i),
|
||||
NodeId: storj.NodeID{byte(i)},
|
||||
})
|
||||
expectedLostPieces[int32(i)] = true
|
||||
}
|
||||
pointer := &pb.Pointer{
|
||||
Remote: &pb.RemoteSegment{
|
||||
Redundancy: &pb.RedundancyScheme{
|
||||
RepairThreshold: int32(2),
|
||||
MinReq: int32(4),
|
||||
RepairThreshold: int32(8),
|
||||
},
|
||||
PieceId: strconv.Itoa(i),
|
||||
RemotePieces: []*pb.RemotePiece{
|
||||
{PieceNum: 0, NodeId: ids[0]},
|
||||
{PieceNum: 1, NodeId: ids[1]},
|
||||
{PieceNum: 2, NodeId: ids[2]},
|
||||
{PieceNum: 3, NodeId: ids[3]},
|
||||
PieceId: "fake-piece-id",
|
||||
RemotePieces: pieces,
|
||||
},
|
||||
},
|
||||
}
|
||||
req := &pb.PutRequest{
|
||||
Path: p.Remote.PieceId,
|
||||
Pointer: p,
|
||||
}
|
||||
|
||||
resp, err := pointerdb.Put(ctx, req)
|
||||
assert.NotNil(b, resp)
|
||||
pointerdb := planet.Satellites[0].Metainfo.Service
|
||||
err = pointerdb.Put(pointer.Remote.PieceId, pointer)
|
||||
assert.NoError(b, err)
|
||||
|
||||
//nodes for cache
|
||||
selection := rand.Intn(4)
|
||||
for _, v := range ids[:selection] {
|
||||
n := &pb.Node{Id: v, Type: pb.NodeType_STORAGE, Address: &pb.NodeAddress{Address: ""}}
|
||||
nodes = append(nodes, n)
|
||||
}
|
||||
pieces := []int32{0, 1, 2, 3}
|
||||
//expected injured segments
|
||||
if len(ids[:selection]) < int(p.Remote.Redundancy.RepairThreshold) {
|
||||
seg := &pb.InjuredSegment{
|
||||
Path: p.Remote.PieceId,
|
||||
LostPieces: pieces[selection:],
|
||||
}
|
||||
segs = append(segs, seg)
|
||||
}
|
||||
}
|
||||
//fill a overlay cache
|
||||
repairQueue := planet.Satellites[0].DB.RepairQueue()
|
||||
checker := planet.Satellites[0].Repair.Checker
|
||||
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
checker := planet.Satellites[0].Repair.Checker
|
||||
err = checker.IdentifyInjuredSegments(tctx)
|
||||
assert.NoError(b, err)
|
||||
|
||||
err = checker.IdentifyInjuredSegments(ctx)
|
||||
injuredSegment, err := repairQueue.Dequeue(tctx)
|
||||
assert.NoError(b, err)
|
||||
|
||||
//check if the expected segments were added to the queue
|
||||
dequeued := []*pb.InjuredSegment{}
|
||||
for i := 0; i < len(segs); i++ {
|
||||
injSeg, err := repairQueue.Dequeue(ctx)
|
||||
assert.NoError(b, err)
|
||||
dequeued = append(dequeued, &injSeg)
|
||||
}
|
||||
sort.Slice(segs, func(i, k int) bool { return segs[i].Path < segs[k].Path })
|
||||
sort.Slice(dequeued, func(i, k int) bool { return dequeued[i].Path < dequeued[k].Path })
|
||||
|
||||
for i := 0; i < len(segs); i++ {
|
||||
assert.True(b, pb.Equal(segs[i], dequeued[i]))
|
||||
assert.Equal(b, "fake-piece-id", injuredSegment.Path)
|
||||
assert.Equal(b, len(expectedLostPieces), len(injuredSegment.LostPieces))
|
||||
for _, lostPiece := range injuredSegment.LostPieces {
|
||||
assert.Equal(b, true, expectedLostPieces[lostPiece])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user