cmd/tools/segment-verify: add logic for iterating over segments
This adds parts for: 1. iterating over the segments 2. using an interface for writing the segments 3. stubs for handling deleted segments Change-Id: I76a17cac6deb0b6c042a8ab7c4155a890db9da84
This commit is contained in:
parent
a60c83c1de
commit
6127f465dc
@ -28,12 +28,12 @@ func (service *Service) CreateBatches(segments []*Segment) ([]*Batch, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Distribute things randomly into batches.
|
// Distribute things randomly into batches.
|
||||||
// We assume that segment.Pieces is randomly ordered in terms of nodes.
|
// We assume that segment.AliasPieces is randomly ordered in terms of nodes.
|
||||||
for _, segment := range segments {
|
for _, segment := range segments {
|
||||||
if len(segment.Pieces) < VerifyPieces {
|
if len(segment.AliasPieces) < int(segment.Status.Retry) {
|
||||||
panic("segment contains too few pieces")
|
panic("segment contains too few pieces")
|
||||||
}
|
}
|
||||||
for _, piece := range segment.Pieces[:segment.Status.Retry] {
|
for _, piece := range segment.AliasPieces[:segment.Status.Retry] {
|
||||||
enqueue(piece.Alias, segment)
|
enqueue(piece.Alias, segment)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -71,7 +71,7 @@ func (service *Service) CreateBatches(segments []*Segment) ([]*Batch, error) {
|
|||||||
nextSegment:
|
nextSegment:
|
||||||
for _, segment := range large.Items[highLen:] {
|
for _, segment := range large.Items[highLen:] {
|
||||||
// try to find a piece that can be moved into a small batch.
|
// try to find a piece that can be moved into a small batch.
|
||||||
for _, piece := range segment.Pieces[segment.Status.Retry:] {
|
for _, piece := range segment.AliasPieces[segment.Status.Retry:] {
|
||||||
if q, ok := smallBatches[piece.Alias]; ok {
|
if q, ok := smallBatches[piece.Alias]; ok {
|
||||||
// move to the other queue
|
// move to the other queue
|
||||||
q.Items = append(q.Items, segment)
|
q.Items = append(q.Items, segment)
|
||||||
@ -98,19 +98,19 @@ func (service *Service) CreateBatches(segments []*Segment) ([]*Batch, error) {
|
|||||||
|
|
||||||
// selectOnlinePieces modifies slice such that it only contains online pieces.
|
// selectOnlinePieces modifies slice such that it only contains online pieces.
|
||||||
func (service *Service) selectOnlinePieces(segment *Segment) {
|
func (service *Service) selectOnlinePieces(segment *Segment) {
|
||||||
for i, x := range segment.Pieces {
|
for i, x := range segment.AliasPieces {
|
||||||
if !service.OfflineNodes.Contains(x.Alias) {
|
if !service.OfflineNodes.Contains(x.Alias) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// found an offline node, start removing
|
// found an offline node, start removing
|
||||||
rs := segment.Pieces[:i]
|
rs := segment.AliasPieces[:i]
|
||||||
for _, x := range segment.Pieces[i+1:] {
|
for _, x := range segment.AliasPieces[i+1:] {
|
||||||
if !service.OfflineNodes.Contains(x.Alias) {
|
if !service.OfflineNodes.Contains(x.Alias) {
|
||||||
rs = append(rs, x)
|
rs = append(rs, x)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
segment.Pieces = rs
|
segment.AliasPieces = rs
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -118,19 +118,19 @@ func (service *Service) selectOnlinePieces(segment *Segment) {
|
|||||||
// removePriorityPieces modifies slice such that it only contains non-priority pieces.
|
// removePriorityPieces modifies slice such that it only contains non-priority pieces.
|
||||||
func (service *Service) removePriorityPieces(segment *Segment) {
|
func (service *Service) removePriorityPieces(segment *Segment) {
|
||||||
target := 0
|
target := 0
|
||||||
for _, x := range segment.Pieces {
|
for _, x := range segment.AliasPieces {
|
||||||
if service.PriorityNodes.Contains(x.Alias) {
|
if service.PriorityNodes.Contains(x.Alias) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
segment.Pieces[target] = x
|
segment.AliasPieces[target] = x
|
||||||
target++
|
target++
|
||||||
}
|
}
|
||||||
segment.Pieces = segment.Pieces[:target]
|
segment.AliasPieces = segment.AliasPieces[:target]
|
||||||
}
|
}
|
||||||
|
|
||||||
// sortPriorityToFirst moves priority node pieces at the front of the list.
|
// sortPriorityToFirst moves priority node pieces at the front of the list.
|
||||||
func (service *Service) sortPriorityToFirst(segment *Segment) {
|
func (service *Service) sortPriorityToFirst(segment *Segment) {
|
||||||
xs := segment.Pieces
|
xs := segment.AliasPieces
|
||||||
target := 0
|
target := 0
|
||||||
for i, x := range xs {
|
for i, x := range xs {
|
||||||
if service.PriorityNodes.Contains(x.Alias) {
|
if service.PriorityNodes.Contains(x.Alias) {
|
||||||
|
@ -35,7 +35,7 @@ func (service *Service) Verify(ctx context.Context, segments []*Segment) error {
|
|||||||
|
|
||||||
// Reverse the pieces slice to ensure we pick different nodes this time.
|
// Reverse the pieces slice to ensure we pick different nodes this time.
|
||||||
for _, segment := range retrySegments {
|
for _, segment := range retrySegments {
|
||||||
xs := segment.Pieces
|
xs := segment.AliasPieces
|
||||||
for i, j := 0, len(xs)-1; i < j; i, j = i+1, j-1 {
|
for i, j := 0, len(xs)-1; i < j; i, j = i+1, j-1 {
|
||||||
xs[i], xs[j] = xs[j], xs[i]
|
xs[i], xs[j] = xs[j], xs[i]
|
||||||
}
|
}
|
||||||
|
@ -4,23 +4,46 @@
|
|||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
|
||||||
|
"github.com/zeebo/errs"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
|
||||||
|
"storj.io/common/storj"
|
||||||
"storj.io/common/uuid"
|
"storj.io/common/uuid"
|
||||||
"storj.io/storj/satellite/metabase"
|
"storj.io/storj/satellite/metabase"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Error is global error class.
|
||||||
|
var Error = errs.Class("segment-verify")
|
||||||
|
|
||||||
// VerifyPieces defines how many pieces we check per segment.
|
// VerifyPieces defines how many pieces we check per segment.
|
||||||
const VerifyPieces = 3
|
const VerifyPieces = 3
|
||||||
|
|
||||||
// ConcurrentRequests defines how many concurrent requests we do to the storagenodes.
|
// ConcurrentRequests defines how many concurrent requests we do to the storagenodes.
|
||||||
const ConcurrentRequests = 10000
|
const ConcurrentRequests = 10000
|
||||||
|
|
||||||
|
// Metabase defines implementation dependencies we need from metabase.
|
||||||
|
type Metabase interface {
|
||||||
|
ConvertAliasesToNodes(ctx context.Context, aliases []metabase.NodeAlias) ([]storj.NodeID, error)
|
||||||
|
GetSegmentByPosition(ctx context.Context, opts metabase.GetSegmentByPosition) (segment metabase.Segment, err error)
|
||||||
|
ListVerifySegments(ctx context.Context, opts metabase.ListVerifySegments) (result metabase.ListVerifySegmentsResult, err error)
|
||||||
|
}
|
||||||
|
|
||||||
|
// SegmentWriter allows writing segments to some output.
|
||||||
|
type SegmentWriter interface {
|
||||||
|
Write(ctx context.Context, segments []*Segment) error
|
||||||
|
}
|
||||||
|
|
||||||
// Service implements segment verification logic.
|
// Service implements segment verification logic.
|
||||||
type Service struct {
|
type Service struct {
|
||||||
log *zap.Logger
|
log *zap.Logger
|
||||||
|
metabase Metabase
|
||||||
|
|
||||||
|
notFound SegmentWriter
|
||||||
|
retry SegmentWriter
|
||||||
|
|
||||||
PriorityNodes NodeAliasSet
|
PriorityNodes NodeAliasSet
|
||||||
OfflineNodes NodeAliasSet
|
OfflineNodes NodeAliasSet
|
||||||
@ -36,12 +59,128 @@ func NewService(log *zap.Logger) *Service {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Process processes segments between low and high uuid.UUID with the specified batchSize.
|
||||||
|
func (service *Service) Process(ctx context.Context, low, high uuid.UUID, batchSize int) error {
|
||||||
|
cursorStreamID := low
|
||||||
|
if !low.IsZero() {
|
||||||
|
cursorStreamID = uuidBefore(low)
|
||||||
|
}
|
||||||
|
cursorPosition := metabase.SegmentPosition{Part: 0xFFFFFFFF, Index: 0xFFFFFFFF}
|
||||||
|
|
||||||
|
for {
|
||||||
|
result, err := service.metabase.ListVerifySegments(ctx, metabase.ListVerifySegments{
|
||||||
|
CursorStreamID: cursorStreamID,
|
||||||
|
CursorPosition: cursorPosition,
|
||||||
|
Limit: batchSize,
|
||||||
|
|
||||||
|
// TODO: add AS OF SYSTEM time.
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return errs.Wrap(err)
|
||||||
|
}
|
||||||
|
verifySegments := result.Segments
|
||||||
|
result.Segments = nil
|
||||||
|
|
||||||
|
// drop any segment that's equal or beyond "high".
|
||||||
|
for len(verifySegments) > 0 && !verifySegments[len(verifySegments)-1].StreamID.Less(high) {
|
||||||
|
verifySegments = verifySegments[:len(verifySegments)-1]
|
||||||
|
}
|
||||||
|
|
||||||
|
// All done?
|
||||||
|
if len(verifySegments) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Convert to struct that contains the status.
|
||||||
|
segmentsData := make([]Segment, len(verifySegments))
|
||||||
|
segments := make([]*Segment, len(verifySegments))
|
||||||
|
for i := range segments {
|
||||||
|
segmentsData[i].VerifySegment = verifySegments[i]
|
||||||
|
segments[i] = &segmentsData[i]
|
||||||
|
}
|
||||||
|
|
||||||
|
// Process the data.
|
||||||
|
err = service.ProcessSegments(ctx, segments)
|
||||||
|
if err != nil {
|
||||||
|
return errs.Wrap(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ProcessSegments processes a collection of segments.
|
||||||
|
func (service *Service) ProcessSegments(ctx context.Context, segments []*Segment) error {
|
||||||
|
service.log.Info("processing segments",
|
||||||
|
zap.Int("count", len(segments)),
|
||||||
|
zap.Stringer("first", segments[0].StreamID),
|
||||||
|
zap.Stringer("last", segments[len(segments)-1].StreamID),
|
||||||
|
)
|
||||||
|
|
||||||
|
// Verify all the segments against storage nodes.
|
||||||
|
err := service.Verify(ctx, segments)
|
||||||
|
if err != nil {
|
||||||
|
return errs.Wrap(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
notFound := []*Segment{}
|
||||||
|
retry := []*Segment{}
|
||||||
|
|
||||||
|
// Find out which of the segments we did not find
|
||||||
|
// or there was some other failure.
|
||||||
|
for _, segment := range segments {
|
||||||
|
if segment.Status.NotFound > 0 {
|
||||||
|
notFound = append(notFound, segment)
|
||||||
|
} else if segment.Status.Retry > 0 {
|
||||||
|
// TODO: should we do a smarter check here?
|
||||||
|
// e.g. if at least half did find, then consider it ok?
|
||||||
|
retry = append(retry, segment)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Some segments might have been deleted during the
|
||||||
|
// processing, so cross-reference and remove any deleted
|
||||||
|
// segments from the list.
|
||||||
|
notFound, err = service.RemoveDeleted(ctx, notFound)
|
||||||
|
if err != nil {
|
||||||
|
return errs.Wrap(err)
|
||||||
|
}
|
||||||
|
retry, err = service.RemoveDeleted(ctx, retry)
|
||||||
|
if err != nil {
|
||||||
|
return errs.Wrap(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Output the problematic segments:
|
||||||
|
errNotFound := service.notFound.Write(ctx, notFound)
|
||||||
|
errRetry := service.retry.Write(ctx, retry)
|
||||||
|
|
||||||
|
return errs.Combine(errNotFound, errRetry)
|
||||||
|
}
|
||||||
|
|
||||||
|
// RemoveDeleted modifies the slice and returns only the segments that
|
||||||
|
// still exist in the database.
|
||||||
|
func (service *Service) RemoveDeleted(ctx context.Context, segments []*Segment) (_ []*Segment, err error) {
|
||||||
|
valid := segments[:0]
|
||||||
|
for _, seg := range segments {
|
||||||
|
_, err := service.metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
|
||||||
|
StreamID: seg.StreamID,
|
||||||
|
Position: seg.Position,
|
||||||
|
})
|
||||||
|
if metabase.ErrSegmentNotFound.Has(err) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
service.log.Error("get segment by id failed", zap.Stringer("stream-id", seg.StreamID), zap.String("position", fmt.Sprint(seg.Position)))
|
||||||
|
if ctx.Err() != nil {
|
||||||
|
return valid, ctx.Err()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
valid = append(valid, seg)
|
||||||
|
}
|
||||||
|
return valid, nil
|
||||||
|
}
|
||||||
|
|
||||||
// Segment contains minimal information necessary for verifying a single Segment.
|
// Segment contains minimal information necessary for verifying a single Segment.
|
||||||
type Segment struct {
|
type Segment struct {
|
||||||
StreamID uuid.UUID
|
metabase.VerifySegment
|
||||||
Position metabase.SegmentPosition
|
|
||||||
Pieces []metabase.AliasPiece
|
|
||||||
|
|
||||||
Status Status
|
Status Status
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -72,3 +211,15 @@ type Batch struct {
|
|||||||
|
|
||||||
// Len returns the length of the batch.
|
// Len returns the length of the batch.
|
||||||
func (b *Batch) Len() int { return len(b.Items) }
|
func (b *Batch) Len() int { return len(b.Items) }
|
||||||
|
|
||||||
|
// uuidBefore returns an uuid.UUID that's immediately before v.
|
||||||
|
// It might not be a valid uuid after this operation.
|
||||||
|
func uuidBefore(v uuid.UUID) uuid.UUID {
|
||||||
|
for i := len(v) - 1; i >= 0; i-- {
|
||||||
|
v[i]--
|
||||||
|
if v[i] != 0xFF { // we didn't wrap around
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return v
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user