satellite/{metainfo,gracefulexit}: fix failing tests
Change-Id: I3428ea601255c36a316732c9f75135d6e5fa4d79
This commit is contained in:
parent
4d37d14929
commit
18825d1e0b
@ -29,6 +29,7 @@ import (
|
||||
"storj.io/storj/private/testblobs"
|
||||
"storj.io/storj/private/testplanet"
|
||||
"storj.io/storj/satellite"
|
||||
"storj.io/storj/satellite/metainfo"
|
||||
"storj.io/storj/satellite/metainfo/metabase"
|
||||
"storj.io/storj/satellite/overlay"
|
||||
"storj.io/storj/storagenode"
|
||||
@ -1419,3 +1420,57 @@ func findNodeToExit(ctx context.Context, planet *testplanet.Planet, objects int)
|
||||
|
||||
return planet.FindNode(exitingNodeID), nil
|
||||
}
|
||||
|
||||
func TestUpdatePiecesCheckDuplicates(t *testing.T) {
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 3, UplinkCount: 1,
|
||||
Reconfigure: testplanet.Reconfigure{
|
||||
Satellite: testplanet.ReconfigureRS(1, 1, 3, 3),
|
||||
},
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
satellite := planet.Satellites[0]
|
||||
uplinkPeer := planet.Uplinks[0]
|
||||
path := "test/path"
|
||||
|
||||
err := uplinkPeer.Upload(ctx, satellite, "test1", path, testrand.Bytes(5*memory.KiB))
|
||||
require.NoError(t, err)
|
||||
|
||||
segments, err := satellite.Metainfo.Metabase.TestingAllSegments(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, segments, 1)
|
||||
|
||||
pieces := segments[0].Pieces
|
||||
require.False(t, hasDuplicates(pieces))
|
||||
|
||||
// Remove second piece in the list and replace it with
|
||||
// a piece on the first node.
|
||||
// This way we can ensure that we use a valid piece num.
|
||||
removePiece := metabase.Piece{
|
||||
Number: pieces[1].Number,
|
||||
StorageNode: pieces[1].StorageNode,
|
||||
}
|
||||
addPiece := metabase.Piece{
|
||||
Number: pieces[1].Number,
|
||||
StorageNode: pieces[0].StorageNode,
|
||||
}
|
||||
|
||||
// test no duplicates
|
||||
err = satellite.GracefulExit.Endpoint.UpdatePiecesCheckDuplicates(ctx, segments[0], metabase.Pieces{addPiece}, metabase.Pieces{removePiece}, true)
|
||||
require.True(t, metainfo.ErrNodeAlreadyExists.Has(err))
|
||||
})
|
||||
}
|
||||
|
||||
func hasDuplicates(pieces metabase.Pieces) bool {
|
||||
nodePieceCounts := make(map[storj.NodeID]int)
|
||||
for _, piece := range pieces {
|
||||
nodePieceCounts[piece.StorageNode]++
|
||||
}
|
||||
|
||||
for _, count := range nodePieceCounts {
|
||||
if count > 1 {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
@ -540,14 +540,9 @@ func TestDeleteBucket(t *testing.T) {
|
||||
err = uplnk.Upload(ctx, planet.Satellites[0], expectedBucketName, "remote-segment-inline-object", testrand.Bytes(33*memory.KiB))
|
||||
require.NoError(t, err)
|
||||
|
||||
listResp, err := satelliteSys.API.Metainfo.Endpoint2.ListObjects(ctx, &pb.ObjectListRequest{
|
||||
Header: &pb.RequestHeader{
|
||||
ApiKey: apiKey.SerializeRaw(),
|
||||
},
|
||||
Bucket: []byte(expectedBucketName),
|
||||
})
|
||||
objects, err := satelliteSys.API.Metainfo.Metabase.TestingAllObjects(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, listResp.GetItems(), 3)
|
||||
require.Len(t, objects, 3)
|
||||
|
||||
delResp, err := satelliteSys.API.Metainfo.Endpoint2.DeleteBucket(ctx, &pb.BucketDeleteRequest{
|
||||
Header: &pb.RequestHeader{
|
||||
|
@ -396,7 +396,7 @@ func (endpoint *Endpoint) DeleteBucket(ctx context.Context, req *pb.BucketDelete
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &pb.BucketDeleteResponse{Bucket: convBucket, DeletedObjectsCount: int64(deletedObjCount)}, nil
|
||||
return &pb.BucketDeleteResponse{Bucket: convBucket, DeletedObjectsCount: deletedObjCount}, nil
|
||||
}
|
||||
if storj.ErrBucketNotFound.Has(err) {
|
||||
return &pb.BucketDeleteResponse{Bucket: convBucket}, nil
|
||||
@ -409,7 +409,7 @@ func (endpoint *Endpoint) DeleteBucket(ctx context.Context, req *pb.BucketDelete
|
||||
|
||||
// deleteBucketNotEmpty deletes all objects from bucket and deletes this bucket.
|
||||
// On success, it returns only the number of deleted objects.
|
||||
func (endpoint *Endpoint) deleteBucketNotEmpty(ctx context.Context, projectID uuid.UUID, bucketName []byte) ([]byte, int, error) {
|
||||
func (endpoint *Endpoint) deleteBucketNotEmpty(ctx context.Context, projectID uuid.UUID, bucketName []byte) ([]byte, int64, error) {
|
||||
deletedCount, err := endpoint.deleteBucketObjects(ctx, projectID, bucketName)
|
||||
if err != nil {
|
||||
return nil, 0, rpcstatus.Error(rpcstatus.Internal, err.Error())
|
||||
@ -430,11 +430,11 @@ func (endpoint *Endpoint) deleteBucketNotEmpty(ctx context.Context, projectID uu
|
||||
}
|
||||
|
||||
// deleteBucketObjects deletes all objects in a bucket.
|
||||
func (endpoint *Endpoint) deleteBucketObjects(ctx context.Context, projectID uuid.UUID, bucketName []byte) (_ int, err error) {
|
||||
func (endpoint *Endpoint) deleteBucketObjects(ctx context.Context, projectID uuid.UUID, bucketName []byte) (_ int64, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
bucketLocation := metabase.BucketLocation{ProjectID: projectID, BucketName: string(bucketName)}
|
||||
_, err = endpoint.metainfo.metabaseDB.DeleteBucketObjects(ctx, metabase.DeleteBucketObjects{
|
||||
deletedObjects, err := endpoint.metainfo.metabaseDB.DeleteBucketObjects(ctx, metabase.DeleteBucketObjects{
|
||||
Bucket: bucketLocation,
|
||||
DeletePieces: func(ctx context.Context, deleted []metabase.DeletedSegmentInfo) error {
|
||||
endpoint.deleteSegmentPieces(ctx, deleted)
|
||||
@ -442,7 +442,7 @@ func (endpoint *Endpoint) deleteBucketObjects(ctx context.Context, projectID uui
|
||||
},
|
||||
})
|
||||
|
||||
return 0, Error.Wrap(err)
|
||||
return deletedObjects, Error.Wrap(err)
|
||||
}
|
||||
|
||||
// ListBuckets returns buckets in a project where the bucket name matches the request cursor.
|
||||
|
@ -4,7 +4,6 @@
|
||||
package metainfo_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"testing"
|
||||
@ -18,52 +17,11 @@ import (
|
||||
"storj.io/common/testcontext"
|
||||
"storj.io/common/testrand"
|
||||
"storj.io/storj/private/testplanet"
|
||||
"storj.io/storj/satellite/metainfo"
|
||||
"storj.io/storj/satellite/metainfo/metabase"
|
||||
"storj.io/storj/storage"
|
||||
)
|
||||
|
||||
const lastSegmentIndex = -1
|
||||
|
||||
func TestIterate(t *testing.T) {
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 6, UplinkCount: 1,
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
saPeer := planet.Satellites[0]
|
||||
uplinkPeer := planet.Uplinks[0]
|
||||
|
||||
// Setup: create 2 test buckets
|
||||
err := uplinkPeer.CreateBucket(ctx, saPeer, "test1")
|
||||
require.NoError(t, err)
|
||||
err = uplinkPeer.CreateBucket(ctx, saPeer, "test2")
|
||||
require.NoError(t, err)
|
||||
|
||||
// Setup: upload an object in one of the buckets
|
||||
expectedData := testrand.Bytes(50 * memory.KiB)
|
||||
err = uplinkPeer.Upload(ctx, saPeer, "test2", "test/path", expectedData)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Test: Confirm that only the objects are in pointerDB
|
||||
// and not the bucket metadata
|
||||
var itemCount int
|
||||
err = saPeer.Metainfo.Database.Iterate(ctx, storage.IterateOptions{Recurse: true},
|
||||
func(ctx context.Context, it storage.Iterator) error {
|
||||
var item storage.ListItem
|
||||
for it.Next(ctx, &item) {
|
||||
itemCount++
|
||||
pathElements := storj.SplitPath(storj.Path(item.Key))
|
||||
// there should not be any objects in pointerDB with less than 4 path
|
||||
// elements. i.e buckets should not be stored in pointerDB
|
||||
require.True(t, len(pathElements) > 3)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, err)
|
||||
// There should only be 1 item in pointerDB, the one object
|
||||
require.Equal(t, 1, itemCount)
|
||||
})
|
||||
}
|
||||
|
||||
// TestGetItems_ReturnValueOrder ensures the return value
|
||||
// of GetItems will always be the same order as the requested paths.
|
||||
// The test does following steps:
|
||||
@ -139,71 +97,6 @@ func TestGetItems_ReturnValueOrder(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestUpdatePiecesCheckDuplicates(t *testing.T) {
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 3, UplinkCount: 1,
|
||||
Reconfigure: testplanet.Reconfigure{
|
||||
Satellite: testplanet.ReconfigureRS(1, 1, 3, 3),
|
||||
},
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
satellite := planet.Satellites[0]
|
||||
uplinkPeer := planet.Uplinks[0]
|
||||
path := "test/path"
|
||||
|
||||
err := uplinkPeer.Upload(ctx, satellite, "test1", path, testrand.Bytes(5*memory.KiB))
|
||||
require.NoError(t, err)
|
||||
|
||||
keys, err := satellite.Metainfo.Database.List(ctx, nil, 1)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 1, len(keys))
|
||||
|
||||
encPath, err := metabase.ParseSegmentKey(metabase.SegmentKey(keys[0]))
|
||||
require.NoError(t, err)
|
||||
pointer, err := satellite.Metainfo.Service.Get(ctx, encPath.Encode())
|
||||
require.NoError(t, err)
|
||||
|
||||
pieces := pointer.GetRemote().GetRemotePieces()
|
||||
require.False(t, hasDuplicates(pointer.GetRemote().GetRemotePieces()))
|
||||
|
||||
// Remove second piece in the list and replace it with
|
||||
// a piece on the first node.
|
||||
// This way we can ensure that we use a valid piece num.
|
||||
removePiece := &pb.RemotePiece{
|
||||
PieceNum: pieces[1].PieceNum,
|
||||
NodeId: pieces[1].NodeId,
|
||||
}
|
||||
addPiece := &pb.RemotePiece{
|
||||
PieceNum: pieces[1].PieceNum,
|
||||
NodeId: pieces[0].NodeId,
|
||||
}
|
||||
|
||||
// test no duplicates
|
||||
updPointer, err := satellite.Metainfo.Service.UpdatePiecesCheckDuplicates(ctx, encPath.Encode(), pointer, []*pb.RemotePiece{addPiece}, []*pb.RemotePiece{removePiece}, true)
|
||||
require.True(t, metainfo.ErrNodeAlreadyExists.Has(err))
|
||||
require.False(t, hasDuplicates(updPointer.GetRemote().GetRemotePieces()))
|
||||
|
||||
// test allow duplicates
|
||||
updPointer, err = satellite.Metainfo.Service.UpdatePieces(ctx, encPath.Encode(), pointer, []*pb.RemotePiece{addPiece}, []*pb.RemotePiece{removePiece})
|
||||
require.NoError(t, err)
|
||||
require.True(t, hasDuplicates(updPointer.GetRemote().GetRemotePieces()))
|
||||
})
|
||||
}
|
||||
|
||||
func hasDuplicates(pieces []*pb.RemotePiece) bool {
|
||||
nodePieceCounts := make(map[storj.NodeID]int)
|
||||
for _, piece := range pieces {
|
||||
nodePieceCounts[piece.NodeId]++
|
||||
}
|
||||
|
||||
for _, count := range nodePieceCounts {
|
||||
if count > 1 {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func TestCountBuckets(t *testing.T) {
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 1,
|
||||
|
Loading…
Reference in New Issue
Block a user