satellite/gc/sender: new service to send retain filters

Implement a new service to read retain filter from a bucket and
send them out to storagenodes.

This allows the retain filters to be generated by a separate command on
a backup of the database.

Paralellism (setting ConcurrentSends) and end-to-end garbage collection
tests will be restored in a subsequent commit.

Solves https://github.com/storj/team-metainfo/issues/121

Change-Id: Iaf8a33fbf6987676cc3cf74a18a8078916fe673d
This commit is contained in:
Erik van Velzen 2022-08-29 11:44:54 +02:00 committed by Storj Robot
parent 7788170234
commit e6b5501f9b
13 changed files with 539 additions and 681 deletions

View File

@ -40,8 +40,8 @@ import (
"storj.io/storj/satellite/console"
"storj.io/storj/satellite/console/consoleweb"
"storj.io/storj/satellite/contact"
"storj.io/storj/satellite/gc"
"storj.io/storj/satellite/gc/bloomfilter"
"storj.io/storj/satellite/gc/sender"
"storj.io/storj/satellite/gracefulexit"
"storj.io/storj/satellite/inspector"
"storj.io/storj/satellite/mailservice"
@ -136,7 +136,7 @@ type Satellite struct {
}
GarbageCollection struct {
Service *gc.Service
Sender *sender.Service
BloomFilters *bloomfilter.Service
}
@ -599,7 +599,7 @@ func createNewSystem(name string, log *zap.Logger, config satellite.Config, peer
system.Audit.Verifier = peer.Audit.Verifier
system.Audit.Reporter = peer.Audit.Reporter
system.GarbageCollection.Service = gcPeer.GarbageCollection.Service
system.GarbageCollection.Sender = gcPeer.GarbageCollection.Sender
system.GarbageCollection.BloomFilters = gcBFPeer.GarbageCollection.Service
system.ExpiredDeletion.Chore = peer.ExpiredDeletion.Chore

View File

@ -23,7 +23,7 @@ import (
"storj.io/private/version"
"storj.io/storj/private/lifecycle"
version_checker "storj.io/storj/private/version/checker"
"storj.io/storj/satellite/gc"
"storj.io/storj/satellite/gc/sender"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/metabase/segmentloop"
"storj.io/storj/satellite/overlay"
@ -61,7 +61,7 @@ type GarbageCollection struct {
}
GarbageCollection struct {
Service *gc.Service
Sender *sender.Service
}
}
@ -146,19 +146,19 @@ func NewGarbageCollection(log *zap.Logger, full *identity.FullIdentity, db DB,
}
{ // setup garbage collection
peer.GarbageCollection.Service = gc.NewService(
peer.Log.Named("garbage-collection"),
peer.GarbageCollection.Sender = sender.NewService(
peer.Log.Named("gc-sender"),
config.GarbageCollection,
peer.Dialer,
peer.Overlay.DB,
peer.Metainfo.SegmentLoop,
)
peer.Services.Add(lifecycle.Item{
Name: "garbage-collection",
Run: peer.GarbageCollection.Service.Run,
Name: "gc-sender",
Run: peer.GarbageCollection.Sender.Run,
})
peer.Debug.Server.Panel.Add(
debug.Cycle("Garbage Collection", peer.GarbageCollection.Service.Loop))
debug.Cycle("Garbage Collection", peer.GarbageCollection.Sender.Loop))
}
return peer, nil

View File

@ -35,7 +35,7 @@ type Config struct {
FalsePositiveRate float64 `help:"the false positive rate used for creating a garbage collection bloom filter" releaseDefault:"0.1" devDefault:"0.1"`
AccessGrant string `help:"Access Grant which will be used to upload bloom filters to the bucket" default:""`
Bucket string `help:"Bucket which will be used to upload bloom filters" default:""` // TODO do we need full location?
Bucket string `help:"Bucket which will be used to upload bloom filters" default:"" testDefault:"gc-queue"` // TODO do we need full location?
ZipBatchSize int `help:"how many bloom filters will be packed in a single zip" default:"500" testDefault:"2"`
ExpireIn time.Duration `help:"how quickly uploaded bloom filters will be automatically deleted" default:"336h"`
}
@ -116,6 +116,7 @@ func (service *Service) RunOnce(ctx context.Context) (err error) {
return nil
}
// uploadBloomFilters stores a zipfile with multiple bloom filters in a bucket.
func (service *Service) uploadBloomFilters(ctx context.Context, latestCreationDate time.Time, retainInfos map[storj.NodeID]*RetainInfo) (err error) {
defer mon.Task()(&ctx)(&err)

View File

@ -1,389 +0,0 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package gc_test
import (
"context"
"errors"
"testing"
"time"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"storj.io/common/base58"
"storj.io/common/encryption"
"storj.io/common/memory"
"storj.io/common/paths"
"storj.io/common/pb"
"storj.io/common/storj"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/storj/private/testplanet"
"storj.io/storj/satellite"
"storj.io/storj/satellite/gc"
"storj.io/storj/satellite/metabase"
"storj.io/storj/storage"
"storj.io/storj/storagenode"
"storj.io/uplink/private/eestream"
"storj.io/uplink/private/testuplink"
)
// TestGarbageCollection does the following:
// * Set up a network with one storagenode
// * Upload two objects
// * Delete one object from the metainfo service on the satellite
// * Wait for bloom filter generation
// * Check that pieces of the deleted object are deleted on the storagenode
// * Check that pieces of the kept object are not deleted on the storagenode.
func TestGarbageCollection(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
config.GarbageCollection.FalsePositiveRate = 0.000000001
config.GarbageCollection.Interval = 500 * time.Millisecond
},
StorageNode: func(index int, config *storagenode.Config) {
config.Retain.MaxTimeSkew = 0
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
upl := planet.Uplinks[0]
targetNode := planet.StorageNodes[0]
gcService := satellite.GarbageCollection.Service
gcService.Loop.Pause()
// Upload two objects
testData1 := testrand.Bytes(8 * memory.KiB)
testData2 := testrand.Bytes(8 * memory.KiB)
err := upl.Upload(ctx, satellite, "testbucket", "test/path/1", testData1)
require.NoError(t, err)
objectLocationToDelete, segmentToDelete := getSegment(ctx, t, satellite, upl, "testbucket", "test/path/1")
var deletedPieceID storj.PieceID
for _, p := range segmentToDelete.Pieces {
if p.StorageNode == targetNode.ID() {
deletedPieceID = segmentToDelete.RootPieceID.Derive(p.StorageNode, int32(p.Number))
break
}
}
require.NotZero(t, deletedPieceID)
err = upl.Upload(ctx, satellite, "testbucket", "test/path/2", testData2)
require.NoError(t, err)
_, segmentToKeep := getSegment(ctx, t, satellite, upl, "testbucket", "test/path/2")
var keptPieceID storj.PieceID
for _, p := range segmentToKeep.Pieces {
if p.StorageNode == targetNode.ID() {
keptPieceID = segmentToKeep.RootPieceID.Derive(p.StorageNode, int32(p.Number))
break
}
}
require.NotZero(t, keptPieceID)
// Delete one object from metainfo service on satellite
_, err = satellite.Metabase.DB.DeleteObjectExactVersion(ctx, metabase.DeleteObjectExactVersion{
ObjectLocation: objectLocationToDelete,
Version: metabase.DefaultVersion,
})
require.NoError(t, err)
// Check that piece of the deleted object is on the storagenode
pieceAccess, err := targetNode.DB.Pieces().Stat(ctx, storage.BlobRef{
Namespace: satellite.ID().Bytes(),
Key: deletedPieceID.Bytes(),
})
require.NoError(t, err)
require.NotNil(t, pieceAccess)
// The pieceInfo.GetPieceIDs query converts piece creation and the filter creation timestamps
// to datetime in sql. This chops off all precision beyond seconds.
// In this test, the amount of time that elapses between piece uploads and the gc loop is
// less than a second, meaning datetime(piece_creation) < datetime(filter_creation) is false unless we sleep
// for a second.
time.Sleep(1 * time.Second)
// Wait for next iteration of garbage collection to finish
gcService.Loop.Restart()
gcService.Loop.TriggerWait()
// Wait for the storagenode's RetainService queue to be empty
targetNode.Storage2.RetainService.TestWaitUntilEmpty()
// Check that piece of the deleted object is not on the storagenode
pieceAccess, err = targetNode.DB.Pieces().Stat(ctx, storage.BlobRef{
Namespace: satellite.ID().Bytes(),
Key: deletedPieceID.Bytes(),
})
require.Error(t, err)
require.Nil(t, pieceAccess)
// Check that piece of the kept object is on the storagenode
pieceAccess, err = targetNode.DB.Pieces().Stat(ctx, storage.BlobRef{
Namespace: satellite.ID().Bytes(),
Key: keptPieceID.Bytes(),
})
require.NoError(t, err)
require.NotNil(t, pieceAccess)
})
}
// TestGarbageCollectionWithCopies checkes that server-side copy elements are not
// affecting GC and nothing unexpected was deleted from storage nodes.
func TestGarbageCollectionWithCopies(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
Satellite: testplanet.ReconfigureRS(2, 3, 4, 4),
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
gcService := satellite.GarbageCollection.Service
gcService.Loop.Pause()
project, err := planet.Uplinks[0].OpenProject(ctx, satellite)
require.NoError(t, err)
defer ctx.Check(project.Close)
allSpaceUsedForPieces := func() (all int64) {
for _, node := range planet.StorageNodes {
_, piecesContent, _, err := node.Storage2.Store.SpaceUsedTotalAndBySatellite(ctx)
require.NoError(t, err)
all += piecesContent
}
return all
}
expectedRemoteData := testrand.Bytes(8 * memory.KiB)
expectedInlineData := testrand.Bytes(1 * memory.KiB)
encryptedSize, err := encryption.CalcEncryptedSize(int64(len(expectedRemoteData)), storj.EncryptionParameters{
CipherSuite: storj.EncAESGCM,
BlockSize: 29 * 256 * memory.B.Int32(), // hardcoded value from uplink
})
require.NoError(t, err)
redundancyStrategy, err := planet.Satellites[0].Config.Metainfo.RS.RedundancyStrategy()
require.NoError(t, err)
pieceSize := eestream.CalcPieceSize(encryptedSize, redundancyStrategy.ErasureScheme)
singleRemoteUsed := pieceSize * int64(len(planet.StorageNodes))
totalUsedByNodes := 2 * singleRemoteUsed // two remote objects
require.NoError(t, planet.Uplinks[0].Upload(ctx, satellite, "testbucket", "remote", expectedRemoteData))
require.NoError(t, planet.Uplinks[0].Upload(ctx, satellite, "testbucket", "inline", expectedInlineData))
require.NoError(t, planet.Uplinks[0].Upload(ctx, satellite, "testbucket", "remote-no-copy", expectedRemoteData))
_, err = project.CopyObject(ctx, "testbucket", "remote", "testbucket", "remote-copy", nil)
require.NoError(t, err)
_, err = project.CopyObject(ctx, "testbucket", "inline", "testbucket", "inline-copy", nil)
require.NoError(t, err)
require.NoError(t, planet.WaitForStorageNodeEndpoints(ctx))
afterTotalUsedByNodes := allSpaceUsedForPieces()
require.Equal(t, totalUsedByNodes, afterTotalUsedByNodes)
// run GC
gcService.Loop.TriggerWait()
for _, node := range planet.StorageNodes {
node.Storage2.RetainService.TestWaitUntilEmpty()
}
// we should see all space used by all objects
afterTotalUsedByNodes = allSpaceUsedForPieces()
require.Equal(t, totalUsedByNodes, afterTotalUsedByNodes)
for _, toDelete := range []string{
// delete ancestors, no change in used space
"remote",
"inline",
// delete object without copy, used space should be decreased
"remote-no-copy",
} {
_, err = project.DeleteObject(ctx, "testbucket", toDelete)
require.NoError(t, err)
}
planet.WaitForStorageNodeDeleters(ctx)
// run GC
gcService.Loop.TriggerWait()
for _, node := range planet.StorageNodes {
node.Storage2.RetainService.TestWaitUntilEmpty()
}
// verify that we deleted only pieces for "remote-no-copy" object
afterTotalUsedByNodes = allSpaceUsedForPieces()
require.Equal(t, singleRemoteUsed, afterTotalUsedByNodes)
// delete rest of objects to verify that everything will be removed also from SNs
for _, toDelete := range []string{
"remote-copy",
"inline-copy",
} {
_, err = project.DeleteObject(ctx, "testbucket", toDelete)
require.NoError(t, err)
}
planet.WaitForStorageNodeDeleters(ctx)
// run GC
gcService.Loop.TriggerWait()
for _, node := range planet.StorageNodes {
node.Storage2.RetainService.TestWaitUntilEmpty()
}
// verify that nothing more was deleted from storage nodes after GC
afterTotalUsedByNodes = allSpaceUsedForPieces()
require.EqualValues(t, 0, afterTotalUsedByNodes)
})
}
func getSegment(ctx *testcontext.Context, t *testing.T, satellite *testplanet.Satellite, upl *testplanet.Uplink, bucket, path string) (_ metabase.ObjectLocation, _ metabase.Segment) {
access := upl.Access[satellite.ID()]
serializedAccess, err := access.Serialize()
require.NoError(t, err)
store, err := encryptionAccess(serializedAccess)
require.NoError(t, err)
encryptedPath, err := encryption.EncryptPathWithStoreCipher(bucket, paths.NewUnencrypted(path), store)
require.NoError(t, err)
objectLocation :=
metabase.ObjectLocation{
ProjectID: upl.Projects[0].ID,
BucketName: "testbucket",
ObjectKey: metabase.ObjectKey(encryptedPath.Raw()),
}
lastSegment, err := satellite.Metabase.DB.GetLatestObjectLastSegment(ctx, metabase.GetLatestObjectLastSegment{
ObjectLocation: objectLocation,
})
require.NoError(t, err)
return objectLocation, lastSegment
}
func encryptionAccess(access string) (*encryption.Store, error) {
data, version, err := base58.CheckDecode(access)
if err != nil || version != 0 {
return nil, errors.New("invalid access grant format")
}
p := new(pb.Scope)
if err := pb.Unmarshal(data, p); err != nil {
return nil, err
}
key, err := storj.NewKey(p.EncryptionAccess.DefaultKey)
if err != nil {
return nil, err
}
store := encryption.NewStore()
store.SetDefaultKey(key)
store.SetDefaultPathCipher(storj.EncAESGCM)
return store, nil
}
func TestGarbageCollection_PendingObject(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
Satellite: testplanet.Combine(
func(log *zap.Logger, index int, config *satellite.Config) {
config.GarbageCollection.FalsePositiveRate = 0.000000001
config.GarbageCollection.Interval = 500 * time.Millisecond
},
testplanet.MaxSegmentSize(20*memory.KiB),
),
StorageNode: func(index int, config *storagenode.Config) {
config.Retain.MaxTimeSkew = 0
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
upl := planet.Uplinks[0]
testData := testrand.Bytes(15 * memory.KiB)
pendingStreamID := startMultipartUpload(ctx, t, upl, satellite, "testbucket", "multi", testData)
segments, err := satellite.Metabase.DB.TestingAllSegments(ctx)
require.NoError(t, err)
require.Len(t, segments, 1)
require.Len(t, segments[0].Pieces, 1)
// The pieceInfo.GetPieceIDs query converts piece creation and the filter creation timestamps
// to datetime in sql. This chops off all precision beyond seconds.
// In this test, the amount of time that elapses between piece uploads and the gc loop is
// less than a second, meaning datetime(piece_creation) < datetime(filter_creation) is false unless we sleep
// for a second.
lastPieceCounts := map[storj.NodeID]int{}
pieceTracker := gc.NewPieceTracker(satellite.Log.Named("gc observer"), gc.Config{
FalsePositiveRate: 0.000000001,
InitialPieces: 10,
}, lastPieceCounts)
err = satellite.Metabase.SegmentLoop.Join(ctx, pieceTracker)
require.NoError(t, err)
require.NotEmpty(t, pieceTracker.RetainInfos)
info := pieceTracker.RetainInfos[planet.StorageNodes[0].ID()]
require.NotNil(t, info)
require.Equal(t, 1, info.Count)
completeMultipartUpload(ctx, t, upl, satellite, "testbucket", "multi", pendingStreamID)
gotData, err := upl.Download(ctx, satellite, "testbucket", "multi")
require.NoError(t, err)
require.Equal(t, testData, gotData)
})
}
func startMultipartUpload(ctx context.Context, t *testing.T, uplink *testplanet.Uplink, satellite *testplanet.Satellite, bucketName string, path storj.Path, data []byte) string {
_, found := testuplink.GetMaxSegmentSize(ctx)
if !found {
ctx = testuplink.WithMaxSegmentSize(ctx, satellite.Config.Metainfo.MaxSegmentSize)
}
project, err := uplink.GetProject(ctx, satellite)
require.NoError(t, err)
defer func() { require.NoError(t, project.Close()) }()
_, err = project.EnsureBucket(ctx, bucketName)
require.NoError(t, err)
info, err := project.BeginUpload(ctx, bucketName, path, nil)
require.NoError(t, err)
upload, err := project.UploadPart(ctx, bucketName, path, info.UploadID, 1)
require.NoError(t, err)
_, err = upload.Write(data)
require.NoError(t, err)
require.NoError(t, upload.Commit())
return info.UploadID
}
func completeMultipartUpload(ctx context.Context, t *testing.T, uplink *testplanet.Uplink, satellite *testplanet.Satellite, bucketName string, path storj.Path, streamID string) {
_, found := testuplink.GetMaxSegmentSize(ctx)
if !found {
ctx = testuplink.WithMaxSegmentSize(ctx, satellite.Config.Metainfo.MaxSegmentSize)
}
project, err := uplink.GetProject(ctx, satellite)
require.NoError(t, err)
defer func() { require.NoError(t, project.Close()) }()
_, err = project.CommitUpload(ctx, bucketName, path, streamID, nil)
require.NoError(t, err)
}

View File

@ -1,94 +0,0 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package gc
import (
"context"
"time"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/common/bloomfilter"
"storj.io/common/memory"
"storj.io/common/storj"
"storj.io/storj/satellite/metabase/segmentloop"
)
var remoteSegmentFunc = mon.Task()
var _ segmentloop.Observer = (*PieceTracker)(nil)
// PieceTracker implements the metainfo loop observer interface for garbage collection.
//
// architecture: Observer
type PieceTracker struct {
log *zap.Logger
config Config
creationDate time.Time
// TODO: should we use int or int64 consistently for piece count (db type is int64)?
pieceCounts map[storj.NodeID]int
RetainInfos map[storj.NodeID]*RetainInfo
}
// NewPieceTracker instantiates a new gc piece tracker to be subscribed to the metainfo loop.
func NewPieceTracker(log *zap.Logger, config Config, pieceCounts map[storj.NodeID]int) *PieceTracker {
return &PieceTracker{
log: log,
config: config,
creationDate: time.Now().UTC(),
pieceCounts: pieceCounts,
RetainInfos: make(map[storj.NodeID]*RetainInfo, len(pieceCounts)),
}
}
// LoopStarted is called at each start of a loop.
func (pieceTracker *PieceTracker) LoopStarted(ctx context.Context, info segmentloop.LoopInfo) (err error) {
if pieceTracker.creationDate.After(info.Started) {
return errs.New("Creation date after loop starting time.")
}
return nil
}
// RemoteSegment takes a remote segment found in metabase and adds pieces to bloom filters.
func (pieceTracker *PieceTracker) RemoteSegment(ctx context.Context, segment *segmentloop.Segment) error {
defer remoteSegmentFunc(&ctx)(nil) // method always returns nil
deriver := segment.RootPieceID.Deriver()
for _, piece := range segment.Pieces {
pieceID := deriver.Derive(piece.StorageNode, int32(piece.Number))
pieceTracker.add(piece.StorageNode, pieceID)
}
return nil
}
// InlineSegment returns nil because we're only doing gc for storage nodes for now.
func (pieceTracker *PieceTracker) InlineSegment(ctx context.Context, segment *segmentloop.Segment) (err error) {
return nil
}
// adds a pieceID to the relevant node's RetainInfo.
func (pieceTracker *PieceTracker) add(nodeID storj.NodeID, pieceID storj.PieceID) {
info, ok := pieceTracker.RetainInfos[nodeID]
if !ok {
// If we know how many pieces a node should be storing, use that number. Otherwise use default.
numPieces := pieceTracker.config.InitialPieces
if pieceTracker.pieceCounts[nodeID] > 0 {
numPieces = pieceTracker.pieceCounts[nodeID]
}
// limit size of bloom filter to ensure we are under the limit for RPC
filter := bloomfilter.NewOptimalMaxSize(numPieces, pieceTracker.config.FalsePositiveRate, 2*memory.MiB)
info = &RetainInfo{
Filter: filter,
CreationDate: pieceTracker.creationDate,
}
pieceTracker.RetainInfos[nodeID] = info
}
info.Filter.Add(pieceID)
info.Count++
}

View File

@ -0,0 +1,150 @@
// Copyright (C) 2022 Storj Labs, Inc.
// See LICENSE for copying information.
package sender
import (
"archive/zip"
"bytes"
"context"
"io"
"strings"
"time"
"github.com/zeebo/errs"
"storj.io/common/pb"
"storj.io/storj/satellite/internalpb"
"storj.io/uplink"
)
// IterateZipObjectKeys checks inside the top-level of a bucket and yields the keys which look like zip files.
func IterateZipObjectKeys(
ctx context.Context,
project uplink.Project,
bucket string,
fn func(objectKey string) error,
) (err error) {
defer mon.Task()(&ctx)(&err)
objects := project.ListObjects(ctx, bucket, &uplink.ListObjectsOptions{
System: true,
// the previously read archives are stored under prefixes, we want to skip those
Recursive: false,
})
for objects.Next() {
object := objects.Item()
if object.IsPrefix {
continue
}
if !strings.HasSuffix(object.Key, ".zip") {
continue
}
if err := fn(object.Key); err != nil {
return err
}
}
return objects.Err()
}
// IterateZipContent opens a zip file at an object key and yields the files inside.
func IterateZipContent(
ctx context.Context,
project uplink.Project,
bucket string,
objectKey string,
fn func(file *zip.File) error,
) (err error) {
download, err := project.DownloadObject(ctx, bucket, objectKey, nil)
if err != nil {
return err
}
defer func() {
err = errs.Combine(err, download.Close())
}()
zipContents, err := io.ReadAll(download)
if err != nil {
return err
}
reader, err := zip.NewReader(bytes.NewReader(zipContents), int64(len(zipContents)))
if err != nil {
return err
}
for _, file := range reader.File {
err = fn(file)
if err != nil {
return err
}
}
return nil
}
// UnpackZipEntry deserialized a retainInfo struct from a single file inside a zip file.
func UnpackZipEntry(
file *zip.File,
) (retainInfo *internalpb.RetainInfo, err error) {
reader, err := file.Open()
if err != nil {
return nil, err
}
defer func() {
err = errs.Combine(err, reader.Close())
}()
uncompressedBytes, err := io.ReadAll(reader)
if err != nil {
return nil, err
}
retainInfo = &internalpb.RetainInfo{}
err = pb.Unmarshal(uncompressedBytes, retainInfo)
if err != nil {
return nil, err
}
err = validate(file.Name, *retainInfo)
if err != nil {
return nil, err
}
return retainInfo, nil
}
func validate(fileName string, retainInfo internalpb.RetainInfo) error {
if retainInfo.StorageNodeId.IsZero() {
return errs.New("Storage Node ID is missing from file %s", fileName)
}
if fileName != retainInfo.StorageNodeId.String() {
return errs.New("Storage Node ID %s is not equal to file name %s", retainInfo.StorageNodeId.String(), fileName)
}
if retainInfo.PieceCount == 0 {
return errs.New("Retain filter count is zero for storage node %s", retainInfo.StorageNodeId.String())
}
if retainInfo.Filter == nil || len(retainInfo.Filter) == 0 {
return errs.New("Retain filter is missing for storage node %s", retainInfo.StorageNodeId.String())
}
if len(bytes.Trim(retainInfo.Filter, "\x00")) == 0 {
return errs.New("Retain filter is zeroes for storage node %s", retainInfo.StorageNodeId.String())
}
year2020 := time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC)
if retainInfo.CreationDate.Before(year2020) {
return errs.New("Retain filter creation date is too long ago: %s", retainInfo.CreationDate.String())
}
return nil
}

View File

@ -0,0 +1,213 @@
// Copyright (C) 2022 Storj Labs, Inc.
// See LICENSE for copying information.
package sender
import (
"archive/zip"
"context"
"time"
"github.com/spacemonkeygo/monkit/v3"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/common/pb"
"storj.io/common/rpc"
"storj.io/common/storj"
"storj.io/common/sync2"
"storj.io/storj/satellite/internalpb"
"storj.io/storj/satellite/overlay"
"storj.io/uplink"
"storj.io/uplink/private/piecestore"
)
var (
// Error defines the gc service errors class.
Error = errs.Class("gc")
mon = monkit.Package()
)
// Config contains configurable values for garbage collection.
type Config struct {
Interval time.Duration `help:"the time between each attempt to download and send garbage collection retain filters to storage nodes" releaseDefault:"48h" devDefault:"5m" testDefault:"$TESTINTERVAL"`
Enabled bool `help:"set if loop to send garbage collection retain filters is enabled" default:"false" devDefault:"false"`
// We suspect this currently not to be the critical path w.r.t. garbage collection, so no paralellism is implemented.
ConcurrentSends int `help:"the number of nodes to concurrently send garbage collection retain filters to" releaseDefault:"1" devDefault:"1"`
RetainSendTimeout time.Duration `help:"the amount of time to allow a node to handle a retain request" default:"1m"`
AccessGrant string `help:"Access to download the bloom filters. Needs read and write permission."`
Bucket string `help:"bucket where retain info is stored" default:"" testDefault:"gc-queue"`
ExpireIn time.Duration `help:"Expiration of newly created objects. These objects store error messages." default:"336h"`
}
// NewService creates a new instance of the gc sender service.
func NewService(log *zap.Logger, config Config, dialer rpc.Dialer, overlay overlay.DB) *Service {
return &Service{
log: log,
Config: config,
Loop: sync2.NewCycle(config.Interval),
dialer: dialer,
overlay: overlay,
}
}
// Service reads bloom filters of piece IDs to retain from a Storj bucket
// and sends them out to the storage nodes.
//
// The split between creating retain info and sending it out to storagenodes
// is made so that the bloom filter can be created from a backup database.
//
// architecture: Service
type Service struct {
log *zap.Logger
Config Config
Loop *sync2.Cycle
dialer rpc.Dialer
overlay overlay.DB
}
// Run continuously polls for new retain filters and sends them out.
func (service *Service) Run(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
if !service.Config.Enabled {
return nil
}
return service.Loop.Run(ctx, service.RunOnce)
}
// RunOnce opens the bucket and sends out all the retain filters located in it to the storage nodes.
func (service *Service) RunOnce(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
loopStartTime := time.Now()
switch {
case service.Config.AccessGrant == "":
return errs.New("Access Grant is not set")
case service.Config.Bucket == "":
return errs.New("Bucket is not set")
}
access, err := uplink.ParseAccess(service.Config.AccessGrant)
if err != nil {
return err
}
project, err := uplink.OpenProject(ctx, access)
if err != nil {
return err
}
defer func() {
err = errs.Combine(err, project.Close())
}()
return IterateZipObjectKeys(ctx, *project, service.Config.Bucket, func(objectKey string) error {
err := IterateZipContent(ctx, *project, service.Config.Bucket, objectKey, func(zipEntry *zip.File) error {
retainInfo, err := UnpackZipEntry(zipEntry)
if err != nil {
service.log.Warn("Skipping retain filter entry: %s", zap.Error(err))
return nil
}
err = service.sendRetainRequest(ctx, retainInfo)
if err != nil {
service.log.Error("Error sending retain filter: %s", zap.Error(err))
}
return nil
})
if err != nil {
// We store the error in the bucket and then continue with the next zip file.
return service.moveToErrorPrefix(ctx, project, objectKey, err, loopStartTime)
}
return service.moveToSentPrefix(ctx, project, objectKey, loopStartTime)
})
}
func (service *Service) sendRetainRequest(ctx context.Context, retainInfo *internalpb.RetainInfo) (err error) {
defer mon.Task()(&ctx)(&err)
dossier, err := service.overlay.Get(ctx, retainInfo.StorageNodeId)
if err != nil {
return Error.Wrap(err)
}
if service.Config.RetainSendTimeout > 0 {
var cancel func()
ctx, cancel = context.WithTimeout(ctx, service.Config.RetainSendTimeout)
defer cancel()
}
nodeurl := storj.NodeURL{
ID: retainInfo.StorageNodeId,
Address: dossier.Address.Address,
}
client, err := piecestore.Dial(ctx, service.dialer, nodeurl, piecestore.DefaultConfig)
if err != nil {
return Error.Wrap(err)
}
defer func() {
err = errs.Combine(err, Error.Wrap(client.Close()))
}()
err = client.Retain(ctx, &pb.RetainRequest{
CreationDate: retainInfo.CreationDate,
Filter: retainInfo.Filter,
})
return Error.Wrap(err)
}
// moveToErrorPrefix moves an object to prefix "error" and attaches the error to the metadata.
func (service *Service) moveToErrorPrefix(
ctx context.Context, project *uplink.Project, objectKey string, previousErr error, timeStamp time.Time,
) error {
newObjectKey := "error-" + timeStamp.Format(time.RFC3339) + "/" + objectKey
err := project.MoveObject(ctx, service.Config.Bucket, objectKey, service.Config.Bucket, newObjectKey, nil)
if err != nil {
return err
}
return service.uploadError(ctx, project, newObjectKey+".error.txt", previousErr)
}
// uploadError saves an error under an object key.
func (service *Service) uploadError(
ctx context.Context, project *uplink.Project, destinationObjectKey string, previousErr error,
) (err error) {
upload, err := project.UploadObject(ctx, service.Config.Bucket, destinationObjectKey, &uplink.UploadOptions{
Expires: time.Now().Add(service.Config.ExpireIn),
})
if err != nil {
return err
}
defer func() {
if err != nil {
err = errs.Combine(err, upload.Abort())
}
}()
_, err = upload.Write([]byte(previousErr.Error()))
if err != nil {
return err
}
return upload.Commit()
}
// moveToErrorPrefix moves an object to prefix "sent-[timestamp]".
func (service *Service) moveToSentPrefix(
ctx context.Context, project *uplink.Project, objectKey string, timeStamp time.Time,
) error {
newObjectKey := "sent-" + timeStamp.Format(time.RFC3339) + "/" + objectKey
return project.MoveObject(ctx, service.Config.Bucket, objectKey, service.Config.Bucket, newObjectKey, nil)
}

View File

@ -0,0 +1,143 @@
// Copyright (C) 2022 Storj Labs, Inc.
// See LICENSE for copying information.
package sender_test
import (
"io"
"sort"
"testing"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
"storj.io/common/memory"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/storj/private/testplanet"
"storj.io/storj/satellite/gc/bloomfilter"
"storj.io/storj/storagenode"
"storj.io/uplink"
)
func TestSendRetainFilters(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 2,
StorageNodeCount: 1,
UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
StorageNode: func(index int, config *storagenode.Config) {
// stop processing at storagenode side so it can be inspected
config.Retain.Concurrency = 0
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
// Set satellite 1 to store bloom filters of satellite 0
access := planet.Uplinks[0].Access[planet.Satellites[1].NodeURL().ID]
accessString, err := access.Serialize()
require.NoError(t, err)
// configure sender
gcsender := planet.Satellites[0].GarbageCollection.Sender
gcsender.Config.AccessGrant = accessString
// upload 1 piece
upl := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
err = upl.Upload(ctx, planet.Satellites[0], "testbucket", "test/path/1", testData)
require.NoError(t, err)
// configure filter uploader
config := planet.Satellites[0].Config.GarbageCollectionBF
config.AccessGrant = accessString
config.ZipBatchSize = 2
bloomFilterService := bloomfilter.NewService(zaptest.NewLogger(t), config, planet.Satellites[0].Overlay.DB, planet.Satellites[0].Metabase.SegmentLoop)
err = bloomFilterService.RunOnce(ctx)
require.NoError(t, err)
storageNode0 := planet.StorageNodes[0]
require.Zero(t, storageNode0.Peer.Storage2.RetainService.HowManyQueued())
// send to storagenode
err = gcsender.RunOnce(ctx)
require.NoError(t, err)
require.Equal(t, 1, storageNode0.Peer.Storage2.RetainService.HowManyQueued())
// check that zip was moved to sent
project, err := planet.Uplinks[0].OpenProject(ctx, planet.Satellites[1])
require.NoError(t, err)
var keys []string
it := project.ListObjects(ctx, gcsender.Config.Bucket, &uplink.ListObjectsOptions{
Recursive: true,
})
require.True(t, it.Next())
keys = append(keys, it.Item().Key)
require.True(t, it.Next())
keys = append(keys, it.Item().Key)
require.False(t, it.Next())
sort.Strings(keys)
require.Equal(t, "gc-done", keys[0])
require.Regexp(t, "sent-.*/.*.zip$", keys[1])
})
}
func TestSendInvalidZip(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 2,
StorageNodeCount: 1,
UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
StorageNode: func(index int, config *storagenode.Config) {
// stop processing at storagenode side so it can be inspected
config.Retain.Concurrency = 0
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
// Set satellite 1 to store bloom filters of satellite 0
access := planet.Uplinks[0].Access[planet.Satellites[1].NodeURL().ID]
accessString, err := access.Serialize()
require.NoError(t, err)
// configure sender
gcsender := planet.Satellites[0].GarbageCollection.Sender
gcsender.Config.AccessGrant = accessString
// upload invalid zip file
err = planet.Uplinks[0].Upload(ctx, planet.Satellites[1], gcsender.Config.Bucket, "wasd.zip", []byte("wasd"))
require.NoError(t, err)
// send to storagenode
err = gcsender.RunOnce(ctx)
require.NoError(t, err)
// check that error is stored
project, err := planet.Uplinks[0].OpenProject(ctx, planet.Satellites[1])
require.NoError(t, err)
var keys []string
it := project.ListObjects(ctx, gcsender.Config.Bucket, &uplink.ListObjectsOptions{
Recursive: true,
})
require.True(t, it.Next())
keys = append(keys, it.Item().Key)
require.True(t, it.Next())
keys = append(keys, it.Item().Key)
require.False(t, it.Next())
// first is corrupt zip file and second is error text
sort.Strings(keys)
require.Regexp(t, "^error-.*/wasd.zip$", keys[0])
require.Regexp(t, "^error-.*/wasd.zip.error.txt$", keys[1])
object, err := project.DownloadObject(ctx, gcsender.Config.Bucket, keys[1], nil)
require.NoError(t, err)
all, err := io.ReadAll(object)
require.NoError(t, err)
require.Equal(t, "zip: not a valid zip file", string(all))
})
}

View File

@ -1,172 +0,0 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package gc
import (
"context"
"time"
"github.com/spacemonkeygo/monkit/v3"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/common/bloomfilter"
"storj.io/common/pb"
"storj.io/common/rpc"
"storj.io/common/storj"
"storj.io/common/sync2"
"storj.io/storj/satellite/metabase/segmentloop"
"storj.io/storj/satellite/overlay"
"storj.io/uplink/private/piecestore"
)
var (
// Error defines the gc service errors class.
Error = errs.Class("gc")
mon = monkit.Package()
)
// Config contains configurable values for garbage collection.
type Config struct {
Interval time.Duration `help:"the time between each send of garbage collection filters to storage nodes" releaseDefault:"120h" devDefault:"10m" testDefault:"$TESTINTERVAL"`
Enabled bool `help:"set if garbage collection is enabled or not" releaseDefault:"true" devDefault:"true"`
// value for InitialPieces currently based on average pieces per node
InitialPieces int `help:"the initial number of pieces expected for a storage node to have, used for creating a filter" releaseDefault:"400000" devDefault:"10"`
FalsePositiveRate float64 `help:"the false positive rate used for creating a garbage collection bloom filter" releaseDefault:"0.1" devDefault:"0.1"`
ConcurrentSends int `help:"the number of nodes to concurrently send garbage collection bloom filters to" releaseDefault:"1" devDefault:"1"`
RetainSendTimeout time.Duration `help:"the amount of time to allow a node to handle a retain request" default:"1m"`
}
// Service implements the garbage collection service.
//
// architecture: Chore
type Service struct {
log *zap.Logger
config Config
Loop *sync2.Cycle
dialer rpc.Dialer
overlay overlay.DB
segmentLoop *segmentloop.Service
}
// RetainInfo contains info needed for a storage node to retain important data and delete garbage data.
type RetainInfo struct {
Filter *bloomfilter.Filter
CreationDate time.Time
Count int
}
// NewService creates a new instance of the gc service.
func NewService(log *zap.Logger, config Config, dialer rpc.Dialer, overlay overlay.DB, loop *segmentloop.Service) *Service {
return &Service{
log: log,
config: config,
Loop: sync2.NewCycle(config.Interval),
dialer: dialer,
overlay: overlay,
segmentLoop: loop,
}
}
// Run starts the gc loop service.
func (service *Service) Run(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
if !service.config.Enabled {
return nil
}
// load last piece counts from overlay db
lastPieceCounts, err := service.overlay.AllPieceCounts(ctx)
if err != nil {
service.log.Error("error getting last piece counts", zap.Error(err))
}
if lastPieceCounts == nil {
lastPieceCounts = make(map[storj.NodeID]int)
}
return service.Loop.Run(ctx, func(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
pieceTracker := NewPieceTracker(service.log.Named("gc observer"), service.config, lastPieceCounts)
// collect things to retain
err = service.segmentLoop.Join(ctx, pieceTracker)
if err != nil {
service.log.Error("error joining metainfoloop", zap.Error(err))
return nil
}
// save piece counts in memory for next iteration
for id := range lastPieceCounts {
delete(lastPieceCounts, id)
}
for id, info := range pieceTracker.RetainInfos {
lastPieceCounts[id] = info.Count
}
// save piece counts to db for next satellite restart
err = service.overlay.UpdatePieceCounts(ctx, lastPieceCounts)
if err != nil {
service.log.Error("error updating piece counts", zap.Error(err))
}
// monitor information
for _, info := range pieceTracker.RetainInfos {
mon.IntVal("node_piece_count").Observe(int64(info.Count))
mon.IntVal("retain_filter_size_bytes").Observe(info.Filter.Size())
}
// send retain requests
limiter := sync2.NewLimiter(service.config.ConcurrentSends)
for id, info := range pieceTracker.RetainInfos {
id, info := id, info
limiter.Go(ctx, func() {
err := service.sendRetainRequest(ctx, id, info)
if err != nil {
service.log.Warn("error sending retain info to node", zap.Stringer("Node ID", id), zap.Error(err))
}
})
}
limiter.Wait()
return nil
})
}
func (service *Service) sendRetainRequest(ctx context.Context, id storj.NodeID, info *RetainInfo) (err error) {
defer mon.Task()(&ctx, id.String())(&err)
dossier, err := service.overlay.Get(ctx, id)
if err != nil {
return Error.Wrap(err)
}
if service.config.RetainSendTimeout > 0 {
var cancel func()
ctx, cancel = context.WithTimeout(ctx, service.config.RetainSendTimeout)
defer cancel()
}
nodeurl := storj.NodeURL{
ID: id,
Address: dossier.Address.Address,
}
client, err := piecestore.Dial(ctx, service.dialer, nodeurl, piecestore.DefaultConfig)
if err != nil {
return Error.Wrap(err)
}
defer func() {
err = errs.Combine(err, Error.Wrap(client.Close()))
}()
err = client.Retain(ctx, &pb.RetainRequest{
CreationDate: info.CreationDate,
Filter: info.Filter.Bytes(),
})
return Error.Wrap(err)
}

View File

@ -125,8 +125,6 @@ func TestExpiredDeletionForCopiedObject(t *testing.T) {
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
gcService := satellite.GarbageCollection.Service
gcService.Loop.Pause()
upl := planet.Uplinks[0]
expiredChore := satellite.Core.ExpiredDeletion.Chore

View File

@ -37,8 +37,8 @@ import (
"storj.io/storj/satellite/console/emailreminders"
"storj.io/storj/satellite/console/restkeys"
"storj.io/storj/satellite/contact"
"storj.io/storj/satellite/gc"
"storj.io/storj/satellite/gc/bloomfilter"
"storj.io/storj/satellite/gc/sender"
"storj.io/storj/satellite/gracefulexit"
"storj.io/storj/satellite/mailservice"
"storj.io/storj/satellite/mailservice/simulate"
@ -149,7 +149,7 @@ type Config struct {
Repairer repairer.Config
Audit audit.Config
GarbageCollection gc.Config
GarbageCollection sender.Config
GarbageCollectionBF bloomfilter.Config
ExpiredDeletion expireddeletion.Config

View File

@ -403,20 +403,23 @@ contact.external-address: ""
# how many bloom filters will be packed in a single zip
# garbage-collection-bf.zip-batch-size: 500
# the number of nodes to concurrently send garbage collection bloom filters to
# Access to download the bloom filters. Needs read and write permission.
# garbage-collection.access-grant: ""
# bucket where retain info is stored
# garbage-collection.bucket: ""
# the number of nodes to concurrently send garbage collection retain filters to
# garbage-collection.concurrent-sends: 1
# set if garbage collection is enabled or not
# garbage-collection.enabled: true
# set if loop to send garbage collection retain filters is enabled
# garbage-collection.enabled: false
# the false positive rate used for creating a garbage collection bloom filter
# garbage-collection.false-positive-rate: 0.1
# Expiration of newly created objects. These objects store error messages.
# garbage-collection.expire-in: 336h0m0s
# the initial number of pieces expected for a storage node to have, used for creating a filter
# garbage-collection.initial-pieces: 400000
# the time between each send of garbage collection filters to storage nodes
# garbage-collection.interval: 120h0m0s
# the time between each attempt to download and send garbage collection retain filters to storage nodes
# garbage-collection.interval: 48h0m0s
# the amount of time to allow a node to handle a retain request
# garbage-collection.retain-send-timeout: 1m0s

View File

@ -440,3 +440,8 @@ func (s *Service) trash(ctx context.Context, satelliteID storj.NodeID, pieceID s
defer mon.Task()(&ctx, satelliteID)(&err)
return s.store.Trash(ctx, satelliteID, pieceID)
}
// HowManyQueued peeks at the number of bloom filters queued.
func (s *Service) HowManyQueued() int {
return len(s.queued)
}