storj/cmd/satellite/fetchpieces.go

210 lines
6.0 KiB
Go
Raw Normal View History

// Copyright (C) 2022 Storj Labs, Inc.
// See LICENSE for copying information.
package main
import (
"encoding/json"
"fmt"
"io"
"os"
"path"
"strconv"
"github.com/spf13/cobra"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/common/context2"
"storj.io/common/pb"
"storj.io/common/uuid"
"storj.io/private/process"
"storj.io/private/version"
"storj.io/storj/private/revocation"
"storj.io/storj/satellite"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/orders"
"storj.io/storj/satellite/satellitedb"
)
func cmdFetchPieces(cmd *cobra.Command, args []string) (err error) {
ctx, _ := process.Ctx(cmd)
log := zap.L()
streamID, err := uuid.FromString(args[0])
if err != nil {
return errs.New("invalid stream-id (should be in UUID form): %w", err)
}
streamPosition, err := strconv.ParseUint(args[1], 10, 64)
if err != nil {
return errs.New("stream position must be a number: %w", err)
}
saveDir := args[2]
destDirStat, err := os.Stat(saveDir)
if !destDirStat.IsDir() {
return errs.New("destination dir %q is not a directory", saveDir)
}
identity, err := runCfg.Identity.Load()
if err != nil {
log.Error("Failed to load identity.", zap.Error(err))
return errs.New("Failed to load identity: %+v", err)
}
db, err := satellitedb.Open(ctx, log.Named("db"), runCfg.Database, satellitedb.Options{ApplicationName: "satellite-pieces-fetcher"})
if err != nil {
return errs.New("Error starting master database: %+v", err)
}
defer func() {
err = errs.Combine(err, db.Close())
}()
metabaseDB, err := metabase.Open(ctx, log.Named("metabase"), runCfg.Metainfo.DatabaseURL,
runCfg.Config.Metainfo.Metabase("satellite-pieces-fetcher"))
if err != nil {
return errs.New("Error creating metabase connection: %+v", err)
}
defer func() {
err = errs.Combine(err, metabaseDB.Close())
}()
revocationDB, err := revocation.OpenDBFromCfg(ctx, runCfg.Server.Config)
if err != nil {
return errs.New("Error creating revocation database: %+v", err)
}
defer func() {
err = errs.Combine(err, revocationDB.Close())
}()
rollupsWriteCache := orders.NewRollupsWriteCache(log.Named("orders-write-cache"), db.Orders(), runCfg.Orders.FlushBatchSize)
defer func() {
err = errs.Combine(err, rollupsWriteCache.CloseAndFlush(context2.WithoutCancellation(ctx)))
}()
peer, err := satellite.NewRepairer(
log,
identity,
metabaseDB,
revocationDB,
db.RepairQueue(),
db.Buckets(),
db.OverlayCache(),
db.Reputation(),
db.Containment(),
rollupsWriteCache,
version.Build,
&runCfg.Config,
process.AtomicLevel(cmd),
)
if err != nil {
return err
}
segmentInfo, err := metabaseDB.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
StreamID: streamID,
Position: metabase.SegmentPositionFromEncoded(streamPosition),
})
if err != nil {
return err
}
pieceInfos, err := peer.SegmentRepairer.AdminFetchPieces(ctx, &segmentInfo, saveDir)
if err != nil {
return err
}
for pieceIndex, pieceInfo := range pieceInfos {
if pieceInfo.GetLimit == nil {
continue
}
log := log.With(zap.Int("piece-index", pieceIndex))
if err := pieceInfo.FetchError; err != nil {
writeErrorMessageToFile(log, err, saveDir, pieceIndex)
}
writeMetaInfoToFile(log, pieceInfo.GetLimit, pieceInfo.OriginalLimit, pieceInfo.Hash, saveDir, pieceIndex)
if pieceInfo.Reader != nil {
writePieceToFile(log, pieceInfo.Reader, saveDir, pieceIndex)
if err := pieceInfo.Reader.Close(); err != nil {
log.Error("could not close piece reader", zap.Error(err))
}
}
}
return nil
}
func writeErrorMessageToFile(log *zap.Logger, err error, saveDir string, pieceIndex int) {
errorMessage := err.Error()
filename := path.Join(saveDir, fmt.Sprintf("piece.%d.error.txt", pieceIndex))
log = log.With(zap.String("file-path", filename))
errorFile, err := os.OpenFile(filename, os.O_WRONLY|os.O_CREATE, 0o700)
if err != nil {
log.Error("could not open error file", zap.Error(err))
return
}
defer func() {
if err := errorFile.Close(); err != nil {
log.Error("could not close error file", zap.Error(err))
}
}()
if _, err := errorFile.WriteString(errorMessage); err != nil {
log.Error("could not write to error file", zap.Error(err))
}
}
type pieceMetaInfo struct {
Hash *pb.PieceHash `json:"hash"`
GetLimit *pb.AddressedOrderLimit `json:"get_limit"`
OriginalLimit *pb.OrderLimit `json:"original_limit"`
}
func writeMetaInfoToFile(log *zap.Logger, getLimit *pb.AddressedOrderLimit, originalLimit *pb.OrderLimit, hash *pb.PieceHash, saveDir string, pieceIndex int) {
filename := path.Join(saveDir, fmt.Sprintf("piece.%d.metainfo.json", pieceIndex))
log = log.With(zap.String("file-path", filename))
metaInfoFile, err := os.OpenFile(filename, os.O_WRONLY|os.O_CREATE|os.O_EXCL, 0o700)
if err != nil {
log.Error("could not open metainfo file", zap.Error(err))
return
}
defer func() {
if err := metaInfoFile.Close(); err != nil {
log.Error("could not close metainfo file", zap.Error(err))
}
}()
metaInfo := pieceMetaInfo{
Hash: hash,
GetLimit: getLimit,
OriginalLimit: originalLimit,
}
metaInfoJSON, err := json.Marshal(&metaInfo)
if err != nil {
log.Error("could not marshal metainfo JSON object", zap.Error(err))
return
}
if _, err := metaInfoFile.Write(metaInfoJSON); err != nil {
log.Error("could not write JSON to metainfo file", zap.Error(err))
}
}
func writePieceToFile(log *zap.Logger, pieceReader io.ReadCloser, saveDir string, pieceIndex int) {
filename := path.Join(saveDir, fmt.Sprintf("piece.%d.contents", pieceIndex))
log = log.With(zap.String("file-path", filename))
contentFile, err := os.OpenFile(filename, os.O_WRONLY|os.O_CREATE|os.O_EXCL, 0o700)
if err != nil {
log.Error("could not open content file", zap.Error(err))
return
}
defer func() {
if err := contentFile.Close(); err != nil {
log.Error("could not close content file", zap.Error(err))
}
}()
if _, err := io.Copy(contentFile, pieceReader); err != nil {
log.Error("could not copy from piece reader to contents file", zap.Error(err))
}
}