261 lines
7.0 KiB
Go
261 lines
7.0 KiB
Go
// Copyright (C) 2019 Storj Labs, Inc.
|
|
// See LICENSE for copying information.
|
|
|
|
package segments
|
|
|
|
import (
|
|
"context"
|
|
"io"
|
|
"math/rand"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/vivint/infectious"
|
|
"gopkg.in/spacemonkeygo/monkit.v2"
|
|
|
|
"storj.io/storj/pkg/pb"
|
|
"storj.io/storj/pkg/ranger"
|
|
"storj.io/storj/pkg/storj"
|
|
"storj.io/storj/uplink/ecclient"
|
|
"storj.io/storj/uplink/eestream"
|
|
"storj.io/storj/uplink/metainfo"
|
|
)
|
|
|
|
var (
|
|
mon = monkit.Package()
|
|
)
|
|
|
|
// Meta info about a segment
|
|
type Meta struct {
|
|
Modified time.Time
|
|
Expiration time.Time
|
|
Size int64
|
|
Data []byte
|
|
}
|
|
|
|
// ListItem is a single item in a listing
|
|
type ListItem struct {
|
|
Path storj.Path
|
|
Meta Meta
|
|
IsPrefix bool
|
|
}
|
|
|
|
// Store for segments
|
|
type Store interface {
|
|
Get(ctx context.Context, streamID storj.StreamID, segmentIndex int32, objectRS storj.RedundancyScheme) (rr ranger.Ranger, encryption storj.SegmentEncryption, err error)
|
|
Put(ctx context.Context, streamID storj.StreamID, data io.Reader, expiration time.Time, segmentInfo func() (int64, storj.SegmentEncryption, error)) (meta Meta, err error)
|
|
Delete(ctx context.Context, streamID storj.StreamID, segmentIndex int32) (err error)
|
|
}
|
|
|
|
type segmentStore struct {
|
|
metainfo *metainfo.Client
|
|
ec ecclient.Client
|
|
rs eestream.RedundancyStrategy
|
|
thresholdSize int
|
|
maxEncryptedSegmentSize int64
|
|
rngMu sync.Mutex
|
|
rng *rand.Rand
|
|
}
|
|
|
|
// NewSegmentStore creates a new instance of segmentStore
|
|
func NewSegmentStore(metainfo *metainfo.Client, ec ecclient.Client, rs eestream.RedundancyStrategy, threshold int, maxEncryptedSegmentSize int64) Store {
|
|
return &segmentStore{
|
|
metainfo: metainfo,
|
|
ec: ec,
|
|
rs: rs,
|
|
thresholdSize: threshold,
|
|
maxEncryptedSegmentSize: maxEncryptedSegmentSize,
|
|
rng: rand.New(rand.NewSource(time.Now().UnixNano())),
|
|
}
|
|
}
|
|
|
|
// Put uploads a segment to an erasure code client
|
|
func (s *segmentStore) Put(ctx context.Context, streamID storj.StreamID, data io.Reader, expiration time.Time, segmentInfo func() (int64, storj.SegmentEncryption, error)) (meta Meta, err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
peekReader := NewPeekThresholdReader(data)
|
|
remoteSized, err := peekReader.IsLargerThan(s.thresholdSize)
|
|
if err != nil {
|
|
return Meta{}, err
|
|
}
|
|
|
|
if !remoteSized {
|
|
segmentIndex, encryption, err := segmentInfo()
|
|
if err != nil {
|
|
return Meta{}, Error.Wrap(err)
|
|
}
|
|
|
|
err = s.metainfo.MakeInlineSegment(ctx, metainfo.MakeInlineSegmentParams{
|
|
StreamID: streamID,
|
|
Position: storj.SegmentPosition{
|
|
Index: int32(segmentIndex),
|
|
},
|
|
Encryption: encryption,
|
|
EncryptedInlineData: peekReader.thresholdBuf,
|
|
})
|
|
if err != nil {
|
|
return Meta{}, Error.Wrap(err)
|
|
}
|
|
return Meta{}, nil
|
|
}
|
|
|
|
segmentIndex, encryption, err := segmentInfo()
|
|
if err != nil {
|
|
return Meta{}, Error.Wrap(err)
|
|
}
|
|
|
|
segmentID, limits, piecePrivateKey, err := s.metainfo.BeginSegment(ctx, metainfo.BeginSegmentParams{
|
|
StreamID: streamID,
|
|
MaxOrderLimit: s.maxEncryptedSegmentSize,
|
|
Position: storj.SegmentPosition{
|
|
Index: int32(segmentIndex),
|
|
},
|
|
})
|
|
if err != nil {
|
|
return Meta{}, Error.Wrap(err)
|
|
}
|
|
|
|
sizedReader := SizeReader(peekReader)
|
|
|
|
successfulNodes, successfulHashes, err := s.ec.Put(ctx, limits, piecePrivateKey, s.rs, sizedReader, expiration)
|
|
if err != nil {
|
|
return Meta{}, Error.Wrap(err)
|
|
}
|
|
|
|
uploadResults := make([]*pb.SegmentPieceUploadResult, 0, len(successfulNodes))
|
|
for i := range successfulNodes {
|
|
if successfulNodes[i] == nil {
|
|
continue
|
|
}
|
|
uploadResults = append(uploadResults, &pb.SegmentPieceUploadResult{
|
|
PieceNum: int32(i),
|
|
NodeId: successfulNodes[i].Id,
|
|
Hash: successfulHashes[i],
|
|
})
|
|
}
|
|
err = s.metainfo.CommitSegmentNew(ctx, metainfo.CommitSegmentParams{
|
|
SegmentID: segmentID,
|
|
SizeEncryptedData: sizedReader.Size(),
|
|
Encryption: encryption,
|
|
UploadResult: uploadResults,
|
|
})
|
|
if err != nil {
|
|
return Meta{}, Error.Wrap(err)
|
|
}
|
|
|
|
return Meta{}, nil
|
|
}
|
|
|
|
// Get requests the satellite to read a segment and downloaded the pieces from the storage nodes
|
|
func (s *segmentStore) Get(ctx context.Context, streamID storj.StreamID, segmentIndex int32, objectRS storj.RedundancyScheme) (rr ranger.Ranger, _ storj.SegmentEncryption, err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
info, limits, err := s.metainfo.DownloadSegment(ctx, metainfo.DownloadSegmentParams{
|
|
StreamID: streamID,
|
|
Position: storj.SegmentPosition{
|
|
Index: segmentIndex,
|
|
},
|
|
})
|
|
if err != nil {
|
|
return nil, storj.SegmentEncryption{}, Error.Wrap(err)
|
|
}
|
|
|
|
switch {
|
|
case len(info.EncryptedInlineData) != 0:
|
|
return ranger.ByteRanger(info.EncryptedInlineData), info.SegmentEncryption, nil
|
|
default:
|
|
needed := CalcNeededNodes(objectRS)
|
|
selected := make([]*pb.AddressedOrderLimit, len(limits))
|
|
s.rngMu.Lock()
|
|
perm := s.rng.Perm(len(limits))
|
|
s.rngMu.Unlock()
|
|
|
|
for _, i := range perm {
|
|
limit := limits[i]
|
|
if limit == nil {
|
|
continue
|
|
}
|
|
|
|
selected[i] = limit
|
|
|
|
needed--
|
|
if needed <= 0 {
|
|
break
|
|
}
|
|
}
|
|
|
|
fc, err := infectious.NewFEC(int(objectRS.RequiredShares), int(objectRS.TotalShares))
|
|
if err != nil {
|
|
return nil, storj.SegmentEncryption{}, err
|
|
}
|
|
es := eestream.NewRSScheme(fc, int(objectRS.ShareSize))
|
|
redundancy, err := eestream.NewRedundancyStrategy(es, int(objectRS.RepairShares), int(objectRS.OptimalShares))
|
|
if err != nil {
|
|
return nil, storj.SegmentEncryption{}, err
|
|
}
|
|
|
|
rr, err = s.ec.Get(ctx, selected, info.PiecePrivateKey, redundancy, info.Size)
|
|
if err != nil {
|
|
return nil, storj.SegmentEncryption{}, Error.Wrap(err)
|
|
}
|
|
|
|
return rr, info.SegmentEncryption, nil
|
|
}
|
|
}
|
|
|
|
// Delete requests the satellite to delete a segment and tells storage nodes
|
|
// to delete the segment's pieces.
|
|
func (s *segmentStore) Delete(ctx context.Context, streamID storj.StreamID, segmentIndex int32) (err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
segmentID, limits, privateKey, err := s.metainfo.BeginDeleteSegment(ctx, metainfo.BeginDeleteSegmentParams{
|
|
StreamID: streamID,
|
|
Position: storj.SegmentPosition{
|
|
Index: segmentIndex,
|
|
},
|
|
})
|
|
if err != nil {
|
|
return Error.Wrap(err)
|
|
}
|
|
|
|
if len(limits) != 0 {
|
|
// remote segment - delete the pieces from storage nodes
|
|
err = s.ec.Delete(ctx, limits, privateKey)
|
|
if err != nil {
|
|
return Error.Wrap(err)
|
|
}
|
|
}
|
|
|
|
err = s.metainfo.FinishDeleteSegment(ctx, metainfo.FinishDeleteSegmentParams{
|
|
SegmentID: segmentID,
|
|
// TODO add delete results
|
|
})
|
|
if err != nil {
|
|
return Error.Wrap(err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// CalcNeededNodes calculate how many minimum nodes are needed for download,
|
|
// based on t = k + (n-o)k/o
|
|
func CalcNeededNodes(rs storj.RedundancyScheme) int32 {
|
|
extra := int32(1)
|
|
|
|
if rs.OptimalShares > 0 {
|
|
extra = int32(((rs.TotalShares - rs.OptimalShares) * rs.RequiredShares) / rs.OptimalShares)
|
|
if extra == 0 {
|
|
// ensure there is at least one extra node, so we can have error detection/correction
|
|
extra = 1
|
|
}
|
|
}
|
|
|
|
needed := int32(rs.RequiredShares) + extra
|
|
|
|
if needed > int32(rs.TotalShares) {
|
|
needed = int32(rs.TotalShares)
|
|
}
|
|
|
|
return needed
|
|
}
|