segment-reaper: delete command logic (#3660)
This commit is contained in:
parent
c400471bbc
commit
b17d40ffd4
@ -4,18 +4,28 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/base64"
|
||||
"encoding/csv"
|
||||
"io"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/zeebo/errs"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"storj.io/storj/pkg/pb"
|
||||
"storj.io/storj/pkg/process"
|
||||
"storj.io/storj/pkg/storj"
|
||||
"storj.io/storj/satellite/metainfo"
|
||||
"storj.io/storj/storage"
|
||||
)
|
||||
|
||||
var (
|
||||
errKnown = errs.Class("known delete error")
|
||||
|
||||
deleteCmd = &cobra.Command{
|
||||
Use: "delete input_file.csv [flags]",
|
||||
Short: "Deletes zombie segments from DB",
|
||||
@ -36,8 +46,10 @@ func init() {
|
||||
}
|
||||
|
||||
func cmdDelete(cmd *cobra.Command, args []string) (err error) {
|
||||
ctx, _ := process.Ctx(cmd)
|
||||
|
||||
log := zap.L()
|
||||
db, err := metainfo.NewStore(log.Named("pointerdb"), detectCfg.DatabaseURL)
|
||||
db, err := metainfo.NewStore(log.Named("pointerdb"), deleteCfg.DatabaseURL)
|
||||
if err != nil {
|
||||
return errs.New("error connecting database: %+v", err)
|
||||
}
|
||||
@ -53,9 +65,94 @@ func cmdDelete(cmd *cobra.Command, args []string) (err error) {
|
||||
err = errs.Combine(err, inputFile.Close())
|
||||
}()
|
||||
|
||||
_ = csv.NewReader(inputFile)
|
||||
csvReader := csv.NewReader(inputFile)
|
||||
csvReader.FieldsPerRecord = 5
|
||||
csvReader.ReuseRecord = true
|
||||
|
||||
// TODO logice will be added in next PR
|
||||
segmentsDeleted := 0
|
||||
segmentsErrored := 0
|
||||
segmentsSkipped := 0
|
||||
for {
|
||||
record, err := csvReader.Read()
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
if err != nil {
|
||||
log.Error("error while reading record", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
|
||||
projectID := record[0]
|
||||
segmentIndex := record[1]
|
||||
bucketName := record[2]
|
||||
encodedPath := record[3]
|
||||
creationDateFromReport, err := time.Parse(time.RFC3339Nano, record[4])
|
||||
if err != nil {
|
||||
log.Error("error while parsing date", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
|
||||
encryptedPath, err := base64.StdEncoding.DecodeString(encodedPath)
|
||||
if err != nil {
|
||||
log.Error("error while decoding encrypted path", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
|
||||
path := storj.JoinPaths(projectID, segmentIndex, bucketName, string(encryptedPath))
|
||||
rawPath := storj.JoinPaths(projectID, segmentIndex, bucketName, encodedPath)
|
||||
|
||||
err = deleteSegment(ctx, db, path, creationDateFromReport, deleteCfg.DryRun)
|
||||
if err != nil {
|
||||
if errKnown.Has(err) {
|
||||
segmentsSkipped++
|
||||
} else {
|
||||
segmentsErrored++
|
||||
}
|
||||
log.Error("error while deleting segment", zap.String("path", rawPath), zap.Error(err))
|
||||
continue
|
||||
}
|
||||
|
||||
log.Debug("segment deleted", zap.String("path", rawPath))
|
||||
segmentsDeleted++
|
||||
}
|
||||
|
||||
log.Info("summary", zap.Int("deleted", segmentsDeleted), zap.Int("skipped", segmentsSkipped), zap.Int("errored", segmentsSkipped))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func deleteSegment(ctx context.Context, db metainfo.PointerDB, path string, creationDate time.Time, dryRun bool) error {
|
||||
pointerBytes, err := db.Get(ctx, []byte(path))
|
||||
if err != nil {
|
||||
if storage.ErrKeyNotFound.Has(err) {
|
||||
return errKnown.New("segment already deleted by user: %+v", err)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
pointer := &pb.Pointer{}
|
||||
err = proto.Unmarshal(pointerBytes, pointer)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// check if pointer has been replaced
|
||||
if !pointer.GetCreationDate().Equal(creationDate) {
|
||||
// pointer has been replaced since detection, do not delete it.
|
||||
return errKnown.New("segment won't be deleted, create date mismatch: %s -> %s", pointer.GetCreationDate(), creationDate)
|
||||
}
|
||||
|
||||
if !dryRun {
|
||||
// delete the pointer using compare-and-swap
|
||||
err = db.CompareAndSwap(ctx, []byte(path), pointerBytes, nil)
|
||||
if err != nil {
|
||||
if storage.ErrValueChanged.Has(err) {
|
||||
// race detected while deleting the pointer, do not try deleting it again.
|
||||
return errKnown.New("segment won't be deleted, race detected while deleting the pointer: %+v", err)
|
||||
}
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
90
cmd/segment-reaper/delete_test.go
Normal file
90
cmd/segment-reaper/delete_test.go
Normal file
@ -0,0 +1,90 @@
|
||||
// Copyright (C) 2019 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"storj.io/storj/pkg/pb"
|
||||
"storj.io/storj/private/testcontext"
|
||||
"storj.io/storj/satellite/metainfo"
|
||||
"storj.io/storj/storage"
|
||||
"storj.io/storj/storage/teststore"
|
||||
)
|
||||
|
||||
func TestDeleteSegment(t *testing.T) {
|
||||
ctx := testcontext.New(t)
|
||||
defer ctx.Cleanup()
|
||||
|
||||
db := teststore.New()
|
||||
defer ctx.Check(db.Close)
|
||||
|
||||
t.Run("segment is deleted", func(t *testing.T) {
|
||||
_, err := makeSegment(ctx, db, "path1", time.Unix(10, 0))
|
||||
require.NoError(t, err)
|
||||
|
||||
dryRun := false
|
||||
deleteError := deleteSegment(ctx, db, "path1", time.Unix(10, 0), dryRun)
|
||||
require.NoError(t, deleteError)
|
||||
_, err = db.Get(ctx, storage.Key("path1"))
|
||||
require.Error(t, err)
|
||||
require.True(t, storage.ErrKeyNotFound.Has(err))
|
||||
})
|
||||
t.Run("segment is not deleted because of dryRun", func(t *testing.T) {
|
||||
expectedPointer, err := makeSegment(ctx, db, "path2", time.Unix(10, 0))
|
||||
require.NoError(t, err)
|
||||
|
||||
dryRun := true
|
||||
deleteError := deleteSegment(ctx, db, "path2", time.Unix(10, 0), dryRun)
|
||||
require.NoError(t, deleteError)
|
||||
pointer, err := db.Get(ctx, storage.Key("path2"))
|
||||
require.NoError(t, err)
|
||||
pointerBytes, err := pointer.MarshalBinary()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, expectedPointer, pointerBytes)
|
||||
})
|
||||
t.Run("segment is not deleted because of time mismatch", func(t *testing.T) {
|
||||
expectedPointer, err := makeSegment(ctx, db, "path3", time.Unix(10, 0))
|
||||
require.NoError(t, err)
|
||||
|
||||
dryRun := false
|
||||
deleteError := deleteSegment(ctx, db, "path3", time.Unix(99, 0), dryRun)
|
||||
require.Error(t, deleteError)
|
||||
require.True(t, errKnown.Has(deleteError))
|
||||
pointer, err := db.Get(ctx, storage.Key("path3"))
|
||||
require.NoError(t, err)
|
||||
pointerBytes, err := pointer.MarshalBinary()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, expectedPointer, pointerBytes)
|
||||
})
|
||||
t.Run("segment is not deleted because not exists", func(t *testing.T) {
|
||||
dryRun := false
|
||||
deleteError := deleteSegment(ctx, db, "not-existing-path", time.Unix(10, 0), dryRun)
|
||||
require.Error(t, deleteError)
|
||||
require.True(t, errKnown.Has(deleteError))
|
||||
})
|
||||
}
|
||||
|
||||
func makeSegment(ctx context.Context, db metainfo.PointerDB, path string, creationDate time.Time) (pointerBytes []byte, err error) {
|
||||
pointer := &pb.Pointer{
|
||||
CreationDate: creationDate,
|
||||
}
|
||||
|
||||
pointerBytes, err = proto.Marshal(pointer)
|
||||
if err != nil {
|
||||
return []byte{}, err
|
||||
}
|
||||
|
||||
err = db.Put(ctx, storage.Key(path), storage.Value(pointerBytes))
|
||||
if err != nil {
|
||||
return []byte{}, err
|
||||
}
|
||||
|
||||
return pointerBytes, nil
|
||||
}
|
Loading…
Reference in New Issue
Block a user