// Copyright (C) 2019 Storj Labs, Inc. // See LICENSE for copying information. package segments import ( "context" "io" "math/rand" "sync" "time" "github.com/vivint/infectious" "gopkg.in/spacemonkeygo/monkit.v2" "storj.io/storj/pkg/pb" "storj.io/storj/pkg/ranger" "storj.io/storj/pkg/storj" "storj.io/storj/uplink/ecclient" "storj.io/storj/uplink/eestream" "storj.io/storj/uplink/metainfo" ) var ( mon = monkit.Package() ) // Meta info about a segment type Meta struct { Modified time.Time Expiration time.Time Size int64 Data []byte } // ListItem is a single item in a listing type ListItem struct { Path storj.Path Meta Meta IsPrefix bool } // Store for segments type Store interface { Get(ctx context.Context, streamID storj.StreamID, segmentIndex int32, objectRS storj.RedundancyScheme) (rr ranger.Ranger, encryption storj.SegmentEncryption, err error) Put(ctx context.Context, streamID storj.StreamID, data io.Reader, expiration time.Time, segmentInfo func() (int64, storj.SegmentEncryption, error)) (meta Meta, err error) Delete(ctx context.Context, streamID storj.StreamID, segmentIndex int32) (err error) } type segmentStore struct { metainfo *metainfo.Client ec ecclient.Client rs eestream.RedundancyStrategy thresholdSize int maxEncryptedSegmentSize int64 rngMu sync.Mutex rng *rand.Rand } // NewSegmentStore creates a new instance of segmentStore func NewSegmentStore(metainfo *metainfo.Client, ec ecclient.Client, rs eestream.RedundancyStrategy, threshold int, maxEncryptedSegmentSize int64) Store { return &segmentStore{ metainfo: metainfo, ec: ec, rs: rs, thresholdSize: threshold, maxEncryptedSegmentSize: maxEncryptedSegmentSize, rng: rand.New(rand.NewSource(time.Now().UnixNano())), } } // Put uploads a segment to an erasure code client func (s *segmentStore) Put(ctx context.Context, streamID storj.StreamID, data io.Reader, expiration time.Time, segmentInfo func() (int64, storj.SegmentEncryption, error)) (meta Meta, err error) { defer mon.Task()(&ctx)(&err) peekReader := NewPeekThresholdReader(data) remoteSized, err := peekReader.IsLargerThan(s.thresholdSize) if err != nil { return Meta{}, err } if !remoteSized { segmentIndex, encryption, err := segmentInfo() if err != nil { return Meta{}, Error.Wrap(err) } err = s.metainfo.MakeInlineSegment(ctx, metainfo.MakeInlineSegmentParams{ StreamID: streamID, Position: storj.SegmentPosition{ Index: int32(segmentIndex), }, Encryption: encryption, EncryptedInlineData: peekReader.thresholdBuf, }) if err != nil { return Meta{}, Error.Wrap(err) } return Meta{}, nil } segmentIndex, encryption, err := segmentInfo() if err != nil { return Meta{}, Error.Wrap(err) } segmentID, limits, piecePrivateKey, err := s.metainfo.BeginSegment(ctx, metainfo.BeginSegmentParams{ StreamID: streamID, MaxOrderLimit: s.maxEncryptedSegmentSize, Position: storj.SegmentPosition{ Index: int32(segmentIndex), }, }) if err != nil { return Meta{}, Error.Wrap(err) } sizedReader := SizeReader(peekReader) successfulNodes, successfulHashes, err := s.ec.Put(ctx, limits, piecePrivateKey, s.rs, sizedReader, expiration) if err != nil { return Meta{}, Error.Wrap(err) } uploadResults := make([]*pb.SegmentPieceUploadResult, 0, len(successfulNodes)) for i := range successfulNodes { if successfulNodes[i] == nil { continue } uploadResults = append(uploadResults, &pb.SegmentPieceUploadResult{ PieceNum: int32(i), NodeId: successfulNodes[i].Id, Hash: successfulHashes[i], }) } if l := len(uploadResults); l < s.rs.OptimalThreshold() { return Meta{}, Error.New("uploaded results (%d) are below the optimal threshold (%d)", l, s.rs.OptimalThreshold()) } err = s.metainfo.CommitSegmentNew(ctx, metainfo.CommitSegmentParams{ SegmentID: segmentID, SizeEncryptedData: sizedReader.Size(), Encryption: encryption, UploadResult: uploadResults, }) if err != nil { return Meta{}, Error.Wrap(err) } return Meta{}, nil } // Get requests the satellite to read a segment and downloaded the pieces from the storage nodes func (s *segmentStore) Get(ctx context.Context, streamID storj.StreamID, segmentIndex int32, objectRS storj.RedundancyScheme) (rr ranger.Ranger, _ storj.SegmentEncryption, err error) { defer mon.Task()(&ctx)(&err) info, limits, err := s.metainfo.DownloadSegment(ctx, metainfo.DownloadSegmentParams{ StreamID: streamID, Position: storj.SegmentPosition{ Index: segmentIndex, }, }) if err != nil { return nil, storj.SegmentEncryption{}, Error.Wrap(err) } switch { // no order limits also means its inline segment case len(info.EncryptedInlineData) != 0 || len(limits) == 0: return ranger.ByteRanger(info.EncryptedInlineData), info.SegmentEncryption, nil default: needed := CalcNeededNodes(objectRS) selected := make([]*pb.AddressedOrderLimit, len(limits)) s.rngMu.Lock() perm := s.rng.Perm(len(limits)) s.rngMu.Unlock() for _, i := range perm { limit := limits[i] if limit == nil { continue } selected[i] = limit needed-- if needed <= 0 { break } } fc, err := infectious.NewFEC(int(objectRS.RequiredShares), int(objectRS.TotalShares)) if err != nil { return nil, storj.SegmentEncryption{}, err } es := eestream.NewRSScheme(fc, int(objectRS.ShareSize)) redundancy, err := eestream.NewRedundancyStrategy(es, int(objectRS.RepairShares), int(objectRS.OptimalShares)) if err != nil { return nil, storj.SegmentEncryption{}, err } rr, err = s.ec.Get(ctx, selected, info.PiecePrivateKey, redundancy, info.Size) if err != nil { return nil, storj.SegmentEncryption{}, Error.Wrap(err) } return rr, info.SegmentEncryption, nil } } // Delete requests the satellite to delete a segment and tells storage nodes // to delete the segment's pieces. func (s *segmentStore) Delete(ctx context.Context, streamID storj.StreamID, segmentIndex int32) (err error) { defer mon.Task()(&ctx)(&err) _, limits, privateKey, err := s.metainfo.BeginDeleteSegment(ctx, metainfo.BeginDeleteSegmentParams{ StreamID: streamID, Position: storj.SegmentPosition{ Index: segmentIndex, }, }) if err != nil { return Error.Wrap(err) } if len(limits) != 0 { // remote segment - delete the pieces from storage nodes err = s.ec.Delete(ctx, limits, privateKey) if err != nil { return Error.Wrap(err) } } // don't do FinishDeleteSegment at the moment to avoid satellite round trip // FinishDeleteSegment doesn't implement any specific logic at the moment return nil } // CalcNeededNodes calculate how many minimum nodes are needed for download, // based on t = k + (n-o)k/o func CalcNeededNodes(rs storj.RedundancyScheme) int32 { extra := int32(1) if rs.OptimalShares > 0 { extra = int32(((rs.TotalShares - rs.OptimalShares) * rs.RequiredShares) / rs.OptimalShares) if extra == 0 { // ensure there is at least one extra node, so we can have error detection/correction extra = 1 } } needed := int32(rs.RequiredShares) + extra if needed > int32(rs.TotalShares) { needed = int32(rs.TotalShares) } return needed }