diff --git a/cmd/segment-reaper/delete.go b/cmd/segment-reaper/delete.go index cd1e143b3..2b869d19b 100644 --- a/cmd/segment-reaper/delete.go +++ b/cmd/segment-reaper/delete.go @@ -4,18 +4,28 @@ package main import ( + "context" + "encoding/base64" "encoding/csv" + "io" "os" + "time" + "github.com/gogo/protobuf/proto" "github.com/spf13/cobra" "github.com/zeebo/errs" "go.uber.org/zap" + "storj.io/storj/pkg/pb" "storj.io/storj/pkg/process" + "storj.io/storj/pkg/storj" "storj.io/storj/satellite/metainfo" + "storj.io/storj/storage" ) var ( + errKnown = errs.Class("known delete error") + deleteCmd = &cobra.Command{ Use: "delete input_file.csv [flags]", Short: "Deletes zombie segments from DB", @@ -36,8 +46,10 @@ func init() { } func cmdDelete(cmd *cobra.Command, args []string) (err error) { + ctx, _ := process.Ctx(cmd) + log := zap.L() - db, err := metainfo.NewStore(log.Named("pointerdb"), detectCfg.DatabaseURL) + db, err := metainfo.NewStore(log.Named("pointerdb"), deleteCfg.DatabaseURL) if err != nil { return errs.New("error connecting database: %+v", err) } @@ -53,9 +65,94 @@ func cmdDelete(cmd *cobra.Command, args []string) (err error) { err = errs.Combine(err, inputFile.Close()) }() - _ = csv.NewReader(inputFile) + csvReader := csv.NewReader(inputFile) + csvReader.FieldsPerRecord = 5 + csvReader.ReuseRecord = true - // TODO logice will be added in next PR + segmentsDeleted := 0 + segmentsErrored := 0 + segmentsSkipped := 0 + for { + record, err := csvReader.Read() + if err == io.EOF { + break + } + if err != nil { + log.Error("error while reading record", zap.Error(err)) + continue + } + + projectID := record[0] + segmentIndex := record[1] + bucketName := record[2] + encodedPath := record[3] + creationDateFromReport, err := time.Parse(time.RFC3339Nano, record[4]) + if err != nil { + log.Error("error while parsing date", zap.Error(err)) + continue + } + + encryptedPath, err := base64.StdEncoding.DecodeString(encodedPath) + if err != nil { + log.Error("error while decoding encrypted path", zap.Error(err)) + continue + } + + path := storj.JoinPaths(projectID, segmentIndex, bucketName, string(encryptedPath)) + rawPath := storj.JoinPaths(projectID, segmentIndex, bucketName, encodedPath) + + err = deleteSegment(ctx, db, path, creationDateFromReport, deleteCfg.DryRun) + if err != nil { + if errKnown.Has(err) { + segmentsSkipped++ + } else { + segmentsErrored++ + } + log.Error("error while deleting segment", zap.String("path", rawPath), zap.Error(err)) + continue + } + + log.Debug("segment deleted", zap.String("path", rawPath)) + segmentsDeleted++ + } + + log.Info("summary", zap.Int("deleted", segmentsDeleted), zap.Int("skipped", segmentsSkipped), zap.Int("errored", segmentsSkipped)) + + return nil +} + +func deleteSegment(ctx context.Context, db metainfo.PointerDB, path string, creationDate time.Time, dryRun bool) error { + pointerBytes, err := db.Get(ctx, []byte(path)) + if err != nil { + if storage.ErrKeyNotFound.Has(err) { + return errKnown.New("segment already deleted by user: %+v", err) + } + return err + } + + pointer := &pb.Pointer{} + err = proto.Unmarshal(pointerBytes, pointer) + if err != nil { + return err + } + + // check if pointer has been replaced + if !pointer.GetCreationDate().Equal(creationDate) { + // pointer has been replaced since detection, do not delete it. + return errKnown.New("segment won't be deleted, create date mismatch: %s -> %s", pointer.GetCreationDate(), creationDate) + } + + if !dryRun { + // delete the pointer using compare-and-swap + err = db.CompareAndSwap(ctx, []byte(path), pointerBytes, nil) + if err != nil { + if storage.ErrValueChanged.Has(err) { + // race detected while deleting the pointer, do not try deleting it again. + return errKnown.New("segment won't be deleted, race detected while deleting the pointer: %+v", err) + } + return err + } + } return nil } diff --git a/cmd/segment-reaper/delete_test.go b/cmd/segment-reaper/delete_test.go new file mode 100644 index 000000000..a33d370d1 --- /dev/null +++ b/cmd/segment-reaper/delete_test.go @@ -0,0 +1,90 @@ +// Copyright (C) 2019 Storj Labs, Inc. +// See LICENSE for copying information. + +package main + +import ( + "context" + "testing" + "time" + + "github.com/gogo/protobuf/proto" + "github.com/stretchr/testify/require" + + "storj.io/storj/pkg/pb" + "storj.io/storj/private/testcontext" + "storj.io/storj/satellite/metainfo" + "storj.io/storj/storage" + "storj.io/storj/storage/teststore" +) + +func TestDeleteSegment(t *testing.T) { + ctx := testcontext.New(t) + defer ctx.Cleanup() + + db := teststore.New() + defer ctx.Check(db.Close) + + t.Run("segment is deleted", func(t *testing.T) { + _, err := makeSegment(ctx, db, "path1", time.Unix(10, 0)) + require.NoError(t, err) + + dryRun := false + deleteError := deleteSegment(ctx, db, "path1", time.Unix(10, 0), dryRun) + require.NoError(t, deleteError) + _, err = db.Get(ctx, storage.Key("path1")) + require.Error(t, err) + require.True(t, storage.ErrKeyNotFound.Has(err)) + }) + t.Run("segment is not deleted because of dryRun", func(t *testing.T) { + expectedPointer, err := makeSegment(ctx, db, "path2", time.Unix(10, 0)) + require.NoError(t, err) + + dryRun := true + deleteError := deleteSegment(ctx, db, "path2", time.Unix(10, 0), dryRun) + require.NoError(t, deleteError) + pointer, err := db.Get(ctx, storage.Key("path2")) + require.NoError(t, err) + pointerBytes, err := pointer.MarshalBinary() + require.NoError(t, err) + require.Equal(t, expectedPointer, pointerBytes) + }) + t.Run("segment is not deleted because of time mismatch", func(t *testing.T) { + expectedPointer, err := makeSegment(ctx, db, "path3", time.Unix(10, 0)) + require.NoError(t, err) + + dryRun := false + deleteError := deleteSegment(ctx, db, "path3", time.Unix(99, 0), dryRun) + require.Error(t, deleteError) + require.True(t, errKnown.Has(deleteError)) + pointer, err := db.Get(ctx, storage.Key("path3")) + require.NoError(t, err) + pointerBytes, err := pointer.MarshalBinary() + require.NoError(t, err) + require.Equal(t, expectedPointer, pointerBytes) + }) + t.Run("segment is not deleted because not exists", func(t *testing.T) { + dryRun := false + deleteError := deleteSegment(ctx, db, "not-existing-path", time.Unix(10, 0), dryRun) + require.Error(t, deleteError) + require.True(t, errKnown.Has(deleteError)) + }) +} + +func makeSegment(ctx context.Context, db metainfo.PointerDB, path string, creationDate time.Time) (pointerBytes []byte, err error) { + pointer := &pb.Pointer{ + CreationDate: creationDate, + } + + pointerBytes, err = proto.Marshal(pointer) + if err != nil { + return []byte{}, err + } + + err = db.Put(ctx, storage.Key(path), storage.Value(pointerBytes)) + if err != nil { + return []byte{}, err + } + + return pointerBytes, nil +}