storagenode/storagenodedb: Add cursor to pieceInfo.GetPieceIDs (#2724)

This commit is contained in:
Maximillian von Briesen 2019-08-06 13:19:16 -04:00 committed by GitHub
parent c8edeb0257
commit bdcb40fbc8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 100 additions and 13 deletions

View File

@ -56,7 +56,7 @@ type DB interface {
// Get returns Info about a piece.
Get(ctx context.Context, satelliteID storj.NodeID, pieceID storj.PieceID) (*Info, error)
// GetPieceIDs gets pieceIDs using the satelliteID
GetPieceIDs(ctx context.Context, satelliteID storj.NodeID, createdBefore time.Time, limit, offset int) (pieceIDs []storj.PieceID, err error)
GetPieceIDs(ctx context.Context, satelliteID storj.NodeID, createdBefore time.Time, limit int, cursor storj.PieceID) (pieceIDs []storj.PieceID, err error)
// Delete deletes Info about a piece.
Delete(ctx context.Context, satelliteID storj.NodeID, pieceID storj.PieceID) error
// DeleteFailed marks piece deletion from disk failed

View File

@ -614,7 +614,7 @@ func (endpoint *Endpoint) Retain(ctx context.Context, retainReq *pb.RetainReques
}
const limit = 1000
offset := 0
cursor := storj.PieceID{}
numDeleted := 0
hasMorePieces := true
@ -622,11 +622,13 @@ func (endpoint *Endpoint) Retain(ctx context.Context, retainReq *pb.RetainReques
// subtract some time to leave room for clock difference between the satellite and storage node
createdBefore := retainReq.GetCreationDate().Add(-endpoint.config.RetainTimeBuffer)
pieceIDs, err := endpoint.pieceinfo.GetPieceIDs(ctx, peer.ID, createdBefore, limit, offset)
pieceIDs, err := endpoint.pieceinfo.GetPieceIDs(ctx, peer.ID, createdBefore, limit, cursor)
if err != nil {
return nil, status.Error(codes.Internal, Error.Wrap(err).Error())
}
for _, pieceID := range pieceIDs {
cursor = pieceID
if !filter.Contains(pieceID) {
endpoint.log.Sugar().Debugf("About to delete piece id (%s) from satellite (%s). RetainStatus: %s", pieceID.String(), peer.ID.String(), endpoint.config.RetainStatus.String())
@ -648,8 +650,6 @@ func (endpoint *Endpoint) Retain(ctx context.Context, retainReq *pb.RetainReques
}
hasMorePieces = (len(pieceIDs) == limit)
offset += len(pieceIDs)
offset -= numDeleted
// We call Gosched() here because the GC process is expected to be long and we want to keep it at low priority,
// so other goroutines can continue serving requests.
runtime.Gosched()

View File

@ -624,11 +624,11 @@ func TestRetain(t *testing.T) {
_, err = endpointDebug.Retain(ctxSatellite0, &retainReq)
require.NoError(t, err)
satellite1Pieces, err := pieceInfos.GetPieceIDs(ctx, satellite1.ID, recentTime.Add(time.Duration(5)*time.Second), numPieces, 0)
satellite1Pieces, err := pieceInfos.GetPieceIDs(ctx, satellite1.ID, recentTime.Add(time.Duration(5)*time.Second), numPieces, storj.PieceID{})
require.NoError(t, err)
require.Equal(t, numPieces, len(satellite1Pieces))
satellite0Pieces, err := pieceInfos.GetPieceIDs(ctx, satellite0.ID, recentTime.Add(time.Duration(5)*time.Second), numPieces, 0)
satellite0Pieces, err := pieceInfos.GetPieceIDs(ctx, satellite0.ID, recentTime.Add(time.Duration(5)*time.Second), numPieces, storj.PieceID{})
require.NoError(t, err)
require.Equal(t, numPieces, len(satellite0Pieces))
@ -637,13 +637,13 @@ func TestRetain(t *testing.T) {
require.NoError(t, err)
// check we have deleted nothing for satellite1
satellite1Pieces, err = pieceInfos.GetPieceIDs(ctx, satellite1.ID, recentTime.Add(time.Duration(5)*time.Second), numPieces, 0)
satellite1Pieces, err = pieceInfos.GetPieceIDs(ctx, satellite1.ID, recentTime.Add(time.Duration(5)*time.Second), numPieces, storj.PieceID{})
require.NoError(t, err)
require.Equal(t, numPieces, len(satellite1Pieces))
// check we did not delete recent pieces or retained pieces for satellite0
// also check that we deleted the correct pieces for satellite0
satellite0Pieces, err = pieceInfos.GetPieceIDs(ctx, satellite0.ID, recentTime.Add(time.Duration(5)*time.Second), numPieces, 0)
satellite0Pieces, err = pieceInfos.GetPieceIDs(ctx, satellite0.ID, recentTime.Add(time.Duration(5)*time.Second), numPieces, storj.PieceID{})
require.NoError(t, err)
require.Equal(t, numPieces-numOldPieces, len(satellite0Pieces))

View File

@ -67,16 +67,16 @@ func (db *pieceinfo) Add(ctx context.Context, info *pieces.Info) (err error) {
}
// GetPieceIDs gets pieceIDs using the satelliteID
func (db *pieceinfo) GetPieceIDs(ctx context.Context, satelliteID storj.NodeID, createdBefore time.Time, limit, offset int) (pieceIDs []storj.PieceID, err error) {
func (db *pieceinfo) GetPieceIDs(ctx context.Context, satelliteID storj.NodeID, createdBefore time.Time, limit int, cursor storj.PieceID) (pieceIDs []storj.PieceID, err error) {
defer mon.Task()(&ctx)(&err)
rows, err := db.db.QueryContext(ctx, db.Rebind(`
SELECT piece_id
FROM pieceinfo_
WHERE satellite_id = ? AND datetime(piece_creation) < datetime(?)
WHERE satellite_id = ? AND datetime(piece_creation) < datetime(?) AND piece_id > ?
ORDER BY piece_id
LIMIT ? OFFSET ?
`), satelliteID, createdBefore.UTC(), limit, offset)
LIMIT ?
`), satelliteID, createdBefore.UTC(), cursor, limit)
if err != nil {
return nil, ErrInfo.Wrap(err)
}

View File

@ -0,0 +1,87 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package storagenodedb_test
import (
"testing"
"time"
"github.com/stretchr/testify/require"
"storj.io/storj/internal/testcontext"
"storj.io/storj/internal/testidentity"
"storj.io/storj/internal/testrand"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/signing"
"storj.io/storj/pkg/storj"
"storj.io/storj/storagenode"
"storj.io/storj/storagenode/pieces"
"storj.io/storj/storagenode/storagenodedb/storagenodedbtest"
)
// TestGetPieceIDs does the following:
// * Create 90 pieces
// * Request 50 pieces starting from the beginning. Expect 50 pieces.
// * Request 50 pieces starting from the end of the previous request. Expect 40 pieces.
// * Request 50 pieces starting from the end of the previous request. Expect 0 pieces.
func TestGetPieceIDs(t *testing.T) {
storagenodedbtest.Run(t, func(t *testing.T, db storagenode.DB) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
pieceInfos := db.PieceInfo()
satellite := testidentity.MustPregeneratedSignedIdentity(0, storj.LatestIDVersion())
uplink := testidentity.MustPregeneratedSignedIdentity(3, storj.LatestIDVersion())
totalPieces := 90
for i := 0; i < totalPieces; i++ {
newID := testrand.PieceID()
pieceHash, err := signing.SignPieceHash(ctx,
signing.SignerFromFullIdentity(uplink),
&pb.PieceHash{
PieceId: newID,
Hash: []byte{0, 2, 3, 4, 5},
})
require.NoError(t, err)
err = pieceInfos.Add(ctx, &pieces.Info{
SatelliteID: satellite.ID,
PieceSize: 4,
PieceID: newID,
PieceCreation: time.Now().Add(-time.Minute),
UplinkPieceHash: pieceHash,
OrderLimit: &pb.OrderLimit{},
})
require.NoError(t, err)
}
seen := make(map[storj.PieceID]bool)
requestSize := 50
cursor := storj.PieceID{}
pieceIDs, err := pieceInfos.GetPieceIDs(ctx, satellite.ID, time.Now(), requestSize, cursor)
require.NoError(t, err)
require.Len(t, pieceIDs, 50)
for _, id := range pieceIDs {
require.False(t, seen[id])
seen[id] = true
cursor = id
}
pieceIDs, err = pieceInfos.GetPieceIDs(ctx, satellite.ID, time.Now(), requestSize, cursor)
require.NoError(t, err)
require.Len(t, pieceIDs, 40)
for _, id := range pieceIDs {
require.False(t, seen[id])
seen[id] = true
cursor = id
}
pieceIDs, err = pieceInfos.GetPieceIDs(ctx, satellite.ID, time.Now(), requestSize, cursor)
require.NoError(t, err)
require.Len(t, pieceIDs, 0)
})
}