2022-10-28 13:53:12 +01:00
|
|
|
// Copyright (C) 2022 Storj Labs, Inc.
|
|
|
|
// See LICENSE for copying information.
|
|
|
|
|
|
|
|
package main
|
|
|
|
|
|
|
|
import (
|
2022-11-15 10:02:14 +00:00
|
|
|
"bytes"
|
|
|
|
"context"
|
2022-11-18 15:57:44 +00:00
|
|
|
"encoding/csv"
|
2022-11-15 10:02:14 +00:00
|
|
|
"errors"
|
|
|
|
"fmt"
|
|
|
|
"io"
|
|
|
|
"net"
|
2022-11-18 15:57:44 +00:00
|
|
|
"os"
|
2022-10-28 13:53:12 +01:00
|
|
|
"strconv"
|
2022-11-15 10:02:14 +00:00
|
|
|
"time"
|
2022-10-28 13:53:12 +01:00
|
|
|
|
|
|
|
"github.com/spf13/cobra"
|
2022-11-15 10:02:14 +00:00
|
|
|
"github.com/vivint/infectious"
|
2022-10-28 13:53:12 +01:00
|
|
|
"github.com/zeebo/errs"
|
|
|
|
"go.uber.org/zap"
|
2022-11-15 10:02:14 +00:00
|
|
|
"golang.org/x/sync/errgroup"
|
2022-10-28 13:53:12 +01:00
|
|
|
|
2022-11-15 10:02:14 +00:00
|
|
|
"storj.io/common/errs2"
|
|
|
|
"storj.io/common/pb"
|
|
|
|
"storj.io/common/peertls/tlsopts"
|
|
|
|
"storj.io/common/rpc"
|
|
|
|
"storj.io/common/rpc/rpcstatus"
|
|
|
|
"storj.io/common/signing"
|
|
|
|
"storj.io/common/storj"
|
2022-10-28 13:53:12 +01:00
|
|
|
"storj.io/common/uuid"
|
|
|
|
"storj.io/private/process"
|
|
|
|
"storj.io/storj/private/revocation"
|
|
|
|
"storj.io/storj/satellite"
|
|
|
|
"storj.io/storj/satellite/metabase"
|
|
|
|
"storj.io/storj/satellite/orders"
|
2022-11-15 10:02:14 +00:00
|
|
|
"storj.io/storj/satellite/overlay"
|
|
|
|
"storj.io/storj/satellite/repair/repairer"
|
2022-10-28 13:53:12 +01:00
|
|
|
"storj.io/storj/satellite/satellitedb"
|
2022-11-15 10:02:14 +00:00
|
|
|
"storj.io/uplink/private/eestream"
|
2022-10-28 13:53:12 +01:00
|
|
|
)
|
|
|
|
|
2022-11-18 15:57:44 +00:00
|
|
|
type segment struct {
|
|
|
|
StreamID uuid.UUID
|
|
|
|
Position uint64
|
|
|
|
}
|
|
|
|
|
2022-10-28 13:53:12 +01:00
|
|
|
func cmdRepairSegment(cmd *cobra.Command, args []string) (err error) {
|
|
|
|
ctx, _ := process.Ctx(cmd)
|
|
|
|
log := zap.L()
|
|
|
|
|
2022-11-18 15:57:44 +00:00
|
|
|
segments, err := collectInputSegments(args)
|
2022-10-28 13:53:12 +01:00
|
|
|
if err != nil {
|
2022-11-18 15:57:44 +00:00
|
|
|
return err
|
2022-10-28 13:53:12 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
identity, err := runCfg.Identity.Load()
|
|
|
|
if err != nil {
|
|
|
|
log.Error("Failed to load identity.", zap.Error(err))
|
|
|
|
return errs.New("Failed to load identity: %+v", err)
|
|
|
|
}
|
|
|
|
|
2023-01-12 12:16:36 +00:00
|
|
|
db, err := satellitedb.Open(ctx, log.Named("db"), runCfg.Database, satellitedb.Options{ApplicationName: "satellite-repair-segment"})
|
2022-10-28 13:53:12 +01:00
|
|
|
if err != nil {
|
|
|
|
return errs.New("Error starting master database: %+v", err)
|
|
|
|
}
|
|
|
|
defer func() {
|
|
|
|
err = errs.Combine(err, db.Close())
|
|
|
|
}()
|
|
|
|
|
2022-10-28 15:56:20 +01:00
|
|
|
metabaseDB, err := metabase.Open(ctx, log.Named("metabase"), runCfg.Metainfo.DatabaseURL,
|
|
|
|
runCfg.Config.Metainfo.Metabase("satellite-repair-segment"))
|
2022-10-28 13:53:12 +01:00
|
|
|
if err != nil {
|
|
|
|
return errs.New("Error creating metabase connection: %+v", err)
|
|
|
|
}
|
|
|
|
defer func() {
|
|
|
|
err = errs.Combine(err, metabaseDB.Close())
|
|
|
|
}()
|
|
|
|
|
|
|
|
revocationDB, err := revocation.OpenDBFromCfg(ctx, runCfg.Server.Config)
|
|
|
|
if err != nil {
|
|
|
|
return errs.New("Error creating revocation database: %+v", err)
|
|
|
|
}
|
|
|
|
defer func() {
|
|
|
|
err = errs.Combine(err, revocationDB.Close())
|
|
|
|
}()
|
|
|
|
|
2022-11-15 10:02:14 +00:00
|
|
|
config := runCfg
|
2022-10-28 13:53:12 +01:00
|
|
|
|
2022-11-15 10:02:14 +00:00
|
|
|
tlsOptions, err := tlsopts.NewOptions(identity, config.Server.Config, revocationDB)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
dialer := rpc.NewDefaultDialer(tlsOptions)
|
|
|
|
|
satellite/overlay: fix placement selection config parsing
When we do `satellite run api --placement '...'`, the placement rules are not parsed well.
The problem is based on `viper.AllSettings()`, and the main logic is sg. like this (from a new unit test):
```
r := ConfigurablePlacementRule{}
err := r.Set(p)
require.NoError(t, err)
serialized := r.String()
r2 := ConfigurablePlacementRule{}
err = r2.Set(serialized)
require.NoError(t, err)
require.Equal(t, p, r2.String())
```
All settings evaluates the placement rules in `ConfigurablePlacementRules` and stores the string representation.
The problem is that we don't have proper `String()` implementation (it prints out the structs instead of the original definition.
There are two main solutions for this problem:
1. We can fix the `String()`. When we parse a placement rule, the `String()` method should print out the original definition
2. We can switch to use pure string as configuration parameter, and parse the rules only when required.
I feel that 1 is error prone, we can do it (and in this patch I added a lot of `String()` implementations, but it's hard to be sure that our `String()` logic is inline with the parsing logic.
Therefore I decided to make the configuration value of the placements a string (or a wrapper around string).
That's the main reason why this patch seems to be big, as I updated all the usages.
But the main part is in beginning of the `placement.go` (configuration parsing is not a pflag.Value implementation any more, but a separated step).
And `filter.go`, (a few more String implementation for filters.
https://github.com/storj/storj/issues/6248
Change-Id: I47c762d3514342b76a2e85683b1c891502a0756a
2023-09-06 10:40:22 +01:00
|
|
|
placement, err := config.Placement.Parse()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
overlayService, err := overlay.NewService(log.Named("overlay"), db.OverlayCache(), db.NodeEvents(), placement.CreateFilters, config.Console.ExternalAddress, config.Console.SatelliteName, config.Overlay)
|
2022-11-15 10:02:14 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
orders, err := orders.NewService(
|
|
|
|
log.Named("orders"),
|
|
|
|
signing.SignerFromFullIdentity(identity),
|
2023-07-06 13:35:26 +01:00
|
|
|
overlayService,
|
2023-01-12 12:16:36 +00:00
|
|
|
orders.NewNoopDB(),
|
satellite/overlay: fix placement selection config parsing
When we do `satellite run api --placement '...'`, the placement rules are not parsed well.
The problem is based on `viper.AllSettings()`, and the main logic is sg. like this (from a new unit test):
```
r := ConfigurablePlacementRule{}
err := r.Set(p)
require.NoError(t, err)
serialized := r.String()
r2 := ConfigurablePlacementRule{}
err = r2.Set(serialized)
require.NoError(t, err)
require.Equal(t, p, r2.String())
```
All settings evaluates the placement rules in `ConfigurablePlacementRules` and stores the string representation.
The problem is that we don't have proper `String()` implementation (it prints out the structs instead of the original definition.
There are two main solutions for this problem:
1. We can fix the `String()`. When we parse a placement rule, the `String()` method should print out the original definition
2. We can switch to use pure string as configuration parameter, and parse the rules only when required.
I feel that 1 is error prone, we can do it (and in this patch I added a lot of `String()` implementations, but it's hard to be sure that our `String()` logic is inline with the parsing logic.
Therefore I decided to make the configuration value of the placements a string (or a wrapper around string).
That's the main reason why this patch seems to be big, as I updated all the usages.
But the main part is in beginning of the `placement.go` (configuration parsing is not a pflag.Value implementation any more, but a separated step).
And `filter.go`, (a few more String implementation for filters.
https://github.com/storj/storj/issues/6248
Change-Id: I47c762d3514342b76a2e85683b1c891502a0756a
2023-09-06 10:40:22 +01:00
|
|
|
placement.CreateFilters,
|
2022-11-15 10:02:14 +00:00
|
|
|
config.Orders,
|
2022-10-28 13:53:12 +01:00
|
|
|
)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2022-11-15 10:02:14 +00:00
|
|
|
ecRepairer := repairer.NewECRepairer(
|
|
|
|
log.Named("ec-repair"),
|
|
|
|
dialer,
|
|
|
|
signing.SigneeFromPeerIdentity(identity.PeerIdentity()),
|
2023-06-16 10:42:05 +01:00
|
|
|
config.Repairer.DialTimeout,
|
2022-11-15 10:02:14 +00:00
|
|
|
config.Repairer.DownloadTimeout,
|
|
|
|
true) // force inmemory download of pieces
|
|
|
|
|
|
|
|
segmentRepairer := repairer.NewSegmentRepairer(
|
|
|
|
log.Named("segment-repair"),
|
|
|
|
metabaseDB,
|
|
|
|
orders,
|
2023-07-06 13:35:26 +01:00
|
|
|
overlayService,
|
2022-11-15 10:02:14 +00:00
|
|
|
nil, // TODO add noop version
|
|
|
|
ecRepairer,
|
satellite/overlay: fix placement selection config parsing
When we do `satellite run api --placement '...'`, the placement rules are not parsed well.
The problem is based on `viper.AllSettings()`, and the main logic is sg. like this (from a new unit test):
```
r := ConfigurablePlacementRule{}
err := r.Set(p)
require.NoError(t, err)
serialized := r.String()
r2 := ConfigurablePlacementRule{}
err = r2.Set(serialized)
require.NoError(t, err)
require.Equal(t, p, r2.String())
```
All settings evaluates the placement rules in `ConfigurablePlacementRules` and stores the string representation.
The problem is that we don't have proper `String()` implementation (it prints out the structs instead of the original definition.
There are two main solutions for this problem:
1. We can fix the `String()`. When we parse a placement rule, the `String()` method should print out the original definition
2. We can switch to use pure string as configuration parameter, and parse the rules only when required.
I feel that 1 is error prone, we can do it (and in this patch I added a lot of `String()` implementations, but it's hard to be sure that our `String()` logic is inline with the parsing logic.
Therefore I decided to make the configuration value of the placements a string (or a wrapper around string).
That's the main reason why this patch seems to be big, as I updated all the usages.
But the main part is in beginning of the `placement.go` (configuration parsing is not a pflag.Value implementation any more, but a separated step).
And `filter.go`, (a few more String implementation for filters.
https://github.com/storj/storj/issues/6248
Change-Id: I47c762d3514342b76a2e85683b1c891502a0756a
2023-09-06 10:40:22 +01:00
|
|
|
placement.CreateFilters,
|
2022-11-15 10:02:14 +00:00
|
|
|
config.Checker.RepairOverrides,
|
2022-11-24 13:02:08 +00:00
|
|
|
config.Repairer,
|
2022-11-15 10:02:14 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
// TODO reorganize to avoid using peer.
|
|
|
|
|
|
|
|
peer := &satellite.Repairer{}
|
2023-07-06 13:35:26 +01:00
|
|
|
peer.Overlay = overlayService
|
2022-11-15 10:02:14 +00:00
|
|
|
peer.Orders.Service = orders
|
|
|
|
peer.EcRepairer = ecRepairer
|
|
|
|
peer.SegmentRepairer = segmentRepairer
|
|
|
|
|
|
|
|
cancelCtx, cancel := context.WithCancel(ctx)
|
|
|
|
group := errgroup.Group{}
|
|
|
|
group.Go(func() error {
|
|
|
|
return peer.Overlay.UploadSelectionCache.Run(cancelCtx)
|
|
|
|
})
|
|
|
|
defer func() {
|
|
|
|
cancel()
|
|
|
|
err := group.Wait()
|
|
|
|
if err != nil {
|
|
|
|
log.Error("upload cache error", zap.Error(err))
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
2022-11-18 15:57:44 +00:00
|
|
|
for _, segment := range segments {
|
|
|
|
segment, err := metabaseDB.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
|
|
|
|
StreamID: segment.StreamID,
|
|
|
|
Position: metabase.SegmentPositionFromEncoded(segment.Position),
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
if metabase.ErrSegmentNotFound.Has(err) {
|
|
|
|
printOutput(segment.StreamID, segment.Position.Encode(), "segment not found in metabase db", 0, 0)
|
2022-11-21 16:18:13 +00:00
|
|
|
} else {
|
|
|
|
log.Error("unknown error when getting segment metadata",
|
|
|
|
zap.Stringer("stream-id", segment.StreamID),
|
|
|
|
zap.Uint64("position", segment.Position.Encode()),
|
|
|
|
zap.Error(err))
|
|
|
|
printOutput(segment.StreamID, segment.Position.Encode(), "internal", 0, 0)
|
2022-11-18 15:57:44 +00:00
|
|
|
}
|
2022-11-21 16:18:13 +00:00
|
|
|
continue
|
2022-11-15 10:02:14 +00:00
|
|
|
}
|
2022-11-18 15:57:44 +00:00
|
|
|
repairSegment(ctx, log, peer, metabaseDB, segment)
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func collectInputSegments(args []string) (segments []segment, err error) {
|
|
|
|
convert := func(streamIDString, positionString string) (segment, error) {
|
|
|
|
streamID, err := uuid.FromString(streamIDString)
|
|
|
|
if err != nil {
|
|
|
|
return segment{}, errs.New("invalid stream-id (should be in UUID form): %w", err)
|
|
|
|
}
|
2022-11-21 09:41:32 +00:00
|
|
|
streamPosition, err := strconv.ParseUint(positionString, 10, 64)
|
2022-11-18 15:57:44 +00:00
|
|
|
if err != nil {
|
|
|
|
return segment{}, errs.New("stream position must be a number: %w", err)
|
|
|
|
}
|
|
|
|
return segment{
|
|
|
|
StreamID: streamID,
|
|
|
|
Position: streamPosition,
|
|
|
|
}, nil
|
2022-11-15 10:02:14 +00:00
|
|
|
}
|
|
|
|
|
2022-11-18 15:57:44 +00:00
|
|
|
if len(args) == 1 {
|
|
|
|
csvFile, err := os.Open(args[0])
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
defer func() {
|
|
|
|
err = errs.Combine(err, csvFile.Close())
|
|
|
|
}()
|
|
|
|
|
|
|
|
csvReader := csv.NewReader(csvFile)
|
|
|
|
allEntries, err := csvReader.ReadAll()
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2022-11-21 09:41:32 +00:00
|
|
|
if len(allEntries) > 1 {
|
|
|
|
// ignore first line with headers
|
|
|
|
for _, entry := range allEntries[1:] {
|
|
|
|
segment, err := convert(entry[0], entry[1])
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
segments = append(segments, segment)
|
2022-11-18 15:57:44 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
segment, err := convert(args[0], args[1])
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
segments = append(segments, segment)
|
|
|
|
}
|
|
|
|
return segments, nil
|
2022-11-15 10:02:14 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// repairSegment will repair selected segment no matter if it's healthy or not.
|
|
|
|
//
|
|
|
|
// Logic for this method is:
|
|
|
|
// * download whole segment into memory, use all available pieces
|
|
|
|
// * reupload segment into new nodes
|
|
|
|
// * replace segment.Pieces field with just new nodes.
|
2022-11-18 15:57:44 +00:00
|
|
|
func repairSegment(ctx context.Context, log *zap.Logger, peer *satellite.Repairer, metabaseDB *metabase.DB, segment metabase.Segment) {
|
2022-11-15 10:02:14 +00:00
|
|
|
log = log.With(zap.Stringer("stream-id", segment.StreamID), zap.Uint64("position", segment.Position.Encode()))
|
|
|
|
segmentData, failedDownloads, err := downloadSegment(ctx, log, peer, metabaseDB, segment)
|
|
|
|
if err != nil {
|
|
|
|
log.Error("download failed", zap.Error(err))
|
|
|
|
|
|
|
|
printOutput(segment.StreamID, segment.Position.Encode(), "download failed", len(segment.Pieces), failedDownloads)
|
2022-11-18 15:57:44 +00:00
|
|
|
return
|
2022-11-15 10:02:14 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
if err := reuploadSegment(ctx, log, peer, metabaseDB, segment, segmentData); err != nil {
|
|
|
|
log.Error("upload failed", zap.Error(err))
|
|
|
|
|
|
|
|
printOutput(segment.StreamID, segment.Position.Encode(), "upload failed", len(segment.Pieces), failedDownloads)
|
2022-11-18 15:57:44 +00:00
|
|
|
return
|
2022-11-15 10:02:14 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
printOutput(segment.StreamID, segment.Position.Encode(), "successful", len(segment.Pieces), failedDownloads)
|
|
|
|
}
|
|
|
|
|
|
|
|
func reuploadSegment(ctx context.Context, log *zap.Logger, peer *satellite.Repairer, metabaseDB *metabase.DB, segment metabase.Segment, segmentData []byte) error {
|
|
|
|
excludeNodeIDs := make([]storj.NodeID, 0, len(segment.Pieces))
|
|
|
|
for _, piece := range segment.Pieces {
|
|
|
|
excludeNodeIDs = append(excludeNodeIDs, piece.StorageNode)
|
|
|
|
}
|
|
|
|
|
|
|
|
redundancy, err := eestream.NewRedundancyStrategyFromStorj(segment.Redundancy)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
request := overlay.FindStorageNodesRequest{
|
|
|
|
RequestedCount: redundancy.OptimalThreshold(),
|
|
|
|
ExcludedIDs: excludeNodeIDs,
|
|
|
|
Placement: segment.Placement,
|
|
|
|
}
|
|
|
|
|
|
|
|
newNodes, err := peer.Overlay.FindStorageNodesForUpload(ctx, request)
|
2022-10-28 13:53:12 +01:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2022-11-15 10:02:14 +00:00
|
|
|
if len(newNodes) < redundancy.RepairThreshold() {
|
|
|
|
return errs.New("not enough new nodes were found for repair: min %v got %v", redundancy.RepairThreshold(), len(newNodes))
|
|
|
|
}
|
|
|
|
|
2023-05-22 13:35:23 +01:00
|
|
|
putLimits, putPrivateKey, err := peer.Orders.Service.CreatePutRepairOrderLimits(ctx, segment, make([]*pb.AddressedOrderLimit, len(newNodes)),
|
satellite/repair: unify repair logic
The repair checker and repair worker both need to determine which pieces
are healthy, which are retrievable, and which should be replaced, but
they have been doing it in different ways in different code, which has
been the cause of bugs. The same term could have very similar but subtly
different meanings between the two, causing much confusion.
With this change, the piece- and node-classification logic is
consolidated into one place within the satellite/repair package, so that
both subsystems can use it. This ought to make decision-making code more
concise and more readable.
The consolidated classification logic has been expanded to create more
sets, so that the decision-making code does not need to do as much
precalculation. It should now be clearer in comments and code that a
piece can belong to multiple sets arbitrarily (except where the
definition of the sets makes this logically impossible), and what the
precise meaning of each set is. These sets include Missing, Suspended,
Clumped, OutOfPlacement, InExcludedCountry, ForcingRepair,
UnhealthyRetrievable, Unhealthy, Retrievable, and Healthy.
Some other side effects of this change:
* CreatePutRepairOrderLimits no longer needs to special-case excluded
countries; it can just create as many order limits as requested (by
way of len(newNodes)).
* The repair checker will now queue a segment for repair when there are
any pieces out of placement. The code calls this "forcing a repair".
* The checker.ReliabilityCache is now accessed by way of a GetNodes()
function similar to the one on the overlay. The classification methods
like MissingPieces(), OutOfPlacementPieces(), and
PiecesNodesLastNetsInOrder() are removed in favor of the
classification logic in satellite/repair/classification.go. This
means the reliability cache no longer needs access to the placement
rules or excluded countries list.
Change-Id: I105109fb94ee126952f07d747c6e11131164fadb
2023-09-11 05:07:39 +01:00
|
|
|
make(map[uint16]struct{}), newNodes)
|
2022-11-15 10:02:14 +00:00
|
|
|
if err != nil {
|
|
|
|
return errs.New("could not create PUT_REPAIR order limits: %w", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
timeout := 5 * time.Minute
|
|
|
|
successfulNeeded := redundancy.OptimalThreshold()
|
|
|
|
successful, _, err := peer.EcRepairer.Repair(ctx, putLimits, putPrivateKey, redundancy, bytes.NewReader(segmentData),
|
|
|
|
timeout, successfulNeeded)
|
2022-10-28 13:53:12 +01:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2022-11-15 10:02:14 +00:00
|
|
|
var repairedPieces metabase.Pieces
|
|
|
|
for i, node := range successful {
|
|
|
|
if node == nil {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
repairedPieces = append(repairedPieces, metabase.Piece{
|
|
|
|
Number: uint16(i),
|
|
|
|
StorageNode: node.Id,
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
if len(repairedPieces) < redundancy.RepairThreshold() {
|
|
|
|
return errs.New("not enough pieces were uploaded during repair: min %v got %v", redundancy.RepairThreshold(), len(repairedPieces))
|
|
|
|
}
|
|
|
|
|
|
|
|
// UpdateSegmentPieces is doing compare and swap
|
|
|
|
return metabaseDB.UpdateSegmentPieces(ctx, metabase.UpdateSegmentPieces{
|
|
|
|
StreamID: segment.StreamID,
|
|
|
|
Position: segment.Position,
|
|
|
|
|
|
|
|
OldPieces: segment.Pieces,
|
|
|
|
NewRedundancy: segment.Redundancy,
|
|
|
|
NewPieces: repairedPieces,
|
|
|
|
|
|
|
|
NewRepairedAt: time.Now(),
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
func downloadSegment(ctx context.Context, log *zap.Logger, peer *satellite.Repairer, metabaseDB *metabase.DB, segment metabase.Segment) ([]byte, int, error) {
|
|
|
|
// AdminFetchPieces downloads all pieces for specified segment and returns readers, readers data is kept on disk or inmemory
|
|
|
|
pieceInfos, err := peer.SegmentRepairer.AdminFetchPieces(ctx, &segment, "")
|
|
|
|
if err != nil {
|
|
|
|
return nil, 0, err
|
|
|
|
}
|
|
|
|
|
|
|
|
numberOfOtherFailures := 0
|
|
|
|
numberOfFileNotFound := 0
|
|
|
|
numberOfOffline := 0
|
|
|
|
pieceReaders := make(map[int]io.ReadCloser, len(pieceInfos))
|
2022-10-28 13:53:12 +01:00
|
|
|
for pieceNum, pieceInfo := range pieceInfos {
|
|
|
|
if pieceInfo.GetLimit == nil {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
2022-11-15 10:02:14 +00:00
|
|
|
log := log.With(zap.Int("piece num", pieceNum))
|
|
|
|
|
|
|
|
var dnsErr *net.DNSError
|
|
|
|
var opError *net.OpError
|
2022-10-28 13:53:12 +01:00
|
|
|
if err := pieceInfo.FetchError; err != nil {
|
2022-11-15 10:02:14 +00:00
|
|
|
if errs2.IsRPC(err, rpcstatus.NotFound) {
|
|
|
|
numberOfFileNotFound++
|
|
|
|
} else if errors.As(err, &dnsErr) || errors.As(err, &opError) {
|
|
|
|
numberOfOffline++
|
|
|
|
} else {
|
|
|
|
numberOfOtherFailures++
|
|
|
|
}
|
|
|
|
|
|
|
|
log.Error("unable to fetch piece", zap.Error(pieceInfo.FetchError))
|
2022-10-28 13:53:12 +01:00
|
|
|
continue
|
|
|
|
}
|
|
|
|
if pieceInfo.Reader == nil {
|
2022-11-15 10:02:14 +00:00
|
|
|
log.Error("piece reader is empty")
|
2022-10-28 13:53:12 +01:00
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
2022-11-15 10:02:14 +00:00
|
|
|
pieceReaders[pieceNum] = pieceInfo.Reader
|
|
|
|
}
|
2022-10-28 13:53:12 +01:00
|
|
|
|
2022-11-15 10:02:14 +00:00
|
|
|
log.Info("download summary",
|
|
|
|
zap.Int("number of pieces", len(segment.Pieces)), zap.Int("pieces downloaded", len(pieceReaders)),
|
|
|
|
zap.Int("file not found", numberOfFileNotFound), zap.Int("offline nodes", numberOfOffline),
|
|
|
|
zap.Int("other errors", numberOfOtherFailures),
|
|
|
|
)
|
2022-10-28 13:53:12 +01:00
|
|
|
|
2022-11-15 10:02:14 +00:00
|
|
|
failedDownloads := numberOfFileNotFound + numberOfOtherFailures
|
|
|
|
|
|
|
|
redundancy, err := eestream.NewRedundancyStrategyFromStorj(segment.Redundancy)
|
|
|
|
if err != nil {
|
|
|
|
return nil, failedDownloads, errs.New("invalid redundancy strategy: %w", err)
|
2022-10-28 13:53:12 +01:00
|
|
|
}
|
|
|
|
|
2022-11-15 10:02:14 +00:00
|
|
|
if len(pieceReaders) < redundancy.RequiredCount() {
|
|
|
|
return nil, failedDownloads, errs.New("not enough pieces to reconstruct the segment, pieces: %d required: %d",
|
|
|
|
len(pieceReaders), redundancy.RequiredCount())
|
|
|
|
}
|
2022-10-28 13:53:12 +01:00
|
|
|
|
2022-11-15 10:02:14 +00:00
|
|
|
fec, err := infectious.NewFEC(redundancy.RequiredCount(), redundancy.TotalCount())
|
|
|
|
if err != nil {
|
|
|
|
return nil, failedDownloads, err
|
|
|
|
}
|
|
|
|
|
|
|
|
esScheme := eestream.NewUnsafeRSScheme(fec, redundancy.ErasureShareSize())
|
2023-02-22 10:32:26 +00:00
|
|
|
pieceSize := segment.PieceSize()
|
2022-11-15 10:02:14 +00:00
|
|
|
expectedSize := pieceSize * int64(redundancy.RequiredCount())
|
|
|
|
|
|
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
|
|
defer cancel()
|
|
|
|
segmentReader := eestream.DecodeReaders2(ctx, cancel, pieceReaders, esScheme, expectedSize, 0, false)
|
|
|
|
data, err := io.ReadAll(segmentReader)
|
|
|
|
return data, failedDownloads, err
|
|
|
|
}
|
|
|
|
|
|
|
|
// printOutput prints result to standard output in a way to be able to combine
|
|
|
|
// single results into single csv file.
|
|
|
|
func printOutput(streamID uuid.UUID, position uint64, result string, numberOfPieces, failedDownloads int) {
|
|
|
|
fmt.Printf("%s,%d,%s,%d,%d\n", streamID, position, result, numberOfPieces, failedDownloads)
|
2022-10-28 13:53:12 +01:00
|
|
|
}
|