cmd/segment-reaper: add detecting and printing zombie segments (#3598)

This commit is contained in:
Michal Niewrzal 2019-12-09 03:30:52 -08:00 committed by GitHub
parent 8d49a99ad8
commit 27462f68e9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 367 additions and 18 deletions

View File

@ -31,6 +31,21 @@ func (mask *bitmask) Set(index int) error {
return nil
}
// Unset removes bit from index in mask. It returns an error if index is negative or it's
// greater than 63.
func (mask *bitmask) Unset(index int) error {
switch {
case index < 0:
return errorBitmaskInvalidIdx.New("negative value (%d)", index)
case index > 63:
return errorBitmaskInvalidIdx.New("index is greater than 63 (%d)", index)
}
bit := uint64(1) << index
*mask = bitmask(uint64(*mask) ^ bit)
return nil
}
// Has returns true if the index is tracked in mask otherwise false.
// It returns an error if index is negative or it's greater than 63.
func (mask *bitmask) Has(index int) (bool, error) {

View File

@ -260,5 +260,56 @@ func TestBitmask(t *testing.T) {
ok := mask.IsSequence()
assert.False(t, ok)
})
})
t.Run("Unset", func(t *testing.T) {
t.Run("ok", func(t *testing.T) {
var (
expectedUnsetIdx = rand.Intn(32)
expectedSetIdx = rand.Intn(32) + 32
mask bitmask
)
err := mask.Set(expectedUnsetIdx)
require.NoError(t, err)
has, err := mask.Has(expectedUnsetIdx)
require.NoError(t, err)
require.True(t, has)
err = mask.Set(expectedSetIdx)
require.NoError(t, err)
err = mask.Unset(expectedUnsetIdx)
require.NoError(t, err)
has, err = mask.Has(expectedUnsetIdx)
require.NoError(t, err)
require.False(t, has)
has, err = mask.Has(expectedSetIdx)
require.NoError(t, err)
require.True(t, has)
})
t.Run("error: negative index", func(t *testing.T) {
var (
invalidIdx = -(rand.Intn(math.MaxInt32-1) + 1)
mask bitmask
)
err := mask.Unset(invalidIdx)
assert.Error(t, err)
assert.True(t, errorBitmaskInvalidIdx.Has(err), "errorBitmaskInvalidIdx class")
})
t.Run("error: index > 63", func(t *testing.T) {
var (
invalidIdx = rand.Intn(math.MaxInt16) + 64
mask bitmask
)
err := mask.Unset(invalidIdx)
assert.Error(t, err)
assert.True(t, errorBitmaskInvalidIdx.Has(err), "errorBitmaskInvalidIdx class")
})
})
}

View File

@ -6,6 +6,7 @@ package main
import (
"encoding/csv"
"os"
"time"
"github.com/spf13/cobra"
"github.com/zeebo/errs"
@ -16,7 +17,7 @@ import (
"storj.io/storj/satellite/metainfo"
)
const maxNumOfSegments = byte(64)
const maxNumOfSegments = 64
var (
detectCmd = &cobra.Command{
@ -28,8 +29,8 @@ var (
detectCfg struct {
DatabaseURL string `help:"the database connection string to use" default:"postgres://"`
From string `help:"begin of date range for detecting zombie segments" default:""`
To string `help:"end of date range for detecting zombie segments" default:""`
From string `help:"begin of date range for detecting zombie segments (RFC3339)" default:""`
To string `help:"end of date range for detecting zombie segments (RFC3339)" default:""`
File string `help:"location of file with report" default:"zombie-segments.csv"`
}
)
@ -67,20 +68,28 @@ func cmdDetect(cmd *cobra.Command, args []string) (err error) {
err = errs.Combine(err, writer.Error())
}()
headers := []string{
"ProjectID",
"SegmentIndex",
"Bucket",
"EncodedEncryptedPath",
"CreationDate",
var from, to *time.Time
if detectCfg.From != "" {
fromTime, err := time.Parse(time.RFC3339, detectCfg.From)
if err != nil {
return err
}
from = &fromTime
}
err = writer.Write(headers)
if detectCfg.To != "" {
toTime, err := time.Parse(time.RFC3339, detectCfg.To)
if err != nil {
return err
}
to = &toTime
}
observer, err := newObserver(db, writer, from, to)
if err != nil {
return err
}
observer := newObserver(db, writer)
err = metainfo.IterateDatabase(ctx, db, observer)
if err != nil {
return err
@ -89,5 +98,6 @@ func cmdDetect(cmd *cobra.Command, args []string) (err error) {
log.Info("number of inline segments", zap.Int("segments", observer.inlineSegments))
log.Info("number of last inline segments", zap.Int("segments", observer.lastInlineSegments))
log.Info("number of remote segments", zap.Int("segments", observer.remoteSegments))
log.Info("number of zombie segments", zap.Int("segments", observer.zombieSegments))
return nil
}

View File

@ -5,8 +5,10 @@ package main
import (
"context"
"encoding/base64"
"encoding/csv"
"strconv"
"time"
"github.com/gogo/protobuf/proto"
"github.com/zeebo/errs"
@ -17,6 +19,8 @@ import (
"storj.io/storj/satellite/metainfo"
)
const lastSegment = int(-1)
// object represents object with segments.
type object struct {
// TODO verify if we have more than 64 segments for object in network
@ -34,26 +38,45 @@ type object struct {
// name.
type bucketsObjects map[string]map[storj.Path]*object
func newObserver(db metainfo.PointerDB, w *csv.Writer) *observer {
func newObserver(db metainfo.PointerDB, w *csv.Writer, from, to *time.Time) (*observer, error) {
headers := []string{
"ProjectID",
"SegmentIndex",
"Bucket",
"EncodedEncryptedPath",
"CreationDate",
}
err := w.Write(headers)
if err != nil {
return nil, err
}
return &observer{
db: db,
writer: w,
db: db,
writer: w,
from: from,
to: to,
zombieBuffer: make([]int, 0, maxNumOfSegments),
objects: make(bucketsObjects),
}
}, nil
}
// observer metainfo.Loop observer for zombie reaper.
type observer struct {
db metainfo.PointerDB
writer *csv.Writer
from *time.Time
to *time.Time
lastProjectID string
zombieBuffer []int
objects bucketsObjects
inlineSegments int
lastInlineSegments int
remoteSegments int
zombieSegments int
}
// RemoteSegment processes a segment to collect data needed to detect zombie segment.
@ -127,6 +150,12 @@ func (obsvr *observer) processSegment(ctx context.Context, path metainfo.ScopedP
}
}
if obsvr.from != nil && pointer.CreationDate.Before(*obsvr.from) {
object.skip = true
} else if obsvr.to != nil && pointer.CreationDate.After(*obsvr.to) {
object.skip = true
}
// collect number of pointers for report
if pointer.Type == pb.Pointer_INLINE {
obsvr.inlineSegments++
@ -143,11 +172,141 @@ func (obsvr *observer) processSegment(ctx context.Context, path metainfo.ScopedP
// analyzeProject analyzes the objects in obsv.objects field for detecting bad
// segments and writing them to objs.writer.
func (obsvr *observer) analyzeProject(ctx context.Context) error {
// TODO this part will be implemented in next PR
// TODO(if): For what is this?
for bucket, objects := range obsvr.objects {
for path, object := range objects {
if object.skip {
continue
}
err := obsvr.findZombieSegments(object)
if err != nil {
return err
}
for _, segmentIndex := range obsvr.zombieBuffer {
err = obsvr.printSegment(ctx, segmentIndex, bucket, path)
if err != nil {
return err
}
}
}
}
return nil
}
func (obsvr *observer) findZombieSegments(object *object) error {
obsvr.resetZombieBuffer()
if !object.hasLastSegment {
obsvr.appendAllObjectSegments(object)
return nil
}
segmentsCount := object.segments.Count()
// using 'expectedNumberOfSegments-1' because 'segments' doesn't contain last segment
switch {
// this case is only for old style pointers with encrypted number of segments
// value 0 means that we don't know how much segments object should have
case object.expectedNumberOfSegments == 0:
sequenceLength := firstSequenceLength(object.segments)
for index := sequenceLength; index < maxNumOfSegments; index++ {
has, err := object.segments.Has(index)
if err != nil {
panic(err)
}
if has {
obsvr.appendSegment(index)
}
}
case segmentsCount > int(object.expectedNumberOfSegments)-1:
sequenceLength := firstSequenceLength(object.segments)
if sequenceLength == int(object.expectedNumberOfSegments-1) {
for index := sequenceLength; index < maxNumOfSegments; index++ {
has, err := object.segments.Has(index)
if err != nil {
panic(err)
}
if has {
obsvr.appendSegment(index)
}
}
} else {
obsvr.appendAllObjectSegments(object)
obsvr.appendSegment(lastSegment)
}
case segmentsCount < int(object.expectedNumberOfSegments)-1,
segmentsCount == int(object.expectedNumberOfSegments)-1 && !object.segments.IsSequence():
obsvr.appendAllObjectSegments(object)
obsvr.appendSegment(lastSegment)
}
return nil
}
func (obsvr *observer) printSegment(ctx context.Context, segmentIndex int, bucket, path string) error {
var segmentIndexStr string
if segmentIndex == lastSegment {
segmentIndexStr = "l"
} else {
segmentIndexStr = "s" + strconv.Itoa(segmentIndex)
}
creationDate, err := pointerCreationDate(ctx, obsvr.db, obsvr.lastProjectID, segmentIndexStr, bucket, path)
if err != nil {
return err
}
encodedPath := base64.StdEncoding.EncodeToString([]byte(path))
err = obsvr.writer.Write([]string{
obsvr.lastProjectID,
segmentIndexStr,
bucket,
encodedPath,
creationDate,
})
if err != nil {
return err
}
obsvr.zombieSegments++
return nil
}
func pointerCreationDate(ctx context.Context, db metainfo.PointerDB, projectID, segmentIndex, bucket, path string) (string, error) {
key := []byte(storj.JoinPaths(projectID, segmentIndex, bucket, path))
pointerBytes, err := db.Get(ctx, key)
if err != nil {
return "", err
}
pointer := &pb.Pointer{}
err = proto.Unmarshal(pointerBytes, pointer)
if err != nil {
return "", err
}
return pointer.CreationDate.Format(time.RFC3339Nano), nil
}
func (obsvr *observer) resetZombieBuffer() {
obsvr.zombieBuffer = obsvr.zombieBuffer[:0]
}
func (obsvr *observer) appendSegment(segmentIndex int) {
obsvr.zombieBuffer = append(obsvr.zombieBuffer, segmentIndex)
}
func (obsvr *observer) appendAllObjectSegments(object *object) {
for index := 0; index < maxNumOfSegments; index++ {
has, err := object.segments.Has(index)
if err != nil {
panic(err)
}
if has {
obsvr.appendSegment(index)
}
}
}
// clearBucketsObjects clears up the buckets objects map for reusing it.
func (obsvr *observer) clearBucketsObjects() {
// This is an idiomatic way of not having to destroy and recreate a new map
@ -173,3 +332,16 @@ func findOrCreate(bucketName string, path string, buckets bucketsObjects) *objec
return obj
}
func firstSequenceLength(segments bitmask) int {
for index := 0; index < maxNumOfSegments; index++ {
has, err := segments.Has(index)
if err != nil {
panic(err)
}
if !has {
return index
}
}
return maxNumOfSegments
}

View File

@ -0,0 +1,101 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package main
import (
"bytes"
"strconv"
"strings"
"testing"
"github.com/stretchr/testify/require"
"storj.io/storj/pkg/storj"
"storj.io/storj/private/testcontext"
"storj.io/storj/private/testrand"
)
func Test_observer_analyzeProject(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
allSegments64 := string(bytes.ReplaceAll(make([]byte, 64), []byte{0}, []byte{'1'}))
tests := []struct {
segments string
expectedNumberOfSegments byte
segmentsAfter string
}{
// this visualize which segments will be NOT selected as zombie segments
// known number of segments
{"11111_l", 6, "11111_l"}, // #0
{"00000_l", 1, "00000_l"}, // #1
{"1111100", 6, "0000000"}, // #2
{"11011_l", 6, "00000_0"}, // #3
{"11011_l", 3, "11000_l"}, // #4
{"11110_l", 6, "00000_0"}, // #5
{"00011_l", 4, "00000_0"}, // #6
{"10011_l", 4, "00000_0"}, // #7
// unknown number of segments
{"11111_l", 0, "11111_l"}, // #8
{"00000_l", 0, "00000_l"}, // #9
{"10000_l", 0, "10000_l"}, // #10
{"1111100", 0, "0000000"}, // #11
{"00111_l", 0, "00000_l"}, // #12
{"10111_l", 0, "10000_l"}, // #13
{"10101_l", 0, "10000_l"}, // #14
{"11011_l", 0, "11000_l"}, // #15
// special cases
{allSegments64 + "_l", 65, allSegments64 + "_l"}, // #16
}
for testNum, tt := range tests {
testNum := testNum
tt := tt
t.Run("case_"+strconv.Itoa(testNum), func(t *testing.T) {
bucketObjects := make(bucketsObjects)
singleObjectMap := make(map[storj.Path]*object)
segments := bitmask(0)
for i, char := range tt.segments {
if char == '_' {
break
}
if char == '1' {
err := segments.Set(i)
require.NoError(t, err)
}
}
object := &object{
segments: segments,
hasLastSegment: strings.HasSuffix(tt.segments, "_l"),
expectedNumberOfSegments: tt.expectedNumberOfSegments,
}
singleObjectMap["test-path"] = object
bucketObjects["test-bucket"] = singleObjectMap
observer := &observer{
objects: bucketObjects,
lastProjectID: testrand.UUID().String(),
zombieBuffer: make([]int, 0, maxNumOfSegments),
}
err := observer.findZombieSegments(object)
require.NoError(t, err)
indexes := observer.zombieBuffer
segmentsAfter := tt.segments
for _, segmentIndex := range indexes {
if segmentIndex == lastSegment {
segmentsAfter = segmentsAfter[:len(segmentsAfter)-1] + "0"
} else {
segmentsAfter = segmentsAfter[:segmentIndex] + "0" + segmentsAfter[segmentIndex+1:]
}
}
require.Equalf(t, tt.segmentsAfter, segmentsAfter, "segments before and after comparison faild: want %s got %s, case %d ", tt.segmentsAfter, segmentsAfter, testNum)
})
}
}