Add RetainStatus to storagenode config (#2633)
--storage2.retain-status = "disabled" (default), "debug", or "enabled"
This commit is contained in:
parent
3b6c69c594
commit
906c77b55a
@ -122,6 +122,7 @@ func (planet *Planet) newStorageNodes(count int, whitelistedSatellites storj.Nod
|
||||
MinimumBandwidth: 100 * memory.MB,
|
||||
MinimumDiskSpace: 100 * memory.MB,
|
||||
},
|
||||
RetainStatus: piecestore.RetainEnabled,
|
||||
},
|
||||
Vouchers: vouchers.Config{
|
||||
Interval: time.Hour,
|
||||
|
@ -59,11 +59,56 @@ type Config struct {
|
||||
MaxConcurrentRequests int `help:"how many concurrent requests are allowed, before uploads are rejected." default:"6"`
|
||||
OrderLimitGracePeriod time.Duration `help:"how long after OrderLimit creation date are OrderLimits no longer accepted" default:"1h0m0s"`
|
||||
RetainTimeBuffer time.Duration `help:"allows for small differences in the satellite and storagenode clocks" default:"1h0m0s"`
|
||||
RetainStatus RetainStatus `help:"allows configuration to enable, disable, or test retain requests from the satellite. Options: (disabled/enabled/debug)" default:"disabled"`
|
||||
|
||||
Monitor monitor.Config
|
||||
Sender orders.SenderConfig
|
||||
}
|
||||
|
||||
// RetainStatus is a type defining the enabled/disabled status of retain requests
|
||||
type RetainStatus uint32
|
||||
|
||||
const (
|
||||
// RetainDisabled means we do not do anything with retain requests
|
||||
RetainDisabled RetainStatus = iota + 1
|
||||
// RetainEnabled means we fully enable retain requests and delete data not defined by bloom filter
|
||||
RetainEnabled
|
||||
// RetainDebug means we partially enable retain requests, and print out pieces we should delete, without actually deleting them
|
||||
RetainDebug
|
||||
)
|
||||
|
||||
// Set implements pflag.Value
|
||||
func (v *RetainStatus) Set(s string) error {
|
||||
switch s {
|
||||
case "disabled":
|
||||
*v = RetainDisabled
|
||||
case "enabled":
|
||||
*v = RetainEnabled
|
||||
case "debug":
|
||||
*v = RetainDebug
|
||||
default:
|
||||
return Error.New("invalid RetainStatus %q", s)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Type implements pflag.Value
|
||||
func (*RetainStatus) Type() string { return "storj.RetainStatus" }
|
||||
|
||||
// String implements pflag.Value
|
||||
func (v *RetainStatus) String() string {
|
||||
switch *v {
|
||||
case RetainDisabled:
|
||||
return "disabled"
|
||||
case RetainEnabled:
|
||||
return "enabled"
|
||||
case RetainDebug:
|
||||
return "debug"
|
||||
default:
|
||||
return "invalid"
|
||||
}
|
||||
}
|
||||
|
||||
// Endpoint implements uploading, downloading and deleting for a storage node.
|
||||
type Endpoint struct {
|
||||
log *zap.Logger
|
||||
@ -542,6 +587,11 @@ func (endpoint *Endpoint) SaveOrder(ctx context.Context, limit *pb.OrderLimit, o
|
||||
func (endpoint *Endpoint) Retain(ctx context.Context, retainReq *pb.RetainRequest) (res *pb.RetainResponse, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
// if retain status is disabled, quit immediately
|
||||
if endpoint.config.RetainStatus == RetainDisabled {
|
||||
return &pb.RetainResponse{}, nil
|
||||
}
|
||||
|
||||
peer, err := identity.PeerIdentityFromContext(ctx)
|
||||
if err != nil {
|
||||
return nil, status.Error(codes.Unauthenticated, Error.Wrap(err).Error())
|
||||
@ -572,18 +622,25 @@ func (endpoint *Endpoint) Retain(ctx context.Context, retainReq *pb.RetainReques
|
||||
}
|
||||
for _, pieceID := range pieceIDs {
|
||||
if !filter.Contains(pieceID) {
|
||||
if err = endpoint.store.Delete(ctx, peer.ID, pieceID); err != nil {
|
||||
endpoint.log.Error("failed to delete a piece", zap.Error(Error.Wrap(err)))
|
||||
// continue because if we fail to delete from file system,
|
||||
// we need to keep the pieceinfo so we can delete next time
|
||||
continue
|
||||
}
|
||||
if err = endpoint.pieceinfo.Delete(ctx, peer.ID, pieceID); err != nil {
|
||||
endpoint.log.Error("failed to delete piece info", zap.Error(Error.Wrap(err)))
|
||||
endpoint.log.Sugar().Debugf("About to delete piece id (%s) from satellite (%s). RetainStatus: %s", pieceID.String(), peer.ID.String(), endpoint.config.RetainStatus.String())
|
||||
|
||||
// if retain status is enabled, delete pieceid
|
||||
if endpoint.config.RetainStatus == RetainEnabled {
|
||||
if err = endpoint.store.Delete(ctx, peer.ID, pieceID); err != nil {
|
||||
endpoint.log.Error("failed to delete a piece", zap.Error(err))
|
||||
// continue because if we fail to delete from file system,
|
||||
// we need to keep the pieceinfo so we can delete next time
|
||||
continue
|
||||
}
|
||||
if err = endpoint.pieceinfo.Delete(ctx, peer.ID, pieceID); err != nil {
|
||||
endpoint.log.Error("failed to delete piece info", zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
numDeleted++
|
||||
}
|
||||
}
|
||||
|
||||
hasMorePieces = (len(pieceIDs) == limit)
|
||||
offset += len(pieceIDs)
|
||||
offset -= numDeleted
|
||||
@ -591,6 +648,9 @@ func (endpoint *Endpoint) Retain(ctx context.Context, retainReq *pb.RetainReques
|
||||
// so other goroutines can continue serving requests.
|
||||
runtime.Gosched()
|
||||
}
|
||||
|
||||
endpoint.log.Sugar().Debugf("Deleted %d pieces during retain. RetainStatus: %s", numDeleted, endpoint.config.RetainStatus.String())
|
||||
|
||||
return &pb.RetainResponse{}, nil
|
||||
}
|
||||
|
||||
|
@ -515,7 +515,8 @@ func TestRetain(t *testing.T) {
|
||||
// have a recent timestamp and thus should not be deleted
|
||||
const numOldPieces = 5
|
||||
|
||||
filter := bloomfilter.NewOptimal(numPiecesToKeep, 0.1)
|
||||
// for this test, we set the false positive rate very low, so we can test which pieces should be deleted with precision
|
||||
filter := bloomfilter.NewOptimal(numPieces, 0.000000001)
|
||||
|
||||
pieceIDs := generateTestIDs(numPieces)
|
||||
|
||||
@ -531,12 +532,25 @@ func TestRetain(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
|
||||
uplink := testidentity.MustPregeneratedSignedIdentity(3, storj.LatestIDVersion())
|
||||
endpoint, err := ps.NewEndpoint(zaptest.NewLogger(t), nil, trusted, nil, store, pieceInfos, nil, nil, nil, ps.Config{})
|
||||
endpointEnabled, err := ps.NewEndpoint(zaptest.NewLogger(t), nil, trusted, nil, store, pieceInfos, nil, nil, nil, ps.Config{
|
||||
RetainStatus: ps.RetainEnabled,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
endpointDisabled, err := ps.NewEndpoint(zaptest.NewLogger(t), nil, trusted, nil, store, pieceInfos, nil, nil, nil, ps.Config{
|
||||
RetainStatus: ps.RetainDisabled,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
endpointDebug, err := ps.NewEndpoint(zaptest.NewLogger(t), nil, trusted, nil, store, pieceInfos, nil, nil, nil, ps.Config{
|
||||
RetainStatus: ps.RetainDebug,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
recentTime := time.Now()
|
||||
oldTime := recentTime.Add(-time.Duration(48) * time.Hour)
|
||||
|
||||
// keep pieceIDs[0 : numPiecesToKeep] (old + in filter)
|
||||
// delete pieceIDs[numPiecesToKeep : numPiecesToKeep+numOldPieces] (old + not in filter)
|
||||
// keep pieceIDs[numPiecesToKeep+numOldPieces : numPieces] (recent + not in filter)
|
||||
var pieceCreation time.Time
|
||||
// add all pieces to the node pieces info DB - but only count piece ids in filter
|
||||
for index, id := range pieceIDs {
|
||||
@ -603,17 +617,35 @@ func TestRetain(t *testing.T) {
|
||||
retainReq.Filter = filter.Bytes()
|
||||
retainReq.CreationDate = recentTime
|
||||
|
||||
_, err = endpoint.Retain(ctxSatellite0, &retainReq)
|
||||
// expect that disabled and debug endpoints do not delete any pieces
|
||||
_, err = endpointDisabled.Retain(ctxSatellite0, &retainReq)
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = endpointDebug.Retain(ctxSatellite0, &retainReq)
|
||||
require.NoError(t, err)
|
||||
|
||||
// check we have deleted nothing for satellite1
|
||||
satellite1Pieces, err := pieceInfos.GetPieceIDs(ctx, satellite1.ID, recentTime.Add(time.Duration(5)*time.Second), numPieces, 0)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, numPieces, len(satellite1Pieces))
|
||||
|
||||
// check we did not delete recent pieces
|
||||
satellite0Pieces, err := pieceInfos.GetPieceIDs(ctx, satellite0.ID, recentTime.Add(time.Duration(5)*time.Second), numPieces, 0)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, numPieces, len(satellite0Pieces))
|
||||
|
||||
// expect that enabled endpoint deletes the correct pieces
|
||||
_, err = endpointEnabled.Retain(ctxSatellite0, &retainReq)
|
||||
require.NoError(t, err)
|
||||
|
||||
// check we have deleted nothing for satellite1
|
||||
satellite1Pieces, err = pieceInfos.GetPieceIDs(ctx, satellite1.ID, recentTime.Add(time.Duration(5)*time.Second), numPieces, 0)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, numPieces, len(satellite1Pieces))
|
||||
|
||||
// check we did not delete recent pieces or retained pieces for satellite0
|
||||
// also check that we deleted the correct pieces for satellite0
|
||||
satellite0Pieces, err = pieceInfos.GetPieceIDs(ctx, satellite0.ID, recentTime.Add(time.Duration(5)*time.Second), numPieces, 0)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, numPieces-numOldPieces, len(satellite0Pieces))
|
||||
|
||||
for _, id := range pieceIDs[:numPiecesToKeep] {
|
||||
require.Contains(t, satellite0Pieces, id, "piece should not have been deleted (not in bloom filter)")
|
||||
@ -622,6 +654,10 @@ func TestRetain(t *testing.T) {
|
||||
for _, id := range pieceIDs[numPiecesToKeep+numOldPieces:] {
|
||||
require.Contains(t, satellite0Pieces, id, "piece should not have been deleted (recent piece)")
|
||||
}
|
||||
|
||||
for _, id := range pieceIDs[numPiecesToKeep : numPiecesToKeep+numOldPieces] {
|
||||
require.NotContains(t, satellite0Pieces, id, "piece should have been deleted")
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user