cmd: remove segment reaper

It was designed to detect and remove zombie segments in the PointerDB.
This tool should be not relevant with the MetabaseDB anymore.

Change-Id: I112552203b1329a5a659f69a0043eb1f8dadb551
This commit is contained in:
Kaloyan Raev 2020-12-11 16:12:42 +02:00
parent 7e6e0d3e2e
commit 2bb010e7c5
12 changed files with 13 additions and 1977 deletions

View File

@ -145,7 +145,7 @@ storagenode-console:
gofmt -w -s storagenode/console/consoleassets/bindata.resource.go
.PHONY: images
images: satellite-image segment-reaper-image storagenode-image uplink-image versioncontrol-image ## Build satellite, segment-reaper, storagenode, uplink, and versioncontrol Docker images
images: satellite-image storagenode-image uplink-image versioncontrol-image ## Build satellite, storagenode, uplink, and versioncontrol Docker images
echo Built version: ${TAG}
.PHONY: satellite-image
@ -159,17 +159,6 @@ satellite-image: satellite_linux_arm satellite_linux_arm64 satellite_linux_amd64
--build-arg=GOARCH=arm --build-arg=DOCKER_ARCH=aarch64 \
-f cmd/satellite/Dockerfile .
.PHONY: segment-reaper-image
segment-reaper-image: segment-reaper_linux_amd64 segment-reaper_linux_arm segment-reaper_linux_arm64 ## Build segment-reaper Docker image
${DOCKER_BUILD} --pull=true -t storjlabs/segment-reaper:${TAG}${CUSTOMTAG}-amd64 \
-f cmd/segment-reaper/Dockerfile .
${DOCKER_BUILD} --pull=true -t storjlabs/segment-reaper:${TAG}${CUSTOMTAG}-arm32v6 \
--build-arg=GOARCH=arm --build-arg=DOCKER_ARCH=arm32v6 \
-f cmd/segment-reaper/Dockerfile .
${DOCKER_BUILD} --pull=true -t storjlabs/segment-reaper:${TAG}${CUSTOMTAG}-aarch64 \
--build-arg=GOARCH=arm --build-arg=DOCKER_ARCH=aarch64 \
-f cmd/segment-reaper/Dockerfile .
.PHONY: storagenode-image
storagenode-image: storagenode_linux_arm storagenode_linux_arm64 storagenode_linux_amd64 ## Build storagenode Docker image
${DOCKER_BUILD} --pull=true -t storjlabs/storagenode:${TAG}${CUSTOMTAG}-amd64 \
@ -254,9 +243,6 @@ inspector_%:
.PHONY: satellite_%
satellite_%:
$(MAKE) binary-check COMPONENT=satellite GOARCH=$(word 3, $(subst _, ,$@)) GOOS=$(word 2, $(subst _, ,$@))
.PHONY: segment-reaper_%
segment-reaper_%:
$(MAKE) binary-check COMPONENT=segment-reaper GOARCH=$(word 3, $(subst _, ,$@)) GOOS=$(word 2, $(subst _, ,$@))
.PHONY: storagenode_%
storagenode_%: storagenode-console
$(MAKE) binary-check COMPONENT=storagenode GOARCH=$(word 3, $(subst _, ,$@)) GOOS=$(word 2, $(subst _, ,$@))
@ -287,7 +273,7 @@ sign-windows-installer:
push-images: ## Push Docker images to Docker Hub (jenkins)
# images have to be pushed before a manifest can be created
# satellite
for c in satellite segment-reaper storagenode uplink versioncontrol ; do \
for c in satellite storagenode uplink versioncontrol ; do \
docker push storjlabs/$$c:${TAG}${CUSTOMTAG}-amd64 \
&& docker push storjlabs/$$c:${TAG}${CUSTOMTAG}-arm32v6 \
&& docker push storjlabs/$$c:${TAG}${CUSTOMTAG}-aarch64 \
@ -326,7 +312,6 @@ clean-images:
-docker rmi storjlabs/storagenode:${TAG}${CUSTOMTAG}
-docker rmi storjlabs/uplink:${TAG}${CUSTOMTAG}
-docker rmi storjlabs/versioncontrol:${TAG}${CUSTOMTAG}
-docker rmi storjlabs/segment-reaper:${TAG}${CUSTOMTAG}
.PHONY: test-docker-clean
test-docker-clean: ## Clean up Docker environment used in test-docker target

View File

@ -1,9 +0,0 @@
ARG DOCKER_ARCH
FROM ${DOCKER_ARCH:-amd64}/alpine
ARG TAG
ARG GOARCH
ENV GOARCH ${GOARCH}
WORKDIR /app
COPY release/${TAG}/segment-reaper_linux_${GOARCH:-amd64} /app/segment-reaper
ENTRYPOINT ["/app/segment-reaper"]

View File

@ -1,108 +0,0 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package main
import (
"math/bits"
"github.com/zeebo/errs"
)
// errorBitArrayInvalidIdx is the error class to return invalid indexes for the
// the bitArray type.
var errorBitArrayInvalidIdx = errs.Class("invalid index")
// bitArray allows easy access to bit values by indices.
type bitArray []byte
// Set tracks index in mask. It returns an error if index is negative.
// Set will resize the array if you access an index larger than its Length.
func (bytes *bitArray) Set(index int) error {
bitIndex, byteIndex := index%8, index/8
switch {
case index < 0:
return errorBitArrayInvalidIdx.New("negative value (%d)", index)
case byteIndex >= len(*bytes):
sizeToGrow := byteIndex - len(*bytes) + 1
*bytes = append(*bytes, make([]byte, sizeToGrow)...)
}
mask := byte(1) << bitIndex
(*bytes)[byteIndex] |= mask
return nil
}
// Unset removes bit from index in mask. It returns an error if index is negative.
func (bytes *bitArray) Unset(index int) error {
bitIndex, byteIndex := index%8, index/8
switch {
case index < 0:
return errorBitArrayInvalidIdx.New("negative value (%d)", index)
case byteIndex >= len(*bytes):
return nil
}
mask := byte(1) << bitIndex
(*bytes)[byteIndex] &^= mask
return nil
}
// Has returns true if the index is tracked in mask otherwise false.
// It returns an error if index is negative.
func (bytes *bitArray) Has(index int) (bool, error) {
bitIndex, byteIndex := index%8, index/8
switch {
case index < 0:
return false, errorBitArrayInvalidIdx.New("negative value (%d)", index)
case byteIndex >= len(*bytes):
return false, nil
}
mask := byte(1) << bitIndex
result := (*bytes)[byteIndex] & mask
return result != 0, nil
}
// Count returns the number of bits which are set.
func (bytes *bitArray) Count() int {
count := 0
for x := 0; x < len(*bytes); x++ {
count += bits.OnesCount8((*bytes)[x])
}
return count
}
// IsSequence returns true if mask has only tracked a correlative sequence of
// indexes starting from index 0.
func (bytes *bitArray) IsSequence() bool {
// find the last byte of the sequence that contains some one
var i int
for i = len(*bytes) - 1; i >= 0; i-- {
zeros := bits.LeadingZeros8((*bytes)[i])
if zeros == 8 {
continue
}
ones := bits.OnesCount8((*bytes)[i])
if zeros+ones != 8 {
// zeros and ones in this byte aren't in sequence
return false
}
break
}
// The rest of the bytes of the sequence must only contains ones
i--
for ; i >= 0; i-- {
if (*bytes)[i] != 255 {
return false
}
}
return true
}
// Length returns the current size of the array in bits.
func (bytes *bitArray) Length() int {
return len(*bytes) * 8
}

View File

@ -1,324 +0,0 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package main
import (
"math"
"math/rand"
"sort"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestBitArray(t *testing.T) {
t.Run("Set", func(t *testing.T) {
t.Run("ok", func(t *testing.T) {
var (
expectedIdx = rand.Intn(64)
bits bitArray
)
err := bits.Set(expectedIdx)
require.NoError(t, err)
})
t.Run("error: negative index", func(t *testing.T) {
var (
invalidIdx = -(rand.Intn(math.MaxInt32-1) + 1)
bits bitArray
)
err := bits.Set(invalidIdx)
assert.Error(t, err)
assert.True(t, errorBitArrayInvalidIdx.Has(err), "errorBitArrayInvalidIdx class")
})
t.Run("error: index > 63", func(t *testing.T) {
var (
invalidIdx = rand.Intn(math.MaxInt16) + 64
bits bitArray
)
err := bits.Set(invalidIdx)
assert.NoError(t, err)
assert.False(t, errorBitArrayInvalidIdx.Has(err), "errorBitArrayInvalidIdx class")
})
})
t.Run("Has", func(t *testing.T) {
t.Run("ok", func(t *testing.T) {
var (
expectedIdx = rand.Intn(64)
bits bitArray
)
has, err := bits.Has(expectedIdx)
require.NoError(t, err)
assert.False(t, has)
})
t.Run("error: negative index", func(t *testing.T) {
var (
invalidIdx = -(rand.Intn(math.MaxInt32-1) + 1)
bits bitArray
)
_, err := bits.Has(invalidIdx)
assert.Error(t, err)
assert.True(t, errorBitArrayInvalidIdx.Has(err), "errorBitArrayInvalidIdx class")
})
})
t.Run("Set and Has", func(t *testing.T) {
t.Run("index not set", func(t *testing.T) {
var (
expectedIdx = rand.Intn(64)
bits bitArray
)
has, err := bits.Has(expectedIdx)
require.NoError(t, err, "Has")
assert.False(t, has, "expected tracked index")
})
t.Run("index is set", func(t *testing.T) {
var (
expectedIdx = rand.Intn(64)
bits bitArray
)
err := bits.Set(expectedIdx)
require.NoError(t, err, "Set")
has, err := bits.Has(expectedIdx)
require.NoError(t, err, "Has")
assert.True(t, has, "expected tracked index")
})
t.Run("same index is set more than once", func(t *testing.T) {
var (
expectedIdx = rand.Intn(63)
times = rand.Intn(10) + 2
bits bitArray
)
for i := 0; i < times; i++ {
err := bits.Set(expectedIdx)
require.NoError(t, err, "Set")
}
has, err := bits.Has(expectedIdx)
require.NoError(t, err, "Has")
assert.True(t, has, "expected tracked index")
// Another index isn't set
has, err = bits.Has(expectedIdx + 1)
require.NoError(t, err, "Has")
assert.False(t, has, "not expected tracked index")
})
t.Run("several indexes are set", func(t *testing.T) {
var (
numIndexes = rand.Intn(61) + 2
indexes = make([]int, numIndexes)
bits bitArray
)
for i := 0; i < numIndexes; i++ {
idx := rand.Intn(63)
indexes[i] = idx
err := bits.Set(idx)
require.NoError(t, err, "Set")
}
for _, idx := range indexes {
has, err := bits.Has(idx)
require.NoError(t, err, "Has")
assert.True(t, has, "expected tracked index")
}
})
})
t.Run("Count", func(t *testing.T) {
t.Run("when initialized", func(t *testing.T) {
var bits bitArray
numIndexes := bits.Count()
assert.Zero(t, numIndexes)
})
t.Run("when several indexes set", func(t *testing.T) {
var (
numSetCalls = rand.Intn(61) + 2
expectedNumIndexes = numSetCalls
bits bitArray
)
for i := 0; i < numSetCalls; i++ {
idx := rand.Intn(63)
ok, err := bits.Has(idx)
require.NoError(t, err, "Has")
if ok {
// idx was already set in previous iteration
expectedNumIndexes--
continue
}
err = bits.Set(idx)
require.NoError(t, err, "Set")
}
numIndexes := bits.Count()
assert.Equal(t, expectedNumIndexes, numIndexes)
})
})
t.Run("IsSequence", func(t *testing.T) {
t.Run("empty", func(t *testing.T) {
var bits bitArray
ok := bits.IsSequence()
assert.True(t, ok)
})
t.Run("no sequence", func(t *testing.T) {
var bits bitArray
for { // loop until getting a list of non-sequenced indexes
var (
numIndexes = rand.Intn(60) + 2
indexes = make([]int, numIndexes)
)
for i := 0; i < numIndexes; i++ {
idx := rand.Intn(63)
indexes[i] = idx
}
sort.Ints(indexes)
areSequenced := true
for i, idx := range indexes {
if i > 0 && (indexes[i-1]-1) < idx {
areSequenced = false
}
err := bits.Set(idx)
require.NoError(t, err, "Set")
}
if !areSequenced {
break
}
}
ok := bits.IsSequence()
assert.False(t, ok)
})
testCases := []struct {
name string
startIndex int
numIndexes int
isSequence bool
}{
{
name: "sequence starts at index 0",
startIndex: 0,
numIndexes: rand.Intn(5000) + 1,
isSequence: true,
},
{
name: "sequence starts at index 8 until index 15",
startIndex: 8,
numIndexes: 15,
isSequence: false,
},
{
name: "sequence starts at index 8 until index 16",
startIndex: 8,
numIndexes: 16,
isSequence: false,
},
{
name: "sequence starts at index 8 until index 17",
startIndex: 8,
numIndexes: 17,
isSequence: false,
},
{
name: "sequence starts at index 8 until index 23",
startIndex: 8,
numIndexes: 23,
isSequence: false,
},
{
name: "sequence starts at other index than 0",
startIndex: rand.Intn(1000) + 1,
numIndexes: rand.Intn(5000) + 1002,
isSequence: false,
},
}
for i := range testCases {
tc := testCases[i]
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
var bits bitArray
for i := tc.startIndex; i < tc.numIndexes; i++ {
err := bits.Set(i)
require.NoError(t, err, "Set")
}
require.Equalf(t, tc.isSequence, bits.IsSequence(),
"startIndex: %d, numIndexes: %d", tc.startIndex, tc.numIndexes,
)
})
}
})
t.Run("Unset", func(t *testing.T) {
t.Run("ok", func(t *testing.T) {
var (
expectedUnsetIdx = rand.Intn(32)
expectedSetIdx = rand.Intn(32) + 32
bits bitArray
)
err := bits.Set(expectedUnsetIdx)
require.NoError(t, err)
has, err := bits.Has(expectedUnsetIdx)
require.NoError(t, err)
require.True(t, has)
err = bits.Set(expectedSetIdx)
require.NoError(t, err)
err = bits.Unset(expectedUnsetIdx)
require.NoError(t, err)
has, err = bits.Has(expectedUnsetIdx)
require.NoError(t, err)
require.False(t, has)
has, err = bits.Has(expectedSetIdx)
require.NoError(t, err)
require.True(t, has)
})
t.Run("error: negative index", func(t *testing.T) {
var (
invalidIdx = -(rand.Intn(math.MaxInt32-1) + 1)
bits bitArray
)
err := bits.Unset(invalidIdx)
assert.Error(t, err)
assert.True(t, errorBitArrayInvalidIdx.Has(err), "errorBitArrayInvalidIdx class")
})
})
}

View File

@ -1,158 +0,0 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package main
import (
"context"
"encoding/base64"
"encoding/csv"
"errors"
"io"
"os"
"time"
"github.com/spf13/cobra"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/common/pb"
"storj.io/common/storj"
"storj.io/private/process"
"storj.io/storj/satellite/metainfo"
"storj.io/storj/storage"
)
var (
errKnown = errs.Class("known delete error")
deleteCmd = &cobra.Command{
Use: "delete input_file.csv [flags]",
Short: "Deletes zombie segments from DB",
Args: cobra.ExactArgs(1),
RunE: cmdDelete,
}
deleteCfg struct {
DatabaseURL string `help:"the database connection string to use" default:"postgres://"`
DryRun bool `help:"with this option no deletion will be done, only printing results" default:"false"`
}
)
func init() {
rootCmd.AddCommand(deleteCmd)
process.Bind(deleteCmd, &deleteCfg)
}
func cmdDelete(cmd *cobra.Command, args []string) (err error) {
ctx, _ := process.Ctx(cmd)
log := zap.L()
db, err := metainfo.OpenStore(ctx, log.Named("pointerdb"), deleteCfg.DatabaseURL, "satellite-reaper")
if err != nil {
return errs.New("error connecting database: %+v", err)
}
defer func() {
err = errs.Combine(err, db.Close())
}()
inputFile, err := os.Open(args[0])
if err != nil {
return errs.New("error opening input file: %+v", err)
}
defer func() {
err = errs.Combine(err, inputFile.Close())
}()
csvReader := csv.NewReader(inputFile)
csvReader.FieldsPerRecord = 6
csvReader.ReuseRecord = true
segmentsDeleted := 0
segmentsErrored := 0
segmentsSkipped := 0
for {
record, err := csvReader.Read()
if errors.Is(err, io.EOF) {
break
}
if err != nil {
log.Error("error while reading record", zap.Error(err))
continue
}
projectID := record[0]
segmentIndex := record[1]
bucketName := record[2]
encodedPath := record[3]
creationDateFromReport, err := time.Parse(time.RFC3339Nano, record[4])
if err != nil {
log.Error("error while parsing date", zap.Error(err))
continue
}
encryptedPath, err := base64.StdEncoding.DecodeString(encodedPath)
if err != nil {
log.Error("error while decoding encrypted path", zap.Error(err))
continue
}
path := storj.JoinPaths(projectID, segmentIndex, bucketName, string(encryptedPath))
rawPath := storj.JoinPaths(projectID, segmentIndex, bucketName, encodedPath)
err = deleteSegment(ctx, db, path, creationDateFromReport, deleteCfg.DryRun)
if err != nil {
if errKnown.Has(err) {
segmentsSkipped++
} else {
segmentsErrored++
}
log.Error("error while deleting segment", zap.String("path", rawPath), zap.Error(err))
continue
}
log.Debug("segment deleted", zap.String("path", rawPath))
segmentsDeleted++
}
log.Info("summary", zap.Int("deleted", segmentsDeleted), zap.Int("skipped", segmentsSkipped), zap.Int("errored", segmentsErrored))
return nil
}
func deleteSegment(ctx context.Context, db metainfo.PointerDB, path string, creationDate time.Time, dryRun bool) error {
pointerBytes, err := db.Get(ctx, []byte(path))
if err != nil {
if storage.ErrKeyNotFound.Has(err) {
return errKnown.New("segment already deleted by user: %+v", err)
}
return err
}
pointer := &pb.Pointer{}
err = pb.Unmarshal(pointerBytes, pointer)
if err != nil {
return err
}
// check if pointer has been replaced
if !pointer.GetCreationDate().Equal(creationDate) {
// pointer has been replaced since detection, do not delete it.
return errKnown.New("segment won't be deleted, create date mismatch: %s -> %s", pointer.GetCreationDate(), creationDate)
}
if !dryRun {
// delete the pointer using compare-and-swap
err = db.CompareAndSwap(ctx, []byte(path), pointerBytes, nil)
if err != nil {
if storage.ErrValueChanged.Has(err) {
// race detected while deleting the pointer, do not try deleting it again.
return errKnown.New("segment won't be deleted, race detected while deleting the pointer: %+v", err)
}
return err
}
}
return nil
}

View File

@ -1,89 +0,0 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package main
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/require"
"storj.io/common/pb"
"storj.io/common/testcontext"
"storj.io/storj/satellite/metainfo"
"storj.io/storj/storage"
"storj.io/storj/storage/teststore"
)
func TestDeleteSegment(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
db := teststore.New()
defer ctx.Check(db.Close)
t.Run("segment is deleted", func(t *testing.T) {
_, err := makeSegment(ctx, db, "path1", time.Unix(10, 0))
require.NoError(t, err)
dryRun := false
deleteError := deleteSegment(ctx, db, "path1", time.Unix(10, 0), dryRun)
require.NoError(t, deleteError)
_, err = db.Get(ctx, storage.Key("path1"))
require.Error(t, err)
require.True(t, storage.ErrKeyNotFound.Has(err))
})
t.Run("segment is not deleted because of dryRun", func(t *testing.T) {
expectedPointer, err := makeSegment(ctx, db, "path2", time.Unix(10, 0))
require.NoError(t, err)
dryRun := true
deleteError := deleteSegment(ctx, db, "path2", time.Unix(10, 0), dryRun)
require.NoError(t, deleteError)
pointer, err := db.Get(ctx, storage.Key("path2"))
require.NoError(t, err)
pointerBytes, err := pointer.MarshalBinary()
require.NoError(t, err)
require.Equal(t, expectedPointer, pointerBytes)
})
t.Run("segment is not deleted because of time mismatch", func(t *testing.T) {
expectedPointer, err := makeSegment(ctx, db, "path3", time.Unix(10, 0))
require.NoError(t, err)
dryRun := false
deleteError := deleteSegment(ctx, db, "path3", time.Unix(99, 0), dryRun)
require.Error(t, deleteError)
require.True(t, errKnown.Has(deleteError))
pointer, err := db.Get(ctx, storage.Key("path3"))
require.NoError(t, err)
pointerBytes, err := pointer.MarshalBinary()
require.NoError(t, err)
require.Equal(t, expectedPointer, pointerBytes)
})
t.Run("segment is not deleted because not exists", func(t *testing.T) {
dryRun := false
deleteError := deleteSegment(ctx, db, "not-existing-path", time.Unix(10, 0), dryRun)
require.Error(t, deleteError)
require.True(t, errKnown.Has(deleteError))
})
}
func makeSegment(ctx context.Context, db metainfo.PointerDB, path string, creationDate time.Time) (pointerBytes []byte, err error) {
pointer := &pb.Pointer{
CreationDate: creationDate,
}
pointerBytes, err = pb.Marshal(pointer)
if err != nil {
return []byte{}, err
}
err = db.Put(ctx, storage.Key(path), storage.Value(pointerBytes))
if err != nil {
return []byte{}, err
}
return pointerBytes, nil
}

View File

@ -1,109 +0,0 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package main
import (
"encoding/csv"
"os"
"time"
"github.com/spf13/cobra"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/private/cfgstruct"
"storj.io/private/process"
"storj.io/storj/satellite/metainfo"
)
var (
detectCmd = &cobra.Command{
Use: "detect",
Short: "Detects zombie segments in DB",
Args: cobra.OnlyValidArgs,
RunE: cmdDetect,
}
detectCfg struct {
DatabaseURL string `help:"the database connection string to use" default:"postgres://"`
From string `help:"begin of date range for detecting zombie segments (RFC3339)" default:""`
To string `help:"end of date range for detecting zombie segments (RFC3339)" default:""`
File string `help:"location of file with report" default:"zombie-segments.csv"`
}
)
func init() {
rootCmd.AddCommand(detectCmd)
defaults := cfgstruct.DefaultsFlag(rootCmd)
process.Bind(detectCmd, &detectCfg, defaults)
}
func cmdDetect(cmd *cobra.Command, args []string) (err error) {
ctx, _ := process.Ctx(cmd)
log := zap.L()
if err := process.InitMetricsWithHostname(ctx, log, nil); err != nil {
log.Warn("Failed to initialize telemetry batcher on segment reaper", zap.Error(err))
}
db, err := metainfo.OpenStore(ctx, log.Named("pointerdb"), detectCfg.DatabaseURL, "satellite-reaper")
if err != nil {
return errs.New("error connecting database: %+v", err)
}
defer func() {
err = errs.Combine(err, db.Close())
}()
file, err := os.Create(detectCfg.File)
if err != nil {
return errs.New("error creating result file: %+v", err)
}
defer func() {
err = errs.Combine(err, file.Close())
}()
writer := csv.NewWriter(file)
defer func() {
writer.Flush()
err = errs.Combine(err, writer.Error())
}()
var from, to *time.Time
if detectCfg.From != "" {
fromTime, err := time.Parse(time.RFC3339, detectCfg.From)
if err != nil {
return err
}
from = &fromTime
}
if detectCfg.To != "" {
toTime, err := time.Parse(time.RFC3339, detectCfg.To)
if err != nil {
return err
}
to = &toTime
}
observer, err := newObserver(db, writer, from, to)
if err != nil {
return err
}
err = observer.detectZombieSegments(ctx)
if err != nil {
return err
}
log.Info("number of inline segments", zap.Int("segments", observer.inlineSegments))
log.Info("number of last inline segments", zap.Int("segments", observer.lastInlineSegments))
log.Info("number of remote segments", zap.Int("segments", observer.remoteSegments))
log.Info("number of zombie segments", zap.Int("segments", observer.zombieSegments))
mon.IntVal("zombie_segments").Observe(int64(observer.zombieSegments)) //mon:locked
return process.Report(ctx)
}

View File

@ -1,25 +0,0 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package main
import (
"github.com/spacemonkeygo/monkit/v3"
"github.com/spf13/cobra"
"storj.io/private/process"
_ "storj.io/storj/private/version" // This attaches version information during release builds.
)
var (
mon = monkit.Package()
rootCmd = &cobra.Command{
Use: "segment-reaper",
Short: "A tool for detecting and deleting zombie segments",
}
)
func main() {
process.Exec(rootCmd)
}

View File

@ -1,370 +0,0 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package main
import (
"context"
"encoding/base64"
"encoding/csv"
"strconv"
"time"
"github.com/zeebo/errs"
"storj.io/common/pb"
"storj.io/common/uuid"
"storj.io/storj/satellite/metainfo"
"storj.io/storj/satellite/metainfo/metabase"
"storj.io/storj/storage"
)
const (
lastSegment = int(-1)
rateLimit = 0
)
// object represents object with segments.
type object struct {
segments bitArray
expectedNumberOfSegments int
hasLastSegment bool
// if skip is true then segments from this object shouldn't be treated as zombie segments
// and printed out, e.g. when one of segments is out of specified date rage
skip bool
}
// bucketsObjects keeps a list of objects associated with their path per bucket
// name.
type bucketsObjects map[string]map[metabase.ObjectKey]*object
func newObserver(db metainfo.PointerDB, w *csv.Writer, from, to *time.Time) (*observer, error) {
headers := []string{
"ProjectID",
"SegmentIndex",
"Bucket",
"EncodedEncryptedPath",
"CreationDate",
"Size",
}
err := w.Write(headers)
if err != nil {
return nil, err
}
return &observer{
db: db,
writer: w,
from: from,
to: to,
zombieBuffer: make([]int, 0),
objects: make(bucketsObjects),
}, nil
}
// observer metainfo.Loop observer for zombie reaper.
type observer struct {
db metainfo.PointerDB
writer *csv.Writer
from *time.Time
to *time.Time
lastProjectID uuid.UUID
zombieBuffer []int
objects bucketsObjects
inlineSegments int
lastInlineSegments int
remoteSegments int
zombieSegments int
}
// RemoteSegment processes a segment to collect data needed to detect zombie segment.
func (obsvr *observer) RemoteSegment(ctx context.Context, segment *metainfo.Segment) (err error) {
return obsvr.processSegment(ctx, segment)
}
// InlineSegment processes a segment to collect data needed to detect zombie segment.
func (obsvr *observer) InlineSegment(ctx context.Context, segment *metainfo.Segment) (err error) {
return obsvr.processSegment(ctx, segment)
}
// Object not used in this implementation.
func (obsvr *observer) Object(ctx context.Context, object *metainfo.Object) (err error) {
return nil
}
// processSegment aggregates, in the observer internal state, the objects that
// belong the same project, tracking their segments indexes and aggregated
// information of them for calling analyzeProject method, before a new project
// list of object segments starts and its internal status is reset.
//
// It also aggregates some stats about all the segments independently of the
// object to which belong.
//
// NOTE it's expected that this method is called continually for the objects
// which belong to a same project before calling it with objects of another
// project.
func (obsvr *observer) processSegment(ctx context.Context, segment *metainfo.Segment) error {
if !obsvr.lastProjectID.IsZero() && obsvr.lastProjectID != segment.Location.ProjectID {
err := obsvr.analyzeProject(ctx)
if err != nil {
return err
}
// cleanup map to free memory
obsvr.clearBucketsObjects()
}
obsvr.lastProjectID = segment.Location.ProjectID
isLastSegment := segment.Location.IsLast()
// collect number of pointers for reporting
if segment.Inline {
obsvr.inlineSegments++
if isLastSegment {
obsvr.lastInlineSegments++
}
} else {
obsvr.remoteSegments++
}
object := findOrCreate(segment.Location.BucketName, segment.Location.ObjectKey, obsvr.objects)
if obsvr.from != nil && segment.CreationDate.Before(*obsvr.from) {
object.skip = true
// release the memory consumed by the segments because it won't be used
// for skip objects
object.segments = nil
return nil
} else if obsvr.to != nil && segment.CreationDate.After(*obsvr.to) {
object.skip = true
// release the memory consumed by the segments because it won't be used
// for skip objects
object.segments = nil
return nil
}
if isLastSegment {
object.hasLastSegment = true
if segment.MetadataNumberOfSegments > 0 {
object.expectedNumberOfSegments = segment.MetadataNumberOfSegments
}
} else {
segmentIndex := int(segment.Location.Index)
if int64(segmentIndex) != segment.Location.Index {
return errs.New("unsupported segment index: %d", segment.Location.Index)
}
ok, err := object.segments.Has(segmentIndex)
if err != nil {
return err
}
if ok {
// TODO make location displayable
return errs.New("fatal error this segment is duplicated: %s", segment.Location.Encode())
}
err = object.segments.Set(segmentIndex)
if err != nil {
return err
}
}
return nil
}
func (obsvr *observer) detectZombieSegments(ctx context.Context) error {
// TODO set bucketsDB and metabaseDB
err := metainfo.IterateDatabase(ctx, rateLimit, obsvr.db, nil, nil, obsvr)
if err != nil {
return err
}
return obsvr.analyzeProject(ctx)
}
// analyzeProject analyzes the objects in obsv.objects field for detecting bad
// segments and writing them to objs.writer.
func (obsvr *observer) analyzeProject(ctx context.Context) error {
for bucket, objects := range obsvr.objects {
for key, object := range objects {
if object.skip {
continue
}
err := obsvr.findZombieSegments(object)
if err != nil {
return err
}
for _, segmentIndex := range obsvr.zombieBuffer {
err = obsvr.printSegment(ctx, segmentIndex, bucket, key)
if err != nil {
return err
}
}
}
}
return nil
}
func (obsvr *observer) findZombieSegments(object *object) error {
obsvr.resetZombieBuffer()
if !object.hasLastSegment {
obsvr.appendAllObjectSegments(object)
return nil
}
segmentsCount := object.segments.Count()
switch {
// this case is only for old style pointers with encrypted number of segments
// value 0 means that we don't know how much segments object should have
case object.expectedNumberOfSegments == 0:
sequenceLength := firstSequenceLength(object.segments)
for index := sequenceLength; index < object.segments.Length(); index++ {
has, err := object.segments.Has(index)
if err != nil {
panic(err)
}
if has {
obsvr.appendSegment(index)
}
}
// using 'expectedNumberOfSegments-1' because 'segments' doesn't contain last segment
case segmentsCount > object.expectedNumberOfSegments-1:
sequenceLength := firstSequenceLength(object.segments)
if sequenceLength == object.expectedNumberOfSegments-1 {
for index := sequenceLength; index < object.segments.Length(); index++ {
has, err := object.segments.Has(index)
if err != nil {
panic(err)
}
if has {
obsvr.appendSegment(index)
}
}
} else {
obsvr.appendAllObjectSegments(object)
obsvr.appendSegment(lastSegment)
}
case segmentsCount < object.expectedNumberOfSegments-1,
segmentsCount == object.expectedNumberOfSegments-1 && !object.segments.IsSequence():
obsvr.appendAllObjectSegments(object)
obsvr.appendSegment(lastSegment)
}
return nil
}
func (obsvr *observer) printSegment(ctx context.Context, segmentIndex int, bucket string, key metabase.ObjectKey) error {
var segmentIndexStr string
if segmentIndex == lastSegment {
segmentIndexStr = "l"
} else {
segmentIndexStr = "s" + strconv.Itoa(segmentIndex)
}
segmentKey := metabase.SegmentLocation{
ProjectID: obsvr.lastProjectID,
BucketName: bucket,
Index: int64(segmentIndex),
ObjectKey: key,
}.Encode()
creationDate, size, err := pointerCreationDateAndSize(ctx, obsvr.db, segmentKey)
if err != nil {
return err
}
encodedPath := base64.StdEncoding.EncodeToString([]byte(key))
err = obsvr.writer.Write([]string{
obsvr.lastProjectID.String(),
segmentIndexStr,
bucket,
encodedPath,
creationDate,
strconv.FormatInt(size, 10),
})
if err != nil {
return err
}
obsvr.zombieSegments++
return nil
}
func pointerCreationDateAndSize(ctx context.Context, db metainfo.PointerDB, key metabase.SegmentKey,
) (creationDate string, size int64, _ error) {
pointerBytes, err := db.Get(ctx, storage.Key(key))
if err != nil {
return "", 0, err
}
pointer := &pb.Pointer{}
err = pb.Unmarshal(pointerBytes, pointer)
if err != nil {
return "", 0, err
}
return pointer.CreationDate.Format(time.RFC3339Nano), pointer.SegmentSize, nil
}
func (obsvr *observer) resetZombieBuffer() {
obsvr.zombieBuffer = obsvr.zombieBuffer[:0]
}
func (obsvr *observer) appendSegment(segmentIndex int) {
obsvr.zombieBuffer = append(obsvr.zombieBuffer, segmentIndex)
}
func (obsvr *observer) appendAllObjectSegments(object *object) {
for index := 0; index < object.segments.Length(); index++ {
has, err := object.segments.Has(index)
if err != nil {
panic(err)
}
if has {
obsvr.appendSegment(index)
}
}
}
// clearBucketsObjects clears up the buckets objects map for reusing it.
func (obsvr *observer) clearBucketsObjects() {
// This is an idiomatic way of not having to destroy and recreate a new map
// each time that a empty map is required.
// See https://github.com/golang/go/issues/20138
for b := range obsvr.objects {
delete(obsvr.objects, b)
}
}
func findOrCreate(bucketName string, key metabase.ObjectKey, buckets bucketsObjects) *object {
objects, ok := buckets[bucketName]
if !ok {
objects = make(map[metabase.ObjectKey]*object)
buckets[bucketName] = objects
}
obj, ok := objects[key]
if !ok {
obj = &object{segments: bitArray{}}
objects[key] = obj
}
return obj
}
func firstSequenceLength(segments bitArray) int {
for index := 0; index < segments.Length(); index++ {
has, err := segments.Has(index)
if err != nil {
panic(err)
}
if !has {
return index
}
}
return segments.Length()
}

View File

@ -1,755 +0,0 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package main
import (
"bytes"
"context"
"encoding/base64"
"encoding/csv"
"fmt"
"math"
"math/rand"
"os"
"strconv"
"strings"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"storj.io/common/pb"
"storj.io/common/storj"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/common/uuid"
"storj.io/storj/satellite/metainfo"
"storj.io/storj/satellite/metainfo/metabase"
"storj.io/storj/storage"
"storj.io/storj/storage/teststore"
)
func TestMain(m *testing.M) {
rand.Seed(time.Now().UnixNano())
os.Exit(m.Run())
}
func TestObserver_processSegment(t *testing.T) {
t.Run("valid objects of different projects", func(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
obsvr := &observer{objects: make(bucketsObjects)}
testdata1 := generateTestdataObjects(ctx, t, false)
// Call processSegment with testadata objects of the first project
for _, objSeg := range testdata1.objSegments {
err := obsvr.processSegment(ctx, objSeg)
require.NoError(t, err)
}
testdata2 := generateTestdataObjects(ctx, t, false)
// Call processSegment with testadata objects of the second project
for _, objSeg := range testdata2.objSegments {
err := obsvr.processSegment(ctx, objSeg)
require.NoError(t, err)
}
// Inspect observer internal state to assert that it only has the state
// related to the second project
assertObserver(t, obsvr, testdata2)
// Assert that objserver keep track global stats of all the segments which
// have received through processSegment calls
assert.Equal(t, testdata1.expectedInlineSegments+testdata2.expectedInlineSegments,
obsvr.inlineSegments, "inlineSegments")
assert.Equal(t, testdata1.expectedInlineSegments+testdata2.expectedInlineSegments,
obsvr.lastInlineSegments, "lastInlineSegments")
assert.Equal(t, testdata1.expectedRemoteSegments+testdata2.expectedRemoteSegments,
obsvr.remoteSegments, "remoteSegments")
})
t.Run("object without last segment", func(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
var testdata = generateTestdataObjects(ctx, t, true)
var obsvr = &observer{objects: make(bucketsObjects)}
// Call processSegment with the testdata
for _, objSeg := range testdata.objSegments {
err := obsvr.processSegment(ctx, objSeg)
require.NoError(t, err)
}
// Assert observer internal state
assertObserver(t, obsvr, testdata)
// Assert observer global stats
assert.Equal(t, testdata.expectedInlineSegments, obsvr.inlineSegments, "inlineSegments")
assert.Equal(t, testdata.expectedInlineSegments, obsvr.lastInlineSegments, "lastInlineSegments")
assert.Equal(t, testdata.expectedRemoteSegments, obsvr.remoteSegments, "remoteSegments")
})
t.Run("objects in the same project with a random number segments", func(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
var (
testdata = generateTestdataObjects(ctx, t, false)
obsvr = &observer{
objects: make(bucketsObjects),
}
)
for _, objSeg := range testdata.objSegments {
err := obsvr.processSegment(ctx, objSeg)
require.NoError(t, err)
}
// Assert observer internal state
assertObserver(t, obsvr, testdata)
// Assert observer global stats
assert.Equal(t, testdata.expectedInlineSegments, obsvr.inlineSegments, "inlineSegments")
assert.Equal(t, testdata.expectedInlineSegments, obsvr.lastInlineSegments, "lastInlineSegments")
assert.Equal(t, testdata.expectedRemoteSegments, obsvr.remoteSegments, "remoteSegments")
})
t.Run("objects where one has segments before from", func(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
var (
to = time.Now().Add(time.Hour)
from = to.Add(-2 * time.Hour)
diffFromTo = to.Sub(from)
bucketName = "test-bucket"
projectID = testrand.UUID()
numSegmentsObjOutDateRange = rand.Intn(50) + 15
numSegmentsBeforeDate = rand.Intn(numSegmentsObjOutDateRange-1) + 1
obsvr = observer{
objects: make(bucketsObjects),
from: &from,
to: &to,
}
)
pathObjOutDateRange, objSegmentsRefs := createNewObjectSegments(
ctx, t, numSegmentsObjOutDateRange, &projectID, bucketName, true, false,
)
for i := 0; i < numSegmentsObjOutDateRange; i++ {
if i < numSegmentsBeforeDate {
// Assign a creation date before the from
decrement := -time.Duration(rand.Int63n(math.MaxInt64-1) + 1)
creationDate := from.Add(decrement)
objSegmentsRefs[i].CreationDate = creationDate
continue
}
// Assign a creation date between from and to (both included)
increment := time.Duration(rand.Int63n(int64(diffFromTo) + 1))
objSegmentsRefs[i].CreationDate = from.Add(increment)
}
numSegmentsObjInDateRange := rand.Intn(50) + 15
var pathObjInDateRange metabase.ObjectKey
{ // Object with all the segments with creation date between the from/to range
var otherObjSegments []*metainfo.Segment
pathObjInDateRange, otherObjSegments = createNewObjectSegments(
ctx, t, numSegmentsObjInDateRange, &projectID, bucketName, true, false,
)
objSegmentsRefs = append(objSegmentsRefs, otherObjSegments...)
}
totalSegments := len(objSegmentsRefs)
rand.Shuffle(totalSegments, func(i, j int) {
objSegmentsRefs[i], objSegmentsRefs[j] = objSegmentsRefs[j], objSegmentsRefs[i]
})
for _, objSeg := range objSegmentsRefs {
err := obsvr.processSegment(ctx, objSeg)
require.NoError(t, err)
}
// Assert observer internal state
assert.Equal(t, projectID, obsvr.lastProjectID, "lastProjectID")
assert.Equal(t, 1, len(obsvr.objects), "objects number")
require.Contains(t, obsvr.objects, bucketName, "bucket in objects map")
require.Equal(t, 2, len(obsvr.objects[bucketName]), "objects in object map")
require.Contains(t, obsvr.objects[bucketName], pathObjOutDateRange, "path in bucket objects map")
obj := obsvr.objects[bucketName][pathObjOutDateRange]
assert.Zero(t, obj.expectedNumberOfSegments, "Object.expectedNumSegments")
assert.True(t, obj.hasLastSegment, "Object.hasLastSegment")
assert.True(t, obj.skip, "Object.skip")
require.Contains(t, obsvr.objects[bucketName], pathObjInDateRange, "path in bucket objects map")
obj = obsvr.objects[bucketName][pathObjInDateRange]
assert.Zero(t, obj.expectedNumberOfSegments, "Object.expectedNumSegments")
assert.True(t, obj.hasLastSegment, "Object.hasLastSegment")
assert.False(t, obj.skip, "Object.skip")
// Assert observer global stats
assert.Equal(t, 2, obsvr.inlineSegments, "inlineSegments")
assert.Equal(t, 2, obsvr.lastInlineSegments, "lastInlineSegments")
assert.Equal(t, totalSegments-2, obsvr.remoteSegments, "remoteSegments")
})
t.Run("objects where one has segments after to", func(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
var (
to = time.Now().Add(time.Hour)
from = to.Add(-2 * time.Hour)
diffFromTo = to.Sub(from)
bucketName = "test-bucket"
projectID = testrand.UUID()
numSegmentsObjOutDateRange = rand.Intn(50) + 15
numSegmentsBeforeDate = rand.Intn(numSegmentsObjOutDateRange-1) + 1
obsvr = observer{
objects: make(bucketsObjects),
from: &from,
to: &to,
}
)
pathObjOutDateRange, objSegmentsRefs := createNewObjectSegments(
ctx, t, numSegmentsObjOutDateRange, &projectID, bucketName, false, true,
)
for i := 0; i < numSegmentsObjOutDateRange; i++ {
if i < numSegmentsBeforeDate {
// Assign a creation date after the to
increment := time.Duration(rand.Int63n(math.MaxInt64-1) + 1)
creationDate := to.Add(increment)
objSegmentsRefs[i].CreationDate = creationDate
continue
}
// Assign a creation date between from and to (both included)
increment := time.Duration(rand.Int63n(int64(diffFromTo) + 1))
objSegmentsRefs[i].CreationDate = from.Add(increment)
}
numSegmentsObjInDateRange := rand.Intn(50) + 15
var pathObjInDateRange metabase.ObjectKey
{ // Object with all the segments with creation date between the from/to range
var otherObjSegments []*metainfo.Segment
pathObjInDateRange, otherObjSegments = createNewObjectSegments(
ctx, t, numSegmentsObjInDateRange, &projectID, bucketName, false, true,
)
objSegmentsRefs = append(objSegmentsRefs, otherObjSegments...)
}
totalSegments := len(objSegmentsRefs)
rand.Shuffle(totalSegments, func(i, j int) {
objSegmentsRefs[i], objSegmentsRefs[j] = objSegmentsRefs[j], objSegmentsRefs[i]
})
for _, objSeg := range objSegmentsRefs {
err := obsvr.processSegment(ctx, objSeg)
require.NoError(t, err)
}
// Assert observer internal state
assert.Equal(t, projectID, obsvr.lastProjectID, "lastProjectID")
assert.Equal(t, 1, len(obsvr.objects), "objects number")
require.Contains(t, obsvr.objects, bucketName, "bucket in objects map")
require.Equal(t, 2, len(obsvr.objects[bucketName]), "objects in object map")
require.Contains(t, obsvr.objects[bucketName], pathObjOutDateRange, "path in bucket objects map")
obj := obsvr.objects[bucketName][pathObjOutDateRange]
assert.Equal(t, numSegmentsObjOutDateRange, obj.expectedNumberOfSegments, "Object.expectedNumSegments")
assert.True(t, obj.hasLastSegment, "Object.hasLastSegment")
assert.True(t, obj.skip, "Object.skip")
require.Contains(t, obsvr.objects[bucketName], pathObjInDateRange, "path in bucket objects map")
obj = obsvr.objects[bucketName][pathObjInDateRange]
assert.Equal(t, numSegmentsObjInDateRange, obj.expectedNumberOfSegments, "Object.expectedNumSegments")
assert.True(t, obj.hasLastSegment, "Object.hasLastSegment")
assert.False(t, obj.skip, "Object.skip")
// Assert observer global stats
assert.Zero(t, obsvr.inlineSegments, "inlineSegments")
assert.Zero(t, obsvr.lastInlineSegments, "lastInlineSegments")
assert.Equal(t, totalSegments, obsvr.remoteSegments, "remoteSegments")
})
}
func TestObserver_processSegment_from_to(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
var (
notSet = time.Time{}
now = time.Now()
)
tests := []struct {
from time.Time
to time.Time
pointerCreateDate time.Time
skipObject bool
}{
// not skipped
{notSet, notSet, now, false},
{notSet, now, now, false},
{now, now, now, false},
{now, notSet, now, false},
{now.Add(-time.Minute), now.Add(time.Minute), now, false},
{now.Add(-time.Minute), now.Add(time.Minute), now.Add(time.Minute), false},
{now.Add(-time.Minute), now.Add(time.Minute), now.Add(-time.Minute), false},
// skipped
{notSet, now, now.Add(time.Second), true},
{now, notSet, now.Add(-time.Second), true},
{now.Add(-time.Minute), now.Add(time.Minute), now.Add(time.Hour), true},
{now.Add(-time.Minute), now.Add(time.Minute), now.Add(-time.Hour), true},
}
for _, tt := range tests {
var from *time.Time
var to *time.Time
if tt.from != notSet {
from = &tt.from
}
if tt.to != notSet {
to = &tt.to
}
observer := &observer{
objects: make(bucketsObjects),
from: from,
to: to,
}
objSeg := metainfo.Segment{
Location: metabase.SegmentLocation{
ProjectID: testrand.UUID(),
BucketName: "bucket1",
Index: metabase.LastSegmentIndex,
ObjectKey: metabase.ObjectKey("path1"),
},
CreationDate: tt.pointerCreateDate,
}
err := observer.processSegment(ctx, &objSeg)
require.NoError(t, err)
objectsMap, ok := observer.objects["bucket1"]
require.True(t, ok)
object, ok := objectsMap["path1"]
require.True(t, ok)
require.Equal(t, tt.skipObject, object.skip)
}
}
func TestObserver_processSegment_switch_project(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
db := teststore.New()
buffer := new(bytes.Buffer)
writer := csv.NewWriter(buffer)
defer ctx.Check(writer.Error)
observer, err := newObserver(db, writer, nil, nil)
require.NoError(t, err)
// project IDs are pregenerated to avoid issues with iteration order
now := time.Now()
project1 := "7176d6a8-3a83-7ae7-e084-5fdbb1a17ac1"
project2 := "890dd9f9-6461-eb1b-c3d1-73af7252b9a4"
// zombie segment for project 1
_, err = makeSegment(ctx, db, storj.JoinPaths(project1, "s0", "bucket1", "path1"), now)
require.NoError(t, err)
// zombie segment for project 2
_, err = makeSegment(ctx, db, storj.JoinPaths(project2, "s0", "bucket1", "path1"), now)
require.NoError(t, err)
err = observer.detectZombieSegments(ctx)
require.NoError(t, err)
writer.Flush()
result := buffer.String()
for _, projectID := range []string{project1, project2} {
encodedPath := base64.StdEncoding.EncodeToString([]byte("path1"))
pathPrefix := strings.Join([]string{projectID, "s0", "bucket1", encodedPath, now.UTC().Format(time.RFC3339Nano)}, ",")
assert.Containsf(t, result, pathPrefix, "entry for projectID %s not found: %s", projectID)
}
}
func TestObserver_processSegment_single_project(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
type object struct {
bucket string
segments []int
numberOfSegments int
expected string
}
project1 := testrand.UUID().String()
tests := []struct {
objects []object
}{
// expected = `object.expectedNumberOfSegments`_`object.segments`_`object.hasLastSegment`
{
objects: []object{},
},
{
objects: []object{
{bucket: "b1", segments: []int{lastSegment}, numberOfSegments: 0, expected: "0_000_l"},
{bucket: "b1", segments: []int{lastSegment}, numberOfSegments: 1, expected: "1_000_l"},
{bucket: "b2", segments: []int{0}, numberOfSegments: 0, expected: "0_100_0"},
{bucket: "b1", segments: []int{0}, numberOfSegments: 5, expected: "0_100_0"},
{bucket: "b3", segments: []int{0, 1, 2, lastSegment}, numberOfSegments: 4, expected: "4_111_l"},
{bucket: "b1", segments: []int{0, 1, 2}, numberOfSegments: 0, expected: "0_111_0"},
{bucket: "b5", segments: []int{2, lastSegment}, numberOfSegments: 1, expected: "1_001_l"},
{bucket: "b1", segments: []int{2}, numberOfSegments: 1, expected: "0_001_0"},
{bucket: "b1", segments: []int{0, lastSegment}, numberOfSegments: 3, expected: "3_100_l"},
},
},
}
for i, tt := range tests {
i := i
tt := tt
t.Run("#"+strconv.Itoa(i), func(t *testing.T) {
// need boltdb to have DB with concurrent access support
db := teststore.New()
for i, ttObject := range tt.objects {
for _, segment := range ttObject.segments {
streamMeta := &pb.StreamMeta{}
segmentIndex := "s" + strconv.Itoa(segment)
if segment == lastSegment {
segmentIndex = "l"
streamMeta.NumberOfSegments = int64(ttObject.numberOfSegments)
}
path := storj.JoinPaths(project1, segmentIndex, ttObject.bucket, "path"+strconv.Itoa(i))
metadata, err := pb.Marshal(streamMeta)
require.NoError(t, err)
pointerBytes, err := pb.Marshal(&pb.Pointer{
Metadata: metadata,
})
require.NoError(t, err)
err = db.Put(ctx, storage.Key(path), storage.Value(pointerBytes))
require.NoError(t, err)
}
}
observer := &observer{
db: db,
objects: make(bucketsObjects),
writer: csv.NewWriter(new(bytes.Buffer)),
}
err := observer.detectZombieSegments(ctx)
require.NoError(t, err)
for i, ttObject := range tt.objects {
objectsMap, ok := observer.objects[ttObject.bucket]
require.True(t, ok)
object, ok := objectsMap[metabase.ObjectKey("path"+strconv.Itoa(i))]
require.True(t, ok)
expectedParts := strings.Split(ttObject.expected, "_")
expectedNumberOfSegments, err := strconv.Atoi(expectedParts[0])
require.NoError(t, err)
assert.Equal(t, expectedNumberOfSegments, object.expectedNumberOfSegments)
expectedSegments := bitArray{}
for i, char := range expectedParts[1] {
if char == '_' {
break
}
if char == '1' {
err := expectedSegments.Set(i)
require.NoError(t, err)
}
}
assert.Equal(t, expectedSegments, object.segments)
expectedLastSegment := expectedParts[2] == "l"
assert.Equal(t, expectedLastSegment, object.hasLastSegment)
}
})
}
}
func TestObserver_findZombieSegments(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
allSegments64 := string(bytes.ReplaceAll(make([]byte, 64), []byte{0}, []byte{'1'}))
tests := []struct {
segments string
expectedNumberOfSegments int
segmentsAfter string
}{
// this visualize which segments will be NOT selected as zombie segments
// known number of segments
{"11111_l", 6, "11111_l"}, // #0
{"00000_l", 1, "00000_l"}, // #1
{"1111100", 6, "0000000"}, // #2
{"11011_l", 6, "00000_0"}, // #3
{"11011_l", 3, "11000_l"}, // #4
{"11110_l", 6, "00000_0"}, // #5
{"00011_l", 4, "00000_0"}, // #6
{"10011_l", 4, "00000_0"}, // #7
{"11011_l", 4, "00000_0"}, // #8
// unknown number of segments
{"11111_l", 0, "11111_l"}, // #9
{"00000_l", 0, "00000_l"}, // #10
{"10000_l", 0, "10000_l"}, // #11
{"1111100", 0, "0000000"}, // #12
{"00111_l", 0, "00000_l"}, // #13
{"10111_l", 0, "10000_l"}, // #14
{"10101_l", 0, "10000_l"}, // #15
{"11011_l", 0, "11000_l"}, // #16
// special cases
{allSegments64 + "_l", 65, allSegments64 + "_l"}, // #16
}
for testNum, tt := range tests {
testNum := testNum
tt := tt
t.Run("case_"+strconv.Itoa(testNum), func(t *testing.T) {
bucketObjects := make(bucketsObjects)
singleObjectMap := make(map[metabase.ObjectKey]*object)
segments := bitArray{}
for i, char := range tt.segments {
if char == '_' {
break
}
if char == '1' {
err := segments.Set(i)
require.NoError(t, err)
}
}
object := &object{
segments: segments,
hasLastSegment: strings.HasSuffix(tt.segments, "_l"),
expectedNumberOfSegments: tt.expectedNumberOfSegments,
}
singleObjectMap["test-path"] = object
bucketObjects["test-bucket"] = singleObjectMap
observer := &observer{
objects: bucketObjects,
lastProjectID: testrand.UUID(),
zombieBuffer: make([]int, 0),
}
err := observer.findZombieSegments(object)
require.NoError(t, err)
indexes := observer.zombieBuffer
segmentsAfter := tt.segments
for _, segmentIndex := range indexes {
if segmentIndex == lastSegment {
segmentsAfter = segmentsAfter[:len(segmentsAfter)-1] + "0"
} else {
segmentsAfter = segmentsAfter[:segmentIndex] + "0" + segmentsAfter[segmentIndex+1:]
}
}
require.Equalf(t, tt.segmentsAfter, segmentsAfter, "segments before and after comparison failed: want %s got %s, case %d ", tt.segmentsAfter, segmentsAfter, testNum)
})
}
}
// createNewObjectSegments creates a list of segment references which belongs to
// a same object.
//
// If inline is true the last segment will be of INLINE type.
//
// If withNumSegments is true the last segment pointer will have the
// NumberOfSegments set.
//
// It returns the object path and the list of object segment references.
func createNewObjectSegments(
ctx context.Context, t *testing.T, numSegments int, projectID *uuid.UUID, bucketName string, inline bool, withNumSegments bool,
) (objectKey metabase.ObjectKey, _ []*metainfo.Segment) {
t.Helper()
var (
objectID = metabase.ObjectKey(testrand.UUID().String())
references = make([]*metainfo.Segment, 0, numSegments)
)
for i := 0; i < (numSegments - 1); i++ {
references = append(references, &metainfo.Segment{
Location: metabase.SegmentLocation{
ProjectID: *projectID,
BucketName: bucketName,
Index: int64(i),
ObjectKey: objectID,
},
CreationDate: time.Now(),
})
}
var pointerNumSegments int
if withNumSegments {
pointerNumSegments = numSegments
}
references = append(references, &metainfo.Segment{
Location: metabase.SegmentLocation{
ProjectID: *projectID,
BucketName: bucketName,
Index: metabase.LastSegmentIndex,
ObjectKey: objectID,
},
Inline: inline,
MetadataNumberOfSegments: pointerNumSegments,
CreationDate: time.Now(),
})
return objectID, references
}
type testdataObjects struct {
// expectations
expectedNumSegments int
expectedInlineSegments int
expectedRemoteSegments int
expectedObjects bucketsObjects
// data used for calling processSegment
objSegments []*metainfo.Segment
projectID *uuid.UUID
}
// generateTestdataObjects generate a testdataObjecst with a random number of
// segments of a random number of objects and buckets but under the same
// project.
//
// When withoutLastSegment is true, there will be objects without last segment,
// otherwise all of them will have a last segment.
func generateTestdataObjects(
ctx context.Context, t *testing.T, withoutLastSegment bool) testdataObjects {
t.Helper()
var (
testdata = testdataObjects{
expectedObjects: make(bucketsObjects),
}
bucketName = "0"
numObjs = rand.Intn(10) + 2
projID = testrand.UUID()
withoutLastSegmentCount = 0
)
testdata.projectID = &projID
for i := 0; i < numObjs; i++ {
var (
inline = (rand.Int() % 2) == 0
withNumSegments = (rand.Int() % 2) == 0
numSegments = rand.Intn(1000) + 2
)
if rand.Int()%2 == 0 {
bucketName = fmt.Sprintf("bucket-%d", i)
}
objPath, objSegmentsProj := createNewObjectSegments(
ctx, t, numSegments, &projID, bucketName, inline, withNumSegments,
)
testdata.objSegments = append(testdata.objSegments, objSegmentsProj...)
expectedObj := findOrCreate(bucketName, objPath, testdata.expectedObjects)
// segments mask doesn't contain the last segment, hence numSegments-1
b := make([]byte, ((numSegments-1)+8-1)/8)
for x := 0; x < numSegments-1; x++ {
bitIndex, byteIndex := x%8, x/8
b[byteIndex] |= byte(1) << bitIndex
}
expectedObj.segments = bitArray(b)
// If withoutLastSegment is true, then choose random objects without last
// segment or force to remove it from the object generated in the last
// iteration if in any object of the previous iterations have the last
// segment
if withoutLastSegment &&
((rand.Int()%2) == 0 || (i == (numObjs-1) && withoutLastSegmentCount == 0)) {
withoutLastSegmentCount++
expectedObj.hasLastSegment = false
numSegments--
testdata.objSegments = testdata.objSegments[:len(testdata.objSegments)-1]
testdata.expectedRemoteSegments += numSegments
} else {
expectedObj.hasLastSegment = true
if inline {
testdata.expectedInlineSegments++
testdata.expectedRemoteSegments += (numSegments - 1)
} else {
testdata.expectedRemoteSegments += numSegments
}
if withNumSegments {
expectedObj.expectedNumberOfSegments = numSegments
}
}
testdata.expectedNumSegments += numSegments
}
// Shuffle the segments for not having a object segments serial order
rand.Shuffle(len(testdata.objSegments), func(i, j int) {
testdata.objSegments[i], testdata.objSegments[j] = testdata.objSegments[j], testdata.objSegments[i]
})
return testdata
}
// assertObserver assert the observer values with the testdata ones.
func assertObserver(t *testing.T, obsvr *observer, testdata testdataObjects) {
t.Helper()
assert.Equal(t, *testdata.projectID, obsvr.lastProjectID, "lastProjectID")
if assert.Equal(t, len(testdata.expectedObjects), len(obsvr.objects), "objects number") {
for bucket, bucketObjs := range obsvr.objects {
expBucketObjs, ok := testdata.expectedObjects[bucket]
if !assert.Truef(t, ok, "bucket '%s' shouldn't exist in objects map", bucket) {
continue
}
if !assert.Equalf(t, len(expBucketObjs), len(bucketObjs), "objects per bucket (%s) number", bucket) {
continue
}
for expPath, expObj := range expBucketObjs {
if !assert.Contains(t, bucketObjs, expPath, "path in bucket objects map") {
continue
}
obj := bucketObjs[expPath]
assert.Equal(t, expObj.expectedNumberOfSegments, obj.expectedNumberOfSegments, "Object.expectedNumSegments")
assert.Equal(t, expObj.hasLastSegment, obj.hasLastSegment, "Object.hasLastSegment")
assert.Equal(t, expObj.skip, obj.skip, "Object.skip")
if !expObj.skip {
assert.Equal(t, expObj.segments, obj.segments, "Object.segments")
}
}
}
}
}

View File

@ -1,4 +1,3 @@
storj.io/storj/cmd/segment-reaper."zombie_segments" IntVal
storj.io/storj/satellite/accounting/tally."bucket_bytes" IntVal
storj.io/storj/satellite/accounting/tally."bucket_inline_bytes" IntVal
storj.io/storj/satellite/accounting/tally."bucket_inline_segments" IntVal

View File

@ -40,18 +40,17 @@ func (object *Object) Expired(now time.Time) bool {
// Segment is the segment info passed to Observer by metainfo loop.
type Segment struct {
Location metabase.SegmentLocation // tally, repair, graceful exit, audit, segment reaper
DataSize int // tally, graceful exit
MetadataSize int // tally
Inline bool // metrics, segment reaper
Redundancy storj.RedundancyScheme // tally, graceful exit, repair
RootPieceID storj.PieceID // gc, graceful exit
Pieces metabase.Pieces // tally, audit, gc, graceful exit, repair
CreationDate time.Time // repair, segment reaper
expirationDate time.Time // tally, repair
LastRepaired time.Time // repair
Pointer *pb.Pointer // repair
MetadataNumberOfSegments int // segment reaper
Location metabase.SegmentLocation // tally, repair, graceful exit, audit
DataSize int // tally, graceful exit
MetadataSize int // tally
Inline bool // metrics
Redundancy storj.RedundancyScheme // tally, graceful exit, repair
RootPieceID storj.PieceID // gc, graceful exit
Pieces metabase.Pieces // tally, audit, gc, graceful exit, repair
CreationDate time.Time // repair
expirationDate time.Time // tally, repair
LastRepaired time.Time // repair
Pointer *pb.Pointer // repair
}
// Expired checks if segment is expired relative to now.