storj/cmd/tools/segment-verify/batch.go

154 lines
4.1 KiB
Go
Raw Normal View History

// Copyright (C) 2022 Storj Labs, Inc.
// See LICENSE for copying information.
package main
import (
"context"
"sort"
"storj.io/storj/satellite/metabase"
)
// CreateBatches creates load-balanced queues of segments to verify.
func (service *Service) CreateBatches(ctx context.Context, segments []*Segment) (_ []*Batch, err error) {
defer mon.Task()(&ctx)(&err)
// Remove offline nodes and prioritize nodes.
for _, segment := range segments {
service.selectOnlinePieces(segment)
service.sortPriorityToFirst(segment)
}
batches := map[metabase.NodeAlias]*Batch{}
enqueue := func(alias metabase.NodeAlias, segment *Segment) {
q, ok := batches[alias]
if !ok {
q = &Batch{Alias: alias}
batches[alias] = q
}
q.Items = append(q.Items, segment)
}
// Distribute things randomly into batches.
// We assume that segment.AliasPieces is randomly ordered in terms of nodes.
for _, segment := range segments {
if len(segment.AliasPieces) < int(segment.Status.Retry) {
panic("segment contains too few pieces")
}
for _, piece := range segment.AliasPieces[:segment.Status.Retry] {
enqueue(piece.Alias, segment)
}
}
allQueues := []*Batch{}
for _, q := range batches {
allQueues = append(allQueues, q)
}
// sort queues by queue length descending.
sort.Slice(allQueues, func(i, k int) bool {
return allQueues[i].Len() >= allQueues[k].Len()
})
// try to redistribute segments in different slices
// queue with length above 65% will be redistributed
// to queues with length below 40%, but no more than 50%
highIndex := len(allQueues) - len(allQueues)*65/100
midIndex := len(allQueues) - len(allQueues)*50/100
lowIndex := len(allQueues) - len(allQueues)*40/100
midLen := allQueues[midIndex].Len()
highLen := allQueues[highIndex].Len()
smallBatches := mapFromBatches(allQueues[lowIndex:])
// iterate over all queues
for _, large := range allQueues[:highIndex] {
// don't redistribute priority nodes
if service.PriorityNodes.Contains(large.Alias) {
continue
}
newItems := large.Items[:highLen]
nextSegment:
for _, segment := range large.Items[highLen:] {
// try to find a piece that can be moved into a small batch.
for _, piece := range segment.AliasPieces[segment.Status.Retry:] {
if q, ok := smallBatches[piece.Alias]; ok {
// move to the other queue
q.Items = append(q.Items, segment)
if q.Len() >= midLen {
delete(smallBatches, piece.Alias)
}
continue nextSegment
}
}
// keep the segment in the slice,
// because we didn't find a good alternative
newItems = append(newItems, segment)
}
large.Items = newItems
if len(smallBatches) == 0 {
break
}
}
return allQueues, nil
}
// selectOnlinePieces modifies slice such that it only contains online pieces.
func (service *Service) selectOnlinePieces(segment *Segment) {
for i, x := range segment.AliasPieces {
if !service.OfflineNodes.Contains(x.Alias) {
continue
}
// found an offline node, start removing
rs := segment.AliasPieces[:i]
for _, x := range segment.AliasPieces[i+1:] {
if !service.OfflineNodes.Contains(x.Alias) {
rs = append(rs, x)
}
}
segment.AliasPieces = rs
return
}
}
// removePriorityPieces modifies slice such that it only contains non-priority pieces.
func (service *Service) removePriorityPieces(segment *Segment) {
target := 0
for _, x := range segment.AliasPieces {
if service.PriorityNodes.Contains(x.Alias) {
continue
}
segment.AliasPieces[target] = x
target++
}
segment.AliasPieces = segment.AliasPieces[:target]
}
// sortPriorityToFirst moves priority node pieces at the front of the list.
func (service *Service) sortPriorityToFirst(segment *Segment) {
xs := segment.AliasPieces
target := 0
for i, x := range xs {
if service.PriorityNodes.Contains(x.Alias) {
xs[target], xs[i] = xs[i], xs[target]
target++
}
}
}
// mapFromBatches creates a map from the specified batches.
func mapFromBatches(batches []*Batch) map[metabase.NodeAlias]*Batch {
xs := make(map[metabase.NodeAlias]*Batch, len(batches))
for _, b := range batches {
xs[b.Alias] = b
}
return xs
}