2019-03-18 10:55:06 +00:00
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package metainfo
import (
"bytes"
"context"
2019-04-02 15:55:58 +01:00
"errors"
2019-03-18 10:55:06 +00:00
"strconv"
2019-04-02 19:21:18 +01:00
"time"
2019-03-18 10:55:06 +00:00
"github.com/skyrings/skyring-common/tools/uuid"
"github.com/zeebo/errs"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
monkit "gopkg.in/spacemonkeygo/monkit.v2"
2019-04-02 19:21:18 +01:00
"storj.io/storj/pkg/accounting"
2019-03-18 10:55:06 +00:00
"storj.io/storj/pkg/auth"
"storj.io/storj/pkg/eestream"
"storj.io/storj/pkg/identity"
2019-05-24 17:51:27 +01:00
"storj.io/storj/pkg/macaroon"
2019-03-18 10:55:06 +00:00
"storj.io/storj/pkg/overlay"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/storj"
"storj.io/storj/satellite/console"
2019-03-27 10:24:35 +00:00
"storj.io/storj/satellite/orders"
2019-03-18 10:55:06 +00:00
"storj.io/storj/storage"
)
var (
mon = monkit . Package ( )
// Error general metainfo error
Error = errs . Class ( "metainfo error" )
)
// APIKeys is api keys store methods used by endpoint
type APIKeys interface {
2019-05-24 17:51:27 +01:00
GetByHead ( ctx context . Context , head [ ] byte ) ( * console . APIKeyInfo , error )
}
// Revocations is the revocations store methods used by the endpoint
type Revocations interface {
GetByProjectID ( ctx context . Context , projectID uuid . UUID ) ( [ ] [ ] byte , error )
2019-03-18 10:55:06 +00:00
}
2019-05-24 20:56:08 +01:00
// Containment is a copy/paste of containment interface to avoid import cycle error
type Containment interface {
Delete ( ctx context . Context , nodeID pb . NodeID ) ( bool , error )
}
2019-03-18 10:55:06 +00:00
// Endpoint metainfo endpoint
type Endpoint struct {
2019-05-10 20:05:42 +01:00
log * zap . Logger
metainfo * Service
orders * orders . Service
cache * overlay . Cache
2019-05-28 16:36:52 +01:00
projectUsage * accounting . ProjectUsage
2019-05-24 20:56:08 +01:00
containment Containment
2019-05-10 20:05:42 +01:00
apiKeys APIKeys
storagenodeAccountingDB accounting . StoragenodeAccounting
2019-03-18 10:55:06 +00:00
}
// NewEndpoint creates new metainfo endpoint instance
2019-05-24 20:56:08 +01:00
func NewEndpoint ( log * zap . Logger , metainfo * Service , orders * orders . Service , cache * overlay . Cache , containment Containment ,
2019-05-24 17:51:27 +01:00
apiKeys APIKeys , sdb accounting . StoragenodeAccounting ,
2019-05-28 16:36:52 +01:00
projectUsage * accounting . ProjectUsage ) * Endpoint {
2019-03-18 10:55:06 +00:00
// TODO do something with too many params
return & Endpoint {
2019-05-10 20:05:42 +01:00
log : log ,
metainfo : metainfo ,
orders : orders ,
cache : cache ,
2019-05-24 20:56:08 +01:00
containment : containment ,
2019-05-10 20:05:42 +01:00
apiKeys : apiKeys ,
storagenodeAccountingDB : sdb ,
2019-05-28 16:36:52 +01:00
projectUsage : projectUsage ,
2019-03-18 10:55:06 +00:00
}
}
// Close closes resources
func ( endpoint * Endpoint ) Close ( ) error { return nil }
2019-05-24 17:51:27 +01:00
func ( endpoint * Endpoint ) validateAuth ( ctx context . Context , action macaroon . Action ) ( * console . APIKeyInfo , error ) {
keyData , ok := auth . GetAPIKey ( ctx )
2019-03-18 10:55:06 +00:00
if ! ok {
2019-05-24 17:51:27 +01:00
endpoint . log . Error ( "unauthorized request" , zap . Error ( status . Errorf ( codes . Unauthenticated , "Invalid API credential" ) ) )
return nil , status . Errorf ( codes . Unauthenticated , "Invalid API credential" )
}
key , err := macaroon . ParseAPIKey ( string ( keyData ) )
if err != nil {
endpoint . log . Error ( "unauthorized request" , zap . Error ( status . Errorf ( codes . Unauthenticated , "Invalid API credential" ) ) )
2019-03-18 10:55:06 +00:00
return nil , status . Errorf ( codes . Unauthenticated , "Invalid API credential" )
}
2019-05-24 17:51:27 +01:00
keyInfo , err := endpoint . apiKeys . GetByHead ( ctx , key . Head ( ) )
2019-03-18 10:55:06 +00:00
if err != nil {
2019-05-24 17:51:27 +01:00
endpoint . log . Error ( "unauthorized request" , zap . Error ( status . Errorf ( codes . Unauthenticated , err . Error ( ) ) ) )
2019-03-18 10:55:06 +00:00
return nil , status . Errorf ( codes . Unauthenticated , "Invalid API credential" )
}
2019-05-24 17:51:27 +01:00
// Revocations are currently handled by just deleting the key.
err = key . Check ( keyInfo . Secret , action , nil )
2019-03-18 10:55:06 +00:00
if err != nil {
2019-05-24 17:51:27 +01:00
endpoint . log . Error ( "unauthorized request" , zap . Error ( status . Errorf ( codes . Unauthenticated , err . Error ( ) ) ) )
2019-03-18 10:55:06 +00:00
return nil , status . Errorf ( codes . Unauthenticated , "Invalid API credential" )
}
return keyInfo , nil
}
// SegmentInfo returns segment metadata info
func ( endpoint * Endpoint ) SegmentInfo ( ctx context . Context , req * pb . SegmentInfoRequest ) ( resp * pb . SegmentInfoResponse , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
2019-05-24 17:51:27 +01:00
keyInfo , err := endpoint . validateAuth ( ctx , macaroon . Action {
Op : macaroon . ActionRead ,
Bucket : req . Bucket ,
EncryptedPath : req . Path ,
Time : time . Now ( ) ,
} )
2019-03-18 10:55:06 +00:00
if err != nil {
return nil , status . Errorf ( codes . Unauthenticated , err . Error ( ) )
}
err = endpoint . validateBucket ( req . Bucket )
if err != nil {
return nil , status . Errorf ( codes . InvalidArgument , err . Error ( ) )
}
2019-04-02 15:55:58 +01:00
path , err := CreatePath ( keyInfo . ProjectID , req . Segment , req . Bucket , req . Path )
2019-03-18 10:55:06 +00:00
if err != nil {
return nil , status . Errorf ( codes . InvalidArgument , err . Error ( ) )
}
// TODO refactor to use []byte directly
2019-04-25 09:46:32 +01:00
pointer , err := endpoint . metainfo . Get ( path )
2019-03-18 10:55:06 +00:00
if err != nil {
if storage . ErrKeyNotFound . Has ( err ) {
return nil , status . Errorf ( codes . NotFound , err . Error ( ) )
}
return nil , status . Errorf ( codes . Internal , err . Error ( ) )
}
return & pb . SegmentInfoResponse { Pointer : pointer } , nil
}
// CreateSegment will generate requested number of OrderLimit with coresponding node addresses for them
func ( endpoint * Endpoint ) CreateSegment ( ctx context . Context , req * pb . SegmentWriteRequest ) ( resp * pb . SegmentWriteResponse , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
2019-05-24 17:51:27 +01:00
keyInfo , err := endpoint . validateAuth ( ctx , macaroon . Action {
Op : macaroon . ActionWrite ,
Bucket : req . Bucket ,
EncryptedPath : req . Path ,
Time : time . Now ( ) ,
} )
2019-03-18 10:55:06 +00:00
if err != nil {
return nil , status . Errorf ( codes . Unauthenticated , err . Error ( ) )
}
2019-04-01 21:14:58 +01:00
err = endpoint . validateBucket ( req . Bucket )
if err != nil {
return nil , status . Errorf ( codes . InvalidArgument , err . Error ( ) )
}
2019-04-09 14:31:19 +01:00
err = endpoint . validateRedundancy ( req . Redundancy )
if err != nil {
return nil , status . Errorf ( codes . InvalidArgument , err . Error ( ) )
}
2019-05-28 16:36:52 +01:00
exceeded , limit , err := endpoint . projectUsage . ExceedsStorageUsage ( ctx , keyInfo . ProjectID )
2019-04-02 19:21:18 +01:00
if err != nil {
2019-05-10 02:39:21 +01:00
endpoint . log . Error ( "retrieving project storage totals" , zap . Error ( err ) )
2019-04-02 19:21:18 +01:00
}
if exceeded {
2019-05-28 16:36:52 +01:00
endpoint . log . Sugar ( ) . Errorf ( "monthly project limits are %s of storage and bandwidth usage. This limit has been exceeded for storage for projectID %s" ,
limit , keyInfo . ProjectID ,
2019-04-02 19:21:18 +01:00
)
2019-05-28 16:36:52 +01:00
return nil , status . Errorf ( codes . ResourceExhausted , "Exceeded Usage Limit" )
2019-04-02 19:21:18 +01:00
}
2019-03-18 10:55:06 +00:00
redundancy , err := eestream . NewRedundancyStrategyFromProto ( req . GetRedundancy ( ) )
if err != nil {
return nil , err
}
maxPieceSize := eestream . CalcPieceSize ( req . GetMaxEncryptedSegmentSize ( ) , redundancy )
2019-03-23 08:06:11 +00:00
request := overlay . FindStorageNodesRequest {
RequestedCount : int ( req . Redundancy . Total ) ,
FreeBandwidth : maxPieceSize ,
FreeDisk : maxPieceSize ,
2019-03-18 10:55:06 +00:00
}
2019-03-23 08:06:11 +00:00
nodes , err := endpoint . cache . FindStorageNodes ( ctx , request )
2019-03-18 10:55:06 +00:00
if err != nil {
return nil , status . Errorf ( codes . Internal , err . Error ( ) )
}
uplinkIdentity , err := identity . PeerIdentityFromContext ( ctx )
if err != nil {
return nil , status . Errorf ( codes . Internal , err . Error ( ) )
}
2019-05-10 02:39:21 +01:00
bucketID := createBucketID ( keyInfo . ProjectID , req . Bucket )
2019-03-28 20:09:23 +00:00
rootPieceID , addressedLimits , err := endpoint . orders . CreatePutOrderLimits ( ctx , uplinkIdentity , bucketID , nodes , req . Expiration , maxPieceSize )
2019-03-27 10:24:35 +00:00
if err != nil {
2019-03-28 20:09:23 +00:00
return nil , Error . Wrap ( err )
2019-03-27 10:24:35 +00:00
}
return & pb . SegmentWriteResponse { AddressedLimits : addressedLimits , RootPieceId : rootPieceID } , nil
2019-03-18 10:55:06 +00:00
}
2019-05-10 02:39:21 +01:00
func calculateSpaceUsed ( ptr * pb . Pointer ) ( inlineSpace , remoteSpace int64 ) {
inline := ptr . GetInlineSegment ( )
if inline != nil {
return int64 ( len ( inline ) ) , 0
}
segmentSize := ptr . GetSegmentSize ( )
remote := ptr . GetRemote ( )
if remote == nil {
return 0 , 0
}
minReq := remote . GetRedundancy ( ) . GetMinReq ( )
pieceSize := segmentSize / int64 ( minReq )
pieces := remote . GetRemotePieces ( )
return 0 , pieceSize * int64 ( len ( pieces ) )
}
2019-03-18 10:55:06 +00:00
// CommitSegment commits segment metadata
func ( endpoint * Endpoint ) CommitSegment ( ctx context . Context , req * pb . SegmentCommitRequest ) ( resp * pb . SegmentCommitResponse , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
2019-05-24 17:51:27 +01:00
keyInfo , err := endpoint . validateAuth ( ctx , macaroon . Action {
Op : macaroon . ActionWrite ,
Bucket : req . Bucket ,
EncryptedPath : req . Path ,
Time : time . Now ( ) ,
} )
2019-03-18 10:55:06 +00:00
if err != nil {
return nil , status . Errorf ( codes . Unauthenticated , err . Error ( ) )
}
err = endpoint . validateBucket ( req . Bucket )
if err != nil {
return nil , status . Errorf ( codes . InvalidArgument , err . Error ( ) )
}
err = endpoint . validateCommit ( req )
if err != nil {
return nil , status . Errorf ( codes . Internal , err . Error ( ) )
}
2019-03-30 11:21:49 +00:00
err = endpoint . filterValidPieces ( req . Pointer )
if err != nil {
return nil , status . Errorf ( codes . Internal , err . Error ( ) )
}
2019-03-18 10:55:06 +00:00
2019-04-02 15:55:58 +01:00
path , err := CreatePath ( keyInfo . ProjectID , req . Segment , req . Bucket , req . Path )
2019-03-18 10:55:06 +00:00
if err != nil {
return nil , status . Errorf ( codes . InvalidArgument , err . Error ( ) )
}
2019-05-10 02:39:21 +01:00
inlineUsed , remoteUsed := calculateSpaceUsed ( req . Pointer )
2019-05-28 16:36:52 +01:00
if err := endpoint . projectUsage . AddProjectStorageUsage ( ctx , keyInfo . ProjectID , inlineUsed , remoteUsed ) ; err != nil {
2019-05-10 02:39:21 +01:00
endpoint . log . Sugar ( ) . Errorf ( "Could not track new storage usage by project %v: %v" , keyInfo . ProjectID , err )
// but continue. it's most likely our own fault that we couldn't track it, and the only thing
// that will be affected is our per-project bandwidth and storage limits.
}
2019-04-25 09:46:32 +01:00
err = endpoint . metainfo . Put ( path , req . Pointer )
2019-03-18 10:55:06 +00:00
if err != nil {
return nil , status . Errorf ( codes . Internal , err . Error ( ) )
}
2019-04-05 08:42:56 +01:00
if req . Pointer . Type == pb . Pointer_INLINE {
bucketID := createBucketID ( keyInfo . ProjectID , req . Bucket )
// TODO or maybe use pointer.SegmentSize ??
err = endpoint . orders . UpdatePutInlineOrder ( ctx , bucketID , int64 ( len ( req . Pointer . InlineSegment ) ) )
if err != nil {
return nil , status . Errorf ( codes . Internal , err . Error ( ) )
}
}
2019-04-25 09:46:32 +01:00
pointer , err := endpoint . metainfo . Get ( path )
2019-03-18 10:55:06 +00:00
if err != nil {
return nil , status . Errorf ( codes . Internal , err . Error ( ) )
}
return & pb . SegmentCommitResponse { Pointer : pointer } , nil
}
// DownloadSegment gets Pointer incase of INLINE data or list of OrderLimit necessary to download remote data
func ( endpoint * Endpoint ) DownloadSegment ( ctx context . Context , req * pb . SegmentDownloadRequest ) ( resp * pb . SegmentDownloadResponse , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
2019-05-24 17:51:27 +01:00
keyInfo , err := endpoint . validateAuth ( ctx , macaroon . Action {
Op : macaroon . ActionRead ,
Bucket : req . Bucket ,
EncryptedPath : req . Path ,
Time : time . Now ( ) ,
} )
2019-03-18 10:55:06 +00:00
if err != nil {
return nil , status . Errorf ( codes . Unauthenticated , err . Error ( ) )
}
err = endpoint . validateBucket ( req . Bucket )
if err != nil {
return nil , status . Errorf ( codes . InvalidArgument , err . Error ( ) )
}
2019-04-02 19:21:18 +01:00
bucketID := createBucketID ( keyInfo . ProjectID , req . Bucket )
2019-05-28 16:36:52 +01:00
exceeded , limit , err := endpoint . projectUsage . ExceedsBandwidthUsage ( ctx , keyInfo . ProjectID , bucketID )
2019-04-02 19:21:18 +01:00
if err != nil {
2019-05-28 16:36:52 +01:00
endpoint . log . Error ( "retrieving project bandwidth total" , zap . Error ( err ) )
2019-04-02 19:21:18 +01:00
}
if exceeded {
2019-05-28 16:36:52 +01:00
endpoint . log . Sugar ( ) . Errorf ( "monthly project limits are %s of storage and bandwidth usage. This limit has been exceeded for bandwidth for projectID %s." ,
limit , keyInfo . ProjectID ,
2019-04-02 19:21:18 +01:00
)
2019-05-28 16:36:52 +01:00
return nil , status . Errorf ( codes . ResourceExhausted , "Exceeded Usage Limit" )
2019-04-02 19:21:18 +01:00
}
2019-04-02 15:55:58 +01:00
path , err := CreatePath ( keyInfo . ProjectID , req . Segment , req . Bucket , req . Path )
2019-03-18 10:55:06 +00:00
if err != nil {
return nil , status . Errorf ( codes . InvalidArgument , err . Error ( ) )
}
// TODO refactor to use []byte directly
2019-04-25 09:46:32 +01:00
pointer , err := endpoint . metainfo . Get ( path )
2019-03-18 10:55:06 +00:00
if err != nil {
if storage . ErrKeyNotFound . Has ( err ) {
return nil , status . Errorf ( codes . NotFound , err . Error ( ) )
}
return nil , status . Errorf ( codes . Internal , err . Error ( ) )
}
if pointer . Type == pb . Pointer_INLINE {
2019-04-05 08:42:56 +01:00
// TODO or maybe use pointer.SegmentSize ??
err := endpoint . orders . UpdateGetInlineOrder ( ctx , bucketID , int64 ( len ( pointer . InlineSegment ) ) )
if err != nil {
return nil , status . Errorf ( codes . Internal , err . Error ( ) )
}
2019-03-18 10:55:06 +00:00
return & pb . SegmentDownloadResponse { Pointer : pointer } , nil
} else if pointer . Type == pb . Pointer_REMOTE && pointer . Remote != nil {
2019-03-28 20:09:23 +00:00
uplinkIdentity , err := identity . PeerIdentityFromContext ( ctx )
2019-03-18 10:55:06 +00:00
if err != nil {
return nil , status . Errorf ( codes . Internal , err . Error ( ) )
}
2019-03-28 20:09:23 +00:00
limits , err := endpoint . orders . CreateGetOrderLimits ( ctx , uplinkIdentity , bucketID , pointer )
if err != nil {
2019-03-27 10:24:35 +00:00
return nil , status . Errorf ( codes . Internal , err . Error ( ) )
}
2019-03-18 10:55:06 +00:00
return & pb . SegmentDownloadResponse { Pointer : pointer , AddressedLimits : limits } , nil
}
return & pb . SegmentDownloadResponse { } , nil
}
// DeleteSegment deletes segment metadata from satellite and returns OrderLimit array to remove them from storage node
func ( endpoint * Endpoint ) DeleteSegment ( ctx context . Context , req * pb . SegmentDeleteRequest ) ( resp * pb . SegmentDeleteResponse , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
2019-05-24 17:51:27 +01:00
keyInfo , err := endpoint . validateAuth ( ctx , macaroon . Action {
Op : macaroon . ActionDelete ,
Bucket : req . Bucket ,
EncryptedPath : req . Path ,
Time : time . Now ( ) ,
} )
2019-03-18 10:55:06 +00:00
if err != nil {
return nil , status . Errorf ( codes . Unauthenticated , err . Error ( ) )
}
err = endpoint . validateBucket ( req . Bucket )
if err != nil {
return nil , status . Errorf ( codes . InvalidArgument , err . Error ( ) )
}
2019-04-02 15:55:58 +01:00
path , err := CreatePath ( keyInfo . ProjectID , req . Segment , req . Bucket , req . Path )
2019-03-18 10:55:06 +00:00
if err != nil {
return nil , status . Errorf ( codes . InvalidArgument , err . Error ( ) )
}
// TODO refactor to use []byte directly
2019-04-25 09:46:32 +01:00
pointer , err := endpoint . metainfo . Get ( path )
2019-03-18 10:55:06 +00:00
if err != nil {
if storage . ErrKeyNotFound . Has ( err ) {
return nil , status . Errorf ( codes . NotFound , err . Error ( ) )
}
return nil , status . Errorf ( codes . Internal , err . Error ( ) )
}
2019-04-25 09:46:32 +01:00
err = endpoint . metainfo . Delete ( path )
2019-05-24 20:56:08 +01:00
2019-03-18 10:55:06 +00:00
if err != nil {
return nil , status . Errorf ( codes . Internal , err . Error ( ) )
}
if pointer . Type == pb . Pointer_REMOTE && pointer . Remote != nil {
2019-03-28 20:09:23 +00:00
uplinkIdentity , err := identity . PeerIdentityFromContext ( ctx )
2019-03-18 10:55:06 +00:00
if err != nil {
return nil , status . Errorf ( codes . Internal , err . Error ( ) )
}
2019-05-24 20:56:08 +01:00
for _ , piece := range pointer . GetRemote ( ) . GetRemotePieces ( ) {
_ , err := endpoint . containment . Delete ( ctx , piece . NodeId )
if err != nil {
return nil , status . Errorf ( codes . Internal , err . Error ( ) )
}
}
2019-03-28 20:09:23 +00:00
bucketID := createBucketID ( keyInfo . ProjectID , req . Bucket )
limits , err := endpoint . orders . CreateDeleteOrderLimits ( ctx , uplinkIdentity , bucketID , pointer )
2019-03-18 10:55:06 +00:00
if err != nil {
2019-03-28 20:09:23 +00:00
return nil , status . Errorf ( codes . Internal , err . Error ( ) )
2019-03-18 10:55:06 +00:00
}
2019-03-28 20:09:23 +00:00
return & pb . SegmentDeleteResponse { AddressedLimits : limits } , nil
2019-03-18 10:55:06 +00:00
}
2019-03-28 20:09:23 +00:00
return & pb . SegmentDeleteResponse { } , nil
2019-03-18 10:55:06 +00:00
}
// ListSegments returns all Path keys in the Pointers bucket
func ( endpoint * Endpoint ) ListSegments ( ctx context . Context , req * pb . ListSegmentsRequest ) ( resp * pb . ListSegmentsResponse , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
2019-05-24 17:51:27 +01:00
keyInfo , err := endpoint . validateAuth ( ctx , macaroon . Action {
Op : macaroon . ActionList ,
Bucket : req . Bucket ,
EncryptedPath : req . Prefix ,
Time : time . Now ( ) ,
} )
2019-03-18 10:55:06 +00:00
if err != nil {
return nil , status . Errorf ( codes . Unauthenticated , err . Error ( ) )
}
2019-04-02 15:55:58 +01:00
prefix , err := CreatePath ( keyInfo . ProjectID , - 1 , req . Bucket , req . Prefix )
2019-03-18 10:55:06 +00:00
if err != nil {
return nil , status . Errorf ( codes . InvalidArgument , err . Error ( ) )
}
2019-04-25 09:46:32 +01:00
items , more , err := endpoint . metainfo . List ( prefix , string ( req . StartAfter ) , string ( req . EndBefore ) , req . Recursive , req . Limit , req . MetaFlags )
2019-03-18 10:55:06 +00:00
if err != nil {
return nil , status . Errorf ( codes . Internal , "ListV2: %v" , err )
}
segmentItems := make ( [ ] * pb . ListSegmentsResponse_Item , len ( items ) )
for i , item := range items {
segmentItems [ i ] = & pb . ListSegmentsResponse_Item {
Path : [ ] byte ( item . Path ) ,
Pointer : item . Pointer ,
IsPrefix : item . IsPrefix ,
}
}
return & pb . ListSegmentsResponse { Items : segmentItems , More : more } , nil
}
2019-03-28 20:09:23 +00:00
func createBucketID ( projectID uuid . UUID , bucket [ ] byte ) [ ] byte {
entries := make ( [ ] string , 0 )
entries = append ( entries , projectID . String ( ) )
entries = append ( entries , string ( bucket ) )
return [ ] byte ( storj . JoinPaths ( entries ... ) )
}
2019-03-18 10:55:06 +00:00
func ( endpoint * Endpoint ) filterValidPieces ( pointer * pb . Pointer ) error {
if pointer . Type == pb . Pointer_REMOTE {
var remotePieces [ ] * pb . RemotePiece
remote := pointer . Remote
for _ , piece := range remote . RemotePieces {
// TODO enable verification
// err := auth.VerifyMsg(piece.Hash, piece.NodeId)
// if err == nil {
// // set to nil after verification to avoid storing in DB
// piece.Hash = nil
// remotePieces = append(remotePieces, piece)
// } else {
// // TODO satellite should send Delete request for piece that failed
// s.logger.Warn("unable to verify piece hash: %v", zap.Error(err))
// }
remotePieces = append ( remotePieces , piece )
}
2019-05-17 20:02:40 +01:00
// we repair when the number of healthy files is less than or equal to the repair threshold
// except for the case when the repair and success thresholds are the same (a case usually seen during testing)
if int32 ( len ( remotePieces ) ) <= remote . Redundancy . RepairThreshold && remote . Redundancy . RepairThreshold != remote . Redundancy . SuccessThreshold {
return Error . New ( "Number of valid pieces is less than or equal to the repair threshold: %v < %v" ,
2019-03-18 10:55:06 +00:00
len ( remotePieces ) ,
2019-04-05 11:19:20 +01:00
remote . Redundancy . RepairThreshold ,
2019-03-18 10:55:06 +00:00
)
}
remote . RemotePieces = remotePieces
}
return nil
}
func ( endpoint * Endpoint ) validateBucket ( bucket [ ] byte ) error {
if len ( bucket ) == 0 {
return errs . New ( "bucket not specified" )
}
2019-03-19 14:37:28 +00:00
if bytes . ContainsAny ( bucket , "/" ) {
return errs . New ( "bucket should not contain slash" )
}
2019-03-18 10:55:06 +00:00
return nil
}
func ( endpoint * Endpoint ) validateCommit ( req * pb . SegmentCommitRequest ) error {
err := endpoint . validatePointer ( req . Pointer )
if err != nil {
return err
}
if req . Pointer . Type == pb . Pointer_REMOTE {
remote := req . Pointer . Remote
2019-04-09 14:31:19 +01:00
if len ( req . OriginalLimits ) == 0 {
return Error . New ( "no order limits" )
}
2019-03-18 10:55:06 +00:00
if int32 ( len ( req . OriginalLimits ) ) != remote . Redundancy . Total {
return Error . New ( "invalid no order limit for piece" )
}
for _ , piece := range remote . RemotePieces {
limit := req . OriginalLimits [ piece . PieceNum ]
2019-03-28 20:09:23 +00:00
err := endpoint . orders . VerifyOrderLimitSignature ( limit )
2019-03-18 10:55:06 +00:00
if err != nil {
return err
}
if limit == nil {
return Error . New ( "invalid no order limit for piece" )
}
derivedPieceID := remote . RootPieceId . Derive ( piece . NodeId )
if limit . PieceId . IsZero ( ) || limit . PieceId != derivedPieceID {
return Error . New ( "invalid order limit piece id" )
}
if bytes . Compare ( piece . NodeId . Bytes ( ) , limit . StorageNodeId . Bytes ( ) ) != 0 {
return Error . New ( "piece NodeID != order limit NodeID" )
}
}
}
return nil
}
func ( endpoint * Endpoint ) validatePointer ( pointer * pb . Pointer ) error {
if pointer == nil {
return Error . New ( "no pointer specified" )
}
// TODO does it all?
if pointer . Type == pb . Pointer_REMOTE {
if pointer . Remote == nil {
return Error . New ( "no remote segment specified" )
}
if pointer . Remote . RemotePieces == nil {
return Error . New ( "no remote segment pieces specified" )
}
if pointer . Remote . Redundancy == nil {
return Error . New ( "no redundancy scheme specified" )
}
}
return nil
}
2019-04-02 15:55:58 +01:00
// CreatePath will create a Segment path
func CreatePath ( projectID uuid . UUID , segmentIndex int64 , bucket , path [ ] byte ) ( storj . Path , error ) {
if segmentIndex < - 1 {
return "" , errors . New ( "invalid segment index" )
}
segment := "l"
if segmentIndex > - 1 {
segment = "s" + strconv . FormatInt ( segmentIndex , 10 )
}
entries := make ( [ ] string , 0 )
entries = append ( entries , projectID . String ( ) )
entries = append ( entries , segment )
if len ( bucket ) != 0 {
entries = append ( entries , string ( bucket ) )
}
if len ( path ) != 0 {
entries = append ( entries , string ( path ) )
}
return storj . JoinPaths ( entries ... ) , nil
}
2019-04-09 14:31:19 +01:00
func ( endpoint * Endpoint ) validateRedundancy ( redundancy * pb . RedundancyScheme ) error {
// TODO more validation, use validation from eestream.NewRedundancyStrategy
if redundancy . ErasureShareSize <= 0 {
return Error . New ( "erasure share size cannot be less than 0" )
}
return nil
}