satellite/gc: add test for pending object

Change-Id: Ifb076ab38442f88f94a3e0c2ae1b19528a55f724
This commit is contained in:
Egon Elbre 2020-12-21 16:59:11 +02:00
parent 66d4d5eb48
commit c4578eb3ec
3 changed files with 103 additions and 9 deletions

View File

@ -4,6 +4,8 @@
package gc_test
import (
"bytes"
"context"
"errors"
"testing"
"time"
@ -21,9 +23,11 @@ import (
"storj.io/common/testrand"
"storj.io/storj/private/testplanet"
"storj.io/storj/satellite"
"storj.io/storj/satellite/gc"
"storj.io/storj/satellite/metainfo/metabase"
"storj.io/storj/storage"
"storj.io/storj/storagenode"
"storj.io/uplink/private/testuplink"
)
// TestGarbageCollection does the following:
@ -177,3 +181,93 @@ func encryptionAccess(access string) (*encryption.Store, error) {
return store, nil
}
func TestGarbageCollection_PendingObject(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
Satellite: testplanet.Combine(
func(log *zap.Logger, index int, config *satellite.Config) {
config.GarbageCollection.FalsePositiveRate = 0.000000001
config.GarbageCollection.Interval = 500 * time.Millisecond
},
testplanet.MaxSegmentSize(20*memory.KiB),
),
StorageNode: func(index int, config *storagenode.Config) {
config.Retain.MaxTimeSkew = 0
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
upl := planet.Uplinks[0]
testData := testrand.Bytes(15 * memory.KiB)
pendingStreamID := startMultipartUpload(ctx, t, upl, satellite, "testbucket", "multi", testData)
segments, err := satellite.Metainfo.Metabase.TestingAllSegments(ctx)
require.NoError(t, err)
require.Len(t, segments, 1)
require.Len(t, segments[0].Pieces, 1)
// The pieceInfo.GetPieceIDs query converts piece creation and the filter creation timestamps
// to datetime in sql. This chops off all precision beyond seconds.
// In this test, the amount of time that elapses between piece uploads and the gc loop is
// less than a second, meaning datetime(piece_creation) < datetime(filter_creation) is false unless we sleep
// for a second.
lastPieceCounts := map[storj.NodeID]int{}
pieceTracker := gc.NewPieceTracker(satellite.Log.Named("gc observer"), gc.Config{
FalsePositiveRate: 0.000000001,
InitialPieces: 10,
}, lastPieceCounts)
err = satellite.Metainfo.Loop.Join(ctx, pieceTracker)
require.NoError(t, err)
require.NotEmpty(t, pieceTracker.RetainInfos)
info := pieceTracker.RetainInfos[planet.StorageNodes[0].ID()]
require.NotNil(t, info)
require.Equal(t, 1, info.Count)
completeMultipartUpload(ctx, t, upl, satellite, "testbucket", "multi", pendingStreamID)
gotData, err := upl.Download(ctx, satellite, "testbucket", "multi")
require.NoError(t, err)
require.Equal(t, testData, gotData)
})
}
func startMultipartUpload(ctx context.Context, t *testing.T, uplink *testplanet.Uplink, satellite *testplanet.Satellite, bucketName string, path storj.Path, data []byte) string {
_, found := testuplink.GetMaxSegmentSize(ctx)
if !found {
ctx = testuplink.WithMaxSegmentSize(ctx, satellite.Config.Metainfo.MaxSegmentSize)
}
project, err := uplink.GetProject(ctx, satellite)
require.NoError(t, err)
defer func() { require.NoError(t, project.Close()) }()
_, err = project.EnsureBucket(ctx, bucketName)
require.NoError(t, err)
info, err := project.NewMultipartUpload(ctx, bucketName, path, nil)
require.NoError(t, err)
_, err = project.PutObjectPart(ctx, bucketName, path, info.StreamID, 1, bytes.NewReader(data))
require.NoError(t, err)
return info.StreamID
}
func completeMultipartUpload(ctx context.Context, t *testing.T, uplink *testplanet.Uplink, satellite *testplanet.Satellite, bucketName string, path storj.Path, streamID string) {
_, found := testuplink.GetMaxSegmentSize(ctx)
if !found {
ctx = testuplink.WithMaxSegmentSize(ctx, satellite.Config.Metainfo.MaxSegmentSize)
}
project, err := uplink.GetProject(ctx, satellite)
require.NoError(t, err)
defer func() { require.NoError(t, project.Close()) }()
_, err = project.CompleteMultipartUpload(ctx, bucketName, path, streamID, nil)
require.NoError(t, err)
}

View File

@ -27,7 +27,7 @@ type PieceTracker struct {
// TODO: should we use int or int64 consistently for piece count (db type is int64)?
pieceCounts map[storj.NodeID]int
retainInfos map[storj.NodeID]*RetainInfo
RetainInfos map[storj.NodeID]*RetainInfo
}
// NewPieceTracker instantiates a new gc piece tracker to be subscribed to the metainfo loop.
@ -38,7 +38,7 @@ func NewPieceTracker(log *zap.Logger, config Config, pieceCounts map[storj.NodeI
creationDate: time.Now().UTC(),
pieceCounts: pieceCounts,
retainInfos: make(map[storj.NodeID]*RetainInfo, len(pieceCounts)),
RetainInfos: make(map[storj.NodeID]*RetainInfo, len(pieceCounts)),
}
}
@ -66,7 +66,7 @@ func (pieceTracker *PieceTracker) InlineSegment(ctx context.Context, segment *me
// adds a pieceID to the relevant node's RetainInfo.
func (pieceTracker *PieceTracker) add(nodeID storj.NodeID, pieceID storj.PieceID) {
if _, ok := pieceTracker.retainInfos[nodeID]; !ok {
if _, ok := pieceTracker.RetainInfos[nodeID]; !ok {
// If we know how many pieces a node should be storing, use that number. Otherwise use default.
numPieces := pieceTracker.config.InitialPieces
if pieceTracker.pieceCounts[nodeID] > 0 {
@ -74,12 +74,12 @@ func (pieceTracker *PieceTracker) add(nodeID storj.NodeID, pieceID storj.PieceID
}
// limit size of bloom filter to ensure we are under the limit for RPC
filter := bloomfilter.NewOptimalMaxSize(numPieces, pieceTracker.config.FalsePositiveRate, 2*memory.MiB)
pieceTracker.retainInfos[nodeID] = &RetainInfo{
pieceTracker.RetainInfos[nodeID] = &RetainInfo{
Filter: filter,
CreationDate: pieceTracker.creationDate,
}
}
pieceTracker.retainInfos[nodeID].Filter.Add(pieceID)
pieceTracker.retainInfos[nodeID].Count++
pieceTracker.RetainInfos[nodeID].Filter.Add(pieceID)
pieceTracker.RetainInfos[nodeID].Count++
}

View File

@ -114,7 +114,7 @@ func (service *Service) Run(ctx context.Context) (err error) {
for id := range lastPieceCounts {
delete(lastPieceCounts, id)
}
for id, info := range pieceTracker.retainInfos {
for id, info := range pieceTracker.RetainInfos {
lastPieceCounts[id] = info.Count
}
@ -125,14 +125,14 @@ func (service *Service) Run(ctx context.Context) (err error) {
}
// monitor information
for _, info := range pieceTracker.retainInfos {
for _, info := range pieceTracker.RetainInfos {
mon.IntVal("node_piece_count").Observe(int64(info.Count))
mon.IntVal("retain_filter_size_bytes").Observe(info.Filter.Size())
}
// send retain requests
limiter := sync2.NewLimiter(service.config.ConcurrentSends)
for id, info := range pieceTracker.retainInfos {
for id, info := range pieceTracker.RetainInfos {
id, info := id, info
limiter.Go(ctx, func() {
err := service.sendRetainRequest(ctx, id, info)