071d1c4313
* uplink/storage/segments: return error no optimal threshold Return an error if the store get less uploaded pieces than the indicated by the optimal threshold. * satellite/metainfo: Fix gRPC status error & add reason This commit fix the CommitSegment endpoint method to return an "Invalid Argument" status code when uplink submits invalid data which is detected when filtering invalid pieces by filterInvalidPieces endpoint method. Because filterInvalidPieces is also used by CommitSegmentOld, such method part has been changed accordingly. * An initial check in CommitSegment to detect earlier if uplink sends an invalid number of upload pieces. * Add more information to some log messages. * Return more information to uplink when it sends a number of invalid pieces which make impossible to finish the operation successfully. * satellite/metainfo: Swap some "sugar" loggers to normal ones Swap "sugar" loggers to normal ones because they impact the performance in production systems and they should only be used under specific circumstances which were none of the ones changed.
262 lines
7.2 KiB
Go
262 lines
7.2 KiB
Go
// Copyright (C) 2019 Storj Labs, Inc.
|
|
// See LICENSE for copying information.
|
|
|
|
package segments
|
|
|
|
import (
|
|
"context"
|
|
"io"
|
|
"math/rand"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/vivint/infectious"
|
|
"gopkg.in/spacemonkeygo/monkit.v2"
|
|
|
|
"storj.io/storj/pkg/pb"
|
|
"storj.io/storj/pkg/ranger"
|
|
"storj.io/storj/pkg/storj"
|
|
"storj.io/storj/uplink/ecclient"
|
|
"storj.io/storj/uplink/eestream"
|
|
"storj.io/storj/uplink/metainfo"
|
|
)
|
|
|
|
var (
|
|
mon = monkit.Package()
|
|
)
|
|
|
|
// Meta info about a segment
|
|
type Meta struct {
|
|
Modified time.Time
|
|
Expiration time.Time
|
|
Size int64
|
|
Data []byte
|
|
}
|
|
|
|
// ListItem is a single item in a listing
|
|
type ListItem struct {
|
|
Path storj.Path
|
|
Meta Meta
|
|
IsPrefix bool
|
|
}
|
|
|
|
// Store for segments
|
|
type Store interface {
|
|
Get(ctx context.Context, streamID storj.StreamID, segmentIndex int32, objectRS storj.RedundancyScheme) (rr ranger.Ranger, encryption storj.SegmentEncryption, err error)
|
|
Put(ctx context.Context, streamID storj.StreamID, data io.Reader, expiration time.Time, segmentInfo func() (int64, storj.SegmentEncryption, error)) (meta Meta, err error)
|
|
Delete(ctx context.Context, streamID storj.StreamID, segmentIndex int32) (err error)
|
|
}
|
|
|
|
type segmentStore struct {
|
|
metainfo *metainfo.Client
|
|
ec ecclient.Client
|
|
rs eestream.RedundancyStrategy
|
|
thresholdSize int
|
|
maxEncryptedSegmentSize int64
|
|
rngMu sync.Mutex
|
|
rng *rand.Rand
|
|
}
|
|
|
|
// NewSegmentStore creates a new instance of segmentStore
|
|
func NewSegmentStore(metainfo *metainfo.Client, ec ecclient.Client, rs eestream.RedundancyStrategy, threshold int, maxEncryptedSegmentSize int64) Store {
|
|
return &segmentStore{
|
|
metainfo: metainfo,
|
|
ec: ec,
|
|
rs: rs,
|
|
thresholdSize: threshold,
|
|
maxEncryptedSegmentSize: maxEncryptedSegmentSize,
|
|
rng: rand.New(rand.NewSource(time.Now().UnixNano())),
|
|
}
|
|
}
|
|
|
|
// Put uploads a segment to an erasure code client
|
|
func (s *segmentStore) Put(ctx context.Context, streamID storj.StreamID, data io.Reader, expiration time.Time, segmentInfo func() (int64, storj.SegmentEncryption, error)) (meta Meta, err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
peekReader := NewPeekThresholdReader(data)
|
|
remoteSized, err := peekReader.IsLargerThan(s.thresholdSize)
|
|
if err != nil {
|
|
return Meta{}, err
|
|
}
|
|
|
|
if !remoteSized {
|
|
segmentIndex, encryption, err := segmentInfo()
|
|
if err != nil {
|
|
return Meta{}, Error.Wrap(err)
|
|
}
|
|
|
|
err = s.metainfo.MakeInlineSegment(ctx, metainfo.MakeInlineSegmentParams{
|
|
StreamID: streamID,
|
|
Position: storj.SegmentPosition{
|
|
Index: int32(segmentIndex),
|
|
},
|
|
Encryption: encryption,
|
|
EncryptedInlineData: peekReader.thresholdBuf,
|
|
})
|
|
if err != nil {
|
|
return Meta{}, Error.Wrap(err)
|
|
}
|
|
return Meta{}, nil
|
|
}
|
|
|
|
segmentIndex, encryption, err := segmentInfo()
|
|
if err != nil {
|
|
return Meta{}, Error.Wrap(err)
|
|
}
|
|
|
|
segmentID, limits, piecePrivateKey, err := s.metainfo.BeginSegment(ctx, metainfo.BeginSegmentParams{
|
|
StreamID: streamID,
|
|
MaxOrderLimit: s.maxEncryptedSegmentSize,
|
|
Position: storj.SegmentPosition{
|
|
Index: int32(segmentIndex),
|
|
},
|
|
})
|
|
if err != nil {
|
|
return Meta{}, Error.Wrap(err)
|
|
}
|
|
|
|
sizedReader := SizeReader(peekReader)
|
|
|
|
successfulNodes, successfulHashes, err := s.ec.Put(ctx, limits, piecePrivateKey, s.rs, sizedReader, expiration)
|
|
if err != nil {
|
|
return Meta{}, Error.Wrap(err)
|
|
}
|
|
|
|
uploadResults := make([]*pb.SegmentPieceUploadResult, 0, len(successfulNodes))
|
|
for i := range successfulNodes {
|
|
if successfulNodes[i] == nil {
|
|
continue
|
|
}
|
|
uploadResults = append(uploadResults, &pb.SegmentPieceUploadResult{
|
|
PieceNum: int32(i),
|
|
NodeId: successfulNodes[i].Id,
|
|
Hash: successfulHashes[i],
|
|
})
|
|
}
|
|
|
|
if l := len(uploadResults); l < s.rs.OptimalThreshold() {
|
|
return Meta{}, Error.New("uploaded results (%d) are below the optimal threshold (%d)", l, s.rs.OptimalThreshold())
|
|
}
|
|
|
|
err = s.metainfo.CommitSegmentNew(ctx, metainfo.CommitSegmentParams{
|
|
SegmentID: segmentID,
|
|
SizeEncryptedData: sizedReader.Size(),
|
|
Encryption: encryption,
|
|
UploadResult: uploadResults,
|
|
})
|
|
if err != nil {
|
|
return Meta{}, Error.Wrap(err)
|
|
}
|
|
|
|
return Meta{}, nil
|
|
}
|
|
|
|
// Get requests the satellite to read a segment and downloaded the pieces from the storage nodes
|
|
func (s *segmentStore) Get(ctx context.Context, streamID storj.StreamID, segmentIndex int32, objectRS storj.RedundancyScheme) (rr ranger.Ranger, _ storj.SegmentEncryption, err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
info, limits, err := s.metainfo.DownloadSegment(ctx, metainfo.DownloadSegmentParams{
|
|
StreamID: streamID,
|
|
Position: storj.SegmentPosition{
|
|
Index: segmentIndex,
|
|
},
|
|
})
|
|
if err != nil {
|
|
return nil, storj.SegmentEncryption{}, Error.Wrap(err)
|
|
}
|
|
|
|
switch {
|
|
// no order limits also means its inline segment
|
|
case len(info.EncryptedInlineData) != 0 || len(limits) == 0:
|
|
return ranger.ByteRanger(info.EncryptedInlineData), info.SegmentEncryption, nil
|
|
default:
|
|
needed := CalcNeededNodes(objectRS)
|
|
selected := make([]*pb.AddressedOrderLimit, len(limits))
|
|
s.rngMu.Lock()
|
|
perm := s.rng.Perm(len(limits))
|
|
s.rngMu.Unlock()
|
|
|
|
for _, i := range perm {
|
|
limit := limits[i]
|
|
if limit == nil {
|
|
continue
|
|
}
|
|
|
|
selected[i] = limit
|
|
|
|
needed--
|
|
if needed <= 0 {
|
|
break
|
|
}
|
|
}
|
|
|
|
fc, err := infectious.NewFEC(int(objectRS.RequiredShares), int(objectRS.TotalShares))
|
|
if err != nil {
|
|
return nil, storj.SegmentEncryption{}, err
|
|
}
|
|
es := eestream.NewRSScheme(fc, int(objectRS.ShareSize))
|
|
redundancy, err := eestream.NewRedundancyStrategy(es, int(objectRS.RepairShares), int(objectRS.OptimalShares))
|
|
if err != nil {
|
|
return nil, storj.SegmentEncryption{}, err
|
|
}
|
|
|
|
rr, err = s.ec.Get(ctx, selected, info.PiecePrivateKey, redundancy, info.Size)
|
|
if err != nil {
|
|
return nil, storj.SegmentEncryption{}, Error.Wrap(err)
|
|
}
|
|
|
|
return rr, info.SegmentEncryption, nil
|
|
}
|
|
}
|
|
|
|
// Delete requests the satellite to delete a segment and tells storage nodes
|
|
// to delete the segment's pieces.
|
|
func (s *segmentStore) Delete(ctx context.Context, streamID storj.StreamID, segmentIndex int32) (err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
_, limits, privateKey, err := s.metainfo.BeginDeleteSegment(ctx, metainfo.BeginDeleteSegmentParams{
|
|
StreamID: streamID,
|
|
Position: storj.SegmentPosition{
|
|
Index: segmentIndex,
|
|
},
|
|
})
|
|
if err != nil {
|
|
return Error.Wrap(err)
|
|
}
|
|
|
|
if len(limits) != 0 {
|
|
// remote segment - delete the pieces from storage nodes
|
|
err = s.ec.Delete(ctx, limits, privateKey)
|
|
if err != nil {
|
|
return Error.Wrap(err)
|
|
}
|
|
}
|
|
|
|
// don't do FinishDeleteSegment at the moment to avoid satellite round trip
|
|
// FinishDeleteSegment doesn't implement any specific logic at the moment
|
|
|
|
return nil
|
|
}
|
|
|
|
// CalcNeededNodes calculate how many minimum nodes are needed for download,
|
|
// based on t = k + (n-o)k/o
|
|
func CalcNeededNodes(rs storj.RedundancyScheme) int32 {
|
|
extra := int32(1)
|
|
|
|
if rs.OptimalShares > 0 {
|
|
extra = int32(((rs.TotalShares - rs.OptimalShares) * rs.RequiredShares) / rs.OptimalShares)
|
|
if extra == 0 {
|
|
// ensure there is at least one extra node, so we can have error detection/correction
|
|
extra = 1
|
|
}
|
|
}
|
|
|
|
needed := int32(rs.RequiredShares) + extra
|
|
|
|
if needed > int32(rs.TotalShares) {
|
|
needed = int32(rs.TotalShares)
|
|
}
|
|
|
|
return needed
|
|
}
|