2019-03-18 10:55:06 +00:00
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package metainfo
import (
"context"
2019-06-24 18:15:45 +01:00
"crypto/sha256"
2019-04-02 15:55:58 +01:00
"errors"
2019-10-17 19:01:40 +01:00
"fmt"
2019-12-11 17:44:13 +00:00
"math"
2019-03-18 10:55:06 +00:00
"strconv"
2019-04-02 19:21:18 +01:00
"time"
2019-03-18 10:55:06 +00:00
2019-07-16 11:39:23 +01:00
"github.com/gogo/protobuf/proto"
2019-03-18 10:55:06 +00:00
"github.com/skyrings/skyring-common/tools/uuid"
"github.com/zeebo/errs"
"go.uber.org/zap"
monkit "gopkg.in/spacemonkeygo/monkit.v2"
2020-01-07 07:26:45 +00:00
"storj.io/common/errs2"
2019-12-27 11:48:47 +00:00
"storj.io/common/identity"
"storj.io/common/pb"
"storj.io/common/rpc"
"storj.io/common/rpc/rpcstatus"
"storj.io/common/signing"
"storj.io/common/storj"
"storj.io/common/sync2"
2020-01-07 18:34:43 +00:00
"storj.io/storj/pkg/macaroon"
2019-12-16 17:59:01 +00:00
"storj.io/storj/private/dbutil"
2019-07-28 06:55:36 +01:00
"storj.io/storj/satellite/accounting"
2019-06-21 20:14:34 +01:00
"storj.io/storj/satellite/attribution"
2019-03-18 10:55:06 +00:00
"storj.io/storj/satellite/console"
2019-03-27 10:24:35 +00:00
"storj.io/storj/satellite/orders"
2019-07-28 06:55:36 +01:00
"storj.io/storj/satellite/overlay"
2019-11-26 11:12:37 +00:00
"storj.io/storj/satellite/rewards"
2020-01-08 13:40:19 +00:00
"storj.io/uplink/eestream"
"storj.io/uplink/piecestore"
"storj.io/uplink/storage/meta"
2019-03-18 10:55:06 +00:00
)
2019-07-16 11:39:23 +01:00
const (
2019-10-16 13:47:08 +01:00
pieceHashExpiration = 24 * time . Hour
2019-07-16 11:39:23 +01:00
satIDExpiration = 24 * time . Hour
2019-07-24 12:33:23 +01:00
lastSegment = - 1
listLimit = 1000
2020-01-07 18:34:43 +00:00
2019-12-20 12:13:09 +00:00
// TODO: orange/v3-3406 this value may change once it's used in production
deleteObjectPiecesConcurrencyLimit = 100
2020-01-07 18:34:43 +00:00
deleteObjectPiecesSuccessThreshold = 0.75
2019-07-16 11:39:23 +01:00
)
2019-07-03 17:14:37 +01:00
2019-03-18 10:55:06 +00:00
var (
mon = monkit . Package ( )
// Error general metainfo error
Error = errs . Class ( "metainfo error" )
2019-11-05 19:13:45 +00:00
// ErrNodeAlreadyExists pointer already has a piece for a node err
ErrNodeAlreadyExists = errs . Class ( "metainfo error: node already exists" )
2019-03-18 10:55:06 +00:00
)
// APIKeys is api keys store methods used by endpoint
2019-09-10 14:24:16 +01:00
//
// architecture: Database
2019-03-18 10:55:06 +00:00
type APIKeys interface {
2019-05-24 17:51:27 +01:00
GetByHead ( ctx context . Context , head [ ] byte ) ( * console . APIKeyInfo , error )
}
// Revocations is the revocations store methods used by the endpoint
2019-09-10 14:24:16 +01:00
//
// architecture: Database
2019-05-24 17:51:27 +01:00
type Revocations interface {
GetByProjectID ( ctx context . Context , projectID uuid . UUID ) ( [ ] [ ] byte , error )
2019-03-18 10:55:06 +00:00
}
2019-12-11 18:46:41 +00:00
// Endpoint metainfo endpoint.
2019-09-10 14:24:16 +01:00
//
// architecture: Endpoint
2019-03-18 10:55:06 +00:00
type Endpoint struct {
2019-10-01 17:55:02 +01:00
log * zap . Logger
metainfo * Service
orders * orders . Service
overlay * overlay . Service
2019-11-26 11:12:37 +00:00
attributions attribution . DB
partners * rewards . PartnersService
2019-10-01 17:55:02 +01:00
peerIdentities overlay . PeerIdentities
2019-11-15 14:27:44 +00:00
projectUsage * accounting . Service
2019-12-11 17:44:13 +00:00
dialer rpc . Dialer
2019-10-01 17:55:02 +01:00
apiKeys APIKeys
createRequests * createRequests
requiredRSConfig RSConfig
satellite signing . Signer
maxCommitInterval time . Duration
2019-03-18 10:55:06 +00:00
}
2019-12-11 17:44:13 +00:00
// NewEndpoint creates new metainfo endpoint instance.
2019-11-26 11:12:37 +00:00
func NewEndpoint ( log * zap . Logger , metainfo * Service , orders * orders . Service , cache * overlay . Service ,
attributions attribution . DB , partners * rewards . PartnersService , peerIdentities overlay . PeerIdentities ,
2019-12-11 17:44:13 +00:00
dialer rpc . Dialer , apiKeys APIKeys , projectUsage * accounting . Service , rsConfig RSConfig , satellite signing . Signer , maxCommitInterval time . Duration ) * Endpoint {
2019-03-18 10:55:06 +00:00
// TODO do something with too many params
return & Endpoint {
2019-10-01 17:55:02 +01:00
log : log ,
metainfo : metainfo ,
orders : orders ,
overlay : cache ,
2019-11-26 11:12:37 +00:00
attributions : attributions ,
partners : partners ,
2019-10-01 17:55:02 +01:00
peerIdentities : peerIdentities ,
2019-12-11 17:44:13 +00:00
dialer : dialer ,
2019-10-01 17:55:02 +01:00
apiKeys : apiKeys ,
projectUsage : projectUsage ,
createRequests : newCreateRequests ( ) ,
requiredRSConfig : rsConfig ,
satellite : satellite ,
maxCommitInterval : maxCommitInterval ,
2019-03-18 10:55:06 +00:00
}
}
// Close closes resources
func ( endpoint * Endpoint ) Close ( ) error { return nil }
2019-07-08 14:33:15 +01:00
// SegmentInfoOld returns segment metadata info
func ( endpoint * Endpoint ) SegmentInfoOld ( ctx context . Context , req * pb . SegmentInfoRequestOld ) ( resp * pb . SegmentInfoResponseOld , err error ) {
2019-03-18 10:55:06 +00:00
defer mon . Task ( ) ( & ctx ) ( & err )
2019-09-19 17:19:29 +01:00
keyInfo , err := endpoint . validateAuth ( ctx , req . Header , macaroon . Action {
2019-05-24 17:51:27 +01:00
Op : macaroon . ActionRead ,
Bucket : req . Bucket ,
EncryptedPath : req . Path ,
Time : time . Now ( ) ,
} )
2019-03-18 10:55:06 +00:00
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . Unauthenticated , err . Error ( ) )
2019-03-18 10:55:06 +00:00
}
2019-06-04 12:55:38 +01:00
err = endpoint . validateBucket ( ctx , req . Bucket )
2019-03-18 10:55:06 +00:00
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . InvalidArgument , err . Error ( ) )
2019-03-18 10:55:06 +00:00
}
2019-08-01 10:04:31 +01:00
pointer , _ , err := endpoint . getPointer ( ctx , keyInfo . ProjectID , req . Segment , req . Bucket , req . Path )
2019-03-18 10:55:06 +00:00
if err != nil {
2019-08-01 10:04:31 +01:00
return nil , err
2019-03-18 10:55:06 +00:00
}
2019-07-08 14:33:15 +01:00
return & pb . SegmentInfoResponseOld { Pointer : pointer } , nil
2019-03-18 10:55:06 +00:00
}
2019-07-08 14:33:15 +01:00
// CreateSegmentOld will generate requested number of OrderLimit with coresponding node addresses for them
func ( endpoint * Endpoint ) CreateSegmentOld ( ctx context . Context , req * pb . SegmentWriteRequestOld ) ( resp * pb . SegmentWriteResponseOld , err error ) {
2019-03-18 10:55:06 +00:00
defer mon . Task ( ) ( & ctx ) ( & err )
2019-09-19 17:19:29 +01:00
keyInfo , err := endpoint . validateAuth ( ctx , req . Header , macaroon . Action {
2019-05-24 17:51:27 +01:00
Op : macaroon . ActionWrite ,
Bucket : req . Bucket ,
EncryptedPath : req . Path ,
Time : time . Now ( ) ,
} )
2019-03-18 10:55:06 +00:00
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . Unauthenticated , err . Error ( ) )
2019-03-18 10:55:06 +00:00
}
2019-06-04 12:55:38 +01:00
err = endpoint . validateBucket ( ctx , req . Bucket )
2019-04-01 21:14:58 +01:00
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . InvalidArgument , err . Error ( ) )
2019-04-01 21:14:58 +01:00
}
2019-07-16 20:16:41 +01:00
if ! req . Expiration . IsZero ( ) && ! req . Expiration . After ( time . Now ( ) ) {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . InvalidArgument , "Invalid expiration time" )
2019-07-16 20:16:41 +01:00
}
2019-06-04 12:55:38 +01:00
err = endpoint . validateRedundancy ( ctx , req . Redundancy )
2019-04-09 14:31:19 +01:00
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . InvalidArgument , err . Error ( ) )
2019-04-09 14:31:19 +01:00
}
2019-05-28 16:36:52 +01:00
exceeded , limit , err := endpoint . projectUsage . ExceedsStorageUsage ( ctx , keyInfo . ProjectID )
2019-04-02 19:21:18 +01:00
if err != nil {
2019-05-10 02:39:21 +01:00
endpoint . log . Error ( "retrieving project storage totals" , zap . Error ( err ) )
2019-04-02 19:21:18 +01:00
}
if exceeded {
2019-05-28 16:36:52 +01:00
endpoint . log . Sugar ( ) . Errorf ( "monthly project limits are %s of storage and bandwidth usage. This limit has been exceeded for storage for projectID %s" ,
limit , keyInfo . ProjectID ,
2019-04-02 19:21:18 +01:00
)
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . ResourceExhausted , "Exceeded Usage Limit" )
2019-04-02 19:21:18 +01:00
}
2019-03-18 10:55:06 +00:00
redundancy , err := eestream . NewRedundancyStrategyFromProto ( req . GetRedundancy ( ) )
if err != nil {
return nil , err
}
maxPieceSize := eestream . CalcPieceSize ( req . GetMaxEncryptedSegmentSize ( ) , redundancy )
2019-03-23 08:06:11 +00:00
request := overlay . FindStorageNodesRequest {
RequestedCount : int ( req . Redundancy . Total ) ,
FreeBandwidth : maxPieceSize ,
FreeDisk : maxPieceSize ,
2019-03-18 10:55:06 +00:00
}
2019-08-06 17:35:59 +01:00
nodes , err := endpoint . overlay . FindStorageNodes ( ctx , request )
2019-03-18 10:55:06 +00:00
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
2019-03-18 10:55:06 +00:00
}
2019-05-10 02:39:21 +01:00
bucketID := createBucketID ( keyInfo . ProjectID , req . Bucket )
2019-07-11 21:51:40 +01:00
rootPieceID , addressedLimits , piecePrivateKey , err := endpoint . orders . CreatePutOrderLimits ( ctx , bucketID , nodes , req . Expiration , maxPieceSize )
2019-03-27 10:24:35 +00:00
if err != nil {
2019-03-28 20:09:23 +00:00
return nil , Error . Wrap ( err )
2019-03-27 10:24:35 +00:00
}
2019-06-05 17:41:02 +01:00
if len ( addressedLimits ) > 0 {
endpoint . createRequests . Put ( addressedLimits [ 0 ] . Limit . SerialNumber , & createRequest {
Expiration : req . Expiration ,
Redundancy : req . Redundancy ,
} )
}
2019-07-11 21:51:40 +01:00
return & pb . SegmentWriteResponseOld { AddressedLimits : addressedLimits , RootPieceId : rootPieceID , PrivateKey : piecePrivateKey } , nil
2019-03-18 10:55:06 +00:00
}
2019-05-10 02:39:21 +01:00
func calculateSpaceUsed ( ptr * pb . Pointer ) ( inlineSpace , remoteSpace int64 ) {
inline := ptr . GetInlineSegment ( )
if inline != nil {
return int64 ( len ( inline ) ) , 0
}
segmentSize := ptr . GetSegmentSize ( )
remote := ptr . GetRemote ( )
if remote == nil {
return 0 , 0
}
minReq := remote . GetRedundancy ( ) . GetMinReq ( )
pieceSize := segmentSize / int64 ( minReq )
pieces := remote . GetRemotePieces ( )
return 0 , pieceSize * int64 ( len ( pieces ) )
}
2019-07-08 14:33:15 +01:00
// CommitSegmentOld commits segment metadata
func ( endpoint * Endpoint ) CommitSegmentOld ( ctx context . Context , req * pb . SegmentCommitRequestOld ) ( resp * pb . SegmentCommitResponseOld , err error ) {
2019-03-18 10:55:06 +00:00
defer mon . Task ( ) ( & ctx ) ( & err )
2019-09-19 17:19:29 +01:00
keyInfo , err := endpoint . validateAuth ( ctx , req . Header , macaroon . Action {
2019-05-24 17:51:27 +01:00
Op : macaroon . ActionWrite ,
Bucket : req . Bucket ,
EncryptedPath : req . Path ,
Time : time . Now ( ) ,
} )
2019-03-18 10:55:06 +00:00
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . Unauthenticated , err . Error ( ) )
2019-03-18 10:55:06 +00:00
}
2019-06-04 12:55:38 +01:00
err = endpoint . validateBucket ( ctx , req . Bucket )
2019-03-18 10:55:06 +00:00
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . InvalidArgument , err . Error ( ) )
2019-03-18 10:55:06 +00:00
}
2019-06-05 17:41:02 +01:00
err = endpoint . validateCommitSegment ( ctx , req )
2019-03-18 10:55:06 +00:00
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
2019-03-18 10:55:06 +00:00
}
2019-07-03 17:14:37 +01:00
err = endpoint . filterValidPieces ( ctx , req . Pointer , req . OriginalLimits )
2019-03-30 11:21:49 +00:00
if err != nil {
2019-10-17 19:01:40 +01:00
return nil , err
2019-03-30 11:21:49 +00:00
}
2019-03-18 10:55:06 +00:00
2019-06-04 12:55:38 +01:00
path , err := CreatePath ( ctx , keyInfo . ProjectID , req . Segment , req . Bucket , req . Path )
2019-03-18 10:55:06 +00:00
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . InvalidArgument , err . Error ( ) )
2019-03-18 10:55:06 +00:00
}
2019-07-08 23:24:38 +01:00
exceeded , limit , err := endpoint . projectUsage . ExceedsStorageUsage ( ctx , keyInfo . ProjectID )
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
2019-07-08 23:24:38 +01:00
}
if exceeded {
endpoint . log . Sugar ( ) . Errorf ( "monthly project limits are %s of storage and bandwidth usage. This limit has been exceeded for storage for projectID %s." ,
limit , keyInfo . ProjectID ,
)
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . ResourceExhausted , "Exceeded Usage Limit" )
2019-07-08 23:24:38 +01:00
}
2019-07-09 21:36:18 +01:00
// clear hashes so we don't store them
for _ , piece := range req . GetPointer ( ) . GetRemote ( ) . GetRemotePieces ( ) {
piece . Hash = nil
}
2019-09-18 14:50:33 +01:00
req . Pointer . PieceHashesVerified = true
2019-07-09 21:36:18 +01:00
2019-05-10 02:39:21 +01:00
inlineUsed , remoteUsed := calculateSpaceUsed ( req . Pointer )
2019-07-08 23:24:38 +01:00
// ToDo: Replace with hash & signature validation
// Ensure neither uplink or storage nodes are cheating on us
if req . Pointer . Type == pb . Pointer_REMOTE {
//We cannot have more redundancy than total/min
if float64 ( remoteUsed ) > ( float64 ( req . Pointer . SegmentSize ) / float64 ( req . Pointer . Remote . Redundancy . MinReq ) ) * float64 ( req . Pointer . Remote . Redundancy . Total ) {
endpoint . log . Sugar ( ) . Debugf ( "data size mismatch, got segment: %d, pieces: %d, RS Min, Total: %d,%d" , req . Pointer . SegmentSize , remoteUsed , req . Pointer . Remote . Redundancy . MinReq , req . Pointer . Remote . Redundancy . Total )
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . InvalidArgument , "mismatched segment size and piece usage" )
2019-07-08 23:24:38 +01:00
}
}
2019-05-28 16:36:52 +01:00
if err := endpoint . projectUsage . AddProjectStorageUsage ( ctx , keyInfo . ProjectID , inlineUsed , remoteUsed ) ; err != nil {
2019-08-20 14:16:51 +01:00
endpoint . log . Sugar ( ) . Errorf ( "Could not track new storage usage by project %q: %v" , keyInfo . ProjectID , err )
2019-05-10 02:39:21 +01:00
// but continue. it's most likely our own fault that we couldn't track it, and the only thing
// that will be affected is our per-project bandwidth and storage limits.
}
2019-06-05 15:23:10 +01:00
err = endpoint . metainfo . Put ( ctx , path , req . Pointer )
2019-03-18 10:55:06 +00:00
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
2019-03-18 10:55:06 +00:00
}
2019-04-05 08:42:56 +01:00
if req . Pointer . Type == pb . Pointer_INLINE {
// TODO or maybe use pointer.SegmentSize ??
2019-06-25 16:58:42 +01:00
err = endpoint . orders . UpdatePutInlineOrder ( ctx , keyInfo . ProjectID , req . Bucket , int64 ( len ( req . Pointer . InlineSegment ) ) )
2019-04-05 08:42:56 +01:00
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
2019-04-05 08:42:56 +01:00
}
}
2019-06-05 15:23:10 +01:00
pointer , err := endpoint . metainfo . Get ( ctx , path )
2019-03-18 10:55:06 +00:00
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
2019-03-18 10:55:06 +00:00
}
2019-06-05 17:41:02 +01:00
if len ( req . OriginalLimits ) > 0 {
endpoint . createRequests . Remove ( req . OriginalLimits [ 0 ] . SerialNumber )
}
2019-07-08 14:33:15 +01:00
return & pb . SegmentCommitResponseOld { Pointer : pointer } , nil
2019-03-18 10:55:06 +00:00
}
2019-07-08 14:33:15 +01:00
// DownloadSegmentOld gets Pointer incase of INLINE data or list of OrderLimit necessary to download remote data
func ( endpoint * Endpoint ) DownloadSegmentOld ( ctx context . Context , req * pb . SegmentDownloadRequestOld ) ( resp * pb . SegmentDownloadResponseOld , err error ) {
2019-03-18 10:55:06 +00:00
defer mon . Task ( ) ( & ctx ) ( & err )
2019-09-19 17:19:29 +01:00
keyInfo , err := endpoint . validateAuth ( ctx , req . Header , macaroon . Action {
2019-05-24 17:51:27 +01:00
Op : macaroon . ActionRead ,
Bucket : req . Bucket ,
EncryptedPath : req . Path ,
Time : time . Now ( ) ,
} )
2019-03-18 10:55:06 +00:00
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . Unauthenticated , err . Error ( ) )
2019-03-18 10:55:06 +00:00
}
2019-06-04 12:55:38 +01:00
err = endpoint . validateBucket ( ctx , req . Bucket )
2019-03-18 10:55:06 +00:00
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . InvalidArgument , err . Error ( ) )
2019-03-18 10:55:06 +00:00
}
2019-04-02 19:21:18 +01:00
bucketID := createBucketID ( keyInfo . ProjectID , req . Bucket )
2019-05-28 16:36:52 +01:00
exceeded , limit , err := endpoint . projectUsage . ExceedsBandwidthUsage ( ctx , keyInfo . ProjectID , bucketID )
2019-04-02 19:21:18 +01:00
if err != nil {
2019-05-28 16:36:52 +01:00
endpoint . log . Error ( "retrieving project bandwidth total" , zap . Error ( err ) )
2019-04-02 19:21:18 +01:00
}
if exceeded {
2019-05-28 16:36:52 +01:00
endpoint . log . Sugar ( ) . Errorf ( "monthly project limits are %s of storage and bandwidth usage. This limit has been exceeded for bandwidth for projectID %s." ,
limit , keyInfo . ProjectID ,
2019-04-02 19:21:18 +01:00
)
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . ResourceExhausted , "Exceeded Usage Limit" )
2019-04-02 19:21:18 +01:00
}
2019-08-01 10:04:31 +01:00
pointer , _ , err := endpoint . getPointer ( ctx , keyInfo . ProjectID , req . Segment , req . Bucket , req . Path )
2019-03-18 10:55:06 +00:00
if err != nil {
2019-08-01 10:04:31 +01:00
return nil , err
2019-03-18 10:55:06 +00:00
}
if pointer . Type == pb . Pointer_INLINE {
2019-04-05 08:42:56 +01:00
// TODO or maybe use pointer.SegmentSize ??
2019-06-25 16:58:42 +01:00
err := endpoint . orders . UpdateGetInlineOrder ( ctx , keyInfo . ProjectID , req . Bucket , int64 ( len ( pointer . InlineSegment ) ) )
2019-04-05 08:42:56 +01:00
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
2019-04-05 08:42:56 +01:00
}
2019-07-08 14:33:15 +01:00
return & pb . SegmentDownloadResponseOld { Pointer : pointer } , nil
2019-03-18 10:55:06 +00:00
} else if pointer . Type == pb . Pointer_REMOTE && pointer . Remote != nil {
2019-07-11 21:51:40 +01:00
limits , privateKey , err := endpoint . orders . CreateGetOrderLimits ( ctx , bucketID , pointer )
2019-03-28 20:09:23 +00:00
if err != nil {
2019-12-04 21:24:36 +00:00
if orders . ErrDownloadFailedNotEnoughPieces . Has ( err ) {
endpoint . log . Sugar ( ) . Errorf ( "unable to create order limits for project id %s from api key id %s: %v." , keyInfo . ProjectID . String ( ) , keyInfo . ID . String ( ) , zap . Error ( err ) )
}
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
2019-03-27 10:24:35 +00:00
}
2019-07-11 21:51:40 +01:00
return & pb . SegmentDownloadResponseOld { Pointer : pointer , AddressedLimits : limits , PrivateKey : privateKey } , nil
2019-03-18 10:55:06 +00:00
}
2019-07-08 14:33:15 +01:00
return & pb . SegmentDownloadResponseOld { } , nil
2019-03-18 10:55:06 +00:00
}
2019-07-08 14:33:15 +01:00
// DeleteSegmentOld deletes segment metadata from satellite and returns OrderLimit array to remove them from storage node
func ( endpoint * Endpoint ) DeleteSegmentOld ( ctx context . Context , req * pb . SegmentDeleteRequestOld ) ( resp * pb . SegmentDeleteResponseOld , err error ) {
2019-03-18 10:55:06 +00:00
defer mon . Task ( ) ( & ctx ) ( & err )
2019-09-19 17:19:29 +01:00
keyInfo , err := endpoint . validateAuth ( ctx , req . Header , macaroon . Action {
2019-05-24 17:51:27 +01:00
Op : macaroon . ActionDelete ,
Bucket : req . Bucket ,
EncryptedPath : req . Path ,
Time : time . Now ( ) ,
} )
2019-03-18 10:55:06 +00:00
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . Unauthenticated , err . Error ( ) )
2019-03-18 10:55:06 +00:00
}
2019-06-04 12:55:38 +01:00
err = endpoint . validateBucket ( ctx , req . Bucket )
2019-03-18 10:55:06 +00:00
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . InvalidArgument , err . Error ( ) )
2019-03-18 10:55:06 +00:00
}
2019-06-04 12:55:38 +01:00
path , err := CreatePath ( ctx , keyInfo . ProjectID , req . Segment , req . Bucket , req . Path )
2019-03-18 10:55:06 +00:00
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . InvalidArgument , err . Error ( ) )
2019-03-18 10:55:06 +00:00
}
// TODO refactor to use []byte directly
2019-06-05 15:23:10 +01:00
pointer , err := endpoint . metainfo . Get ( ctx , path )
2019-03-18 10:55:06 +00:00
if err != nil {
2019-12-10 20:21:30 +00:00
if storj . ErrObjectNotFound . Has ( err ) {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . NotFound , err . Error ( ) )
2019-03-18 10:55:06 +00:00
}
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
2019-03-18 10:55:06 +00:00
}
2019-11-06 17:02:14 +00:00
err = endpoint . metainfo . UnsynchronizedDelete ( ctx , path )
2019-05-24 20:56:08 +01:00
2019-03-18 10:55:06 +00:00
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
2019-03-18 10:55:06 +00:00
}
if pointer . Type == pb . Pointer_REMOTE && pointer . Remote != nil {
2019-03-28 20:09:23 +00:00
bucketID := createBucketID ( keyInfo . ProjectID , req . Bucket )
2019-07-11 21:51:40 +01:00
limits , privateKey , err := endpoint . orders . CreateDeleteOrderLimits ( ctx , bucketID , pointer )
2019-03-18 10:55:06 +00:00
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
2019-03-18 10:55:06 +00:00
}
2019-07-11 21:51:40 +01:00
return & pb . SegmentDeleteResponseOld { AddressedLimits : limits , PrivateKey : privateKey } , nil
2019-03-18 10:55:06 +00:00
}
2019-07-08 14:33:15 +01:00
return & pb . SegmentDeleteResponseOld { } , nil
2019-03-18 10:55:06 +00:00
}
2019-07-08 14:33:15 +01:00
// ListSegmentsOld returns all Path keys in the Pointers bucket
func ( endpoint * Endpoint ) ListSegmentsOld ( ctx context . Context , req * pb . ListSegmentsRequestOld ) ( resp * pb . ListSegmentsResponseOld , err error ) {
2019-03-18 10:55:06 +00:00
defer mon . Task ( ) ( & ctx ) ( & err )
2019-09-19 17:19:29 +01:00
keyInfo , err := endpoint . validateAuth ( ctx , req . Header , macaroon . Action {
2019-05-24 17:51:27 +01:00
Op : macaroon . ActionList ,
Bucket : req . Bucket ,
EncryptedPath : req . Prefix ,
Time : time . Now ( ) ,
} )
2019-03-18 10:55:06 +00:00
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . Unauthenticated , err . Error ( ) )
2019-03-18 10:55:06 +00:00
}
2019-06-04 12:55:38 +01:00
prefix , err := CreatePath ( ctx , keyInfo . ProjectID , - 1 , req . Bucket , req . Prefix )
2019-03-18 10:55:06 +00:00
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . InvalidArgument , err . Error ( ) )
2019-03-18 10:55:06 +00:00
}
2019-09-25 22:30:41 +01:00
items , more , err := endpoint . metainfo . List ( ctx , prefix , string ( req . StartAfter ) , req . Recursive , req . Limit , req . MetaFlags )
2019-03-18 10:55:06 +00:00
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
2019-03-18 10:55:06 +00:00
}
2019-07-08 14:33:15 +01:00
segmentItems := make ( [ ] * pb . ListSegmentsResponseOld_Item , len ( items ) )
2019-03-18 10:55:06 +00:00
for i , item := range items {
2019-07-08 14:33:15 +01:00
segmentItems [ i ] = & pb . ListSegmentsResponseOld_Item {
2019-03-18 10:55:06 +00:00
Path : [ ] byte ( item . Path ) ,
Pointer : item . Pointer ,
IsPrefix : item . IsPrefix ,
}
}
2019-07-08 14:33:15 +01:00
return & pb . ListSegmentsResponseOld { Items : segmentItems , More : more } , nil
2019-03-18 10:55:06 +00:00
}
2019-03-28 20:09:23 +00:00
func createBucketID ( projectID uuid . UUID , bucket [ ] byte ) [ ] byte {
entries := make ( [ ] string , 0 )
entries = append ( entries , projectID . String ( ) )
entries = append ( entries , string ( bucket ) )
return [ ] byte ( storj . JoinPaths ( entries ... ) )
}
2019-10-17 19:01:40 +01:00
// filterValidPieces filter out the invalid remote pieces held by pointer.
//
2019-12-03 13:36:32 +00:00
// This method expect the pointer to be valid, so it has to be validated before
// calling it.
//
2019-10-17 19:01:40 +01:00
// The method always return a gRPC status error so the caller can directly
// return it to the client.
2019-12-03 13:36:32 +00:00
func ( endpoint * Endpoint ) filterValidPieces ( ctx context . Context , pointer * pb . Pointer , originalLimits [ ] * pb . OrderLimit ) ( err error ) {
2019-06-04 12:55:38 +01:00
defer mon . Task ( ) ( & ctx ) ( & err )
2019-11-15 13:33:09 +00:00
if pointer . Type != pb . Pointer_REMOTE {
return nil
}
2019-09-18 14:50:33 +01:00
2019-11-15 13:33:09 +00:00
remote := pointer . Remote
peerIDMap , err := endpoint . mapNodesFor ( ctx , remote . RemotePieces )
if err != nil {
return err
}
type invalidPiece struct {
NodeID storj . NodeID
PieceNum int32
Reason string
}
var (
remotePieces [ ] * pb . RemotePiece
invalidPieces [ ] invalidPiece
lastPieceSize int64
allSizesValid = true
)
for _ , piece := range remote . RemotePieces {
// Verify storagenode signature on piecehash
peerID , ok := peerIDMap [ piece . NodeId ]
if ! ok {
endpoint . log . Warn ( "Identity chain unknown for node. Piece removed from pointer" ,
zap . Stringer ( "Node ID" , piece . NodeId ) ,
zap . Int32 ( "Piece ID" , piece . PieceNum ) ,
)
invalidPieces = append ( invalidPieces , invalidPiece {
NodeID : piece . NodeId ,
PieceNum : piece . PieceNum ,
Reason : "Identity chain unknown for node" ,
} )
continue
2019-09-18 14:50:33 +01:00
}
2019-11-15 13:33:09 +00:00
signee := signing . SigneeFromPeerIdentity ( peerID )
2019-09-18 14:50:33 +01:00
2019-12-03 13:36:32 +00:00
limit := originalLimits [ piece . PieceNum ]
if limit == nil {
endpoint . log . Warn ( "There is not limit for the piece. Piece removed from pointer" ,
zap . Int32 ( "Piece ID" , piece . PieceNum ) ,
)
invalidPieces = append ( invalidPieces , invalidPiece {
NodeID : piece . NodeId ,
PieceNum : piece . PieceNum ,
Reason : "No order limit for validating the piece hash" ,
} )
continue
}
err = endpoint . validatePieceHash ( ctx , piece , limit , signee )
2019-11-15 13:33:09 +00:00
if err != nil {
endpoint . log . Warn ( "Problem validating piece hash. Pieces removed from pointer" , zap . Error ( err ) )
invalidPieces = append ( invalidPieces , invalidPiece {
NodeID : piece . NodeId ,
PieceNum : piece . PieceNum ,
Reason : err . Error ( ) ,
} )
continue
2019-10-17 19:01:40 +01:00
}
2019-03-18 10:55:06 +00:00
2019-11-15 13:33:09 +00:00
if piece . Hash . PieceSize <= 0 || ( lastPieceSize > 0 && lastPieceSize != piece . Hash . PieceSize ) {
allSizesValid = false
break
}
lastPieceSize = piece . Hash . PieceSize
2019-10-17 19:01:40 +01:00
2019-11-15 13:33:09 +00:00
remotePieces = append ( remotePieces , piece )
}
2019-07-03 17:14:37 +01:00
2019-11-15 13:33:09 +00:00
if allSizesValid {
redundancy , err := eestream . NewRedundancyStrategyFromProto ( pointer . GetRemote ( ) . GetRedundancy ( ) )
if err != nil {
endpoint . log . Debug ( "pointer contains an invalid redundancy strategy" , zap . Error ( Error . Wrap ( err ) ) )
return rpcstatus . Errorf ( rpcstatus . InvalidArgument ,
"invalid redundancy strategy; MinReq and/or Total are invalid: %s" , err ,
)
}
2019-07-03 17:14:37 +01:00
2019-11-15 13:33:09 +00:00
expectedPieceSize := eestream . CalcPieceSize ( pointer . SegmentSize , redundancy )
if expectedPieceSize != lastPieceSize {
endpoint . log . Debug ( "expected piece size is different from provided" ,
zap . Int64 ( "expectedSize" , expectedPieceSize ) ,
zap . Int64 ( "actualSize" , lastPieceSize ) ,
)
return rpcstatus . Errorf ( rpcstatus . InvalidArgument ,
"expected piece size is different from provided (%d != %d)" ,
expectedPieceSize , lastPieceSize ,
)
2019-03-18 10:55:06 +00:00
}
2019-11-15 13:33:09 +00:00
} else {
errMsg := "all pieces needs to have the same size"
endpoint . log . Debug ( errMsg )
return rpcstatus . Error ( rpcstatus . InvalidArgument , errMsg )
}
// We repair when the number of healthy files is less than or equal to the repair threshold
// except for the case when the repair and success thresholds are the same (a case usually seen during testing).
if numPieces := int32 ( len ( remotePieces ) ) ; numPieces <= remote . Redundancy . RepairThreshold && numPieces < remote . Redundancy . SuccessThreshold {
endpoint . log . Debug ( "Number of valid pieces is less than or equal to the repair threshold" ,
zap . Int ( "totalReceivedPieces" , len ( remote . RemotePieces ) ) ,
zap . Int ( "validPieces" , len ( remotePieces ) ) ,
zap . Int ( "invalidPieces" , len ( invalidPieces ) ) ,
zap . Int32 ( "repairThreshold" , remote . Redundancy . RepairThreshold ) ,
)
2019-03-18 10:55:06 +00:00
2019-11-15 13:33:09 +00:00
errMsg := fmt . Sprintf ( "Number of valid pieces (%d) is less than or equal to the repair threshold (%d). Found %d invalid pieces" ,
len ( remotePieces ) ,
remote . Redundancy . RepairThreshold ,
len ( remote . RemotePieces ) ,
)
if len ( invalidPieces ) > 0 {
errMsg = fmt . Sprintf ( "%s. Invalid Pieces:" , errMsg )
2019-07-03 17:14:37 +01:00
2019-11-15 13:33:09 +00:00
for _ , p := range invalidPieces {
errMsg = fmt . Sprintf ( "%s\nNodeID: %v, PieceNum: %d, Reason: %s" ,
errMsg , p . NodeID , p . PieceNum , p . Reason ,
2019-10-17 19:01:40 +01:00
)
2019-07-03 17:14:37 +01:00
}
}
2019-11-15 13:33:09 +00:00
return rpcstatus . Error ( rpcstatus . InvalidArgument , errMsg )
}
2019-10-17 19:01:40 +01:00
2019-11-15 13:33:09 +00:00
if int32 ( len ( remotePieces ) ) < remote . Redundancy . SuccessThreshold {
endpoint . log . Debug ( "Number of valid pieces is less than the success threshold" ,
zap . Int ( "totalReceivedPieces" , len ( remote . RemotePieces ) ) ,
zap . Int ( "validPieces" , len ( remotePieces ) ) ,
zap . Int ( "invalidPieces" , len ( invalidPieces ) ) ,
zap . Int32 ( "successThreshold" , remote . Redundancy . SuccessThreshold ) ,
)
2019-03-18 10:55:06 +00:00
2019-11-15 13:33:09 +00:00
errMsg := fmt . Sprintf ( "Number of valid pieces (%d) is less than the success threshold (%d). Found %d invalid pieces" ,
len ( remotePieces ) ,
remote . Redundancy . SuccessThreshold ,
len ( remote . RemotePieces ) ,
)
if len ( invalidPieces ) > 0 {
errMsg = fmt . Sprintf ( "%s. Invalid Pieces:" , errMsg )
2019-10-17 19:01:40 +01:00
2019-11-15 13:33:09 +00:00
for _ , p := range invalidPieces {
errMsg = fmt . Sprintf ( "%s\nNodeID: %v, PieceNum: %d, Reason: %s" ,
errMsg , p . NodeID , p . PieceNum , p . Reason ,
)
2019-10-17 19:01:40 +01:00
}
2019-07-31 19:28:43 +01:00
}
2019-11-15 13:33:09 +00:00
return rpcstatus . Error ( rpcstatus . InvalidArgument , errMsg )
2019-03-18 10:55:06 +00:00
}
2019-10-17 19:01:40 +01:00
2019-11-15 13:33:09 +00:00
remote . RemotePieces = remotePieces
2019-03-18 10:55:06 +00:00
return nil
}
2019-09-18 14:50:33 +01:00
func ( endpoint * Endpoint ) mapNodesFor ( ctx context . Context , pieces [ ] * pb . RemotePiece ) ( map [ storj . NodeID ] * identity . PeerIdentity , error ) {
nodeIDList := storj . NodeIDList { }
for _ , piece := range pieces {
nodeIDList = append ( nodeIDList , piece . NodeId )
}
peerIDList , err := endpoint . peerIdentities . BatchGet ( ctx , nodeIDList )
if err != nil {
2019-10-17 19:01:40 +01:00
endpoint . log . Error ( "retrieving batch of the peer identities of nodes" , zap . Error ( Error . Wrap ( err ) ) )
return nil , rpcstatus . Error ( rpcstatus . Internal , "retrieving nodes peer identities" )
2019-09-18 14:50:33 +01:00
}
peerIDMap := make ( map [ storj . NodeID ] * identity . PeerIdentity , len ( peerIDList ) )
for _ , peerID := range peerIDList {
peerIDMap [ peerID . ID ] = peerID
}
return peerIDMap , nil
}
2019-12-10 11:15:35 +00:00
// CreatePath creates a Segment path.
2019-06-04 12:55:38 +01:00
func CreatePath ( ctx context . Context , projectID uuid . UUID , segmentIndex int64 , bucket , path [ ] byte ) ( _ storj . Path , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
2019-04-02 15:55:58 +01:00
if segmentIndex < - 1 {
return "" , errors . New ( "invalid segment index" )
}
segment := "l"
if segmentIndex > - 1 {
segment = "s" + strconv . FormatInt ( segmentIndex , 10 )
}
entries := make ( [ ] string , 0 )
entries = append ( entries , projectID . String ( ) )
entries = append ( entries , segment )
if len ( bucket ) != 0 {
entries = append ( entries , string ( bucket ) )
}
if len ( path ) != 0 {
entries = append ( entries , string ( path ) )
}
return storj . JoinPaths ( entries ... ) , nil
}
2019-06-13 02:35:37 +01:00
2019-07-08 14:33:15 +01:00
// SetAttributionOld tries to add attribution to the bucket.
func ( endpoint * Endpoint ) SetAttributionOld ( ctx context . Context , req * pb . SetAttributionRequestOld ) ( _ * pb . SetAttributionResponseOld , err error ) {
2019-06-19 13:02:37 +01:00
defer mon . Task ( ) ( & ctx ) ( & err )
2019-09-19 17:19:29 +01:00
err = endpoint . setBucketAttribution ( ctx , req . Header , req . BucketName , req . PartnerId )
2019-06-13 02:35:37 +01:00
2019-08-05 08:07:40 +01:00
return & pb . SetAttributionResponseOld { } , err
2019-06-21 20:14:34 +01:00
}
2019-06-24 18:15:45 +01:00
// ProjectInfo returns allowed ProjectInfo for the provided API key
func ( endpoint * Endpoint ) ProjectInfo ( ctx context . Context , req * pb . ProjectInfoRequest ) ( _ * pb . ProjectInfoResponse , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
2019-09-19 17:19:29 +01:00
keyInfo , err := endpoint . validateAuth ( ctx , req . Header , macaroon . Action {
2019-06-24 18:15:45 +01:00
Op : macaroon . ActionProjectInfo ,
Time : time . Now ( ) ,
} )
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . Unauthenticated , err . Error ( ) )
2019-06-24 18:15:45 +01:00
}
salt := sha256 . Sum256 ( keyInfo . ProjectID [ : ] )
return & pb . ProjectInfoResponse {
ProjectSalt : salt [ : ] ,
} , nil
}
2019-07-01 23:17:30 +01:00
2019-07-08 23:32:18 +01:00
// GetBucket returns a bucket
func ( endpoint * Endpoint ) GetBucket ( ctx context . Context , req * pb . BucketGetRequest ) ( resp * pb . BucketGetResponse , err error ) {
2019-07-01 23:17:30 +01:00
defer mon . Task ( ) ( & ctx ) ( & err )
2019-07-08 23:32:18 +01:00
2019-09-19 17:19:29 +01:00
keyInfo , err := endpoint . validateAuth ( ctx , req . Header , macaroon . Action {
2019-07-08 23:32:18 +01:00
Op : macaroon . ActionRead ,
Bucket : req . Name ,
Time : time . Now ( ) ,
} )
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . Unauthenticated , err . Error ( ) )
2019-07-08 23:32:18 +01:00
}
bucket , err := endpoint . metainfo . GetBucket ( ctx , req . GetName ( ) , keyInfo . ProjectID )
if err != nil {
2019-07-12 13:57:02 +01:00
if storj . ErrBucketNotFound . Has ( err ) {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . NotFound , err . Error ( ) )
2019-07-12 13:57:02 +01:00
}
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
2019-07-08 23:32:18 +01:00
}
2019-07-19 16:17:34 +01:00
convBucket , err := convertBucketToProto ( ctx , bucket )
if err != nil {
return resp , err
}
2019-07-08 23:32:18 +01:00
return & pb . BucketGetResponse {
2019-07-19 16:17:34 +01:00
Bucket : convBucket ,
2019-07-08 23:32:18 +01:00
} , nil
2019-07-01 23:17:30 +01:00
}
2019-07-08 23:32:18 +01:00
// CreateBucket creates a new bucket
func ( endpoint * Endpoint ) CreateBucket ( ctx context . Context , req * pb . BucketCreateRequest ) ( resp * pb . BucketCreateResponse , err error ) {
2019-07-01 23:17:30 +01:00
defer mon . Task ( ) ( & ctx ) ( & err )
2019-07-08 23:32:18 +01:00
2019-09-19 17:19:29 +01:00
keyInfo , err := endpoint . validateAuth ( ctx , req . Header , macaroon . Action {
2019-07-08 23:32:18 +01:00
Op : macaroon . ActionWrite ,
Bucket : req . Name ,
Time : time . Now ( ) ,
} )
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . Unauthenticated , err . Error ( ) )
2019-07-08 23:32:18 +01:00
}
err = endpoint . validateBucket ( ctx , req . Name )
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . InvalidArgument , err . Error ( ) )
2019-07-08 23:32:18 +01:00
}
2019-07-24 12:33:23 +01:00
// TODO set default Redundancy if not set
2019-07-08 23:32:18 +01:00
err = endpoint . validateRedundancy ( ctx , req . GetDefaultRedundancyScheme ( ) )
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . InvalidArgument , err . Error ( ) )
2019-07-08 23:32:18 +01:00
}
2019-07-19 16:17:34 +01:00
// checks if bucket exists before updates it or makes a new entry
2019-11-26 11:12:37 +00:00
_ , err = endpoint . metainfo . GetBucket ( ctx , req . GetName ( ) , keyInfo . ProjectID )
2019-07-19 16:17:34 +01:00
if err == nil {
2019-11-26 11:12:37 +00:00
return nil , rpcstatus . Error ( rpcstatus . AlreadyExists , "bucket already exists" )
}
if ! storj . ErrBucketNotFound . Has ( err ) {
return nil , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
2019-07-08 23:32:18 +01:00
}
2019-11-26 11:12:37 +00:00
bucket , err := convertProtoToBucket ( req , keyInfo . ProjectID )
if err != nil {
return nil , rpcstatus . Error ( rpcstatus . InvalidArgument , err . Error ( ) )
}
2019-07-19 16:17:34 +01:00
2019-11-26 11:12:37 +00:00
bucket , err = endpoint . metainfo . CreateBucket ( ctx , bucket )
if err != nil {
endpoint . log . Error ( "error while creating bucket" , zap . String ( "bucketName" , bucket . Name ) , zap . Error ( err ) )
return nil , rpcstatus . Error ( rpcstatus . Internal , "unable to create bucket" )
}
2019-07-19 16:17:34 +01:00
2019-11-26 11:12:37 +00:00
convBucket , err := convertBucketToProto ( ctx , bucket )
if err != nil {
endpoint . log . Error ( "error while converting bucket to proto" , zap . String ( "bucketName" , bucket . Name ) , zap . Error ( err ) )
return nil , rpcstatus . Error ( rpcstatus . Internal , "unable to create bucket" )
2019-07-19 16:17:34 +01:00
}
2019-11-26 11:12:37 +00:00
return & pb . BucketCreateResponse {
Bucket : convBucket ,
} , nil
2019-07-01 23:17:30 +01:00
}
// DeleteBucket deletes a bucket
2019-07-08 23:32:18 +01:00
func ( endpoint * Endpoint ) DeleteBucket ( ctx context . Context , req * pb . BucketDeleteRequest ) ( resp * pb . BucketDeleteResponse , err error ) {
2019-07-01 23:17:30 +01:00
defer mon . Task ( ) ( & ctx ) ( & err )
2019-07-08 23:32:18 +01:00
2019-09-19 17:19:29 +01:00
keyInfo , err := endpoint . validateAuth ( ctx , req . Header , macaroon . Action {
2019-07-08 23:32:18 +01:00
Op : macaroon . ActionDelete ,
Bucket : req . Name ,
Time : time . Now ( ) ,
} )
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . Unauthenticated , err . Error ( ) )
2019-07-08 23:32:18 +01:00
}
err = endpoint . validateBucket ( ctx , req . Name )
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . InvalidArgument , err . Error ( ) )
2019-07-08 23:32:18 +01:00
}
err = endpoint . metainfo . DeleteBucket ( ctx , req . Name , keyInfo . ProjectID )
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
2019-07-08 23:32:18 +01:00
}
return & pb . BucketDeleteResponse { } , nil
2019-07-01 23:17:30 +01:00
}
2019-07-08 23:32:18 +01:00
// ListBuckets returns buckets in a project where the bucket name matches the request cursor
func ( endpoint * Endpoint ) ListBuckets ( ctx context . Context , req * pb . BucketListRequest ) ( resp * pb . BucketListResponse , err error ) {
2019-07-01 23:17:30 +01:00
defer mon . Task ( ) ( & ctx ) ( & err )
2019-07-08 23:32:18 +01:00
action := macaroon . Action {
Op : macaroon . ActionRead ,
Time : time . Now ( ) ,
}
2019-09-19 17:19:29 +01:00
keyInfo , err := endpoint . validateAuth ( ctx , req . Header , action )
2019-07-08 23:32:18 +01:00
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . Unauthenticated , err . Error ( ) )
2019-07-08 23:32:18 +01:00
}
2019-09-19 17:19:29 +01:00
allowedBuckets , err := getAllowedBuckets ( ctx , req . Header , action )
2019-07-08 23:32:18 +01:00
if err != nil {
return nil , err
}
listOpts := storj . BucketListOptions {
2019-07-12 13:57:02 +01:00
Cursor : string ( req . Cursor ) ,
Limit : int ( req . Limit ) ,
Direction : storj . ListDirection ( req . Direction ) ,
2019-07-08 23:32:18 +01:00
}
bucketList , err := endpoint . metainfo . ListBuckets ( ctx , keyInfo . ProjectID , listOpts , allowedBuckets )
if err != nil {
return nil , err
}
bucketItems := make ( [ ] * pb . BucketListItem , len ( bucketList . Items ) )
for i , item := range bucketList . Items {
bucketItems [ i ] = & pb . BucketListItem {
Name : [ ] byte ( item . Name ) ,
CreatedAt : item . Created ,
}
}
return & pb . BucketListResponse {
Items : bucketItems ,
More : bucketList . More ,
} , nil
2019-07-01 23:17:30 +01:00
}
2019-09-19 17:19:29 +01:00
func getAllowedBuckets ( ctx context . Context , header * pb . RequestHeader , action macaroon . Action ) ( _ macaroon . AllowedBuckets , err error ) {
key , err := getAPIKey ( ctx , header )
2019-07-08 23:32:18 +01:00
if err != nil {
2019-09-19 05:46:39 +01:00
return macaroon . AllowedBuckets { } , rpcstatus . Errorf ( rpcstatus . InvalidArgument , "Invalid API credentials: %v" , err )
2019-07-08 23:32:18 +01:00
}
2019-07-12 13:57:02 +01:00
allowedBuckets , err := key . GetAllowedBuckets ( ctx , action )
2019-07-08 23:32:18 +01:00
if err != nil {
2019-09-19 05:46:39 +01:00
return macaroon . AllowedBuckets { } , rpcstatus . Errorf ( rpcstatus . Internal , "GetAllowedBuckets: %v" , err )
2019-07-08 23:32:18 +01:00
}
return allowedBuckets , err
}
// SetBucketAttribution sets the bucket attribution.
2019-08-05 08:07:40 +01:00
func ( endpoint * Endpoint ) SetBucketAttribution ( ctx context . Context , req * pb . BucketSetAttributionRequest ) ( resp * pb . BucketSetAttributionResponse , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
2019-09-19 17:19:29 +01:00
err = endpoint . setBucketAttribution ( ctx , req . Header , req . Name , req . PartnerId )
2019-08-05 08:07:40 +01:00
return & pb . BucketSetAttributionResponse { } , err
}
2019-11-26 11:12:37 +00:00
// resolvePartnerID returns partnerIDBytes as parsed or UUID corresponding to header.UserAgent.
// returns empty uuid when neither is defined.
func ( endpoint * Endpoint ) resolvePartnerID ( ctx context . Context , header * pb . RequestHeader , partnerIDBytes [ ] byte ) ( uuid . UUID , error ) {
if len ( partnerIDBytes ) > 0 {
2019-12-16 17:59:01 +00:00
partnerID , err := dbutil . BytesToUUID ( partnerIDBytes )
2019-11-26 11:12:37 +00:00
if err != nil {
return uuid . UUID { } , rpcstatus . Errorf ( rpcstatus . InvalidArgument , "unable to parse partner ID: %v" , err )
}
return partnerID , nil
}
if len ( header . UserAgent ) == 0 {
return uuid . UUID { } , nil
}
partner , err := endpoint . partners . ByUserAgent ( ctx , string ( header . UserAgent ) )
if err != nil || partner . UUID == nil {
return uuid . UUID { } , rpcstatus . Errorf ( rpcstatus . InvalidArgument , "unable to resolve user agent %q: %v" , string ( header . UserAgent ) , err )
}
return * partner . UUID , nil
}
func ( endpoint * Endpoint ) setBucketAttribution ( ctx context . Context , header * pb . RequestHeader , bucketName [ ] byte , partnerIDBytes [ ] byte ) error {
2019-09-19 17:19:29 +01:00
keyInfo , err := endpoint . validateAuth ( ctx , header , macaroon . Action {
2019-08-05 08:07:40 +01:00
Op : macaroon . ActionList ,
Bucket : bucketName ,
EncryptedPath : [ ] byte ( "" ) ,
Time : time . Now ( ) ,
} )
if err != nil {
2019-09-19 05:46:39 +01:00
return rpcstatus . Error ( rpcstatus . Unauthenticated , err . Error ( ) )
2019-08-05 08:07:40 +01:00
}
2019-11-26 11:12:37 +00:00
partnerID , err := endpoint . resolvePartnerID ( ctx , header , partnerIDBytes )
2019-08-05 08:07:40 +01:00
if err != nil {
2019-11-26 11:12:37 +00:00
return rpcstatus . Error ( rpcstatus . InvalidArgument , err . Error ( ) )
}
if partnerID . IsZero ( ) {
return rpcstatus . Error ( rpcstatus . InvalidArgument , "unknown user agent or partner id" )
2019-08-05 08:07:40 +01:00
}
// check if attribution is set for given bucket
2019-11-26 11:12:37 +00:00
_ , err = endpoint . attributions . Get ( ctx , keyInfo . ProjectID , bucketName )
2019-08-05 08:07:40 +01:00
if err == nil {
2019-11-26 11:12:37 +00:00
endpoint . log . Info ( "bucket already attributed" , zap . ByteString ( "bucketName" , bucketName ) , zap . Stringer ( "Partner ID" , partnerID ) )
2019-08-05 08:07:40 +01:00
return nil
}
if ! attribution . ErrBucketNotAttributed . Has ( err ) {
// try only to set the attribution, when it's missing
endpoint . log . Error ( "error while getting attribution from DB" , zap . Error ( err ) )
2019-09-19 05:46:39 +01:00
return rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
2019-08-05 08:07:40 +01:00
}
prefix , err := CreatePath ( ctx , keyInfo . ProjectID , - 1 , bucketName , [ ] byte { } )
if err != nil {
2019-09-19 05:46:39 +01:00
return rpcstatus . Error ( rpcstatus . InvalidArgument , err . Error ( ) )
2019-08-05 08:07:40 +01:00
}
2019-09-25 22:30:41 +01:00
items , _ , err := endpoint . metainfo . List ( ctx , prefix , "" , true , 1 , 0 )
2019-08-05 08:07:40 +01:00
if err != nil {
endpoint . log . Error ( "error while listing segments" , zap . Error ( err ) )
2019-09-19 05:46:39 +01:00
return rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
2019-08-05 08:07:40 +01:00
}
if len ( items ) > 0 {
2019-11-26 11:12:37 +00:00
return rpcstatus . Errorf ( rpcstatus . AlreadyExists , "bucket %q is not empty, PartnerID %q cannot be attributed" , bucketName , partnerID )
2019-08-05 08:07:40 +01:00
}
2019-11-26 11:12:37 +00:00
// checks if bucket exists before updates it or makes a new entry
bucket , err := endpoint . metainfo . GetBucket ( ctx , bucketName , keyInfo . ProjectID )
if err != nil {
if storj . ErrBucketNotFound . Has ( err ) {
return rpcstatus . Errorf ( rpcstatus . NotFound , "bucket %q does not exist" , bucketName )
}
endpoint . log . Error ( "error while getting bucket" , zap . ByteString ( "bucketName" , bucketName ) , zap . Error ( err ) )
return rpcstatus . Error ( rpcstatus . Internal , "unable to set bucket attribution" )
}
if ! bucket . PartnerID . IsZero ( ) {
endpoint . log . Info ( "bucket already attributed" , zap . ByteString ( "bucketName" , bucketName ) , zap . Stringer ( "Partner ID" , partnerID ) )
return nil
}
// update bucket information
bucket . PartnerID = partnerID
_ , err = endpoint . metainfo . UpdateBucket ( ctx , bucket )
if err != nil {
endpoint . log . Error ( "error while updating bucket" , zap . ByteString ( "bucketName" , bucketName ) , zap . Error ( err ) )
return rpcstatus . Error ( rpcstatus . Internal , "unable to set bucket attribution" )
}
// update attribution table
_ , err = endpoint . attributions . Insert ( ctx , & attribution . Info {
2019-08-05 08:07:40 +01:00
ProjectID : keyInfo . ProjectID ,
BucketName : bucketName ,
PartnerID : partnerID ,
} )
if err != nil {
endpoint . log . Error ( "error while inserting attribution to DB" , zap . Error ( err ) )
2019-09-19 05:46:39 +01:00
return rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
2019-08-05 08:07:40 +01:00
}
2019-11-26 11:12:37 +00:00
2019-08-05 08:07:40 +01:00
return nil
2019-07-08 23:32:18 +01:00
}
2019-07-19 16:17:34 +01:00
func convertProtoToBucket ( req * pb . BucketCreateRequest , projectID uuid . UUID ) ( bucket storj . Bucket , err error ) {
2019-07-08 23:32:18 +01:00
bucketID , err := uuid . New ( )
if err != nil {
return storj . Bucket { } , err
}
defaultRS := req . GetDefaultRedundancyScheme ( )
defaultEP := req . GetDefaultEncryptionParameters ( )
2019-07-19 16:17:34 +01:00
2019-11-26 11:12:37 +00:00
// TODO: resolve partner id
2019-07-19 16:17:34 +01:00
var partnerID uuid . UUID
err = partnerID . UnmarshalJSON ( req . GetPartnerId ( ) )
// bucket's partnerID should never be set
// it is always read back from buckets DB
if err != nil && ! partnerID . IsZero ( ) {
return bucket , errs . New ( "Invalid uuid" )
}
2019-07-08 23:32:18 +01:00
return storj . Bucket {
ID : * bucketID ,
Name : string ( req . GetName ( ) ) ,
ProjectID : projectID ,
2019-07-19 16:17:34 +01:00
PartnerID : partnerID ,
2019-07-08 23:32:18 +01:00
PathCipher : storj . CipherSuite ( req . GetPathCipher ( ) ) ,
DefaultSegmentsSize : req . GetDefaultSegmentSize ( ) ,
DefaultRedundancyScheme : storj . RedundancyScheme {
Algorithm : storj . RedundancyAlgorithm ( defaultRS . GetType ( ) ) ,
ShareSize : defaultRS . GetErasureShareSize ( ) ,
RequiredShares : int16 ( defaultRS . GetMinReq ( ) ) ,
RepairShares : int16 ( defaultRS . GetRepairThreshold ( ) ) ,
OptimalShares : int16 ( defaultRS . GetSuccessThreshold ( ) ) ,
TotalShares : int16 ( defaultRS . GetTotal ( ) ) ,
} ,
DefaultEncryptionParameters : storj . EncryptionParameters {
CipherSuite : storj . CipherSuite ( defaultEP . CipherSuite ) ,
BlockSize : int32 ( defaultEP . BlockSize ) ,
} ,
} , nil
}
2019-07-19 16:17:34 +01:00
func convertBucketToProto ( ctx context . Context , bucket storj . Bucket ) ( pbBucket * pb . Bucket , err error ) {
2019-07-08 23:32:18 +01:00
rs := bucket . DefaultRedundancyScheme
2019-07-19 16:17:34 +01:00
partnerID , err := bucket . PartnerID . MarshalJSON ( )
if err != nil {
2019-09-19 05:46:39 +01:00
return pbBucket , rpcstatus . Error ( rpcstatus . Internal , "UUID marshal error" )
2019-07-19 16:17:34 +01:00
}
2019-07-08 23:32:18 +01:00
return & pb . Bucket {
Name : [ ] byte ( bucket . Name ) ,
PathCipher : pb . CipherSuite ( int ( bucket . PathCipher ) ) ,
2019-07-19 16:17:34 +01:00
PartnerId : partnerID ,
2019-07-08 23:32:18 +01:00
CreatedAt : bucket . Created ,
DefaultSegmentSize : bucket . DefaultSegmentsSize ,
DefaultRedundancyScheme : & pb . RedundancyScheme {
Type : pb . RedundancyScheme_RS ,
MinReq : int32 ( rs . RequiredShares ) ,
Total : int32 ( rs . TotalShares ) ,
RepairThreshold : int32 ( rs . RepairShares ) ,
SuccessThreshold : int32 ( rs . OptimalShares ) ,
ErasureShareSize : rs . ShareSize ,
} ,
DefaultEncryptionParameters : & pb . EncryptionParameters {
CipherSuite : pb . CipherSuite ( int ( bucket . DefaultEncryptionParameters . CipherSuite ) ) ,
BlockSize : int64 ( bucket . DefaultEncryptionParameters . BlockSize ) ,
} ,
2019-07-19 16:17:34 +01:00
} , nil
2019-07-01 23:17:30 +01:00
}
2019-07-16 11:39:23 +01:00
// BeginObject begins object
func ( endpoint * Endpoint ) BeginObject ( ctx context . Context , req * pb . ObjectBeginRequest ) ( resp * pb . ObjectBeginResponse , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
2019-09-19 17:19:29 +01:00
keyInfo , err := endpoint . validateAuth ( ctx , req . Header , macaroon . Action {
2019-07-16 11:39:23 +01:00
Op : macaroon . ActionWrite ,
Bucket : req . Bucket ,
EncryptedPath : req . EncryptedPath ,
Time : time . Now ( ) ,
} )
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . Unauthenticated , err . Error ( ) )
2019-07-16 11:39:23 +01:00
}
2020-01-03 09:27:10 +00:00
if ! req . ExpiresAt . IsZero ( ) && ! req . ExpiresAt . After ( time . Now ( ) ) {
return nil , rpcstatus . Error ( rpcstatus . InvalidArgument , "Invalid expiration time" )
}
2019-07-16 11:39:23 +01:00
err = endpoint . validateBucket ( ctx , req . Bucket )
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . InvalidArgument , err . Error ( ) )
2019-07-16 11:39:23 +01:00
}
bucket , err := endpoint . metainfo . GetBucket ( ctx , req . Bucket , keyInfo . ProjectID )
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . InvalidArgument , err . Error ( ) )
2019-07-16 11:39:23 +01:00
}
// take bucket RS values if not set in request
pbRS := req . RedundancyScheme
2019-08-01 10:04:31 +01:00
if pbRS . Type == 0 {
pbRS . Type = pb . RedundancyScheme_SchemeType ( bucket . DefaultRedundancyScheme . Algorithm )
}
2019-07-16 11:39:23 +01:00
if pbRS . ErasureShareSize == 0 {
pbRS . ErasureShareSize = bucket . DefaultRedundancyScheme . ShareSize
}
if pbRS . MinReq == 0 {
pbRS . MinReq = int32 ( bucket . DefaultRedundancyScheme . RequiredShares )
}
if pbRS . RepairThreshold == 0 {
pbRS . RepairThreshold = int32 ( bucket . DefaultRedundancyScheme . RepairShares )
}
if pbRS . SuccessThreshold == 0 {
pbRS . SuccessThreshold = int32 ( bucket . DefaultRedundancyScheme . OptimalShares )
}
if pbRS . Total == 0 {
pbRS . Total = int32 ( bucket . DefaultRedundancyScheme . TotalShares )
}
pbEP := req . EncryptionParameters
if pbEP . CipherSuite == 0 {
pbEP . CipherSuite = pb . CipherSuite ( bucket . DefaultEncryptionParameters . CipherSuite )
}
if pbEP . BlockSize == 0 {
pbEP . BlockSize = int64 ( bucket . DefaultEncryptionParameters . BlockSize )
}
2019-07-24 12:33:23 +01:00
streamID , err := endpoint . packStreamID ( ctx , & pb . SatStreamID {
2019-08-01 10:04:31 +01:00
Bucket : req . Bucket ,
EncryptedPath : req . EncryptedPath ,
Version : req . Version ,
Redundancy : pbRS ,
CreationDate : time . Now ( ) ,
ExpirationDate : req . ExpiresAt ,
2019-07-24 12:33:23 +01:00
} )
2019-07-16 11:39:23 +01:00
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
2019-07-16 11:39:23 +01:00
}
return & pb . ObjectBeginResponse {
Bucket : req . Bucket ,
EncryptedPath : req . EncryptedPath ,
Version : req . Version ,
StreamId : streamID ,
RedundancyScheme : pbRS ,
EncryptionParameters : pbEP ,
} , nil
}
2019-12-06 18:14:35 +00:00
// CommitObject commits an object when all its segments have already been
// committed.
2019-07-16 11:39:23 +01:00
func ( endpoint * Endpoint ) CommitObject ( ctx context . Context , req * pb . ObjectCommitRequest ) ( resp * pb . ObjectCommitResponse , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
streamID := & pb . SatStreamID { }
err = proto . Unmarshal ( req . StreamId , streamID )
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
2019-07-16 11:39:23 +01:00
}
err = signing . VerifyStreamID ( ctx , endpoint . satellite , streamID )
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . Unauthenticated , err . Error ( ) )
2019-07-16 11:39:23 +01:00
}
if streamID . CreationDate . Before ( time . Now ( ) . Add ( - satIDExpiration ) ) {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . InvalidArgument , "stream ID expired" )
2019-07-16 11:39:23 +01:00
}
2019-09-19 17:19:29 +01:00
keyInfo , err := endpoint . validateAuth ( ctx , req . Header , macaroon . Action {
2019-07-16 11:39:23 +01:00
Op : macaroon . ActionWrite ,
Bucket : streamID . Bucket ,
EncryptedPath : streamID . EncryptedPath ,
Time : time . Now ( ) ,
} )
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . Unauthenticated , err . Error ( ) )
2019-07-16 11:39:23 +01:00
}
2019-11-24 21:14:51 +00:00
streamMeta := pb . StreamMeta { }
err = proto . Unmarshal ( req . EncryptedMetadata , & streamMeta )
if err != nil {
return nil , rpcstatus . Error ( rpcstatus . InvalidArgument , "invalid metadata structure" )
}
2019-08-01 10:04:31 +01:00
2019-11-24 21:14:51 +00:00
lastSegmentIndex := streamMeta . NumberOfSegments - 1
lastSegmentPath , err := CreatePath ( ctx , keyInfo . ProjectID , lastSegmentIndex , streamID . Bucket , streamID . EncryptedPath )
if err != nil {
return nil , rpcstatus . Errorf ( rpcstatus . InvalidArgument , "unable to create segment path: %s" , err . Error ( ) )
}
2019-08-01 10:04:31 +01:00
2019-11-24 21:14:51 +00:00
lastSegmentPointerBytes , lastSegmentPointer , err := endpoint . metainfo . GetWithBytes ( ctx , lastSegmentPath )
if err != nil {
endpoint . log . Error ( "unable to get pointer" , zap . String ( "segmentPath" , lastSegmentPath ) , zap . Error ( err ) )
return nil , rpcstatus . Error ( rpcstatus . Internal , "unable to commit object" )
2019-08-01 10:04:31 +01:00
}
if lastSegmentPointer == nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Errorf ( rpcstatus . NotFound , "unable to find object: %q/%q" , streamID . Bucket , streamID . EncryptedPath )
2019-08-01 10:04:31 +01:00
}
2019-09-19 00:18:14 +01:00
if lastSegmentPointer . Remote == nil {
lastSegmentPointer . Remote = & pb . RemoteSegment { }
}
// RS is set always for last segment to emulate RS per object
lastSegmentPointer . Remote . Redundancy = streamID . Redundancy
2019-08-01 10:04:31 +01:00
lastSegmentPointer . Metadata = req . EncryptedMetadata
2019-11-06 17:02:14 +00:00
err = endpoint . metainfo . Delete ( ctx , lastSegmentPath , lastSegmentPointerBytes )
2019-08-01 10:04:31 +01:00
if err != nil {
2019-11-24 21:14:51 +00:00
endpoint . log . Error ( "unable to delete pointer" , zap . String ( "segmentPath" , lastSegmentPath ) , zap . Error ( err ) )
return nil , rpcstatus . Error ( rpcstatus . Internal , "unable to commit object" )
2019-08-01 10:04:31 +01:00
}
2019-11-24 21:14:51 +00:00
lastSegmentIndex = - 1
lastSegmentPath , err = CreatePath ( ctx , keyInfo . ProjectID , lastSegmentIndex , streamID . Bucket , streamID . EncryptedPath )
2019-08-01 10:04:31 +01:00
if err != nil {
2019-11-24 21:14:51 +00:00
endpoint . log . Error ( "unable to create path" , zap . Error ( err ) )
return nil , rpcstatus . Error ( rpcstatus . Internal , "unable to commit object" )
2019-08-01 10:04:31 +01:00
}
err = endpoint . metainfo . Put ( ctx , lastSegmentPath , lastSegmentPointer )
if err != nil {
2019-11-24 21:14:51 +00:00
endpoint . log . Error ( "unable to put pointer" , zap . Error ( err ) )
return nil , rpcstatus . Error ( rpcstatus . Internal , "unable to commit object" )
2019-08-01 10:04:31 +01:00
}
2019-07-16 11:39:23 +01:00
return & pb . ObjectCommitResponse { } , nil
}
2019-07-23 12:09:12 +01:00
// GetObject gets single object
func ( endpoint * Endpoint ) GetObject ( ctx context . Context , req * pb . ObjectGetRequest ) ( resp * pb . ObjectGetResponse , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
2019-09-19 17:19:29 +01:00
keyInfo , err := endpoint . validateAuth ( ctx , req . Header , macaroon . Action {
2019-07-23 12:09:12 +01:00
Op : macaroon . ActionRead ,
Bucket : req . Bucket ,
EncryptedPath : req . EncryptedPath ,
Time : time . Now ( ) ,
} )
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . Unauthenticated , err . Error ( ) )
2019-07-23 12:09:12 +01:00
}
err = endpoint . validateBucket ( ctx , req . Bucket )
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . InvalidArgument , err . Error ( ) )
2019-07-23 12:09:12 +01:00
}
2019-08-01 10:04:31 +01:00
pointer , _ , err := endpoint . getPointer ( ctx , keyInfo . ProjectID , - 1 , req . Bucket , req . EncryptedPath )
2019-07-23 12:09:12 +01:00
if err != nil {
2019-08-01 10:04:31 +01:00
return nil , err
2019-07-23 12:09:12 +01:00
}
2019-08-01 10:04:31 +01:00
streamMeta := & pb . StreamMeta { }
err = proto . Unmarshal ( pointer . Metadata , streamMeta )
2019-07-23 12:09:12 +01:00
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
2019-07-23 12:09:12 +01:00
}
2019-07-24 12:33:23 +01:00
streamID , err := endpoint . packStreamID ( ctx , & pb . SatStreamID {
2019-07-23 12:09:12 +01:00
Bucket : req . Bucket ,
EncryptedPath : req . EncryptedPath ,
Version : req . Version ,
CreationDate : time . Now ( ) ,
2019-07-24 12:33:23 +01:00
} )
2019-07-23 12:09:12 +01:00
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
2019-07-23 12:09:12 +01:00
}
object := & pb . Object {
Bucket : req . Bucket ,
EncryptedPath : req . EncryptedPath ,
Version : - 1 ,
StreamId : streamID ,
ExpiresAt : pointer . ExpirationDate ,
CreatedAt : pointer . CreationDate ,
EncryptedMetadata : pointer . Metadata ,
2019-08-01 10:04:31 +01:00
EncryptionParameters : & pb . EncryptionParameters {
CipherSuite : pb . CipherSuite ( streamMeta . EncryptionType ) ,
BlockSize : int64 ( streamMeta . EncryptionBlockSize ) ,
} ,
}
if pointer . Remote != nil {
object . RedundancyScheme = pointer . Remote . Redundancy
2019-09-19 00:18:14 +01:00
// NumberOfSegments == 0 - pointer with encrypted num of segments
// NumberOfSegments > 1 - pointer with unencrypted num of segments and multiple segments
} else if streamMeta . NumberOfSegments == 0 || streamMeta . NumberOfSegments > 1 {
// workaround
// The new metainfo API redundancy scheme is on object level (not per segment).
// Because of that, RS is always taken from the last segment.
// The old implementation saves RS per segment, and in some cases
// when the remote file's last segment is an inline segment, we end up
// missing an RS scheme. This loop will search for RS in segments other than the last one.
index := int64 ( 0 )
for {
2019-09-19 10:13:57 +01:00
path , err := CreatePath ( ctx , keyInfo . ProjectID , index , req . Bucket , req . EncryptedPath )
if err != nil {
endpoint . log . Error ( "unable to get pointer path" , zap . Error ( err ) )
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . Internal , "unable to get object" )
2019-09-19 10:13:57 +01:00
}
pointer , err = endpoint . metainfo . Get ( ctx , path )
2019-09-19 00:18:14 +01:00
if err != nil {
2019-12-10 20:21:30 +00:00
if storj . ErrObjectNotFound . Has ( err ) {
2019-09-19 00:18:14 +01:00
break
}
endpoint . log . Error ( "unable to get pointer" , zap . Error ( err ) )
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . Internal , "unable to get object" )
2019-09-19 00:18:14 +01:00
}
if pointer . Remote != nil {
object . RedundancyScheme = pointer . Remote . Redundancy
break
}
index ++
}
2019-07-23 12:09:12 +01:00
}
2019-12-02 14:39:19 +00:00
endpoint . log . Info ( "Get Object" , zap . Stringer ( "Project ID" , keyInfo . ProjectID ) )
2019-07-23 12:09:12 +01:00
return & pb . ObjectGetResponse {
Object : object ,
} , nil
}
2019-07-16 11:39:23 +01:00
// ListObjects list objects according to specific parameters
func ( endpoint * Endpoint ) ListObjects ( ctx context . Context , req * pb . ObjectListRequest ) ( resp * pb . ObjectListResponse , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
2019-09-19 17:19:29 +01:00
keyInfo , err := endpoint . validateAuth ( ctx , req . Header , macaroon . Action {
2019-07-16 11:39:23 +01:00
Op : macaroon . ActionList ,
Bucket : req . Bucket ,
2019-10-24 22:05:08 +01:00
EncryptedPath : req . EncryptedPrefix ,
2019-07-16 11:39:23 +01:00
Time : time . Now ( ) ,
} )
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . Unauthenticated , err . Error ( ) )
2019-07-16 11:39:23 +01:00
}
err = endpoint . validateBucket ( ctx , req . Bucket )
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . InvalidArgument , err . Error ( ) )
2019-07-16 11:39:23 +01:00
}
prefix , err := CreatePath ( ctx , keyInfo . ProjectID , - 1 , req . Bucket , req . EncryptedPrefix )
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . InvalidArgument , err . Error ( ) )
2019-07-16 11:39:23 +01:00
}
metaflags := meta . All
// TODO use flags
2019-09-25 22:30:41 +01:00
segments , more , err := endpoint . metainfo . List ( ctx , prefix , string ( req . EncryptedCursor ) , req . Recursive , req . Limit , metaflags )
2019-07-16 11:39:23 +01:00
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
2019-07-16 11:39:23 +01:00
}
items := make ( [ ] * pb . ObjectListItem , len ( segments ) )
for i , segment := range segments {
items [ i ] = & pb . ObjectListItem {
2019-08-01 10:04:31 +01:00
EncryptedPath : [ ] byte ( segment . Path ) ,
}
if segment . Pointer != nil {
items [ i ] . EncryptedMetadata = segment . Pointer . Metadata
items [ i ] . CreatedAt = segment . Pointer . CreationDate
items [ i ] . ExpiresAt = segment . Pointer . ExpirationDate
2019-07-16 11:39:23 +01:00
}
}
2019-12-02 14:39:19 +00:00
endpoint . log . Info ( "List Objects" , zap . Stringer ( "Project ID" , keyInfo . ProjectID ) )
2019-07-16 11:39:23 +01:00
return & pb . ObjectListResponse {
Items : items ,
More : more ,
} , nil
}
2019-12-10 11:15:35 +00:00
// BeginDeleteObject begins object deletion process.
2019-07-16 11:39:23 +01:00
func ( endpoint * Endpoint ) BeginDeleteObject ( ctx context . Context , req * pb . ObjectBeginDeleteRequest ) ( resp * pb . ObjectBeginDeleteResponse , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
2019-09-19 17:19:29 +01:00
keyInfo , err := endpoint . validateAuth ( ctx , req . Header , macaroon . Action {
2019-07-16 11:39:23 +01:00
Op : macaroon . ActionDelete ,
Bucket : req . Bucket ,
EncryptedPath : req . EncryptedPath ,
Time : time . Now ( ) ,
} )
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . Unauthenticated , err . Error ( ) )
2019-07-16 11:39:23 +01:00
}
err = endpoint . validateBucket ( ctx , req . Bucket )
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . InvalidArgument , err . Error ( ) )
2019-07-16 11:39:23 +01:00
}
satStreamID := & pb . SatStreamID {
Bucket : req . Bucket ,
EncryptedPath : req . EncryptedPath ,
Version : req . Version ,
CreationDate : time . Now ( ) ,
}
satStreamID , err = signing . SignStreamID ( ctx , endpoint . satellite , satStreamID )
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
2019-07-16 11:39:23 +01:00
}
encodedStreamID , err := proto . Marshal ( satStreamID )
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
2019-07-16 11:39:23 +01:00
}
streamID , err := storj . StreamIDFromBytes ( encodedStreamID )
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
2019-07-16 11:39:23 +01:00
}
2019-12-16 19:03:20 +00:00
err = endpoint . DeleteObjectPieces ( ctx , keyInfo . ProjectID , satStreamID . Bucket , satStreamID . EncryptedPath )
2019-08-01 10:04:31 +01:00
if err != nil {
return nil , err
}
2019-07-16 11:39:23 +01:00
return & pb . ObjectBeginDeleteResponse {
StreamId : streamID ,
} , nil
}
// FinishDeleteObject finishes object deletion
func ( endpoint * Endpoint ) FinishDeleteObject ( ctx context . Context , req * pb . ObjectFinishDeleteRequest ) ( resp * pb . ObjectFinishDeleteResponse , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
streamID := & pb . SatStreamID { }
err = proto . Unmarshal ( req . StreamId , streamID )
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
2019-07-16 11:39:23 +01:00
}
err = signing . VerifyStreamID ( ctx , endpoint . satellite , streamID )
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . Unauthenticated , err . Error ( ) )
2019-07-16 11:39:23 +01:00
}
if streamID . CreationDate . Before ( time . Now ( ) . Add ( - satIDExpiration ) ) {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . InvalidArgument , "stream ID expired" )
2019-07-16 11:39:23 +01:00
}
2019-09-19 17:19:29 +01:00
_ , err = endpoint . validateAuth ( ctx , req . Header , macaroon . Action {
2019-07-16 11:39:23 +01:00
Op : macaroon . ActionDelete ,
Bucket : streamID . Bucket ,
EncryptedPath : streamID . EncryptedPath ,
Time : time . Now ( ) ,
} )
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . Unauthenticated , err . Error ( ) )
2019-07-16 11:39:23 +01:00
}
// we don't need to do anything for shim implementation
return & pb . ObjectFinishDeleteResponse { } , nil
}
2019-07-22 15:45:18 +01:00
// BeginSegment begins segment uploading
func ( endpoint * Endpoint ) BeginSegment ( ctx context . Context , req * pb . SegmentBeginRequest ) ( resp * pb . SegmentBeginResponse , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
streamID , err := endpoint . unmarshalSatStreamID ( ctx , req . StreamId )
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . InvalidArgument , err . Error ( ) )
2019-07-22 15:45:18 +01:00
}
2019-09-19 17:19:29 +01:00
keyInfo , err := endpoint . validateAuth ( ctx , req . Header , macaroon . Action {
2019-07-22 15:45:18 +01:00
Op : macaroon . ActionWrite ,
Bucket : streamID . Bucket ,
EncryptedPath : streamID . EncryptedPath ,
Time : time . Now ( ) ,
} )
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . Unauthenticated , err . Error ( ) )
2019-07-22 15:45:18 +01:00
}
2019-07-24 12:33:23 +01:00
// no need to validate streamID fields because it was validated during BeginObject
2019-08-01 10:04:31 +01:00
if req . Position . Index < 0 {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . InvalidArgument , "segment index must be greater then 0" )
2019-08-01 10:04:31 +01:00
}
2019-07-24 12:33:23 +01:00
exceeded , limit , err := endpoint . projectUsage . ExceedsStorageUsage ( ctx , keyInfo . ProjectID )
if err != nil {
endpoint . log . Error ( "retrieving project storage totals" , zap . Error ( err ) )
}
if exceeded {
endpoint . log . Sugar ( ) . Errorf ( "monthly project limits are %s of storage and bandwidth usage. This limit has been exceeded for storage for projectID %s" ,
limit , keyInfo . ProjectID ,
)
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . ResourceExhausted , "Exceeded Usage Limit" )
2019-07-24 12:33:23 +01:00
}
redundancy , err := eestream . NewRedundancyStrategyFromProto ( streamID . Redundancy )
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . InvalidArgument , err . Error ( ) )
2019-07-24 12:33:23 +01:00
}
maxPieceSize := eestream . CalcPieceSize ( req . MaxOrderLimit , redundancy )
2019-07-22 15:45:18 +01:00
2019-07-24 12:33:23 +01:00
request := overlay . FindStorageNodesRequest {
RequestedCount : redundancy . TotalCount ( ) ,
FreeBandwidth : maxPieceSize ,
FreeDisk : maxPieceSize ,
}
2019-08-06 17:35:59 +01:00
nodes , err := endpoint . overlay . FindStorageNodes ( ctx , request )
2019-07-24 12:33:23 +01:00
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
2019-07-24 12:33:23 +01:00
}
bucketID := createBucketID ( keyInfo . ProjectID , streamID . Bucket )
rootPieceID , addressedLimits , piecePrivateKey , err := endpoint . orders . CreatePutOrderLimits ( ctx , bucketID , nodes , streamID . ExpirationDate , maxPieceSize )
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
2019-07-24 12:33:23 +01:00
}
segmentID , err := endpoint . packSegmentID ( ctx , & pb . SatSegmentID {
StreamId : streamID ,
2019-08-01 10:04:31 +01:00
Index : req . Position . Index ,
2019-07-24 12:33:23 +01:00
OriginalOrderLimits : addressedLimits ,
RootPieceId : rootPieceID ,
CreationDate : time . Now ( ) ,
} )
2019-12-02 14:39:19 +00:00
endpoint . log . Info ( "Segment Upload" , zap . Stringer ( "Project ID" , keyInfo . ProjectID ) )
2019-07-24 12:33:23 +01:00
return & pb . SegmentBeginResponse {
SegmentId : segmentID ,
AddressedLimits : addressedLimits ,
PrivateKey : piecePrivateKey ,
} , nil
2019-07-22 15:45:18 +01:00
}
// CommitSegment commits segment after uploading
func ( endpoint * Endpoint ) CommitSegment ( ctx context . Context , req * pb . SegmentCommitRequest ) ( resp * pb . SegmentCommitResponse , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
segmentID , err := endpoint . unmarshalSatSegmentID ( ctx , req . SegmentId )
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . InvalidArgument , err . Error ( ) )
2019-07-22 15:45:18 +01:00
}
streamID := segmentID . StreamId
2019-09-19 17:19:29 +01:00
keyInfo , err := endpoint . validateAuth ( ctx , req . Header , macaroon . Action {
2019-07-22 15:45:18 +01:00
Op : macaroon . ActionWrite ,
Bucket : streamID . Bucket ,
EncryptedPath : streamID . EncryptedPath ,
Time : time . Now ( ) ,
} )
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . Unauthenticated , err . Error ( ) )
2019-07-22 15:45:18 +01:00
}
2019-10-17 19:01:40 +01:00
if numResults := len ( req . UploadResult ) ; numResults < int ( streamID . Redundancy . GetSuccessThreshold ( ) ) {
endpoint . log . Debug ( "the results of uploaded pieces for the segment is below the redundancy optimal threshold" ,
zap . Int ( "upload pieces results" , numResults ) ,
zap . Int32 ( "redundancy optimal threshold" , streamID . Redundancy . GetSuccessThreshold ( ) ) ,
2019-11-05 21:04:07 +00:00
zap . Stringer ( "Segment ID" , req . SegmentId ) ,
2019-10-17 19:01:40 +01:00
)
return nil , rpcstatus . Errorf ( rpcstatus . InvalidArgument ,
"the number of results of uploaded pieces (%d) is below the optimal threshold (%d)" ,
numResults , streamID . Redundancy . GetSuccessThreshold ( ) ,
)
}
2019-07-24 12:33:23 +01:00
pieces := make ( [ ] * pb . RemotePiece , len ( req . UploadResult ) )
for i , result := range req . UploadResult {
pieces [ i ] = & pb . RemotePiece {
PieceNum : result . PieceNum ,
NodeId : result . NodeId ,
Hash : result . Hash ,
}
}
remote := & pb . RemoteSegment {
Redundancy : streamID . Redundancy ,
RootPieceId : segmentID . RootPieceId ,
RemotePieces : pieces ,
}
2019-08-15 12:45:49 +01:00
metadata , err := proto . Marshal ( & pb . SegmentMeta {
EncryptedKey : req . EncryptedKey ,
KeyNonce : req . EncryptedKeyNonce . Bytes ( ) ,
} )
if err != nil {
endpoint . log . Error ( "unable to marshal segment metadata" , zap . Error ( err ) )
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
2019-08-15 12:45:49 +01:00
}
2019-07-24 12:33:23 +01:00
pointer := & pb . Pointer {
Type : pb . Pointer_REMOTE ,
Remote : remote ,
SegmentSize : req . SizeEncryptedData ,
CreationDate : streamID . CreationDate ,
ExpirationDate : streamID . ExpirationDate ,
2019-08-15 12:45:49 +01:00
Metadata : metadata ,
2019-09-18 14:50:33 +01:00
PieceHashesVerified : true ,
2019-07-24 12:33:23 +01:00
}
orderLimits := make ( [ ] * pb . OrderLimit , len ( segmentID . OriginalOrderLimits ) )
for i , orderLimit := range segmentID . OriginalOrderLimits {
orderLimits [ i ] = orderLimit . Limit
}
err = endpoint . validatePointer ( ctx , pointer , orderLimits )
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . InvalidArgument , err . Error ( ) )
2019-07-24 12:33:23 +01:00
}
err = endpoint . filterValidPieces ( ctx , pointer , orderLimits )
if err != nil {
2019-10-17 19:01:40 +01:00
return nil , err
2019-07-24 12:33:23 +01:00
}
path , err := CreatePath ( ctx , keyInfo . ProjectID , int64 ( segmentID . Index ) , streamID . Bucket , streamID . EncryptedPath )
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . InvalidArgument , err . Error ( ) )
2019-07-24 12:33:23 +01:00
}
exceeded , limit , err := endpoint . projectUsage . ExceedsStorageUsage ( ctx , keyInfo . ProjectID )
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
2019-07-24 12:33:23 +01:00
}
if exceeded {
2019-10-17 19:01:40 +01:00
endpoint . log . Error ( "The project limit of storage and bandwidth has been exceeded" ,
zap . Int64 ( "limit" , limit . Int64 ( ) ) ,
2019-11-05 21:04:07 +00:00
zap . Stringer ( "Project ID" , keyInfo . ProjectID ) ,
2019-07-24 12:33:23 +01:00
)
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . ResourceExhausted , "Exceeded Usage Limit" )
2019-07-24 12:33:23 +01:00
}
2019-08-01 10:04:31 +01:00
// clear hashes so we don't store them
for _ , piece := range pointer . GetRemote ( ) . GetRemotePieces ( ) {
piece . Hash = nil
}
2019-07-24 12:33:23 +01:00
inlineUsed , remoteUsed := calculateSpaceUsed ( pointer )
// ToDo: Replace with hash & signature validation
// Ensure neither uplink or storage nodes are cheating on us
if pointer . Type == pb . Pointer_REMOTE {
//We cannot have more redundancy than total/min
if float64 ( remoteUsed ) > ( float64 ( pointer . SegmentSize ) / float64 ( pointer . Remote . Redundancy . MinReq ) ) * float64 ( pointer . Remote . Redundancy . Total ) {
2019-10-17 19:01:40 +01:00
endpoint . log . Debug ( "data size mismatch" ,
zap . Int64 ( "segment" , pointer . SegmentSize ) ,
zap . Int64 ( "pieces" , remoteUsed ) ,
zap . Int32 ( "redundancy minimum requested" , pointer . Remote . Redundancy . MinReq ) ,
zap . Int32 ( "redundancy total" , pointer . Remote . Redundancy . Total ) ,
)
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . InvalidArgument , "mismatched segment size and piece usage" )
2019-07-24 12:33:23 +01:00
}
}
if err := endpoint . projectUsage . AddProjectStorageUsage ( ctx , keyInfo . ProjectID , inlineUsed , remoteUsed ) ; err != nil {
2019-10-17 19:01:40 +01:00
endpoint . log . Error ( "Could not track new storage usage by project" ,
2019-11-05 21:04:07 +00:00
zap . Stringer ( "Project ID" , keyInfo . ProjectID ) ,
2019-10-17 19:01:40 +01:00
zap . Error ( err ) ,
)
2019-07-24 12:33:23 +01:00
// but continue. it's most likely our own fault that we couldn't track it, and the only thing
// that will be affected is our per-project bandwidth and storage limits.
}
err = endpoint . metainfo . Put ( ctx , path , pointer )
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
2019-07-24 12:33:23 +01:00
}
2019-07-22 15:45:18 +01:00
2019-09-18 14:50:33 +01:00
return & pb . SegmentCommitResponse {
SuccessfulPieces : int32 ( len ( pointer . Remote . RemotePieces ) ) ,
} , nil
2019-07-22 15:45:18 +01:00
}
// MakeInlineSegment makes inline segment on satellite
func ( endpoint * Endpoint ) MakeInlineSegment ( ctx context . Context , req * pb . SegmentMakeInlineRequest ) ( resp * pb . SegmentMakeInlineResponse , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
streamID , err := endpoint . unmarshalSatStreamID ( ctx , req . StreamId )
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . InvalidArgument , err . Error ( ) )
2019-07-22 15:45:18 +01:00
}
2019-09-19 17:19:29 +01:00
keyInfo , err := endpoint . validateAuth ( ctx , req . Header , macaroon . Action {
2019-07-22 15:45:18 +01:00
Op : macaroon . ActionWrite ,
Bucket : streamID . Bucket ,
EncryptedPath : streamID . EncryptedPath ,
Time : time . Now ( ) ,
} )
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . Unauthenticated , err . Error ( ) )
2019-07-22 15:45:18 +01:00
}
2019-08-01 10:04:31 +01:00
if req . Position . Index < 0 {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . InvalidArgument , "segment index must be greater then 0" )
2019-08-01 10:04:31 +01:00
}
2019-07-24 12:33:23 +01:00
path , err := CreatePath ( ctx , keyInfo . ProjectID , int64 ( req . Position . Index ) , streamID . Bucket , streamID . EncryptedPath )
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . InvalidArgument , err . Error ( ) )
2019-07-24 12:33:23 +01:00
}
exceeded , limit , err := endpoint . projectUsage . ExceedsStorageUsage ( ctx , keyInfo . ProjectID )
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
2019-07-24 12:33:23 +01:00
}
if exceeded {
endpoint . log . Sugar ( ) . Errorf ( "monthly project limits are %s of storage and bandwidth usage. This limit has been exceeded for storage for projectID %s." ,
limit , keyInfo . ProjectID ,
)
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . ResourceExhausted , "Exceeded Usage Limit" )
2019-07-24 12:33:23 +01:00
}
inlineUsed := int64 ( len ( req . EncryptedInlineData ) )
if err := endpoint . projectUsage . AddProjectStorageUsage ( ctx , keyInfo . ProjectID , inlineUsed , 0 ) ; err != nil {
endpoint . log . Sugar ( ) . Errorf ( "Could not track new storage usage by project %v: %v" , keyInfo . ProjectID , err )
// but continue. it's most likely our own fault that we couldn't track it, and the only thing
// that will be affected is our per-project bandwidth and storage limits.
}
2019-09-19 00:18:14 +01:00
metadata , err := proto . Marshal ( & pb . SegmentMeta {
EncryptedKey : req . EncryptedKey ,
KeyNonce : req . EncryptedKeyNonce . Bytes ( ) ,
} )
2019-07-24 12:33:23 +01:00
pointer := & pb . Pointer {
Type : pb . Pointer_INLINE ,
SegmentSize : inlineUsed ,
CreationDate : streamID . CreationDate ,
ExpirationDate : streamID . ExpirationDate ,
InlineSegment : req . EncryptedInlineData ,
2019-09-19 00:18:14 +01:00
Metadata : metadata ,
2019-07-24 12:33:23 +01:00
}
err = endpoint . metainfo . Put ( ctx , path , pointer )
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
2019-07-24 12:33:23 +01:00
}
err = endpoint . orders . UpdatePutInlineOrder ( ctx , keyInfo . ProjectID , streamID . Bucket , inlineUsed )
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
2019-07-24 12:33:23 +01:00
}
2019-07-22 15:45:18 +01:00
2019-12-02 14:39:19 +00:00
endpoint . log . Info ( "Make Inline Segment" , zap . Stringer ( "Project ID" , keyInfo . ProjectID ) )
2019-07-24 12:33:23 +01:00
return & pb . SegmentMakeInlineResponse { } , nil
2019-07-22 15:45:18 +01:00
}
// BeginDeleteSegment begins segment deletion process
func ( endpoint * Endpoint ) BeginDeleteSegment ( ctx context . Context , req * pb . SegmentBeginDeleteRequest ) ( resp * pb . SegmentBeginDeleteResponse , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
streamID , err := endpoint . unmarshalSatStreamID ( ctx , req . StreamId )
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . InvalidArgument , err . Error ( ) )
2019-07-22 15:45:18 +01:00
}
2019-09-19 17:19:29 +01:00
keyInfo , err := endpoint . validateAuth ( ctx , req . Header , macaroon . Action {
2019-07-22 15:45:18 +01:00
Op : macaroon . ActionDelete ,
Bucket : streamID . Bucket ,
EncryptedPath : streamID . EncryptedPath ,
Time : time . Now ( ) ,
} )
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . Unauthenticated , err . Error ( ) )
2019-07-22 15:45:18 +01:00
}
2019-09-23 22:41:58 +01:00
pointer , path , err := endpoint . getPointer ( ctx , keyInfo . ProjectID , int64 ( req . Position . Index ) , streamID . Bucket , streamID . EncryptedPath )
2019-07-24 12:33:23 +01:00
if err != nil {
2019-08-01 10:04:31 +01:00
return nil , err
2019-07-24 12:33:23 +01:00
}
var limits [ ] * pb . AddressedOrderLimit
2019-08-01 10:04:31 +01:00
var privateKey storj . PiecePrivateKey
2019-07-24 12:33:23 +01:00
if pointer . Type == pb . Pointer_REMOTE && pointer . Remote != nil {
bucketID := createBucketID ( keyInfo . ProjectID , streamID . Bucket )
2019-08-01 10:04:31 +01:00
limits , privateKey , err = endpoint . orders . CreateDeleteOrderLimits ( ctx , bucketID , pointer )
2019-07-24 12:33:23 +01:00
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
2019-07-24 12:33:23 +01:00
}
}
2019-09-23 22:41:58 +01:00
// moved from FinishDeleteSegment to avoid inconsistency if someone will not
// call FinishDeleteSegment on uplink side
2019-11-06 17:02:14 +00:00
err = endpoint . metainfo . UnsynchronizedDelete ( ctx , path )
2019-09-23 22:41:58 +01:00
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
2019-09-23 22:41:58 +01:00
}
2019-07-24 12:33:23 +01:00
segmentID , err := endpoint . packSegmentID ( ctx , & pb . SatSegmentID {
StreamId : streamID ,
OriginalOrderLimits : limits ,
Index : req . Position . Index ,
CreationDate : time . Now ( ) ,
} )
2019-12-02 14:39:19 +00:00
endpoint . log . Info ( "Delete Segment" , zap . Stringer ( "Project ID" , keyInfo . ProjectID ) )
2019-07-24 12:33:23 +01:00
return & pb . SegmentBeginDeleteResponse {
SegmentId : segmentID ,
AddressedLimits : limits ,
2019-08-01 10:04:31 +01:00
PrivateKey : privateKey ,
2019-07-24 12:33:23 +01:00
} , nil
2019-07-22 15:45:18 +01:00
}
// FinishDeleteSegment finishes segment deletion process
func ( endpoint * Endpoint ) FinishDeleteSegment ( ctx context . Context , req * pb . SegmentFinishDeleteRequest ) ( resp * pb . SegmentFinishDeleteResponse , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
segmentID , err := endpoint . unmarshalSatSegmentID ( ctx , req . SegmentId )
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . InvalidArgument , err . Error ( ) )
2019-07-22 15:45:18 +01:00
}
streamID := segmentID . StreamId
2019-09-23 22:41:58 +01:00
_ , err = endpoint . validateAuth ( ctx , req . Header , macaroon . Action {
2019-07-22 15:45:18 +01:00
Op : macaroon . ActionDelete ,
Bucket : streamID . Bucket ,
EncryptedPath : streamID . EncryptedPath ,
Time : time . Now ( ) ,
} )
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . Unauthenticated , err . Error ( ) )
2019-07-22 15:45:18 +01:00
}
2019-09-23 22:41:58 +01:00
// at the moment logic is in BeginDeleteSegment
2019-07-22 15:45:18 +01:00
2019-07-24 12:33:23 +01:00
return & pb . SegmentFinishDeleteResponse { } , nil
2019-07-22 15:45:18 +01:00
}
2019-07-24 12:33:23 +01:00
// ListSegments list object segments
2019-07-22 15:45:18 +01:00
func ( endpoint * Endpoint ) ListSegments ( ctx context . Context , req * pb . SegmentListRequest ) ( resp * pb . SegmentListResponse , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
streamID , err := endpoint . unmarshalSatStreamID ( ctx , req . StreamId )
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . InvalidArgument , err . Error ( ) )
2019-07-22 15:45:18 +01:00
}
2019-09-19 17:19:29 +01:00
keyInfo , err := endpoint . validateAuth ( ctx , req . Header , macaroon . Action {
2019-07-22 15:45:18 +01:00
Op : macaroon . ActionList ,
Bucket : streamID . Bucket ,
EncryptedPath : streamID . EncryptedPath ,
Time : time . Now ( ) ,
} )
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . Unauthenticated , err . Error ( ) )
2019-07-22 15:45:18 +01:00
}
2019-07-24 12:33:23 +01:00
limit := req . Limit
if limit == 0 || limit > listLimit {
limit = listLimit
}
2019-12-10 11:15:35 +00:00
pointer , _ , err := endpoint . getPointer ( ctx , keyInfo . ProjectID , lastSegment , streamID . Bucket , streamID . EncryptedPath )
2019-08-30 22:30:18 +01:00
if err != nil {
2019-12-16 19:03:20 +00:00
if rpcstatus . Code ( err ) == rpcstatus . NotFound {
return & pb . SegmentListResponse { } , nil
}
2019-12-10 11:15:35 +00:00
return nil , err
2019-08-30 22:30:18 +01:00
}
streamMeta := & pb . StreamMeta { }
err = proto . Unmarshal ( pointer . Metadata , streamMeta )
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
2019-08-30 22:30:18 +01:00
}
2019-12-02 14:39:19 +00:00
endpoint . log . Info ( "List Segments" , zap . Stringer ( "Project ID" , keyInfo . ProjectID ) )
2019-08-30 22:30:18 +01:00
if streamMeta . NumberOfSegments > 0 {
// use unencrypted number of segments
// TODO cleanup int32 vs int64
2019-12-09 12:18:54 +00:00
return endpoint . listSegmentsFromNumberOfSegments ( ctx , int32 ( streamMeta . NumberOfSegments ) , req . CursorPosition . Index , limit )
2019-08-30 22:30:18 +01:00
}
// list segments by requesting each segment from cursor index to n until n segment is not found
return endpoint . listSegmentsManually ( ctx , keyInfo . ProjectID , streamID , req . CursorPosition . Index , limit )
}
2019-12-09 12:18:54 +00:00
func ( endpoint * Endpoint ) listSegmentsFromNumberOfSegments ( ctx context . Context , numberOfSegments , cursorIndex , limit int32 ) ( resp * pb . SegmentListResponse , err error ) {
if numberOfSegments <= 0 {
endpoint . log . Error (
"Invalid number of segments; this function requires the value to be greater than 0" ,
zap . Int32 ( "numberOfSegments" , numberOfSegments ) ,
)
return nil , rpcstatus . Error ( rpcstatus . Internal , "unable to list segments" )
}
2019-08-30 22:30:18 +01:00
2019-12-09 12:18:54 +00:00
if cursorIndex > numberOfSegments {
endpoint . log . Error (
"Invalid number cursor index; the index cannot be greater than the total number of segments" ,
zap . Int32 ( "numberOfSegments" , numberOfSegments ) ,
zap . Int32 ( "cursorIndex" , cursorIndex ) ,
)
return nil , rpcstatus . Error ( rpcstatus . Internal , "unable to list segments" )
}
2019-08-30 22:30:18 +01:00
2019-12-09 12:18:54 +00:00
numberOfSegments -= cursorIndex
var (
segmentItems = make ( [ ] * pb . SegmentListItem , 0 )
more = false
)
2019-08-30 22:30:18 +01:00
if numberOfSegments > 0 {
2019-12-09 12:18:54 +00:00
segmentItems = make ( [ ] * pb . SegmentListItem , 0 , int ( numberOfSegments ) )
2019-08-30 22:30:18 +01:00
if numberOfSegments > limit {
more = true
numberOfSegments = limit
} else {
// remove last segment to avoid if statements in loop to detect last segment,
// last segment will be added manually at the end of this block
numberOfSegments --
}
2019-12-09 12:18:54 +00:00
2019-08-30 22:30:18 +01:00
for index := int32 ( 0 ) ; index < numberOfSegments ; index ++ {
segmentItems = append ( segmentItems , & pb . SegmentListItem {
Position : & pb . SegmentPosition {
Index : index + cursorIndex ,
} ,
} )
}
2019-12-09 12:18:54 +00:00
2019-08-30 22:30:18 +01:00
if ! more {
// last segment is always the last one
segmentItems = append ( segmentItems , & pb . SegmentListItem {
Position : & pb . SegmentPosition {
Index : lastSegment ,
} ,
} )
}
}
return & pb . SegmentListResponse {
Items : segmentItems ,
More : more ,
} , nil
}
2019-12-10 14:31:46 +00:00
// listSegmentManually lists the segments that belongs to projectID and streamID
// from the cursorIndex up to the limit. It stops before the limit when
// cursorIndex + n returns a not found pointer.
//
// limit must be greater than 0 and cursorIndex greater than or equal than 0,
// otherwise an error is returned.
func ( endpoint * Endpoint ) listSegmentsManually ( ctx context . Context , projectID uuid . UUID , streamID * pb . SatStreamID , cursorIndex , limit int32 ) ( resp * pb . SegmentListResponse , err error ) {
if limit <= 0 {
return nil , rpcstatus . Errorf (
rpcstatus . InvalidArgument , "invalid limit, cannot be 0 or negative. Got %d" , limit ,
)
}
2019-08-30 22:30:18 +01:00
2019-12-10 14:31:46 +00:00
index := int64 ( cursorIndex )
2019-07-24 12:33:23 +01:00
segmentItems := make ( [ ] * pb . SegmentListItem , 0 )
2019-08-30 22:30:18 +01:00
more := false
2019-07-24 12:33:23 +01:00
for {
2019-12-10 14:31:46 +00:00
_ , _ , err := endpoint . getPointer ( ctx , projectID , index , streamID . Bucket , streamID . EncryptedPath )
2019-07-24 12:33:23 +01:00
if err != nil {
2019-12-10 14:31:46 +00:00
if rpcstatus . Code ( err ) != rpcstatus . NotFound {
return nil , err
2019-07-24 12:33:23 +01:00
}
2019-12-10 14:31:46 +00:00
break
2019-07-24 12:33:23 +01:00
}
2019-12-10 14:31:46 +00:00
2019-08-30 22:30:18 +01:00
if limit == int32 ( len ( segmentItems ) ) {
2019-07-24 12:33:23 +01:00
more = true
break
}
segmentItems = append ( segmentItems , & pb . SegmentListItem {
Position : & pb . SegmentPosition {
Index : int32 ( index ) ,
} ,
} )
index ++
2019-08-30 22:30:18 +01:00
}
if limit > int32 ( len ( segmentItems ) ) {
segmentItems = append ( segmentItems , & pb . SegmentListItem {
Position : & pb . SegmentPosition {
Index : lastSegment ,
} ,
} )
} else {
more = true
2019-07-24 12:33:23 +01:00
}
2019-07-22 15:45:18 +01:00
2019-07-24 12:33:23 +01:00
return & pb . SegmentListResponse {
Items : segmentItems ,
More : more ,
} , nil
2019-07-22 15:45:18 +01:00
}
// DownloadSegment returns data necessary to download segment
func ( endpoint * Endpoint ) DownloadSegment ( ctx context . Context , req * pb . SegmentDownloadRequest ) ( resp * pb . SegmentDownloadResponse , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
streamID , err := endpoint . unmarshalSatStreamID ( ctx , req . StreamId )
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . InvalidArgument , err . Error ( ) )
2019-07-22 15:45:18 +01:00
}
2019-09-19 17:19:29 +01:00
keyInfo , err := endpoint . validateAuth ( ctx , req . Header , macaroon . Action {
2019-07-22 15:45:18 +01:00
Op : macaroon . ActionRead ,
Bucket : streamID . Bucket ,
EncryptedPath : streamID . EncryptedPath ,
Time : time . Now ( ) ,
} )
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . Unauthenticated , err . Error ( ) )
2019-07-22 15:45:18 +01:00
}
2019-07-24 12:33:23 +01:00
bucketID := createBucketID ( keyInfo . ProjectID , streamID . Bucket )
exceeded , limit , err := endpoint . projectUsage . ExceedsBandwidthUsage ( ctx , keyInfo . ProjectID , bucketID )
if err != nil {
endpoint . log . Error ( "retrieving project bandwidth total" , zap . Error ( err ) )
}
if exceeded {
endpoint . log . Sugar ( ) . Errorf ( "monthly project limits are %s of storage and bandwidth usage. This limit has been exceeded for bandwidth for projectID %s." ,
limit , keyInfo . ProjectID ,
)
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . ResourceExhausted , "Exceeded Usage Limit" )
2019-07-24 12:33:23 +01:00
}
2019-08-01 10:04:31 +01:00
pointer , _ , err := endpoint . getPointer ( ctx , keyInfo . ProjectID , int64 ( req . CursorPosition . Index ) , streamID . Bucket , streamID . EncryptedPath )
2019-07-24 12:33:23 +01:00
if err != nil {
2019-08-01 10:04:31 +01:00
return nil , err
2019-07-24 12:33:23 +01:00
}
2019-08-01 10:04:31 +01:00
segmentID , err := endpoint . packSegmentID ( ctx , & pb . SatSegmentID { } )
var encryptedKeyNonce storj . Nonce
var encryptedKey [ ] byte
if len ( pointer . Metadata ) != 0 {
2019-08-29 09:00:20 +01:00
var segmentMeta * pb . SegmentMeta
2019-08-01 10:04:31 +01:00
if req . CursorPosition . Index == lastSegment {
streamMeta := & pb . StreamMeta { }
err = proto . Unmarshal ( pointer . Metadata , streamMeta )
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
2019-08-01 10:04:31 +01:00
}
2019-08-29 09:00:20 +01:00
segmentMeta = streamMeta . LastSegmentMeta
2019-08-01 10:04:31 +01:00
} else {
2019-09-19 00:18:14 +01:00
segmentMeta = & pb . SegmentMeta { }
2019-08-29 09:00:20 +01:00
err = proto . Unmarshal ( pointer . Metadata , segmentMeta )
2019-08-01 10:04:31 +01:00
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
2019-08-01 10:04:31 +01:00
}
2019-07-24 12:33:23 +01:00
}
2019-08-29 09:00:20 +01:00
if segmentMeta != nil {
encryptedKeyNonce , err = storj . NonceFromBytes ( segmentMeta . KeyNonce )
if err != nil {
endpoint . log . Error ( "unable to get encryption key nonce from metadata" , zap . Error ( err ) )
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
2019-08-29 09:00:20 +01:00
}
2019-07-24 12:33:23 +01:00
2019-08-29 09:00:20 +01:00
encryptedKey = segmentMeta . EncryptedKey
2019-08-01 10:04:31 +01:00
}
}
2019-07-24 12:33:23 +01:00
if pointer . Type == pb . Pointer_INLINE {
err := endpoint . orders . UpdateGetInlineOrder ( ctx , keyInfo . ProjectID , streamID . Bucket , int64 ( len ( pointer . InlineSegment ) ) )
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
2019-07-24 12:33:23 +01:00
}
2019-12-02 14:39:19 +00:00
endpoint . log . Info ( "Download Segment" , zap . Stringer ( "Project ID" , keyInfo . ProjectID ) )
2019-07-24 12:33:23 +01:00
return & pb . SegmentDownloadResponse {
SegmentId : segmentID ,
2019-08-01 10:04:31 +01:00
SegmentSize : pointer . SegmentSize ,
2019-07-24 12:33:23 +01:00
EncryptedInlineData : pointer . InlineSegment ,
2019-08-01 10:04:31 +01:00
EncryptedKeyNonce : encryptedKeyNonce ,
EncryptedKey : encryptedKey ,
2019-07-24 12:33:23 +01:00
} , nil
} else if pointer . Type == pb . Pointer_REMOTE && pointer . Remote != nil {
2019-08-01 10:04:31 +01:00
limits , privateKey , err := endpoint . orders . CreateGetOrderLimits ( ctx , bucketID , pointer )
2019-07-24 12:33:23 +01:00
if err != nil {
2019-12-04 21:24:36 +00:00
if orders . ErrDownloadFailedNotEnoughPieces . Has ( err ) {
endpoint . log . Sugar ( ) . Errorf ( "unable to create order limits for project id %s from api key id %s: %v." , keyInfo . ProjectID . String ( ) , keyInfo . ID . String ( ) , zap . Error ( err ) )
}
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
2019-07-24 12:33:23 +01:00
}
2019-08-01 10:04:31 +01:00
limits = sortLimits ( limits , pointer )
// workaround to avoid sending nil values on top level
for i := range limits {
if limits [ i ] == nil {
limits [ i ] = & pb . AddressedOrderLimit { }
}
}
2019-12-02 14:39:19 +00:00
endpoint . log . Info ( "Download Segment" , zap . Stringer ( "Project ID" , keyInfo . ProjectID ) )
2019-07-24 12:33:23 +01:00
return & pb . SegmentDownloadResponse {
SegmentId : segmentID ,
AddressedLimits : limits ,
2019-08-01 10:04:31 +01:00
PrivateKey : privateKey ,
SegmentSize : pointer . SegmentSize ,
EncryptedKeyNonce : encryptedKeyNonce ,
EncryptedKey : encryptedKey ,
2019-07-24 12:33:23 +01:00
} , nil
}
2019-09-19 05:46:39 +01:00
return & pb . SegmentDownloadResponse { } , rpcstatus . Error ( rpcstatus . Internal , "invalid type of pointer" )
2019-07-24 12:33:23 +01:00
}
2019-12-10 11:15:35 +00:00
// getPointer returns the pointer and the segment path projectID, bucket and
// encryptedPath. It returns an error with a specific RPC status.
2019-12-11 18:46:41 +00:00
func ( endpoint * Endpoint ) getPointer (
ctx context . Context , projectID uuid . UUID , segmentIndex int64 , bucket , encryptedPath [ ] byte ,
) ( _ * pb . Pointer , _ string , err error ) {
defer mon . Task ( ) ( & ctx , projectID . String ( ) , segmentIndex , bucket , encryptedPath ) ( & err )
2019-08-01 10:04:31 +01:00
path , err := CreatePath ( ctx , projectID , segmentIndex , bucket , encryptedPath )
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , "" , rpcstatus . Error ( rpcstatus . InvalidArgument , err . Error ( ) )
2019-08-01 10:04:31 +01:00
}
pointer , err := endpoint . metainfo . Get ( ctx , path )
if err != nil {
2019-12-10 20:21:30 +00:00
if storj . ErrObjectNotFound . Has ( err ) {
2019-09-19 05:46:39 +01:00
return nil , "" , rpcstatus . Error ( rpcstatus . NotFound , err . Error ( ) )
2019-08-01 10:04:31 +01:00
}
2019-12-10 11:15:35 +00:00
endpoint . log . Error ( "error getting the pointer from metainfo service" , zap . Error ( err ) )
2019-09-19 05:46:39 +01:00
return nil , "" , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
2019-08-01 10:04:31 +01:00
}
return pointer , path , nil
}
2019-12-11 17:44:13 +00:00
// getObjectNumberOfSegments returns the number of segments of the indicated
// object by projectID, bucket and encryptedPath.
//
// It returns 0 if the number is unknown.
func ( endpoint * Endpoint ) getObjectNumberOfSegments ( ctx context . Context , projectID uuid . UUID , bucket , encryptedPath [ ] byte ) ( _ int64 , err error ) {
defer mon . Task ( ) ( & ctx , projectID . String ( ) , bucket , encryptedPath ) ( & err )
pointer , _ , err := endpoint . getPointer ( ctx , projectID , lastSegment , bucket , encryptedPath )
if err != nil {
return 0 , err
}
meta := & pb . StreamMeta { }
err = proto . Unmarshal ( pointer . Metadata , meta )
if err != nil {
endpoint . log . Error ( "error unmarshaling pointer metadata" , zap . Error ( err ) )
return 0 , rpcstatus . Error ( rpcstatus . Internal , "unable to unmarshal metadata" )
}
return meta . NumberOfSegments , nil
}
2019-08-01 10:04:31 +01:00
// sortLimits sorts order limits and fill missing ones with nil values
func sortLimits ( limits [ ] * pb . AddressedOrderLimit , pointer * pb . Pointer ) [ ] * pb . AddressedOrderLimit {
sorted := make ( [ ] * pb . AddressedOrderLimit , pointer . GetRemote ( ) . GetRedundancy ( ) . GetTotal ( ) )
for _ , piece := range pointer . GetRemote ( ) . GetRemotePieces ( ) {
sorted [ piece . GetPieceNum ( ) ] = getLimitByStorageNodeID ( limits , piece . NodeId )
}
return sorted
}
func getLimitByStorageNodeID ( limits [ ] * pb . AddressedOrderLimit , storageNodeID storj . NodeID ) * pb . AddressedOrderLimit {
for _ , limit := range limits {
if limit . GetLimit ( ) . StorageNodeId == storageNodeID {
return limit
}
}
return nil
}
2019-07-24 12:33:23 +01:00
func ( endpoint * Endpoint ) packStreamID ( ctx context . Context , satStreamID * pb . SatStreamID ) ( streamID storj . StreamID , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
2019-07-22 15:45:18 +01:00
2019-07-24 12:33:23 +01:00
signedStreamID , err := signing . SignStreamID ( ctx , endpoint . satellite , satStreamID )
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
2019-07-24 12:33:23 +01:00
}
encodedStreamID , err := proto . Marshal ( signedStreamID )
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
2019-07-24 12:33:23 +01:00
}
streamID , err = storj . StreamIDFromBytes ( encodedStreamID )
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
2019-07-24 12:33:23 +01:00
}
return streamID , nil
2019-07-22 15:45:18 +01:00
}
2019-07-24 12:33:23 +01:00
func ( endpoint * Endpoint ) packSegmentID ( ctx context . Context , satSegmentID * pb . SatSegmentID ) ( segmentID storj . SegmentID , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
signedSegmentID , err := signing . SignSegmentID ( ctx , endpoint . satellite , satSegmentID )
if err != nil {
return nil , err
}
encodedSegmentID , err := proto . Marshal ( signedSegmentID )
if err != nil {
return nil , err
}
segmentID , err = storj . SegmentIDFromBytes ( encodedSegmentID )
if err != nil {
return nil , err
}
return segmentID , nil
}
func ( endpoint * Endpoint ) unmarshalSatStreamID ( ctx context . Context , streamID storj . StreamID ) ( _ * pb . SatStreamID , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
2019-07-22 15:45:18 +01:00
satStreamID := & pb . SatStreamID { }
2019-07-24 12:33:23 +01:00
err = proto . Unmarshal ( streamID , satStreamID )
2019-07-22 15:45:18 +01:00
if err != nil {
return nil , err
}
err = signing . VerifyStreamID ( ctx , endpoint . satellite , satStreamID )
if err != nil {
return nil , err
}
if satStreamID . CreationDate . Before ( time . Now ( ) . Add ( - satIDExpiration ) ) {
return nil , errs . New ( "stream ID expired" )
}
return satStreamID , nil
}
2019-07-24 12:33:23 +01:00
func ( endpoint * Endpoint ) unmarshalSatSegmentID ( ctx context . Context , segmentID storj . SegmentID ) ( _ * pb . SatSegmentID , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
2019-07-22 15:45:18 +01:00
satSegmentID := & pb . SatSegmentID { }
2019-07-24 12:33:23 +01:00
err = proto . Unmarshal ( segmentID , satSegmentID )
2019-07-22 15:45:18 +01:00
if err != nil {
return nil , err
}
2019-11-15 15:44:23 +00:00
if satSegmentID . StreamId == nil {
return nil , errs . New ( "stream ID missing" )
}
2019-07-22 15:45:18 +01:00
err = signing . VerifySegmentID ( ctx , endpoint . satellite , satSegmentID )
if err != nil {
return nil , err
}
if satSegmentID . CreationDate . Before ( time . Now ( ) . Add ( - satIDExpiration ) ) {
return nil , errs . New ( "segment ID expired" )
}
return satSegmentID , nil
}
2019-12-11 17:44:13 +00:00
// DeleteObjectPieces deletes all the pieces of the storage nodes that belongs
// to the specified object.
//
// NOTE: this method is exported for being able to individually test it without
// having import cycles.
func ( endpoint * Endpoint ) DeleteObjectPieces (
ctx context . Context , projectID uuid . UUID , bucket , encryptedPath [ ] byte ,
) ( err error ) {
defer mon . Task ( ) ( & ctx , projectID . String ( ) , bucket , encryptedPath ) ( & err )
numOfSegments , err := endpoint . getObjectNumberOfSegments ( ctx , projectID , bucket , encryptedPath )
if err != nil {
return err
}
knownNumOfSegments := false
if numOfSegments == 0 {
numOfSegments = math . MaxInt64
} else {
knownNumOfSegments = true
}
2019-12-18 10:41:02 +00:00
var (
nodesPieces = make ( map [ storj . NodeID ] [ ] storj . PieceID )
nodeIDs storj . NodeIDList
)
2019-12-16 19:03:20 +00:00
for segmentIdx := int64 ( lastSegment ) ; segmentIdx < ( numOfSegments - 1 ) ; segmentIdx ++ {
pointer , err := endpoint . deletePointer ( ctx , projectID , segmentIdx , bucket , encryptedPath )
2019-12-11 17:44:13 +00:00
if err != nil {
2019-12-16 19:03:20 +00:00
// Only return the error for aborting the operation if it happens on the
// first iteration
if segmentIdx == int64 ( lastSegment ) {
if storj . ErrObjectNotFound . Has ( err ) {
return rpcstatus . Error ( rpcstatus . NotFound , "object doesn't exist" )
}
endpoint . log . Error ( "unexpected error while deleting object pieces" ,
zap . Stringer ( "project_id" , projectID ) ,
zap . ByteString ( "bucket_name" , bucket ) ,
zap . Binary ( "encrypted_path" , encryptedPath ) ,
zap . Error ( err ) ,
)
return rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
2019-12-11 17:44:13 +00:00
}
2019-12-16 19:03:20 +00:00
if storj . ErrObjectNotFound . Has ( err ) {
if ! knownNumOfSegments {
// Because we don't know the number of segments, we assume that if the
// pointer isn't found then we reached in the previous iteration the
// segment before the last one.
break
}
segment := "s" + strconv . FormatInt ( segmentIdx , 10 )
endpoint . log . Warn (
"unexpected not found error while deleting a pointer, it may have been deleted concurrently" ,
zap . String ( "pointer_path" ,
fmt . Sprintf ( "%s/%s/%s/%q" , projectID , segment , bucket , encryptedPath ) ,
) ,
zap . String ( "segment" , segment ) ,
)
} else {
segment := "s" + strconv . FormatInt ( segmentIdx , 10 )
endpoint . log . Warn (
"unexpected error while deleting a pointer" ,
zap . String ( "pointer_path" ,
fmt . Sprintf ( "%s/%s/%s/%q" , projectID , segment , bucket , encryptedPath ) ,
) ,
zap . String ( "segment" , segment ) ,
zap . Error ( err ) ,
)
2019-12-11 17:44:13 +00:00
}
2019-12-16 19:03:20 +00:00
// We continue with the next segment for not deleting the pieces of this
// pointer and avoiding that some storage nodes fail audits due to a
// missing piece.
// If it was not found them we assume that the pieces were deleted by
// another request running concurrently.
2019-12-11 17:44:13 +00:00
continue
}
if pointer . Type != pb . Pointer_REMOTE {
continue
}
rootPieceID := pointer . GetRemote ( ) . RootPieceId
for _ , piece := range pointer . GetRemote ( ) . GetRemotePieces ( ) {
pieceID := rootPieceID . Derive ( piece . NodeId , piece . PieceNum )
pieces , ok := nodesPieces [ piece . NodeId ]
if ! ok {
nodesPieces [ piece . NodeId ] = [ ] storj . PieceID { pieceID }
2019-12-18 10:41:02 +00:00
nodeIDs = append ( nodeIDs , piece . NodeId )
2019-12-11 17:44:13 +00:00
continue
}
nodesPieces [ piece . NodeId ] = append ( pieces , pieceID )
}
}
2020-01-07 13:15:04 +00:00
if len ( nodeIDs ) == 0 {
// Pieces will be collected by garbage collector
return
}
2019-12-18 10:41:02 +00:00
nodes , err := endpoint . overlay . KnownReliable ( ctx , nodeIDs )
if err != nil {
endpoint . log . Warn ( "unable to look up nodes from overlay" ,
zap . String ( "object_path" ,
fmt . Sprintf ( "%s/%s/%q" , projectID , bucket , encryptedPath ) ,
) ,
zap . Error ( err ) ,
)
// Pieces will be collected by garbage collector
return nil
}
2019-12-11 17:44:13 +00:00
2020-01-07 18:34:43 +00:00
var successThreshold * sync2 . SuccessThreshold
{
var numPieces int
for _ , node := range nodes {
numPieces += len ( nodesPieces [ node . Id ] )
}
var err error
successThreshold , err = sync2 . NewSuccessThreshold ( numPieces , deleteObjectPiecesSuccessThreshold )
if err != nil {
endpoint . log . Error ( "error creating success threshold" ,
zap . Int ( "num_tasks" , numPieces ) ,
zap . Float32 ( "success_threshold" , deleteObjectPiecesSuccessThreshold ) ,
zap . Error ( err ) ,
)
return rpcstatus . Errorf ( rpcstatus . Internal ,
"error creating success threshold: %+v" , err . Error ( ) ,
)
}
}
// TODO: v3-3476 This timeout will go away when the service is implemented
ctx , cancel := context . WithCancel ( ctx )
defer cancel ( )
2019-12-20 12:13:09 +00:00
// TODO: v3-3406 Should we use a global limiter?
2019-12-18 10:41:02 +00:00
limiter := sync2 . NewLimiter ( deleteObjectPiecesConcurrencyLimit )
for _ , node := range nodes {
node := node
nodePieces := nodesPieces [ node . Id ]
2019-12-11 17:44:13 +00:00
limiter . Go ( ctx , func ( ) {
client , err := piecestore . Dial (
2019-12-18 10:41:02 +00:00
ctx , endpoint . dialer , node , endpoint . log , piecestore . Config { } ,
2019-12-11 17:44:13 +00:00
)
if err != nil {
endpoint . log . Warn ( "unable to dial storage node" ,
2019-12-18 10:41:02 +00:00
zap . Stringer ( "node_id" , node . Id ) ,
zap . Stringer ( "node_info" , node ) ,
2019-12-11 17:44:13 +00:00
zap . Error ( err ) ,
)
2020-01-07 18:34:43 +00:00
// Mark all the pieces of this node as failure in the success threshold
for range nodePieces {
successThreshold . Failure ( )
}
2019-12-11 17:44:13 +00:00
// Pieces will be collected by garbage collector
return
}
2020-01-08 14:01:34 +00:00
defer func ( ) {
err := client . Close ( )
2020-01-08 14:54:11 +00:00
if err != nil {
endpoint . log . Warn ( "error closing the storage node client connection" ,
zap . Stringer ( "node_id" , node . Id ) ,
zap . Stringer ( "node_info" , node ) ,
zap . Error ( err ) ,
)
}
2020-01-08 14:01:34 +00:00
} ( )
2019-12-11 17:44:13 +00:00
for _ , pieceID := range nodePieces {
err := client . DeletePiece ( ctx , pieceID )
if err != nil {
// piece will be collected by garbage collector
endpoint . log . Warn ( "unable to delete piece of a storage node" ,
2019-12-18 10:41:02 +00:00
zap . Stringer ( "node_id" , node . Id ) ,
2019-12-11 17:44:13 +00:00
zap . Stringer ( "piece_id" , pieceID ) ,
zap . Error ( err ) ,
)
2020-01-07 18:34:43 +00:00
successThreshold . Failure ( )
continue
2019-12-11 17:44:13 +00:00
}
2020-01-07 18:34:43 +00:00
successThreshold . Success ( )
2019-12-11 17:44:13 +00:00
}
} )
}
2020-01-07 18:34:43 +00:00
successThreshold . Wait ( ctx )
// return to the client after the success threshold but wait some time before
// canceling the remaining deletes
timer := time . AfterFunc ( 200 * time . Millisecond , cancel )
defer timer . Stop ( )
2019-12-11 17:44:13 +00:00
limiter . Wait ( )
return nil
}
2019-12-16 19:03:20 +00:00
// deletePointer deletes a pointer returning the deleted pointer.
//
// If the pointer isn't found when getting or deleting it, it returns
// storj.ErrObjectNotFound error.
func ( endpoint * Endpoint ) deletePointer (
ctx context . Context , projectID uuid . UUID , segmentIndex int64 , bucket , encryptedPath [ ] byte ,
) ( _ * pb . Pointer , err error ) {
defer mon . Task ( ) ( & ctx , projectID , segmentIndex , bucket , encryptedPath ) ( & err )
pointer , path , err := endpoint . getPointer ( ctx , projectID , segmentIndex , bucket , encryptedPath )
if err != nil {
2020-01-07 07:26:45 +00:00
if errs2 . IsRPC ( err , rpcstatus . NotFound ) {
2020-01-03 19:34:03 +00:00
return nil , storj . ErrObjectNotFound . New ( "%s" , err . Error ( ) )
}
2019-12-16 19:03:20 +00:00
return nil , err
}
err = endpoint . metainfo . UnsynchronizedDelete ( ctx , path )
if err != nil {
return nil , err
}
return pointer , nil
}