satellite/gc: Upload bloomfilters with prefix and update LATEST when complete
Change he bloomfilter generation process to prefix the objects with a date and update the LATEST object with the prefix. The sender will read the LATEST file to get the prefix to process. Change-Id: Iae0d3c49015d57f391d87789fb799a7d774066bf
This commit is contained in:
parent
4efde65c9e
commit
9a09d8920e
@ -22,6 +22,9 @@ import (
|
||||
"storj.io/uplink"
|
||||
)
|
||||
|
||||
// LATEST is the name of the file that contains the most recently completed bloomfilter generation prefix.
|
||||
const LATEST = "LATEST"
|
||||
|
||||
var mon = monkit.Package()
|
||||
|
||||
// Config contains configurable values for garbage collection.
|
||||
@ -126,6 +129,8 @@ func (service *Service) uploadBloomFilters(ctx context.Context, latestCreationDa
|
||||
return nil
|
||||
}
|
||||
|
||||
prefix := time.Now().Format(time.RFC3339)
|
||||
|
||||
expirationTime := time.Now().Add(service.config.ExpireIn)
|
||||
|
||||
accessGrant, err := uplink.ParseAccess(service.config.AccessGrant)
|
||||
@ -141,7 +146,7 @@ func (service *Service) uploadBloomFilters(ctx context.Context, latestCreationDa
|
||||
// do cleanup in case of any error while uploading bloom filters
|
||||
if err != nil {
|
||||
// TODO should we drop whole bucket if cleanup will fail
|
||||
err = errs.Combine(err, service.cleanup(ctx, project))
|
||||
err = errs.Combine(err, service.cleanup(ctx, project, prefix))
|
||||
}
|
||||
err = errs.Combine(err, project.Close())
|
||||
}()
|
||||
@ -152,7 +157,10 @@ func (service *Service) uploadBloomFilters(ctx context.Context, latestCreationDa
|
||||
}
|
||||
|
||||
// TODO move it before segment loop is started
|
||||
iterator := project.ListObjects(ctx, service.config.Bucket, nil)
|
||||
o := uplink.ListObjectsOptions{
|
||||
Prefix: prefix + "/",
|
||||
}
|
||||
iterator := project.ListObjects(ctx, service.config.Bucket, &o)
|
||||
for iterator.Next() {
|
||||
if iterator.Item().IsPrefix {
|
||||
continue
|
||||
@ -175,7 +183,7 @@ func (service *Service) uploadBloomFilters(ctx context.Context, latestCreationDa
|
||||
})
|
||||
|
||||
if len(infos) == service.config.ZipBatchSize {
|
||||
err = service.uploadPack(ctx, project, batchNumber, expirationTime, infos)
|
||||
err = service.uploadPack(ctx, project, prefix, batchNumber, expirationTime, infos)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -186,22 +194,32 @@ func (service *Service) uploadBloomFilters(ctx context.Context, latestCreationDa
|
||||
}
|
||||
|
||||
// upload rest of infos if any
|
||||
if err := service.uploadPack(ctx, project, batchNumber, expirationTime, infos); err != nil {
|
||||
if err := service.uploadPack(ctx, project, prefix, batchNumber, expirationTime, infos); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
// update LATEST file
|
||||
upload, err := project.UploadObject(ctx, service.config.Bucket, LATEST, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = upload.Write([]byte(prefix))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return upload.Commit()
|
||||
}
|
||||
|
||||
// uploadPack uploads single zip pack with multiple bloom filters.
|
||||
func (service *Service) uploadPack(ctx context.Context, project *uplink.Project, batchNumber int, expirationTime time.Time, infos []internalpb.RetainInfo) (err error) {
|
||||
func (service *Service) uploadPack(ctx context.Context, project *uplink.Project, prefix string, batchNumber int, expirationTime time.Time, infos []internalpb.RetainInfo) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
if len(infos) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
upload, err := project.UploadObject(ctx, service.config.Bucket, "bloomfilters-"+strconv.Itoa(batchNumber)+".zip", &uplink.UploadOptions{
|
||||
upload, err := project.UploadObject(ctx, service.config.Bucket, prefix+"/bloomfilters-"+strconv.Itoa(batchNumber)+".zip", &uplink.UploadOptions{
|
||||
Expires: expirationTime,
|
||||
})
|
||||
if err != nil {
|
||||
@ -243,11 +261,14 @@ func (service *Service) uploadPack(ctx context.Context, project *uplink.Project,
|
||||
|
||||
// cleanup moves all objects from root location to unique prefix. Objects will be deleted
|
||||
// automatically when expires.
|
||||
func (service *Service) cleanup(ctx context.Context, project *uplink.Project) (err error) {
|
||||
func (service *Service) cleanup(ctx context.Context, project *uplink.Project, prefix string) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
prefix := "upload-error-" + time.Now().Format(time.RFC3339)
|
||||
iterator := project.ListObjects(ctx, service.config.Bucket, nil)
|
||||
errPrefix := "upload-error-" + time.Now().Format(time.RFC3339)
|
||||
o := uplink.ListObjectsOptions{
|
||||
Prefix: prefix + "/",
|
||||
}
|
||||
iterator := project.ListObjects(ctx, service.config.Bucket, &o)
|
||||
|
||||
for iterator.Next() {
|
||||
item := iterator.Item()
|
||||
@ -255,7 +276,7 @@ func (service *Service) cleanup(ctx context.Context, project *uplink.Project) (e
|
||||
continue
|
||||
}
|
||||
|
||||
err := project.MoveObject(ctx, service.config.Bucket, item.Key, service.config.Bucket, prefix+"/"+item.Key, nil)
|
||||
err := project.MoveObject(ctx, service.config.Bucket, item.Key, service.config.Bucket, prefix+"/"+errPrefix+"/"+item.Key, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -23,6 +23,7 @@ import (
|
||||
"storj.io/storj/satellite"
|
||||
"storj.io/storj/satellite/gc/bloomfilter"
|
||||
"storj.io/storj/satellite/internalpb"
|
||||
"storj.io/uplink"
|
||||
)
|
||||
|
||||
func TestGarbageCollectionBloomFilters(t *testing.T) {
|
||||
@ -73,7 +74,19 @@ func TestGarbageCollectionBloomFilters(t *testing.T) {
|
||||
err = service.RunOnce(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
iterator := project.ListObjects(ctx, tc.Bucket, nil)
|
||||
download, err := project.DownloadObject(ctx, tc.Bucket, bloomfilter.LATEST, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
value, err := io.ReadAll(download)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = download.Close()
|
||||
require.NoError(t, err)
|
||||
|
||||
prefix := string(value)
|
||||
iterator := project.ListObjects(ctx, tc.Bucket, &uplink.ListObjectsOptions{
|
||||
Prefix: prefix + "/",
|
||||
})
|
||||
|
||||
count := 0
|
||||
nodeIds := []string{}
|
||||
@ -113,7 +126,7 @@ func TestGarbageCollectionBloomFilters(t *testing.T) {
|
||||
|
||||
expectedPackNames := []string{}
|
||||
for i := 0; i < tc.ExpectedPacks; i++ {
|
||||
expectedPackNames = append(expectedPackNames, "bloomfilters-"+strconv.Itoa(i)+".zip")
|
||||
expectedPackNames = append(expectedPackNames, prefix+"/bloomfilters-"+strconv.Itoa(i)+".zip")
|
||||
}
|
||||
sort.Strings(expectedPackNames)
|
||||
sort.Strings(packNames)
|
||||
@ -130,7 +143,7 @@ func TestGarbageCollectionBloomFilters(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestGarbageCollectionBloomFilters_NotEmptyBucket(t *testing.T) {
|
||||
func TestGarbageCollectionBloomFilters_AllowNotEmptyBucket(t *testing.T) {
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1,
|
||||
StorageNodeCount: 4,
|
||||
@ -165,12 +178,16 @@ func TestGarbageCollectionBloomFilters_NotEmptyBucket(t *testing.T) {
|
||||
err = service.RunOnce(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
// check that there are 2 objects and the names match
|
||||
iterator := project.ListObjects(ctx, "bloomfilters", nil)
|
||||
|
||||
// no new uploads, bucket structure was not changed
|
||||
require.True(t, iterator.Next())
|
||||
require.Equal(t, "some object", iterator.Item().Key)
|
||||
require.False(t, iterator.Next())
|
||||
require.NoError(t, iterator.Err())
|
||||
keys := []string{}
|
||||
for iterator.Next() {
|
||||
if !iterator.Item().IsPrefix {
|
||||
keys = append(keys, iterator.Item().Key)
|
||||
}
|
||||
}
|
||||
require.Len(t, keys, 2)
|
||||
require.Contains(t, keys, "some object")
|
||||
require.Contains(t, keys, bloomfilter.LATEST)
|
||||
})
|
||||
}
|
||||
|
@ -23,14 +23,15 @@ func IterateZipObjectKeys(
|
||||
ctx context.Context,
|
||||
project uplink.Project,
|
||||
bucket string,
|
||||
prefix string,
|
||||
fn func(objectKey string) error,
|
||||
) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
objects := project.ListObjects(ctx, bucket, &uplink.ListObjectsOptions{
|
||||
System: true,
|
||||
// the previously read archives are stored under prefixes, we want to skip those
|
||||
System: true,
|
||||
Recursive: false,
|
||||
Prefix: prefix,
|
||||
})
|
||||
|
||||
for objects.Next() {
|
||||
|
@ -6,6 +6,8 @@ package sender
|
||||
import (
|
||||
"archive/zip"
|
||||
"context"
|
||||
"errors"
|
||||
"io"
|
||||
"time"
|
||||
|
||||
"github.com/spacemonkeygo/monkit/v3"
|
||||
@ -16,6 +18,7 @@ import (
|
||||
"storj.io/common/rpc"
|
||||
"storj.io/common/storj"
|
||||
"storj.io/common/sync2"
|
||||
"storj.io/storj/satellite/gc/bloomfilter"
|
||||
"storj.io/storj/satellite/internalpb"
|
||||
"storj.io/storj/satellite/overlay"
|
||||
"storj.io/uplink"
|
||||
@ -107,7 +110,26 @@ func (service *Service) RunOnce(ctx context.Context) (err error) {
|
||||
err = errs.Combine(err, project.Close())
|
||||
}()
|
||||
|
||||
return IterateZipObjectKeys(ctx, *project, service.Config.Bucket, func(objectKey string) error {
|
||||
download, err := project.DownloadObject(ctx, service.Config.Bucket, bloomfilter.LATEST, nil)
|
||||
if err != nil {
|
||||
if errors.Is(err, uplink.ErrObjectNotFound) {
|
||||
service.log.Info("LATEST file does not exist in bucket", zap.String("bucket", service.Config.Bucket))
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
defer func() {
|
||||
err = errs.Combine(err, download.Close())
|
||||
}()
|
||||
|
||||
value, err := io.ReadAll(download)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
prefix := string(value) + "/"
|
||||
|
||||
return IterateZipObjectKeys(ctx, *project, service.Config.Bucket, prefix, func(objectKey string) error {
|
||||
limiter := sync2.NewLimiter(service.Config.ConcurrentSends)
|
||||
err := IterateZipContent(ctx, *project, service.Config.Bucket, objectKey, func(zipEntry *zip.File) error {
|
||||
retainInfo, err := UnpackZipEntry(zipEntry)
|
||||
@ -172,7 +194,7 @@ func (service *Service) sendRetainRequest(ctx context.Context, retainInfo *inter
|
||||
func (service *Service) moveToErrorPrefix(
|
||||
ctx context.Context, project *uplink.Project, objectKey string, previousErr error, timeStamp time.Time,
|
||||
) error {
|
||||
newObjectKey := "error-" + timeStamp.Format(time.RFC3339) + "/" + objectKey
|
||||
newObjectKey := "error-" + objectKey
|
||||
|
||||
err := project.MoveObject(ctx, service.Config.Bucket, objectKey, service.Config.Bucket, newObjectKey, nil)
|
||||
if err != nil {
|
||||
@ -211,7 +233,7 @@ func (service *Service) uploadError(
|
||||
func (service *Service) moveToSentPrefix(
|
||||
ctx context.Context, project *uplink.Project, objectKey string, timeStamp time.Time,
|
||||
) error {
|
||||
newObjectKey := "sent-" + timeStamp.Format(time.RFC3339) + "/" + objectKey
|
||||
newObjectKey := "sent-" + objectKey
|
||||
|
||||
return project.MoveObject(ctx, service.Config.Bucket, objectKey, service.Config.Bucket, newObjectKey, nil)
|
||||
}
|
||||
|
@ -7,6 +7,7 @@ import (
|
||||
"io"
|
||||
"sort"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/zap/zaptest"
|
||||
@ -69,9 +70,19 @@ func TestSendRetainFilters(t *testing.T) {
|
||||
project, err := planet.Uplinks[0].OpenProject(ctx, planet.Satellites[1])
|
||||
require.NoError(t, err)
|
||||
|
||||
download, err := project.DownloadObject(ctx, gcsender.Config.Bucket, bloomfilter.LATEST, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
prefix, err := io.ReadAll(download)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = download.Close()
|
||||
require.NoError(t, err)
|
||||
|
||||
var keys []string
|
||||
it := project.ListObjects(ctx, gcsender.Config.Bucket, &uplink.ListObjectsOptions{
|
||||
Recursive: true,
|
||||
Prefix: "sent-" + string(prefix) + "/",
|
||||
})
|
||||
require.True(t, it.Next())
|
||||
keys = append(keys, it.Item().Key)
|
||||
@ -103,8 +114,13 @@ func TestSendInvalidZip(t *testing.T) {
|
||||
gcsender := planet.Satellites[0].GarbageCollection.Sender
|
||||
gcsender.Config.AccessGrant = accessString
|
||||
|
||||
// update LATEST file
|
||||
prefix := time.Now().Format(time.RFC3339)
|
||||
err = planet.Uplinks[0].Upload(ctx, planet.Satellites[1], gcsender.Config.Bucket, bloomfilter.LATEST, []byte(prefix))
|
||||
require.NoError(t, err)
|
||||
|
||||
// upload invalid zip file
|
||||
err = planet.Uplinks[0].Upload(ctx, planet.Satellites[1], gcsender.Config.Bucket, "wasd.zip", []byte("wasd"))
|
||||
err = planet.Uplinks[0].Upload(ctx, planet.Satellites[1], gcsender.Config.Bucket, prefix+"/wasd.zip", []byte("wasd"))
|
||||
require.NoError(t, err)
|
||||
|
||||
// send to storagenode
|
||||
@ -118,6 +134,7 @@ func TestSendInvalidZip(t *testing.T) {
|
||||
var keys []string
|
||||
it := project.ListObjects(ctx, gcsender.Config.Bucket, &uplink.ListObjectsOptions{
|
||||
Recursive: true,
|
||||
Prefix: "error-" + prefix + "/",
|
||||
})
|
||||
require.True(t, it.Next())
|
||||
keys = append(keys, it.Item().Key)
|
||||
|
Loading…
Reference in New Issue
Block a user