cmd/satellite: repair-segment; add option to process csv file directly

Current option is to put stream id and position as an input but
it's not very efficient when we have long list of segments to repair.
This change adds option to read whole csv file and process each entry
one by one.

If command will have single argument then it will treat it as csv file
location and if will have two arguments then it will parse it just as
stream id and position.

Change-Id: I1e91cf57a794d81d74af0091c24a2e7d00d1fab9
This commit is contained in:
Michal Niewrzal 2022-11-18 16:57:44 +01:00 committed by Storj Robot
parent 87660bd9b3
commit 8d5a2a90f2
2 changed files with 78 additions and 28 deletions

View File

@ -6,10 +6,12 @@ package main
import ( import (
"bytes" "bytes"
"context" "context"
"encoding/csv"
"errors" "errors"
"fmt" "fmt"
"io" "io"
"net" "net"
"os"
"strconv" "strconv"
"time" "time"
@ -39,17 +41,18 @@ import (
"storj.io/uplink/private/eestream" "storj.io/uplink/private/eestream"
) )
type segment struct {
StreamID uuid.UUID
Position uint64
}
func cmdRepairSegment(cmd *cobra.Command, args []string) (err error) { func cmdRepairSegment(cmd *cobra.Command, args []string) (err error) {
ctx, _ := process.Ctx(cmd) ctx, _ := process.Ctx(cmd)
log := zap.L() log := zap.L()
streamID, err := uuid.FromString(args[0]) segments, err := collectInputSegments(args)
if err != nil { if err != nil {
return errs.New("invalid stream-id (should be in UUID form): %w", err) return err
}
streamPosition, err := strconv.ParseUint(args[1], 10, 64)
if err != nil {
return errs.New("stream position must be a number: %w", err)
} }
identity, err := runCfg.Identity.Load() identity, err := runCfg.Identity.Load()
@ -154,24 +157,74 @@ func cmdRepairSegment(cmd *cobra.Command, args []string) (err error) {
} }
}() }()
segment, err := metabaseDB.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{ for _, segment := range segments {
StreamID: streamID, segment, err := metabaseDB.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
Position: metabase.SegmentPositionFromEncoded(streamPosition), StreamID: segment.StreamID,
}) Position: metabase.SegmentPositionFromEncoded(segment.Position),
if err != nil { })
if metabase.ErrSegmentNotFound.Has(err) { if err != nil {
printOutput(segment.StreamID, segment.Position.Encode(), "segment not found in metabase db", 0, 0) if metabase.ErrSegmentNotFound.Has(err) {
printOutput(segment.StreamID, segment.Position.Encode(), "segment not found in metabase db", 0, 0)
return nil
}
log.Error("unknown error when getting segment metadata",
zap.Stringer("stream-id", segment.StreamID),
zap.Uint64("position", segment.Position.Encode()),
zap.Error(err))
printOutput(segment.StreamID, segment.Position.Encode(), "internal", 0, 0)
return nil return nil
} }
log.Error("unknown error when getting segment metadata", repairSegment(ctx, log, peer, metabaseDB, segment)
zap.Stringer("stream-id", streamID), }
zap.Uint64("position", streamPosition), return nil
zap.Error(err)) }
printOutput(segment.StreamID, segment.Position.Encode(), "internal", 0, 0)
return nil func collectInputSegments(args []string) (segments []segment, err error) {
convert := func(streamIDString, positionString string) (segment, error) {
streamID, err := uuid.FromString(streamIDString)
if err != nil {
return segment{}, errs.New("invalid stream-id (should be in UUID form): %w", err)
}
streamPosition, err := strconv.ParseUint(args[1], 10, 64)
if err != nil {
return segment{}, errs.New("stream position must be a number: %w", err)
}
return segment{
StreamID: streamID,
Position: streamPosition,
}, nil
} }
return repairSegment(ctx, log, peer, metabaseDB, segment) if len(args) == 1 {
csvFile, err := os.Open(args[0])
if err != nil {
return nil, err
}
defer func() {
err = errs.Combine(err, csvFile.Close())
}()
csvReader := csv.NewReader(csvFile)
allEntries, err := csvReader.ReadAll()
if err != nil {
return nil, err
}
// ignore first line with headers
for _, entry := range allEntries[1:] {
segment, err := convert(entry[0], entry[1])
if err != nil {
return nil, err
}
segments = append(segments, segment)
}
} else {
segment, err := convert(args[0], args[1])
if err != nil {
return nil, err
}
segments = append(segments, segment)
}
return segments, nil
} }
// repairSegment will repair selected segment no matter if it's healthy or not. // repairSegment will repair selected segment no matter if it's healthy or not.
@ -180,25 +233,24 @@ func cmdRepairSegment(cmd *cobra.Command, args []string) (err error) {
// * download whole segment into memory, use all available pieces // * download whole segment into memory, use all available pieces
// * reupload segment into new nodes // * reupload segment into new nodes
// * replace segment.Pieces field with just new nodes. // * replace segment.Pieces field with just new nodes.
func repairSegment(ctx context.Context, log *zap.Logger, peer *satellite.Repairer, metabaseDB *metabase.DB, segment metabase.Segment) error { func repairSegment(ctx context.Context, log *zap.Logger, peer *satellite.Repairer, metabaseDB *metabase.DB, segment metabase.Segment) {
log = log.With(zap.Stringer("stream-id", segment.StreamID), zap.Uint64("position", segment.Position.Encode())) log = log.With(zap.Stringer("stream-id", segment.StreamID), zap.Uint64("position", segment.Position.Encode()))
segmentData, failedDownloads, err := downloadSegment(ctx, log, peer, metabaseDB, segment) segmentData, failedDownloads, err := downloadSegment(ctx, log, peer, metabaseDB, segment)
if err != nil { if err != nil {
log.Error("download failed", zap.Error(err)) log.Error("download failed", zap.Error(err))
printOutput(segment.StreamID, segment.Position.Encode(), "download failed", len(segment.Pieces), failedDownloads) printOutput(segment.StreamID, segment.Position.Encode(), "download failed", len(segment.Pieces), failedDownloads)
return nil return
} }
if err := reuploadSegment(ctx, log, peer, metabaseDB, segment, segmentData); err != nil { if err := reuploadSegment(ctx, log, peer, metabaseDB, segment, segmentData); err != nil {
log.Error("upload failed", zap.Error(err)) log.Error("upload failed", zap.Error(err))
printOutput(segment.StreamID, segment.Position.Encode(), "upload failed", len(segment.Pieces), failedDownloads) printOutput(segment.StreamID, segment.Position.Encode(), "upload failed", len(segment.Pieces), failedDownloads)
return nil return
} }
printOutput(segment.StreamID, segment.Position.Encode(), "successful", len(segment.Pieces), failedDownloads) printOutput(segment.StreamID, segment.Position.Encode(), "successful", len(segment.Pieces), failedDownloads)
return nil
} }
func reuploadSegment(ctx context.Context, log *zap.Logger, peer *satellite.Repairer, metabaseDB *metabase.DB, segment metabase.Segment, segmentData []byte) error { func reuploadSegment(ctx context.Context, log *zap.Logger, peer *satellite.Repairer, metabaseDB *metabase.DB, segment metabase.Segment, segmentData []byte) error {

View File

@ -35,8 +35,7 @@ func TestRepairSegment(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.Len(t, segments, 1) require.Len(t, segments, 1)
err = repairSegment(ctx, zaptest.NewLogger(t), satellite.Repairer, satellite.Metabase.DB, segments[0]) repairSegment(ctx, zaptest.NewLogger(t), satellite.Repairer, satellite.Metabase.DB, segments[0])
require.NoError(t, err)
data, err := planet.Uplinks[0].Download(ctx, planet.Satellites[0], "bucket", "object") data, err := planet.Uplinks[0].Download(ctx, planet.Satellites[0], "bucket", "object")
require.NoError(t, err) require.NoError(t, err)
@ -67,8 +66,7 @@ func TestRepairSegment(t *testing.T) {
// we cannot download segment so repair is not possible // we cannot download segment so repair is not possible
observedZapCore, observedLogs := observer.New(zap.ErrorLevel) observedZapCore, observedLogs := observer.New(zap.ErrorLevel)
observedLogger := zap.New(observedZapCore) observedLogger := zap.New(observedZapCore)
err = repairSegment(ctx, observedLogger, satellite.Repairer, satellite.Metabase.DB, segments[0]) repairSegment(ctx, observedLogger, satellite.Repairer, satellite.Metabase.DB, segments[0])
require.NoError(t, err)
require.Contains(t, "download failed", observedLogs.All()[observedLogs.Len()-1].Message) require.Contains(t, "download failed", observedLogs.All()[observedLogs.Len()-1].Message)
// TODO add more detailed tests // TODO add more detailed tests