diff --git a/storagenode/pieces/store.go b/storagenode/pieces/store.go index 51fd561f5..f3d409ae5 100644 --- a/storagenode/pieces/store.go +++ b/storagenode/pieces/store.go @@ -56,7 +56,7 @@ type DB interface { // Get returns Info about a piece. Get(ctx context.Context, satelliteID storj.NodeID, pieceID storj.PieceID) (*Info, error) // GetPieceIDs gets pieceIDs using the satelliteID - GetPieceIDs(ctx context.Context, satelliteID storj.NodeID, createdBefore time.Time, limit, offset int) (pieceIDs []storj.PieceID, err error) + GetPieceIDs(ctx context.Context, satelliteID storj.NodeID, createdBefore time.Time, limit int, cursor storj.PieceID) (pieceIDs []storj.PieceID, err error) // Delete deletes Info about a piece. Delete(ctx context.Context, satelliteID storj.NodeID, pieceID storj.PieceID) error // DeleteFailed marks piece deletion from disk failed diff --git a/storagenode/piecestore/endpoint.go b/storagenode/piecestore/endpoint.go index d6d853753..69ae8d883 100644 --- a/storagenode/piecestore/endpoint.go +++ b/storagenode/piecestore/endpoint.go @@ -614,7 +614,7 @@ func (endpoint *Endpoint) Retain(ctx context.Context, retainReq *pb.RetainReques } const limit = 1000 - offset := 0 + cursor := storj.PieceID{} numDeleted := 0 hasMorePieces := true @@ -622,11 +622,13 @@ func (endpoint *Endpoint) Retain(ctx context.Context, retainReq *pb.RetainReques // subtract some time to leave room for clock difference between the satellite and storage node createdBefore := retainReq.GetCreationDate().Add(-endpoint.config.RetainTimeBuffer) - pieceIDs, err := endpoint.pieceinfo.GetPieceIDs(ctx, peer.ID, createdBefore, limit, offset) + pieceIDs, err := endpoint.pieceinfo.GetPieceIDs(ctx, peer.ID, createdBefore, limit, cursor) if err != nil { return nil, status.Error(codes.Internal, Error.Wrap(err).Error()) } for _, pieceID := range pieceIDs { + cursor = pieceID + if !filter.Contains(pieceID) { endpoint.log.Sugar().Debugf("About to delete piece id (%s) from satellite (%s). RetainStatus: %s", pieceID.String(), peer.ID.String(), endpoint.config.RetainStatus.String()) @@ -648,8 +650,6 @@ func (endpoint *Endpoint) Retain(ctx context.Context, retainReq *pb.RetainReques } hasMorePieces = (len(pieceIDs) == limit) - offset += len(pieceIDs) - offset -= numDeleted // We call Gosched() here because the GC process is expected to be long and we want to keep it at low priority, // so other goroutines can continue serving requests. runtime.Gosched() diff --git a/storagenode/piecestore/endpoint_test.go b/storagenode/piecestore/endpoint_test.go index 17c322e17..42e86f81f 100644 --- a/storagenode/piecestore/endpoint_test.go +++ b/storagenode/piecestore/endpoint_test.go @@ -624,11 +624,11 @@ func TestRetain(t *testing.T) { _, err = endpointDebug.Retain(ctxSatellite0, &retainReq) require.NoError(t, err) - satellite1Pieces, err := pieceInfos.GetPieceIDs(ctx, satellite1.ID, recentTime.Add(time.Duration(5)*time.Second), numPieces, 0) + satellite1Pieces, err := pieceInfos.GetPieceIDs(ctx, satellite1.ID, recentTime.Add(time.Duration(5)*time.Second), numPieces, storj.PieceID{}) require.NoError(t, err) require.Equal(t, numPieces, len(satellite1Pieces)) - satellite0Pieces, err := pieceInfos.GetPieceIDs(ctx, satellite0.ID, recentTime.Add(time.Duration(5)*time.Second), numPieces, 0) + satellite0Pieces, err := pieceInfos.GetPieceIDs(ctx, satellite0.ID, recentTime.Add(time.Duration(5)*time.Second), numPieces, storj.PieceID{}) require.NoError(t, err) require.Equal(t, numPieces, len(satellite0Pieces)) @@ -637,13 +637,13 @@ func TestRetain(t *testing.T) { require.NoError(t, err) // check we have deleted nothing for satellite1 - satellite1Pieces, err = pieceInfos.GetPieceIDs(ctx, satellite1.ID, recentTime.Add(time.Duration(5)*time.Second), numPieces, 0) + satellite1Pieces, err = pieceInfos.GetPieceIDs(ctx, satellite1.ID, recentTime.Add(time.Duration(5)*time.Second), numPieces, storj.PieceID{}) require.NoError(t, err) require.Equal(t, numPieces, len(satellite1Pieces)) // check we did not delete recent pieces or retained pieces for satellite0 // also check that we deleted the correct pieces for satellite0 - satellite0Pieces, err = pieceInfos.GetPieceIDs(ctx, satellite0.ID, recentTime.Add(time.Duration(5)*time.Second), numPieces, 0) + satellite0Pieces, err = pieceInfos.GetPieceIDs(ctx, satellite0.ID, recentTime.Add(time.Duration(5)*time.Second), numPieces, storj.PieceID{}) require.NoError(t, err) require.Equal(t, numPieces-numOldPieces, len(satellite0Pieces)) diff --git a/storagenode/storagenodedb/pieceinfo.go b/storagenode/storagenodedb/pieceinfo.go index 4abb8e01b..36c36ab6d 100644 --- a/storagenode/storagenodedb/pieceinfo.go +++ b/storagenode/storagenodedb/pieceinfo.go @@ -67,16 +67,16 @@ func (db *pieceinfo) Add(ctx context.Context, info *pieces.Info) (err error) { } // GetPieceIDs gets pieceIDs using the satelliteID -func (db *pieceinfo) GetPieceIDs(ctx context.Context, satelliteID storj.NodeID, createdBefore time.Time, limit, offset int) (pieceIDs []storj.PieceID, err error) { +func (db *pieceinfo) GetPieceIDs(ctx context.Context, satelliteID storj.NodeID, createdBefore time.Time, limit int, cursor storj.PieceID) (pieceIDs []storj.PieceID, err error) { defer mon.Task()(&ctx)(&err) rows, err := db.db.QueryContext(ctx, db.Rebind(` SELECT piece_id FROM pieceinfo_ - WHERE satellite_id = ? AND datetime(piece_creation) < datetime(?) + WHERE satellite_id = ? AND datetime(piece_creation) < datetime(?) AND piece_id > ? ORDER BY piece_id - LIMIT ? OFFSET ? - `), satelliteID, createdBefore.UTC(), limit, offset) + LIMIT ? + `), satelliteID, createdBefore.UTC(), cursor, limit) if err != nil { return nil, ErrInfo.Wrap(err) } diff --git a/storagenode/storagenodedb/pieceinfo_test.go b/storagenode/storagenodedb/pieceinfo_test.go new file mode 100644 index 000000000..bbb12a288 --- /dev/null +++ b/storagenode/storagenodedb/pieceinfo_test.go @@ -0,0 +1,87 @@ +// Copyright (C) 2019 Storj Labs, Inc. +// See LICENSE for copying information. + +package storagenodedb_test + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" + + "storj.io/storj/internal/testcontext" + "storj.io/storj/internal/testidentity" + "storj.io/storj/internal/testrand" + "storj.io/storj/pkg/pb" + "storj.io/storj/pkg/signing" + "storj.io/storj/pkg/storj" + "storj.io/storj/storagenode" + "storj.io/storj/storagenode/pieces" + "storj.io/storj/storagenode/storagenodedb/storagenodedbtest" +) + +// TestGetPieceIDs does the following: +// * Create 90 pieces +// * Request 50 pieces starting from the beginning. Expect 50 pieces. +// * Request 50 pieces starting from the end of the previous request. Expect 40 pieces. +// * Request 50 pieces starting from the end of the previous request. Expect 0 pieces. +func TestGetPieceIDs(t *testing.T) { + storagenodedbtest.Run(t, func(t *testing.T, db storagenode.DB) { + ctx := testcontext.New(t) + defer ctx.Cleanup() + + pieceInfos := db.PieceInfo() + + satellite := testidentity.MustPregeneratedSignedIdentity(0, storj.LatestIDVersion()) + uplink := testidentity.MustPregeneratedSignedIdentity(3, storj.LatestIDVersion()) + totalPieces := 90 + for i := 0; i < totalPieces; i++ { + newID := testrand.PieceID() + + pieceHash, err := signing.SignPieceHash(ctx, + signing.SignerFromFullIdentity(uplink), + &pb.PieceHash{ + PieceId: newID, + Hash: []byte{0, 2, 3, 4, 5}, + }) + require.NoError(t, err) + + err = pieceInfos.Add(ctx, &pieces.Info{ + SatelliteID: satellite.ID, + PieceSize: 4, + PieceID: newID, + PieceCreation: time.Now().Add(-time.Minute), + UplinkPieceHash: pieceHash, + OrderLimit: &pb.OrderLimit{}, + }) + require.NoError(t, err) + } + + seen := make(map[storj.PieceID]bool) + + requestSize := 50 + cursor := storj.PieceID{} + + pieceIDs, err := pieceInfos.GetPieceIDs(ctx, satellite.ID, time.Now(), requestSize, cursor) + require.NoError(t, err) + require.Len(t, pieceIDs, 50) + for _, id := range pieceIDs { + require.False(t, seen[id]) + seen[id] = true + cursor = id + } + + pieceIDs, err = pieceInfos.GetPieceIDs(ctx, satellite.ID, time.Now(), requestSize, cursor) + require.NoError(t, err) + require.Len(t, pieceIDs, 40) + for _, id := range pieceIDs { + require.False(t, seen[id]) + seen[id] = true + cursor = id + } + + pieceIDs, err = pieceInfos.GetPieceIDs(ctx, satellite.ID, time.Now(), requestSize, cursor) + require.NoError(t, err) + require.Len(t, pieceIDs, 0) + }) +}