2019-01-24 20:15:10 +00:00
// Copyright (C) 2019 Storj Labs, Inc.
2018-07-23 21:05:02 +01:00
// See LICENSE for copying information.
2018-07-27 07:02:59 +01:00
package segments
2018-07-23 21:05:02 +01:00
import (
"context"
"io"
2018-12-11 16:05:14 +00:00
"math/rand"
2019-03-18 10:55:06 +00:00
"strconv"
"strings"
2018-07-23 21:05:02 +01:00
"time"
2018-07-27 07:02:59 +01:00
"github.com/golang/protobuf/ptypes"
2018-07-30 19:57:50 +01:00
"github.com/golang/protobuf/ptypes/timestamp"
"go.uber.org/zap"
2018-07-23 21:05:02 +01:00
monkit "gopkg.in/spacemonkeygo/monkit.v2"
2018-11-29 18:39:27 +00:00
2018-07-23 21:05:02 +01:00
"storj.io/storj/pkg/eestream"
2018-09-18 05:39:06 +01:00
"storj.io/storj/pkg/pb"
2018-07-23 21:05:02 +01:00
"storj.io/storj/pkg/ranger"
2019-01-02 18:47:34 +00:00
ecclient "storj.io/storj/pkg/storage/ec"
2018-10-25 21:28:16 +01:00
"storj.io/storj/pkg/storj"
2019-03-18 10:55:06 +00:00
"storj.io/storj/uplink/metainfo"
2018-07-23 21:05:02 +01:00
)
var (
mon = monkit . Package ( )
)
2018-07-30 19:57:50 +01:00
// Meta info about a segment
2018-07-23 21:05:02 +01:00
type Meta struct {
2018-07-30 19:57:50 +01:00
Modified time . Time
Expiration time . Time
Size int64
Data [ ] byte
}
// ListItem is a single item in a listing
type ListItem struct {
2018-10-25 21:28:16 +01:00
Path storj . Path
2018-09-07 15:20:15 +01:00
Meta Meta
IsPrefix bool
2018-07-23 21:05:02 +01:00
}
// Store for segments
type Store interface {
2018-10-25 21:28:16 +01:00
Meta ( ctx context . Context , path storj . Path ) ( meta Meta , err error )
Get ( ctx context . Context , path storj . Path ) ( rr ranger . Ranger , meta Meta , err error )
Put ( ctx context . Context , data io . Reader , expiration time . Time , segmentInfo func ( ) ( storj . Path , [ ] byte , error ) ) ( meta Meta , err error )
Delete ( ctx context . Context , path storj . Path ) ( err error )
List ( ctx context . Context , prefix , startAfter , endBefore storj . Path , recursive bool , limit int , metaFlags uint32 ) ( items [ ] ListItem , more bool , err error )
2018-07-23 21:05:02 +01:00
}
type segmentStore struct {
2019-03-18 10:55:06 +00:00
metainfo metainfo . Client
ec ecclient . Client
rs eestream . RedundancyStrategy
thresholdSize int
maxEncryptedSegmentSize int64
2018-07-23 21:05:02 +01:00
}
// NewSegmentStore creates a new instance of segmentStore
2019-03-18 10:55:06 +00:00
func NewSegmentStore ( metainfo metainfo . Client , ec ecclient . Client , rs eestream . RedundancyStrategy , threshold int , maxEncryptedSegmentSize int64 ) Store {
2019-02-05 10:54:25 +00:00
return & segmentStore {
2019-03-18 10:55:06 +00:00
metainfo : metainfo ,
ec : ec ,
rs : rs ,
thresholdSize : threshold ,
maxEncryptedSegmentSize : maxEncryptedSegmentSize ,
2019-02-05 10:54:25 +00:00
}
2018-07-23 21:05:02 +01:00
}
// Meta retrieves the metadata of the segment
2018-10-25 21:28:16 +01:00
func ( s * segmentStore ) Meta ( ctx context . Context , path storj . Path ) ( meta Meta , err error ) {
2018-07-23 21:05:02 +01:00
defer mon . Task ( ) ( & ctx ) ( & err )
2019-03-18 10:55:06 +00:00
bucket , objectPath , segmentIndex , err := split ( path )
if err != nil {
return Meta { } , err
}
pointer , err := s . metainfo . SegmentInfo ( ctx , bucket , objectPath , segmentIndex )
2018-07-23 21:05:02 +01:00
if err != nil {
return Meta { } , Error . Wrap ( err )
}
2019-03-18 10:55:06 +00:00
return convertMeta ( pointer ) , nil
2018-07-23 21:05:02 +01:00
}
// Put uploads a segment to an erasure code client
2018-10-25 21:28:16 +01:00
func ( s * segmentStore ) Put ( ctx context . Context , data io . Reader , expiration time . Time , segmentInfo func ( ) ( storj . Path , [ ] byte , error ) ) ( meta Meta , err error ) {
2018-07-23 21:05:02 +01:00
defer mon . Task ( ) ( & ctx ) ( & err )
2019-03-18 10:55:06 +00:00
redundancy := & pb . RedundancyScheme {
Type : pb . RedundancyScheme_RS ,
MinReq : int32 ( s . rs . RequiredCount ( ) ) ,
Total : int32 ( s . rs . TotalCount ( ) ) ,
RepairThreshold : int32 ( s . rs . RepairThreshold ( ) ) ,
SuccessThreshold : int32 ( s . rs . OptimalThreshold ( ) ) ,
ErasureShareSize : int32 ( s . rs . ErasureShareSize ( ) ) ,
}
2018-08-01 21:25:06 +01:00
exp , err := ptypes . TimestampProto ( expiration )
2018-07-23 21:05:02 +01:00
if err != nil {
return Meta { } , Error . Wrap ( err )
}
2018-08-01 21:25:06 +01:00
peekReader := NewPeekThresholdReader ( data )
remoteSized , err := peekReader . IsLargerThan ( s . thresholdSize )
if err != nil {
return Meta { } , err
}
2018-10-08 16:09:37 +01:00
2018-10-25 21:28:16 +01:00
var path storj . Path
2018-10-03 14:05:40 +01:00
var pointer * pb . Pointer
2019-03-18 10:55:06 +00:00
var originalLimits [ ] * pb . OrderLimit2
2018-08-01 21:25:06 +01:00
if ! remoteSized {
2018-10-03 14:05:40 +01:00
p , metadata , err := segmentInfo ( )
if err != nil {
return Meta { } , Error . Wrap ( err )
}
path = p
pointer = & pb . Pointer {
2018-09-18 05:39:06 +01:00
Type : pb . Pointer_INLINE ,
2018-08-01 21:25:06 +01:00
InlineSegment : peekReader . thresholdBuf ,
2018-11-20 17:09:35 +00:00
SegmentSize : int64 ( len ( peekReader . thresholdBuf ) ) ,
2018-08-01 21:25:06 +01:00
ExpirationDate : exp ,
Metadata : metadata ,
}
} else {
2019-03-18 10:55:06 +00:00
limits , rootPieceID , err := s . metainfo . CreateSegment ( ctx , "" , "" , - 1 , redundancy , s . maxEncryptedSegmentSize , expiration ) // bucket, path and segment index are not known at this point
2018-08-01 21:25:06 +01:00
if err != nil {
return Meta { } , Error . Wrap ( err )
}
2019-03-18 10:55:06 +00:00
sizedReader := SizeReader ( peekReader )
2018-12-11 17:30:14 +00:00
2019-03-18 10:55:06 +00:00
successfulNodes , successfulHashes , err := s . ec . Put ( ctx , limits , s . rs , sizedReader , expiration )
2018-08-01 21:25:06 +01:00
if err != nil {
return Meta { } , Error . Wrap ( err )
}
2018-10-03 14:05:40 +01:00
p , metadata , err := segmentInfo ( )
if err != nil {
return Meta { } , Error . Wrap ( err )
}
path = p
2019-03-18 10:55:06 +00:00
pointer , err = makeRemotePointer ( successfulNodes , successfulHashes , s . rs , rootPieceID , sizedReader . Size ( ) , exp , metadata )
2018-08-01 21:25:06 +01:00
if err != nil {
2019-02-28 14:14:54 +00:00
return Meta { } , Error . Wrap ( err )
2018-08-01 21:25:06 +01:00
}
2019-03-18 10:55:06 +00:00
originalLimits = make ( [ ] * pb . OrderLimit2 , len ( limits ) )
for i , addressedLimit := range limits {
originalLimits [ i ] = addressedLimit . GetLimit ( )
}
2018-08-01 21:25:06 +01:00
}
2019-03-18 10:55:06 +00:00
bucket , objectPath , segmentIndex , err := split ( path )
2018-08-01 21:25:06 +01:00
if err != nil {
2019-03-18 10:55:06 +00:00
return Meta { } , err
2018-08-01 21:25:06 +01:00
}
2018-07-23 21:05:02 +01:00
2019-03-18 10:55:06 +00:00
savedPointer , err := s . metainfo . CommitSegment ( ctx , bucket , objectPath , segmentIndex , pointer , originalLimits )
2018-07-23 21:05:02 +01:00
if err != nil {
return Meta { } , Error . Wrap ( err )
}
2019-03-18 10:55:06 +00:00
return convertMeta ( savedPointer ) , nil
2018-08-01 21:25:06 +01:00
}
2018-07-23 21:05:02 +01:00
2019-03-18 10:55:06 +00:00
// Get requests the satellite to read a segment and downloaded the pieces from the storage nodes
2018-11-06 11:40:06 +00:00
func ( s * segmentStore ) Get ( ctx context . Context , path storj . Path ) ( rr ranger . Ranger , meta Meta , err error ) {
2018-07-23 21:05:02 +01:00
defer mon . Task ( ) ( & ctx ) ( & err )
2019-03-18 10:55:06 +00:00
bucket , objectPath , segmentIndex , err := split ( path )
if err != nil {
return nil , Meta { } , err
}
pointer , limits , err := s . metainfo . ReadSegment ( ctx , bucket , objectPath , segmentIndex )
2018-07-23 21:05:02 +01:00
if err != nil {
return nil , Meta { } , Error . Wrap ( err )
}
2019-03-18 10:55:06 +00:00
switch pointer . GetType ( ) {
2018-12-11 16:05:14 +00:00
case pb . Pointer_INLINE :
2019-03-18 10:55:06 +00:00
return ranger . ByteRanger ( pointer . InlineSegment ) , convertMeta ( pointer ) , nil
2018-12-11 16:05:14 +00:00
case pb . Pointer_REMOTE :
2019-03-18 10:55:06 +00:00
needed := CalcNeededNodes ( pointer . GetRemote ( ) . GetRedundancy ( ) )
selected := make ( [ ] * pb . AddressedOrderLimit , len ( limits ) )
2018-11-06 17:03:11 +00:00
2019-03-18 10:55:06 +00:00
for _ , i := range rand . Perm ( len ( limits ) ) {
limit := limits [ i ]
if limit == nil {
2018-12-11 16:05:14 +00:00
continue
}
2019-03-18 10:55:06 +00:00
selected [ i ] = limit
2018-12-11 16:05:14 +00:00
needed --
if needed <= 0 {
break
2018-11-02 15:22:01 +00:00
}
}
2019-03-18 10:55:06 +00:00
redundancy , err := eestream . NewRedundancyStrategyFromProto ( pointer . GetRemote ( ) . GetRedundancy ( ) )
if err != nil {
return nil , Meta { } , err
}
rr , err = s . ec . Get ( ctx , selected , redundancy , pointer . GetSegmentSize ( ) )
2018-08-01 21:25:06 +01:00
if err != nil {
return nil , Meta { } , Error . Wrap ( err )
}
2019-03-18 10:55:06 +00:00
return rr , convertMeta ( pointer ) , nil
2018-12-11 16:05:14 +00:00
default :
2019-03-18 10:55:06 +00:00
return nil , Meta { } , Error . New ( "unsupported pointer type: %d" , pointer . GetType ( ) )
2018-07-23 21:05:02 +01:00
}
}
2018-12-13 07:12:36 +00:00
// makeRemotePointer creates a pointer of type remote
2019-03-18 10:55:06 +00:00
func makeRemotePointer ( nodes [ ] * pb . Node , hashes [ ] * pb . PieceHash , rs eestream . RedundancyStrategy , pieceID storj . PieceID , readerSize int64 , exp * timestamp . Timestamp , metadata [ ] byte ) ( pointer * pb . Pointer , err error ) {
2019-02-28 14:14:54 +00:00
if len ( nodes ) != len ( hashes ) {
return nil , Error . New ( "unable to make pointer: size of nodes != size of hashes" )
}
2018-12-13 07:12:36 +00:00
var remotePieces [ ] * pb . RemotePiece
for i := range nodes {
if nodes [ i ] == nil {
continue
2018-12-11 16:05:14 +00:00
}
2019-01-02 18:47:34 +00:00
nodes [ i ] . Type . DPanicOnInvalid ( "makeremotepointer" )
2018-12-13 07:12:36 +00:00
remotePieces = append ( remotePieces , & pb . RemotePiece {
PieceNum : int32 ( i ) ,
NodeId : nodes [ i ] . Id ,
2019-02-28 14:14:54 +00:00
Hash : hashes [ i ] ,
2018-12-13 07:12:36 +00:00
} )
2018-12-11 16:05:14 +00:00
}
2018-12-13 07:12:36 +00:00
pointer = & pb . Pointer {
Type : pb . Pointer_REMOTE ,
Remote : & pb . RemoteSegment {
Redundancy : & pb . RedundancyScheme {
Type : pb . RedundancyScheme_RS ,
MinReq : int32 ( rs . RequiredCount ( ) ) ,
Total : int32 ( rs . TotalCount ( ) ) ,
RepairThreshold : int32 ( rs . RepairThreshold ( ) ) ,
SuccessThreshold : int32 ( rs . OptimalThreshold ( ) ) ,
ErasureShareSize : int32 ( rs . ErasureShareSize ( ) ) ,
} ,
2019-03-18 10:55:06 +00:00
RootPieceId : pieceID ,
2018-12-13 07:12:36 +00:00
RemotePieces : remotePieces ,
} ,
SegmentSize : readerSize ,
ExpirationDate : exp ,
Metadata : metadata ,
2018-12-11 16:05:14 +00:00
}
2018-12-13 07:12:36 +00:00
return pointer , nil
2018-12-11 16:05:14 +00:00
}
2019-03-18 10:55:06 +00:00
// Delete requests the satellite to delete a segment and tells storage nodes
// to delete the segment's pieces.
2018-10-25 21:28:16 +01:00
func ( s * segmentStore ) Delete ( ctx context . Context , path storj . Path ) ( err error ) {
2018-07-23 21:05:02 +01:00
defer mon . Task ( ) ( & ctx ) ( & err )
2019-03-18 10:55:06 +00:00
bucket , objectPath , segmentIndex , err := split ( path )
2018-07-23 21:05:02 +01:00
if err != nil {
2019-03-18 10:55:06 +00:00
return err
2018-07-23 21:05:02 +01:00
}
2019-03-18 10:55:06 +00:00
limits , err := s . metainfo . DeleteSegment ( ctx , bucket , objectPath , segmentIndex )
if err != nil {
return Error . Wrap ( err )
}
2018-11-06 17:03:11 +00:00
2019-03-18 10:55:06 +00:00
if len ( limits ) == 0 {
// inline segment - nothing else to do
return
}
2018-07-23 21:05:02 +01:00
2019-03-18 10:55:06 +00:00
// remote segment - delete the pieces from storage nodes
err = s . ec . Delete ( ctx , limits )
if err != nil {
return Error . Wrap ( err )
2018-07-23 21:05:02 +01:00
}
2019-03-18 10:55:06 +00:00
return nil
2018-07-23 21:05:02 +01:00
}
2018-12-13 07:12:36 +00:00
// List retrieves paths to segments and their metadata stored in the pointerdb
func ( s * segmentStore ) List ( ctx context . Context , prefix , startAfter , endBefore storj . Path , recursive bool , limit int , metaFlags uint32 ) ( items [ ] ListItem , more bool , err error ) {
2018-10-30 18:06:12 +00:00
defer mon . Task ( ) ( & ctx ) ( & err )
2019-03-18 10:55:06 +00:00
bucket , strippedPrefix , _ , err := split ( prefix )
2018-10-30 18:06:12 +00:00
if err != nil {
2019-03-18 10:55:06 +00:00
return nil , false , Error . Wrap ( err )
2018-11-12 16:10:44 +00:00
}
2019-03-18 10:55:06 +00:00
list , more , err := s . metainfo . ListSegments ( ctx , bucket , strippedPrefix , startAfter , endBefore , recursive , int32 ( limit ) , metaFlags )
if err != nil {
return nil , false , Error . Wrap ( err )
}
items = make ( [ ] ListItem , len ( list ) )
for i , itm := range list {
2018-12-13 07:12:36 +00:00
items [ i ] = ListItem {
Path : itm . Path ,
Meta : convertMeta ( itm . Pointer ) ,
IsPrefix : itm . IsPrefix ,
2018-10-30 18:06:12 +00:00
}
}
2018-12-13 07:12:36 +00:00
return items , more , nil
}
2018-10-30 18:06:12 +00:00
2019-03-18 10:55:06 +00:00
// CalcNeededNodes calculate how many minimum nodes are needed for download,
2018-12-13 07:12:36 +00:00
// based on t = k + (n-o)k/o
2019-03-18 10:55:06 +00:00
func CalcNeededNodes ( rs * pb . RedundancyScheme ) int32 {
2018-12-13 07:12:36 +00:00
extra := int32 ( 1 )
2018-10-30 18:06:12 +00:00
2018-12-13 07:12:36 +00:00
if rs . GetSuccessThreshold ( ) > 0 {
extra = ( ( rs . GetTotal ( ) - rs . GetSuccessThreshold ( ) ) * rs . GetMinReq ( ) ) / rs . GetSuccessThreshold ( )
if extra == 0 {
// ensure there is at least one extra node, so we can have error detection/correction
extra = 1
2018-10-30 18:06:12 +00:00
}
}
2018-12-13 07:12:36 +00:00
needed := rs . GetMinReq ( ) + extra
if needed > rs . GetTotal ( ) {
needed = rs . GetTotal ( )
2018-10-30 18:06:12 +00:00
}
2018-12-13 07:12:36 +00:00
return needed
2018-10-30 18:06:12 +00:00
}
2018-07-30 19:57:50 +01:00
// convertMeta converts pointer to segment metadata
2018-09-18 05:39:06 +01:00
func convertMeta ( pr * pb . Pointer ) Meta {
2018-07-30 19:57:50 +01:00
return Meta {
Modified : convertTime ( pr . GetCreationDate ( ) ) ,
Expiration : convertTime ( pr . GetExpirationDate ( ) ) ,
2018-11-20 17:09:35 +00:00
Size : pr . GetSegmentSize ( ) ,
2018-07-30 19:57:50 +01:00
Data : pr . GetMetadata ( ) ,
}
}
// convertTime converts gRPC timestamp to Go time
func convertTime ( ts * timestamp . Timestamp ) time . Time {
2018-08-16 15:32:28 +01:00
if ts == nil {
return time . Time { }
}
2018-07-30 19:57:50 +01:00
t , err := ptypes . Timestamp ( ts )
if err != nil {
zap . S ( ) . Warnf ( "Failed converting timestamp %v: %v" , ts , err )
}
return t
2018-07-23 21:05:02 +01:00
}
2019-03-18 10:55:06 +00:00
func split ( path storj . Path ) ( bucket string , objectPath storj . Path , segmentIndex int64 , err error ) {
components := storj . SplitPath ( path )
if len ( components ) < 1 {
return "" , "" , - 2 , Error . New ( "empty path" )
}
segmentIndex , err = convertSegmentIndex ( components [ 0 ] )
if err != nil {
return "" , "" , - 2 , err
}
if len ( components ) > 1 {
bucket = components [ 1 ]
objectPath = storj . JoinPaths ( components [ 2 : ] ... )
}
return bucket , objectPath , segmentIndex , nil
}
func convertSegmentIndex ( segmentComp string ) ( segmentIndex int64 , err error ) {
switch {
case segmentComp == "l" :
return - 1 , nil
case strings . HasPrefix ( segmentComp , "s" ) :
num , err := strconv . Atoi ( segmentComp [ 1 : ] )
if err != nil {
return - 2 , Error . Wrap ( err )
}
return int64 ( num ) , nil
default :
return - 2 , Error . New ( "invalid segment component: %s" , segmentComp )
}
}