2019-03-18 10:55:06 +00:00
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package metainfo
import (
"context"
2019-06-24 18:15:45 +01:00
"crypto/sha256"
2019-04-02 15:55:58 +01:00
"errors"
2019-10-17 19:01:40 +01:00
"fmt"
2019-04-02 19:21:18 +01:00
"time"
2019-03-18 10:55:06 +00:00
2019-11-08 20:40:39 +00:00
"github.com/spacemonkeygo/monkit/v3"
2019-03-18 10:55:06 +00:00
"github.com/zeebo/errs"
"go.uber.org/zap"
2020-08-11 14:00:57 +01:00
"storj.io/common/context2"
2020-04-09 09:19:16 +01:00
"storj.io/common/encryption"
2020-05-29 14:31:26 +01:00
"storj.io/common/macaroon"
2020-06-01 21:07:31 +01:00
"storj.io/common/memory"
2019-12-27 11:48:47 +00:00
"storj.io/common/pb"
"storj.io/common/rpc/rpcstatus"
"storj.io/common/signing"
"storj.io/common/storj"
2020-03-30 10:08:50 +01:00
"storj.io/common/uuid"
2020-01-17 15:01:36 +00:00
lrucache "storj.io/storj/pkg/cache"
2019-07-28 06:55:36 +01:00
"storj.io/storj/satellite/accounting"
2019-06-21 20:14:34 +01:00
"storj.io/storj/satellite/attribution"
2019-03-18 10:55:06 +00:00
"storj.io/storj/satellite/console"
2020-10-29 16:16:25 +00:00
"storj.io/storj/satellite/internalpb"
2020-08-28 12:56:09 +01:00
"storj.io/storj/satellite/metainfo/metabase"
2020-07-09 16:32:35 +01:00
"storj.io/storj/satellite/metainfo/objectdeletion"
2020-03-12 07:03:46 +00:00
"storj.io/storj/satellite/metainfo/piecedeletion"
2020-03-18 13:24:31 +00:00
"storj.io/storj/satellite/metainfo/pointerverification"
2019-03-27 10:24:35 +00:00
"storj.io/storj/satellite/orders"
2019-07-28 06:55:36 +01:00
"storj.io/storj/satellite/overlay"
2020-06-03 14:51:02 +01:00
"storj.io/storj/satellite/revocation"
2019-11-26 11:12:37 +00:00
"storj.io/storj/satellite/rewards"
2020-02-21 14:07:29 +00:00
"storj.io/uplink/private/eestream"
"storj.io/uplink/private/storage/meta"
2019-03-18 10:55:06 +00:00
)
2019-07-16 11:39:23 +01:00
const (
2020-03-23 13:45:46 +00:00
satIDExpiration = 48 * time . Hour
2020-01-07 18:34:43 +00:00
deleteObjectPiecesSuccessThreshold = 0.75
2019-07-16 11:39:23 +01:00
)
2019-07-03 17:14:37 +01:00
2019-03-18 10:55:06 +00:00
var (
mon = monkit . Package ( )
2020-08-11 15:50:01 +01:00
// Error general metainfo error.
2019-03-18 10:55:06 +00:00
Error = errs . Class ( "metainfo error" )
2020-08-11 15:50:01 +01:00
// ErrNodeAlreadyExists pointer already has a piece for a node err.
2019-11-05 19:13:45 +00:00
ErrNodeAlreadyExists = errs . Class ( "metainfo error: node already exists" )
2019-03-18 10:55:06 +00:00
)
2020-06-30 22:49:29 +01:00
// APIKeys is api keys store methods used by endpoint.
2019-09-10 14:24:16 +01:00
//
// architecture: Database
2019-03-18 10:55:06 +00:00
type APIKeys interface {
2019-05-24 17:51:27 +01:00
GetByHead ( ctx context . Context , head [ ] byte ) ( * console . APIKeyInfo , error )
}
2019-12-11 18:46:41 +00:00
// Endpoint metainfo endpoint.
2019-09-10 14:24:16 +01:00
//
// architecture: Endpoint
2019-03-18 10:55:06 +00:00
type Endpoint struct {
2020-04-09 09:19:16 +01:00
log * zap . Logger
metainfo * Service
deletePieces * piecedeletion . Service
2020-07-09 16:32:35 +01:00
deleteObjects * objectdeletion . Service
2020-04-09 09:19:16 +01:00
orders * orders . Service
overlay * overlay . Service
attributions attribution . DB
partners * rewards . PartnersService
pointerVerification * pointerverification . Service
projectUsage * accounting . Service
projects console . Projects
apiKeys APIKeys
satellite signing . Signer
limiterCache * lrucache . ExpiringLRU
encInlineSegmentSize int64 // max inline segment size + encryption overhead
2020-06-03 14:51:02 +01:00
revocations revocation . DB
2020-10-27 17:34:59 +00:00
defaultRS * pb . RedundancyScheme
2020-04-09 09:19:16 +01:00
config Config
2019-03-18 10:55:06 +00:00
}
2019-12-11 17:44:13 +00:00
// NewEndpoint creates new metainfo endpoint instance.
2020-03-12 07:03:46 +00:00
func NewEndpoint ( log * zap . Logger , metainfo * Service , deletePieces * piecedeletion . Service ,
2020-01-09 17:09:22 +00:00
orders * orders . Service , cache * overlay . Service , attributions attribution . DB ,
partners * rewards . PartnersService , peerIdentities overlay . PeerIdentities ,
2020-01-17 15:01:36 +00:00
apiKeys APIKeys , projectUsage * accounting . Service , projects console . Projects ,
2020-06-03 14:51:02 +01:00
satellite signing . Signer , revocations revocation . DB , config Config ) ( * Endpoint , error ) {
2019-03-18 10:55:06 +00:00
// TODO do something with too many params
2020-04-09 09:19:16 +01:00
encInlineSegmentSize , err := encryption . CalcEncryptedSize ( config . MaxInlineSegmentSize . Int64 ( ) , storj . EncryptionParameters {
CipherSuite : storj . EncAESGCM ,
BlockSize : 128 , // intentionally low block size to allow maximum possible encryption overhead
} )
if err != nil {
return nil , err
}
2020-07-09 16:32:35 +01:00
objectDeletion , err := objectdeletion . NewService ( log , metainfo , config . ObjectDeletion )
if err != nil {
return nil , err
}
2020-10-27 17:34:59 +00:00
defaultRSScheme := & pb . RedundancyScheme {
Type : pb . RedundancyScheme_RS ,
MinReq : int32 ( config . RS . Min ) ,
RepairThreshold : int32 ( config . RS . Repair ) ,
SuccessThreshold : int32 ( config . RS . Success ) ,
Total : int32 ( config . RS . Total ) ,
ErasureShareSize : config . RS . ErasureShareSize . Int32 ( ) ,
}
2019-03-18 10:55:06 +00:00
return & Endpoint {
2020-03-18 13:24:31 +00:00
log : log ,
metainfo : metainfo ,
deletePieces : deletePieces ,
2020-07-09 16:32:35 +01:00
deleteObjects : objectDeletion ,
2020-03-18 13:24:31 +00:00
orders : orders ,
overlay : cache ,
attributions : attributions ,
partners : partners ,
pointerVerification : pointerverification . NewService ( peerIdentities ) ,
apiKeys : apiKeys ,
projectUsage : projectUsage ,
projects : projects ,
satellite : satellite ,
2020-01-29 15:22:22 +00:00
limiterCache : lrucache . New ( lrucache . Options {
2020-04-01 10:15:24 +01:00
Capacity : config . RateLimiter . CacheCapacity ,
Expiration : config . RateLimiter . CacheExpiration ,
2020-01-29 15:22:22 +00:00
} ) ,
2020-04-09 09:19:16 +01:00
encInlineSegmentSize : encInlineSegmentSize ,
2020-06-03 14:51:02 +01:00
revocations : revocations ,
2020-10-27 17:34:59 +00:00
defaultRS : defaultRSScheme ,
2020-04-09 09:19:16 +01:00
config : config ,
} , nil
2019-03-18 10:55:06 +00:00
}
2020-06-30 22:49:29 +01:00
// Close closes resources.
2019-03-18 10:55:06 +00:00
func ( endpoint * Endpoint ) Close ( ) error { return nil }
2019-10-31 17:27:38 +00:00
func calculateSpaceUsed ( ptr * pb . Pointer ) ( segmentSize , totalStored int64 ) {
2019-05-10 02:39:21 +01:00
inline := ptr . GetInlineSegment ( )
if inline != nil {
2019-10-31 17:27:38 +00:00
inlineSize := int64 ( len ( inline ) )
return inlineSize , inlineSize
2019-05-10 02:39:21 +01:00
}
2019-10-31 17:27:38 +00:00
segmentSize = ptr . GetSegmentSize ( )
2019-05-10 02:39:21 +01:00
remote := ptr . GetRemote ( )
if remote == nil {
return 0 , 0
}
minReq := remote . GetRedundancy ( ) . GetMinReq ( )
pieceSize := segmentSize / int64 ( minReq )
pieces := remote . GetRemotePieces ( )
2019-10-31 17:27:38 +00:00
return segmentSize , pieceSize * int64 ( len ( pieces ) )
2019-05-10 02:39:21 +01:00
}
2019-10-17 19:01:40 +01:00
// filterValidPieces filter out the invalid remote pieces held by pointer.
//
2019-12-03 13:36:32 +00:00
// This method expect the pointer to be valid, so it has to be validated before
// calling it.
//
2019-10-17 19:01:40 +01:00
// The method always return a gRPC status error so the caller can directly
// return it to the client.
2019-12-03 13:36:32 +00:00
func ( endpoint * Endpoint ) filterValidPieces ( ctx context . Context , pointer * pb . Pointer , originalLimits [ ] * pb . OrderLimit ) ( err error ) {
2019-06-04 12:55:38 +01:00
defer mon . Task ( ) ( & ctx ) ( & err )
2019-11-15 13:33:09 +00:00
if pointer . Type != pb . Pointer_REMOTE {
return nil
}
2019-09-18 14:50:33 +01:00
2020-03-18 13:24:31 +00:00
// verify that the piece sizes matches what we would expect.
err = endpoint . pointerVerification . VerifySizes ( ctx , pointer )
if err != nil {
endpoint . log . Debug ( "piece sizes are invalid" , zap . Error ( err ) )
return rpcstatus . Errorf ( rpcstatus . InvalidArgument , "piece sizes are invalid: %v" , err )
2019-11-15 13:33:09 +00:00
}
2019-07-03 17:14:37 +01:00
2020-03-18 13:24:31 +00:00
validPieces , invalidPieces , err := endpoint . pointerVerification . SelectValidPieces ( ctx , pointer , originalLimits )
if err != nil {
endpoint . log . Debug ( "pointer verification failed" , zap . Error ( err ) )
return rpcstatus . Errorf ( rpcstatus . InvalidArgument , "pointer verification failed: %s" , err )
2019-11-15 13:33:09 +00:00
}
2020-03-18 13:24:31 +00:00
remote := pointer . Remote
if int32 ( len ( validPieces ) ) < remote . Redundancy . SuccessThreshold {
2019-11-15 13:33:09 +00:00
endpoint . log . Debug ( "Number of valid pieces is less than the success threshold" ,
zap . Int ( "totalReceivedPieces" , len ( remote . RemotePieces ) ) ,
2020-03-18 13:24:31 +00:00
zap . Int ( "validPieces" , len ( validPieces ) ) ,
2019-11-15 13:33:09 +00:00
zap . Int ( "invalidPieces" , len ( invalidPieces ) ) ,
zap . Int32 ( "successThreshold" , remote . Redundancy . SuccessThreshold ) ,
)
2019-03-18 10:55:06 +00:00
2019-11-15 13:33:09 +00:00
errMsg := fmt . Sprintf ( "Number of valid pieces (%d) is less than the success threshold (%d). Found %d invalid pieces" ,
2020-03-18 13:24:31 +00:00
len ( validPieces ) ,
2019-11-15 13:33:09 +00:00
remote . Redundancy . SuccessThreshold ,
len ( remote . RemotePieces ) ,
)
if len ( invalidPieces ) > 0 {
errMsg = fmt . Sprintf ( "%s. Invalid Pieces:" , errMsg )
2019-10-17 19:01:40 +01:00
2019-11-15 13:33:09 +00:00
for _ , p := range invalidPieces {
errMsg = fmt . Sprintf ( "%s\nNodeID: %v, PieceNum: %d, Reason: %s" ,
errMsg , p . NodeID , p . PieceNum , p . Reason ,
)
2019-10-17 19:01:40 +01:00
}
2019-07-31 19:28:43 +01:00
}
2019-11-15 13:33:09 +00:00
return rpcstatus . Error ( rpcstatus . InvalidArgument , errMsg )
2019-03-18 10:55:06 +00:00
}
2019-10-17 19:01:40 +01:00
2020-03-18 13:24:31 +00:00
remote . RemotePieces = validPieces
2019-11-15 13:33:09 +00:00
2019-03-18 10:55:06 +00:00
return nil
}
2020-06-30 22:49:29 +01:00
// ProjectInfo returns allowed ProjectInfo for the provided API key.
2019-06-24 18:15:45 +01:00
func ( endpoint * Endpoint ) ProjectInfo ( ctx context . Context , req * pb . ProjectInfoRequest ) ( _ * pb . ProjectInfoResponse , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
2019-09-19 17:19:29 +01:00
keyInfo , err := endpoint . validateAuth ( ctx , req . Header , macaroon . Action {
2019-06-24 18:15:45 +01:00
Op : macaroon . ActionProjectInfo ,
Time : time . Now ( ) ,
} )
if err != nil {
2020-03-10 09:58:14 +00:00
return nil , err
2019-06-24 18:15:45 +01:00
}
salt := sha256 . Sum256 ( keyInfo . ProjectID [ : ] )
return & pb . ProjectInfoResponse {
ProjectSalt : salt [ : ] ,
} , nil
}
2019-07-01 23:17:30 +01:00
2020-06-30 22:49:29 +01:00
// GetBucket returns a bucket.
2019-07-08 23:32:18 +01:00
func ( endpoint * Endpoint ) GetBucket ( ctx context . Context , req * pb . BucketGetRequest ) ( resp * pb . BucketGetResponse , err error ) {
2019-07-01 23:17:30 +01:00
defer mon . Task ( ) ( & ctx ) ( & err )
2019-07-08 23:32:18 +01:00
2019-09-19 17:19:29 +01:00
keyInfo , err := endpoint . validateAuth ( ctx , req . Header , macaroon . Action {
2019-07-08 23:32:18 +01:00
Op : macaroon . ActionRead ,
Bucket : req . Name ,
Time : time . Now ( ) ,
} )
if err != nil {
2020-03-10 09:58:14 +00:00
return nil , err
2019-07-08 23:32:18 +01:00
}
bucket , err := endpoint . metainfo . GetBucket ( ctx , req . GetName ( ) , keyInfo . ProjectID )
if err != nil {
2019-07-12 13:57:02 +01:00
if storj . ErrBucketNotFound . Has ( err ) {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . NotFound , err . Error ( ) )
2019-07-12 13:57:02 +01:00
}
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
2019-07-08 23:32:18 +01:00
}
2020-01-28 13:44:47 +00:00
// override RS to fit satellite settings
2020-10-27 17:34:59 +00:00
convBucket , err := convertBucketToProto ( bucket , endpoint . defaultRS )
2019-07-19 16:17:34 +01:00
if err != nil {
return resp , err
}
2019-07-08 23:32:18 +01:00
return & pb . BucketGetResponse {
2019-07-19 16:17:34 +01:00
Bucket : convBucket ,
2019-07-08 23:32:18 +01:00
} , nil
2019-07-01 23:17:30 +01:00
}
2020-06-30 22:49:29 +01:00
// CreateBucket creates a new bucket.
2019-07-08 23:32:18 +01:00
func ( endpoint * Endpoint ) CreateBucket ( ctx context . Context , req * pb . BucketCreateRequest ) ( resp * pb . BucketCreateResponse , err error ) {
2019-07-01 23:17:30 +01:00
defer mon . Task ( ) ( & ctx ) ( & err )
2019-07-08 23:32:18 +01:00
2019-09-19 17:19:29 +01:00
keyInfo , err := endpoint . validateAuth ( ctx , req . Header , macaroon . Action {
2019-07-08 23:32:18 +01:00
Op : macaroon . ActionWrite ,
Bucket : req . Name ,
Time : time . Now ( ) ,
} )
if err != nil {
2020-03-10 09:58:14 +00:00
return nil , err
2019-07-08 23:32:18 +01:00
}
err = endpoint . validateBucket ( ctx , req . Name )
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . InvalidArgument , err . Error ( ) )
2019-07-08 23:32:18 +01:00
}
2019-07-19 16:17:34 +01:00
// checks if bucket exists before updates it or makes a new entry
2019-11-26 11:12:37 +00:00
_ , err = endpoint . metainfo . GetBucket ( ctx , req . GetName ( ) , keyInfo . ProjectID )
2019-07-19 16:17:34 +01:00
if err == nil {
2020-04-14 12:50:50 +01:00
// When the bucket exists, try to set the attribution.
2020-07-24 10:40:17 +01:00
if err := endpoint . ensureAttribution ( ctx , req . Header , keyInfo , req . GetName ( ) ) ; err != nil {
2020-04-14 12:50:50 +01:00
return nil , err
}
2019-11-26 11:12:37 +00:00
return nil , rpcstatus . Error ( rpcstatus . AlreadyExists , "bucket already exists" )
}
if ! storj . ErrBucketNotFound . Has ( err ) {
return nil , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
2019-07-08 23:32:18 +01:00
}
2020-06-30 22:49:29 +01:00
// check if project has exceeded its allocated bucket limit
maxBuckets , err := endpoint . projects . GetMaxBuckets ( ctx , keyInfo . ProjectID )
if err != nil {
return nil , err
}
2020-09-06 00:02:12 +01:00
if maxBuckets == nil {
defaultMaxBuckets := endpoint . config . ProjectLimits . MaxBuckets
maxBuckets = & defaultMaxBuckets
2020-06-30 22:49:29 +01:00
}
bucketCount , err := endpoint . metainfo . CountBuckets ( ctx , keyInfo . ProjectID )
if err != nil {
return nil , err
}
2020-09-06 00:02:12 +01:00
if bucketCount >= * maxBuckets {
2020-06-30 22:49:29 +01:00
return nil , rpcstatus . Error ( rpcstatus . ResourceExhausted , fmt . Sprintf ( "number of allocated buckets (%d) exceeded" , endpoint . config . ProjectLimits . MaxBuckets ) )
}
bucketReq , err := convertProtoToBucket ( req , keyInfo . ProjectID )
2019-11-26 11:12:37 +00:00
if err != nil {
return nil , rpcstatus . Error ( rpcstatus . InvalidArgument , err . Error ( ) )
}
2019-07-19 16:17:34 +01:00
2020-06-30 22:49:29 +01:00
bucket , err := endpoint . metainfo . CreateBucket ( ctx , bucketReq )
2019-11-26 11:12:37 +00:00
if err != nil {
2020-06-30 22:49:29 +01:00
endpoint . log . Error ( "error while creating bucket" , zap . String ( "bucketName" , bucketReq . Name ) , zap . Error ( err ) )
2019-11-26 11:12:37 +00:00
return nil , rpcstatus . Error ( rpcstatus . Internal , "unable to create bucket" )
}
2019-07-19 16:17:34 +01:00
2020-04-14 12:50:50 +01:00
// Once we have created the bucket, we can try setting the attribution.
2020-07-24 10:40:17 +01:00
if err := endpoint . ensureAttribution ( ctx , req . Header , keyInfo , req . GetName ( ) ) ; err != nil {
2020-04-14 12:50:50 +01:00
return nil , err
}
2020-01-28 13:44:47 +00:00
// override RS to fit satellite settings
2020-10-27 17:34:59 +00:00
convBucket , err := convertBucketToProto ( bucket , endpoint . defaultRS )
2019-11-26 11:12:37 +00:00
if err != nil {
endpoint . log . Error ( "error while converting bucket to proto" , zap . String ( "bucketName" , bucket . Name ) , zap . Error ( err ) )
return nil , rpcstatus . Error ( rpcstatus . Internal , "unable to create bucket" )
2019-07-19 16:17:34 +01:00
}
2019-11-26 11:12:37 +00:00
return & pb . BucketCreateResponse {
Bucket : convBucket ,
} , nil
2019-07-01 23:17:30 +01:00
}
2020-06-30 22:49:29 +01:00
// DeleteBucket deletes a bucket.
2019-07-08 23:32:18 +01:00
func ( endpoint * Endpoint ) DeleteBucket ( ctx context . Context , req * pb . BucketDeleteRequest ) ( resp * pb . BucketDeleteResponse , err error ) {
2019-07-01 23:17:30 +01:00
defer mon . Task ( ) ( & ctx ) ( & err )
2019-07-08 23:32:18 +01:00
2020-03-11 15:53:16 +00:00
now := time . Now ( )
2019-09-19 17:19:29 +01:00
keyInfo , err := endpoint . validateAuth ( ctx , req . Header , macaroon . Action {
2019-07-08 23:32:18 +01:00
Op : macaroon . ActionDelete ,
Bucket : req . Name ,
2020-03-11 15:53:16 +00:00
Time : now ,
2019-07-08 23:32:18 +01:00
} )
if err != nil {
2020-03-10 09:58:14 +00:00
return nil , err
2019-07-08 23:32:18 +01:00
}
err = endpoint . validateBucket ( ctx , req . Name )
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . InvalidArgument , err . Error ( ) )
2019-07-08 23:32:18 +01:00
}
2020-03-11 15:53:16 +00:00
_ , err = endpoint . validateAuth ( ctx , req . Header , macaroon . Action {
Op : macaroon . ActionRead ,
Bucket : req . Name ,
Time : now ,
} )
canRead := err == nil
_ , err = endpoint . validateAuth ( ctx , req . Header , macaroon . Action {
Op : macaroon . ActionList ,
Bucket : req . Name ,
Time : now ,
} )
canList := err == nil
2020-08-06 02:23:45 +01:00
var (
bucket storj . Bucket
convBucket * pb . Bucket
)
2020-03-11 15:53:16 +00:00
if canRead || canList {
2020-08-06 02:23:45 +01:00
// Info about deleted bucket is returned only if either Read, or List permission is granted.
2020-03-11 15:53:16 +00:00
bucket , err = endpoint . metainfo . GetBucket ( ctx , req . Name , keyInfo . ProjectID )
if err != nil {
if storj . ErrBucketNotFound . Has ( err ) {
return nil , rpcstatus . Error ( rpcstatus . NotFound , err . Error ( ) )
}
return nil , err
}
2020-08-06 02:23:45 +01:00
2020-10-27 17:34:59 +00:00
convBucket , err = convertBucketToProto ( bucket , endpoint . defaultRS )
2020-08-06 02:23:45 +01:00
if err != nil {
return nil , err
}
2020-03-11 15:53:16 +00:00
}
2019-07-08 23:32:18 +01:00
err = endpoint . metainfo . DeleteBucket ( ctx , req . Name , keyInfo . ProjectID )
if err != nil {
2020-03-11 15:53:16 +00:00
if ! canRead && ! canList {
2020-08-06 02:23:45 +01:00
// No error info is returned if neither Read, nor List permission is granted.
2020-04-02 08:45:51 +01:00
return & pb . BucketDeleteResponse { } , nil
2020-03-11 15:53:16 +00:00
}
2020-02-14 12:52:00 +00:00
if ErrBucketNotEmpty . Has ( err ) {
2020-08-06 02:23:45 +01:00
// List permission is required to delete all objects in a bucket.
if ! req . GetDeleteAll ( ) || ! canList {
return nil , rpcstatus . Error ( rpcstatus . FailedPrecondition , err . Error ( ) )
}
_ , deletedObjCount , err := endpoint . deleteBucketNotEmpty ( ctx , keyInfo . ProjectID , req . Name )
if err != nil {
return nil , err
}
return & pb . BucketDeleteResponse { Bucket : convBucket , DeletedObjectsCount : int64 ( deletedObjCount ) } , nil
}
if storj . ErrBucketNotFound . Has ( err ) {
return & pb . BucketDeleteResponse { Bucket : convBucket } , nil
2020-02-14 12:52:00 +00:00
}
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
2019-07-08 23:32:18 +01:00
}
2020-08-06 02:23:45 +01:00
return & pb . BucketDeleteResponse { Bucket : convBucket } , nil
}
// deleteBucketNotEmpty deletes all objects that're complete or have first segment.
// On success, it returns only the number of complete objects that has been deleted
// since from the user's perspective, objects without last segment are invisible.
func ( endpoint * Endpoint ) deleteBucketNotEmpty ( ctx context . Context , projectID uuid . UUID , bucketName [ ] byte ) ( [ ] byte , int , error ) {
// Delete all objects that has last segment.
2020-09-16 12:25:14 +01:00
deletedCount , err := endpoint . deleteByPrefix ( ctx , projectID , bucketName , metabase . LastSegmentIndex )
2020-03-11 15:53:16 +00:00
if err != nil {
2020-08-06 02:23:45 +01:00
return nil , 0 , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
}
// Delete all zombie objects that have first segment.
2020-09-16 12:25:14 +01:00
_ , err = endpoint . deleteByPrefix ( ctx , projectID , bucketName , metabase . FirstSegmentIndex )
2020-08-06 02:23:45 +01:00
if err != nil {
2020-08-28 19:27:58 +01:00
return nil , deletedCount , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
2020-03-11 15:53:16 +00:00
}
2020-08-06 02:23:45 +01:00
err = endpoint . metainfo . DeleteBucket ( ctx , bucketName , projectID )
if err != nil {
if ErrBucketNotEmpty . Has ( err ) {
2020-08-28 19:27:58 +01:00
return nil , deletedCount , rpcstatus . Error ( rpcstatus . FailedPrecondition , "cannot delete the bucket because it's being used by another process" )
2020-08-06 02:23:45 +01:00
}
if storj . ErrBucketNotFound . Has ( err ) {
return bucketName , 0 , nil
}
2020-08-28 19:27:58 +01:00
return nil , deletedCount , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
2020-08-06 02:23:45 +01:00
}
return bucketName , deletedCount , nil
}
// deleteByPrefix deletes all objects that matches with a prefix.
func ( endpoint * Endpoint ) deleteByPrefix ( ctx context . Context , projectID uuid . UUID , bucketName [ ] byte , segmentIdx int64 ) ( deletedCount int , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
2020-09-03 14:54:56 +01:00
location , err := CreatePath ( ctx , projectID , segmentIdx , bucketName , [ ] byte { } )
2020-08-06 02:23:45 +01:00
if err != nil {
return deletedCount , rpcstatus . Error ( rpcstatus . InvalidArgument , err . Error ( ) )
}
2020-09-03 14:54:56 +01:00
prefix := location . Encode ( )
2020-08-06 02:23:45 +01:00
for {
segments , more , err := endpoint . metainfo . List ( ctx , prefix , "" , true , 0 , meta . None )
if err != nil {
return deletedCount , err
}
2020-09-04 08:46:53 +01:00
deleteReqs := make ( [ ] * metabase . ObjectLocation , len ( segments ) )
2020-08-06 02:23:45 +01:00
for i , segment := range segments {
2020-09-04 08:46:53 +01:00
deleteReqs [ i ] = & metabase . ObjectLocation {
ProjectID : projectID ,
BucketName : string ( bucketName ) ,
ObjectKey : metabase . ObjectKey ( segment . Path ) ,
2020-08-06 02:23:45 +01:00
}
}
rep , err := endpoint . deleteObjectsPieces ( ctx , deleteReqs ... )
if err != nil {
return deletedCount , err
}
deletedCount += len ( rep . Deleted )
if ! more {
break
}
}
return deletedCount , nil
2019-07-01 23:17:30 +01:00
}
2020-06-30 22:49:29 +01:00
// ListBuckets returns buckets in a project where the bucket name matches the request cursor.
2019-07-08 23:32:18 +01:00
func ( endpoint * Endpoint ) ListBuckets ( ctx context . Context , req * pb . BucketListRequest ) ( resp * pb . BucketListResponse , err error ) {
2019-07-01 23:17:30 +01:00
defer mon . Task ( ) ( & ctx ) ( & err )
2019-07-08 23:32:18 +01:00
action := macaroon . Action {
2020-03-11 15:53:16 +00:00
// TODO: This has to be ActionList, but it seems to be set to
// ActionRead as a hacky workaround to make bucket listing possible.
2019-07-08 23:32:18 +01:00
Op : macaroon . ActionRead ,
Time : time . Now ( ) ,
}
2019-09-19 17:19:29 +01:00
keyInfo , err := endpoint . validateAuth ( ctx , req . Header , action )
2019-07-08 23:32:18 +01:00
if err != nil {
2020-03-10 09:58:14 +00:00
return nil , err
2019-07-08 23:32:18 +01:00
}
2019-09-19 17:19:29 +01:00
allowedBuckets , err := getAllowedBuckets ( ctx , req . Header , action )
2019-07-08 23:32:18 +01:00
if err != nil {
return nil , err
}
listOpts := storj . BucketListOptions {
2019-07-12 13:57:02 +01:00
Cursor : string ( req . Cursor ) ,
Limit : int ( req . Limit ) ,
Direction : storj . ListDirection ( req . Direction ) ,
2019-07-08 23:32:18 +01:00
}
bucketList , err := endpoint . metainfo . ListBuckets ( ctx , keyInfo . ProjectID , listOpts , allowedBuckets )
if err != nil {
return nil , err
}
bucketItems := make ( [ ] * pb . BucketListItem , len ( bucketList . Items ) )
for i , item := range bucketList . Items {
bucketItems [ i ] = & pb . BucketListItem {
Name : [ ] byte ( item . Name ) ,
CreatedAt : item . Created ,
}
}
return & pb . BucketListResponse {
Items : bucketItems ,
More : bucketList . More ,
} , nil
2019-07-01 23:17:30 +01:00
}
2020-06-30 22:49:29 +01:00
// CountBuckets returns the number of buckets a project currently has.
// TODO: add this to the uplink client side.
func ( endpoint * Endpoint ) CountBuckets ( ctx context . Context , projectID uuid . UUID ) ( count int , err error ) {
count , err = endpoint . metainfo . CountBuckets ( ctx , projectID )
if err != nil {
return 0 , err
}
return count , nil
}
2019-09-19 17:19:29 +01:00
func getAllowedBuckets ( ctx context . Context , header * pb . RequestHeader , action macaroon . Action ) ( _ macaroon . AllowedBuckets , err error ) {
key , err := getAPIKey ( ctx , header )
2019-07-08 23:32:18 +01:00
if err != nil {
2019-09-19 05:46:39 +01:00
return macaroon . AllowedBuckets { } , rpcstatus . Errorf ( rpcstatus . InvalidArgument , "Invalid API credentials: %v" , err )
2019-07-08 23:32:18 +01:00
}
2019-07-12 13:57:02 +01:00
allowedBuckets , err := key . GetAllowedBuckets ( ctx , action )
2019-07-08 23:32:18 +01:00
if err != nil {
2019-09-19 05:46:39 +01:00
return macaroon . AllowedBuckets { } , rpcstatus . Errorf ( rpcstatus . Internal , "GetAllowedBuckets: %v" , err )
2019-07-08 23:32:18 +01:00
}
return allowedBuckets , err
}
2019-07-19 16:17:34 +01:00
func convertProtoToBucket ( req * pb . BucketCreateRequest , projectID uuid . UUID ) ( bucket storj . Bucket , err error ) {
2019-07-08 23:32:18 +01:00
bucketID , err := uuid . New ( )
if err != nil {
return storj . Bucket { } , err
}
defaultRS := req . GetDefaultRedundancyScheme ( )
defaultEP := req . GetDefaultEncryptionParameters ( )
2019-07-19 16:17:34 +01:00
2019-11-26 11:12:37 +00:00
// TODO: resolve partner id
2019-07-19 16:17:34 +01:00
var partnerID uuid . UUID
err = partnerID . UnmarshalJSON ( req . GetPartnerId ( ) )
// bucket's partnerID should never be set
// it is always read back from buckets DB
if err != nil && ! partnerID . IsZero ( ) {
return bucket , errs . New ( "Invalid uuid" )
}
2019-07-08 23:32:18 +01:00
return storj . Bucket {
2020-04-02 15:18:08 +01:00
ID : bucketID ,
2019-07-08 23:32:18 +01:00
Name : string ( req . GetName ( ) ) ,
2020-04-02 15:18:08 +01:00
ProjectID : projectID ,
PartnerID : partnerID ,
2019-07-08 23:32:18 +01:00
PathCipher : storj . CipherSuite ( req . GetPathCipher ( ) ) ,
DefaultSegmentsSize : req . GetDefaultSegmentSize ( ) ,
DefaultRedundancyScheme : storj . RedundancyScheme {
Algorithm : storj . RedundancyAlgorithm ( defaultRS . GetType ( ) ) ,
ShareSize : defaultRS . GetErasureShareSize ( ) ,
RequiredShares : int16 ( defaultRS . GetMinReq ( ) ) ,
RepairShares : int16 ( defaultRS . GetRepairThreshold ( ) ) ,
OptimalShares : int16 ( defaultRS . GetSuccessThreshold ( ) ) ,
TotalShares : int16 ( defaultRS . GetTotal ( ) ) ,
} ,
DefaultEncryptionParameters : storj . EncryptionParameters {
CipherSuite : storj . CipherSuite ( defaultEP . CipherSuite ) ,
BlockSize : int32 ( defaultEP . BlockSize ) ,
} ,
} , nil
}
2020-06-30 22:49:29 +01:00
func convertBucketToProto ( bucket storj . Bucket , rs * pb . RedundancyScheme ) ( pbBucket * pb . Bucket , err error ) {
2020-03-11 15:53:16 +00:00
if bucket == ( storj . Bucket { } ) {
return nil , nil
}
2019-07-19 16:17:34 +01:00
partnerID , err := bucket . PartnerID . MarshalJSON ( )
if err != nil {
2019-09-19 05:46:39 +01:00
return pbBucket , rpcstatus . Error ( rpcstatus . Internal , "UUID marshal error" )
2019-07-19 16:17:34 +01:00
}
2020-03-04 17:38:52 +00:00
pbBucket = & pb . Bucket {
2020-01-28 13:44:47 +00:00
Name : [ ] byte ( bucket . Name ) ,
2020-03-04 17:38:52 +00:00
PathCipher : pb . CipherSuite ( bucket . PathCipher ) ,
2020-01-28 13:44:47 +00:00
PartnerId : partnerID ,
CreatedAt : bucket . Created ,
DefaultSegmentSize : bucket . DefaultSegmentsSize ,
DefaultRedundancyScheme : rs ,
2019-07-08 23:32:18 +01:00
DefaultEncryptionParameters : & pb . EncryptionParameters {
2020-03-04 17:38:52 +00:00
CipherSuite : pb . CipherSuite ( bucket . DefaultEncryptionParameters . CipherSuite ) ,
2019-07-08 23:32:18 +01:00
BlockSize : int64 ( bucket . DefaultEncryptionParameters . BlockSize ) ,
} ,
2020-03-04 17:38:52 +00:00
}
// this part is to provide default ciphers (path and encryption) for old uplinks
// new uplinks are using ciphers from encryption access
if pbBucket . PathCipher == pb . CipherSuite_ENC_UNSPECIFIED {
pbBucket . PathCipher = pb . CipherSuite_ENC_AESGCM
}
if pbBucket . DefaultEncryptionParameters . CipherSuite == pb . CipherSuite_ENC_UNSPECIFIED {
pbBucket . DefaultEncryptionParameters . CipherSuite = pb . CipherSuite_ENC_AESGCM
pbBucket . DefaultEncryptionParameters . BlockSize = int64 ( rs . ErasureShareSize * rs . MinReq )
}
return pbBucket , nil
2019-07-01 23:17:30 +01:00
}
2019-07-16 11:39:23 +01:00
2020-06-30 22:49:29 +01:00
// BeginObject begins object.
2019-07-16 11:39:23 +01:00
func ( endpoint * Endpoint ) BeginObject ( ctx context . Context , req * pb . ObjectBeginRequest ) ( resp * pb . ObjectBeginResponse , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
2019-09-19 17:19:29 +01:00
keyInfo , err := endpoint . validateAuth ( ctx , req . Header , macaroon . Action {
2019-07-16 11:39:23 +01:00
Op : macaroon . ActionWrite ,
Bucket : req . Bucket ,
EncryptedPath : req . EncryptedPath ,
Time : time . Now ( ) ,
} )
if err != nil {
2020-03-10 09:58:14 +00:00
return nil , err
2019-07-16 11:39:23 +01:00
}
2020-01-03 09:27:10 +00:00
if ! req . ExpiresAt . IsZero ( ) && ! req . ExpiresAt . After ( time . Now ( ) ) {
return nil , rpcstatus . Error ( rpcstatus . InvalidArgument , "Invalid expiration time" )
}
2020-06-15 12:49:09 +01:00
err = endpoint . validateBucket ( ctx , req . Bucket )
if err != nil {
return nil , rpcstatus . Error ( rpcstatus . InvalidArgument , err . Error ( ) )
2020-03-16 08:55:52 +00:00
}
// TODO this needs to be optimized to avoid DB call on each request
_ , err = endpoint . metainfo . GetBucket ( ctx , req . Bucket , keyInfo . ProjectID )
if err != nil {
if storj . ErrBucketNotFound . Has ( err ) {
return nil , rpcstatus . Error ( rpcstatus . NotFound , err . Error ( ) )
}
endpoint . log . Error ( "unable to check bucket" , zap . Error ( err ) )
return nil , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
}
2020-07-24 10:40:17 +01:00
if err := endpoint . ensureAttribution ( ctx , req . Header , keyInfo , req . Bucket ) ; err != nil {
2020-04-14 12:50:50 +01:00
return nil , err
}
2020-01-21 10:38:41 +00:00
// use only satellite values for Redundancy Scheme
2020-10-27 17:34:59 +00:00
pbRS := endpoint . defaultRS
2019-07-16 11:39:23 +01:00
2020-10-29 16:16:25 +00:00
streamID , err := endpoint . packStreamID ( ctx , & internalpb . StreamID {
2019-08-01 10:04:31 +01:00
Bucket : req . Bucket ,
EncryptedPath : req . EncryptedPath ,
Version : req . Version ,
Redundancy : pbRS ,
CreationDate : time . Now ( ) ,
ExpirationDate : req . ExpiresAt ,
2019-07-24 12:33:23 +01:00
} )
2019-07-16 11:39:23 +01:00
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
2019-07-16 11:39:23 +01:00
}
2020-07-31 12:24:40 +01:00
_ , err = endpoint . validateAuth ( ctx , req . Header , macaroon . Action {
Op : macaroon . ActionDelete ,
Bucket : req . Bucket ,
EncryptedPath : req . EncryptedPath ,
Time : time . Now ( ) ,
} )
canDelete := err == nil
if canDelete {
2020-07-27 21:12:14 +01:00
_ , err = endpoint . DeleteObjectPieces ( ctx , keyInfo . ProjectID , req . Bucket , req . EncryptedPath )
if err != nil {
2020-07-31 12:24:40 +01:00
return nil , err
}
} else {
2020-09-16 12:25:14 +01:00
location , err := CreatePath ( ctx , keyInfo . ProjectID , metabase . LastSegmentIndex , req . Bucket , req . EncryptedPath )
2020-07-31 12:24:40 +01:00
if err != nil {
endpoint . log . Error ( "unable to create path" , zap . Error ( err ) )
return nil , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
}
// TODO maybe we can have different Get without pointer unmarshaling
2020-09-03 14:54:56 +01:00
_ , _ , err = endpoint . metainfo . GetWithBytes ( ctx , location . Encode ( ) )
2020-07-31 12:24:40 +01:00
if err == nil {
return nil , rpcstatus . Error ( rpcstatus . PermissionDenied , "Unauthorized API credentials" )
}
2020-01-24 13:25:38 +00:00
}
2020-01-20 18:48:26 +00:00
endpoint . log . Info ( "Object Upload" , zap . Stringer ( "Project ID" , keyInfo . ProjectID ) , zap . String ( "operation" , "put" ) , zap . String ( "type" , "object" ) )
2020-01-29 15:03:30 +00:00
mon . Meter ( "req_put_object" ) . Mark ( 1 )
2020-01-20 18:48:26 +00:00
2019-07-16 11:39:23 +01:00
return & pb . ObjectBeginResponse {
2020-04-06 12:36:34 +01:00
Bucket : req . Bucket ,
EncryptedPath : req . EncryptedPath ,
Version : req . Version ,
StreamId : streamID ,
RedundancyScheme : pbRS ,
2019-07-16 11:39:23 +01:00
} , nil
}
2020-06-30 22:49:29 +01:00
// CommitObject commits an object when all its segments have already been committed.
2019-07-16 11:39:23 +01:00
func ( endpoint * Endpoint ) CommitObject ( ctx context . Context , req * pb . ObjectCommitRequest ) ( resp * pb . ObjectCommitResponse , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
2020-01-27 20:25:52 +00:00
return endpoint . commitObject ( ctx , req , nil )
}
func ( endpoint * Endpoint ) commitObject ( ctx context . Context , req * pb . ObjectCommitRequest , pointer * pb . Pointer ) ( resp * pb . ObjectCommitResponse , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
2020-10-30 11:22:16 +00:00
streamID , err := endpoint . unmarshalSatStreamID ( ctx , req . StreamId )
2019-07-16 11:39:23 +01:00
if err != nil {
2020-03-10 09:58:14 +00:00
return nil , rpcstatus . Error ( rpcstatus . InvalidArgument , err . Error ( ) )
2019-07-16 11:39:23 +01:00
}
2019-09-19 17:19:29 +01:00
keyInfo , err := endpoint . validateAuth ( ctx , req . Header , macaroon . Action {
2019-07-16 11:39:23 +01:00
Op : macaroon . ActionWrite ,
Bucket : streamID . Bucket ,
EncryptedPath : streamID . EncryptedPath ,
Time : time . Now ( ) ,
} )
if err != nil {
2020-03-10 09:58:14 +00:00
return nil , err
2019-07-16 11:39:23 +01:00
}
2020-06-01 21:07:31 +01:00
metadataSize := memory . Size ( len ( req . EncryptedMetadata ) )
if metadataSize > endpoint . config . MaxMetadataSize {
return nil , rpcstatus . Error ( rpcstatus . InvalidArgument , fmt . Sprintf ( "Metadata is too large, got %v, maximum allowed is %v" , metadataSize , endpoint . config . MaxMetadataSize ) )
}
2019-11-24 21:14:51 +00:00
streamMeta := pb . StreamMeta { }
2020-04-08 13:08:57 +01:00
err = pb . Unmarshal ( req . EncryptedMetadata , & streamMeta )
2019-11-24 21:14:51 +00:00
if err != nil {
return nil , rpcstatus . Error ( rpcstatus . InvalidArgument , "invalid metadata structure" )
}
2019-08-01 10:04:31 +01:00
2020-01-27 20:25:52 +00:00
lastSegmentPointer := pointer
if pointer == nil {
lastSegmentIndex := streamMeta . NumberOfSegments - 1
2020-09-03 14:54:56 +01:00
lastSegmentLocation , err := CreatePath ( ctx , keyInfo . ProjectID , lastSegmentIndex , streamID . Bucket , streamID . EncryptedPath )
2020-01-27 20:25:52 +00:00
if err != nil {
return nil , rpcstatus . Errorf ( rpcstatus . InvalidArgument , "unable to create segment path: %s" , err . Error ( ) )
}
2020-01-16 19:12:09 +00:00
2020-01-27 20:25:52 +00:00
var lastSegmentPointerBytes [ ] byte
2020-09-03 14:54:56 +01:00
lastSegmentKey := lastSegmentLocation . Encode ( )
lastSegmentPointerBytes , lastSegmentPointer , err = endpoint . metainfo . GetWithBytes ( ctx , lastSegmentKey )
2020-01-27 20:25:52 +00:00
if err != nil {
2020-09-03 14:54:56 +01:00
endpoint . log . Error ( "unable to get pointer" , zap . ByteString ( "segmentPath" , lastSegmentKey ) , zap . Error ( err ) )
2020-01-27 20:25:52 +00:00
return nil , rpcstatus . Error ( rpcstatus . Internal , "unable to commit object" )
}
if lastSegmentPointer == nil {
return nil , rpcstatus . Errorf ( rpcstatus . NotFound , "unable to find object: %q/%q" , streamID . Bucket , streamID . EncryptedPath )
}
2020-09-03 14:54:56 +01:00
err = endpoint . metainfo . Delete ( ctx , lastSegmentKey , lastSegmentPointerBytes )
2020-01-27 20:25:52 +00:00
if err != nil {
2020-09-03 14:54:56 +01:00
endpoint . log . Error ( "unable to delete pointer" , zap . ByteString ( "segmentPath" , lastSegmentKey ) , zap . Error ( err ) )
2020-01-27 20:25:52 +00:00
return nil , rpcstatus . Error ( rpcstatus . Internal , "unable to commit object" )
}
2019-08-01 10:04:31 +01:00
}
2019-09-19 00:18:14 +01:00
if lastSegmentPointer . Remote == nil {
lastSegmentPointer . Remote = & pb . RemoteSegment { }
}
// RS is set always for last segment to emulate RS per object
lastSegmentPointer . Remote . Redundancy = streamID . Redundancy
2019-08-01 10:04:31 +01:00
lastSegmentPointer . Metadata = req . EncryptedMetadata
2020-09-16 12:25:14 +01:00
lastSegmentLocation , err := CreatePath ( ctx , keyInfo . ProjectID , int64 ( metabase . LastSegmentIndex ) , streamID . Bucket , streamID . EncryptedPath )
2019-08-01 10:04:31 +01:00
if err != nil {
2019-11-24 21:14:51 +00:00
endpoint . log . Error ( "unable to create path" , zap . Error ( err ) )
return nil , rpcstatus . Error ( rpcstatus . Internal , "unable to commit object" )
2019-08-01 10:04:31 +01:00
}
2020-09-03 14:54:56 +01:00
err = endpoint . metainfo . UnsynchronizedPut ( ctx , lastSegmentLocation . Encode ( ) , lastSegmentPointer )
2019-08-01 10:04:31 +01:00
if err != nil {
2019-11-24 21:14:51 +00:00
endpoint . log . Error ( "unable to put pointer" , zap . Error ( err ) )
return nil , rpcstatus . Error ( rpcstatus . Internal , "unable to commit object" )
2019-08-01 10:04:31 +01:00
}
2019-07-16 11:39:23 +01:00
return & pb . ObjectCommitResponse { } , nil
}
2020-06-30 22:49:29 +01:00
// GetObject gets single object.
2019-07-23 12:09:12 +01:00
func ( endpoint * Endpoint ) GetObject ( ctx context . Context , req * pb . ObjectGetRequest ) ( resp * pb . ObjectGetResponse , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
2019-09-19 17:19:29 +01:00
keyInfo , err := endpoint . validateAuth ( ctx , req . Header , macaroon . Action {
2019-07-23 12:09:12 +01:00
Op : macaroon . ActionRead ,
Bucket : req . Bucket ,
EncryptedPath : req . EncryptedPath ,
Time : time . Now ( ) ,
} )
if err != nil {
2020-03-10 09:58:14 +00:00
return nil , err
2019-07-23 12:09:12 +01:00
}
err = endpoint . validateBucket ( ctx , req . Bucket )
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . InvalidArgument , err . Error ( ) )
2019-07-23 12:09:12 +01:00
}
2020-03-11 15:53:16 +00:00
object , err := endpoint . getObject ( ctx , keyInfo . ProjectID , req . Bucket , req . EncryptedPath , req . Version )
if err != nil {
return nil , err
}
endpoint . log . Info ( "Object Download" , zap . Stringer ( "Project ID" , keyInfo . ProjectID ) , zap . String ( "operation" , "get" ) , zap . String ( "type" , "object" ) )
mon . Meter ( "req_get_object" ) . Mark ( 1 )
return & pb . ObjectGetResponse {
Object : object ,
} , nil
}
func ( endpoint * Endpoint ) getObject ( ctx context . Context , projectID uuid . UUID , bucket , encryptedPath [ ] byte , version int32 ) ( * pb . Object , error ) {
2020-09-16 12:25:14 +01:00
pointer , _ , err := endpoint . getPointer ( ctx , projectID , metabase . LastSegmentIndex , bucket , encryptedPath )
2019-07-23 12:09:12 +01:00
if err != nil {
2019-08-01 10:04:31 +01:00
return nil , err
2019-07-23 12:09:12 +01:00
}
2019-08-01 10:04:31 +01:00
streamMeta := & pb . StreamMeta { }
2020-04-08 13:08:57 +01:00
err = pb . Unmarshal ( pointer . Metadata , streamMeta )
2019-07-23 12:09:12 +01:00
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
2019-07-23 12:09:12 +01:00
}
2020-10-29 16:16:25 +00:00
streamID , err := endpoint . packStreamID ( ctx , & internalpb . StreamID {
2020-03-11 15:53:16 +00:00
Bucket : bucket ,
EncryptedPath : encryptedPath ,
Version : version ,
2019-07-23 12:09:12 +01:00
CreationDate : time . Now ( ) ,
2019-07-24 12:33:23 +01:00
} )
2019-07-23 12:09:12 +01:00
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
2019-07-23 12:09:12 +01:00
}
object := & pb . Object {
2020-03-11 15:53:16 +00:00
Bucket : bucket ,
EncryptedPath : encryptedPath ,
2019-07-23 12:09:12 +01:00
Version : - 1 ,
StreamId : streamID ,
ExpiresAt : pointer . ExpirationDate ,
CreatedAt : pointer . CreationDate ,
EncryptedMetadata : pointer . Metadata ,
2019-08-01 10:04:31 +01:00
EncryptionParameters : & pb . EncryptionParameters {
CipherSuite : pb . CipherSuite ( streamMeta . EncryptionType ) ,
BlockSize : int64 ( streamMeta . EncryptionBlockSize ) ,
} ,
}
if pointer . Remote != nil {
object . RedundancyScheme = pointer . Remote . Redundancy
2019-09-19 00:18:14 +01:00
// NumberOfSegments == 0 - pointer with encrypted num of segments
// NumberOfSegments > 1 - pointer with unencrypted num of segments and multiple segments
} else if streamMeta . NumberOfSegments == 0 || streamMeta . NumberOfSegments > 1 {
// workaround
// The new metainfo API redundancy scheme is on object level (not per segment).
// Because of that, RS is always taken from the last segment.
// The old implementation saves RS per segment, and in some cases
// when the remote file's last segment is an inline segment, we end up
// missing an RS scheme. This loop will search for RS in segments other than the last one.
index := int64 ( 0 )
for {
2020-09-03 14:54:56 +01:00
location , err := CreatePath ( ctx , projectID , index , bucket , encryptedPath )
2019-09-19 10:13:57 +01:00
if err != nil {
endpoint . log . Error ( "unable to get pointer path" , zap . Error ( err ) )
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . Internal , "unable to get object" )
2019-09-19 10:13:57 +01:00
}
2020-09-03 14:54:56 +01:00
pointer , err = endpoint . metainfo . Get ( ctx , location . Encode ( ) )
2019-09-19 00:18:14 +01:00
if err != nil {
2019-12-10 20:21:30 +00:00
if storj . ErrObjectNotFound . Has ( err ) {
2019-09-19 00:18:14 +01:00
break
}
endpoint . log . Error ( "unable to get pointer" , zap . Error ( err ) )
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . Internal , "unable to get object" )
2019-09-19 00:18:14 +01:00
}
if pointer . Remote != nil {
object . RedundancyScheme = pointer . Remote . Redundancy
break
}
index ++
}
2019-07-23 12:09:12 +01:00
}
2020-03-11 15:53:16 +00:00
return object , nil
2019-07-23 12:09:12 +01:00
}
2020-06-30 22:49:29 +01:00
// ListObjects list objects according to specific parameters.
2019-07-16 11:39:23 +01:00
func ( endpoint * Endpoint ) ListObjects ( ctx context . Context , req * pb . ObjectListRequest ) ( resp * pb . ObjectListResponse , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
2019-09-19 17:19:29 +01:00
keyInfo , err := endpoint . validateAuth ( ctx , req . Header , macaroon . Action {
2019-07-16 11:39:23 +01:00
Op : macaroon . ActionList ,
Bucket : req . Bucket ,
2019-10-24 22:05:08 +01:00
EncryptedPath : req . EncryptedPrefix ,
2019-07-16 11:39:23 +01:00
Time : time . Now ( ) ,
} )
if err != nil {
2020-03-10 09:58:14 +00:00
return nil , err
2019-07-16 11:39:23 +01:00
}
err = endpoint . validateBucket ( ctx , req . Bucket )
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . InvalidArgument , err . Error ( ) )
2019-07-16 11:39:23 +01:00
}
2020-03-16 08:55:52 +00:00
// TODO this needs to be optimized to avoid DB call on each request
_ , err = endpoint . metainfo . GetBucket ( ctx , req . Bucket , keyInfo . ProjectID )
if err != nil {
if storj . ErrBucketNotFound . Has ( err ) {
return nil , rpcstatus . Error ( rpcstatus . NotFound , err . Error ( ) )
}
endpoint . log . Error ( "unable to check bucket" , zap . Error ( err ) )
return nil , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
}
2020-09-16 12:25:14 +01:00
prefix , err := CreatePath ( ctx , keyInfo . ProjectID , metabase . LastSegmentIndex , req . Bucket , req . EncryptedPrefix )
2019-07-16 11:39:23 +01:00
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . InvalidArgument , err . Error ( ) )
2019-07-16 11:39:23 +01:00
}
metaflags := meta . All
// TODO use flags
2020-09-03 14:54:56 +01:00
segments , more , err := endpoint . metainfo . List ( ctx , prefix . Encode ( ) , string ( req . EncryptedCursor ) , req . Recursive , req . Limit , metaflags )
2019-07-16 11:39:23 +01:00
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
2019-07-16 11:39:23 +01:00
}
items := make ( [ ] * pb . ObjectListItem , len ( segments ) )
for i , segment := range segments {
items [ i ] = & pb . ObjectListItem {
2019-08-01 10:04:31 +01:00
EncryptedPath : [ ] byte ( segment . Path ) ,
}
if segment . Pointer != nil {
items [ i ] . EncryptedMetadata = segment . Pointer . Metadata
items [ i ] . CreatedAt = segment . Pointer . CreationDate
items [ i ] . ExpiresAt = segment . Pointer . ExpirationDate
2019-07-16 11:39:23 +01:00
}
}
2020-01-20 18:48:26 +00:00
endpoint . log . Info ( "Object List" , zap . Stringer ( "Project ID" , keyInfo . ProjectID ) , zap . String ( "operation" , "list" ) , zap . String ( "type" , "object" ) )
2020-01-29 15:03:30 +00:00
mon . Meter ( "req_list_object" ) . Mark ( 1 )
2019-07-16 11:39:23 +01:00
return & pb . ObjectListResponse {
Items : items ,
More : more ,
} , nil
}
2019-12-10 11:15:35 +00:00
// BeginDeleteObject begins object deletion process.
2019-07-16 11:39:23 +01:00
func ( endpoint * Endpoint ) BeginDeleteObject ( ctx context . Context , req * pb . ObjectBeginDeleteRequest ) ( resp * pb . ObjectBeginDeleteResponse , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
2020-03-11 15:53:16 +00:00
now := time . Now ( )
2019-09-19 17:19:29 +01:00
keyInfo , err := endpoint . validateAuth ( ctx , req . Header , macaroon . Action {
2019-07-16 11:39:23 +01:00
Op : macaroon . ActionDelete ,
Bucket : req . Bucket ,
EncryptedPath : req . EncryptedPath ,
2020-03-11 15:53:16 +00:00
Time : now ,
2019-07-16 11:39:23 +01:00
} )
if err != nil {
2020-03-10 09:58:14 +00:00
return nil , err
2019-07-16 11:39:23 +01:00
}
err = endpoint . validateBucket ( ctx , req . Bucket )
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . InvalidArgument , err . Error ( ) )
2019-07-16 11:39:23 +01:00
}
2020-03-11 15:53:16 +00:00
_ , err = endpoint . validateAuth ( ctx , req . Header , macaroon . Action {
Op : macaroon . ActionRead ,
Bucket : req . Bucket ,
EncryptedPath : req . EncryptedPath ,
Time : now ,
} )
canRead := err == nil
_ , err = endpoint . validateAuth ( ctx , req . Header , macaroon . Action {
Op : macaroon . ActionList ,
Bucket : req . Bucket ,
EncryptedPath : req . EncryptedPath ,
Time : now ,
} )
canList := err == nil
2020-07-27 21:12:14 +01:00
report , err := endpoint . DeleteObjectPieces ( ctx , keyInfo . ProjectID , req . Bucket , req . EncryptedPath )
2019-08-01 10:04:31 +01:00
if err != nil {
2020-03-11 15:53:16 +00:00
if ! canRead && ! canList {
// No error info is returned if neither Read, nor List permission is granted
2020-04-02 08:45:51 +01:00
return & pb . ObjectBeginDeleteResponse { } , nil
2020-03-11 15:53:16 +00:00
}
2019-08-01 10:04:31 +01:00
return nil , err
}
2020-07-27 21:12:14 +01:00
var object * pb . Object
if canRead || canList {
// Info about deleted object is returned only if either Read, or List permission is granted
2020-08-11 14:00:57 +01:00
deletedObjects , err := report . DeletedObjects ( )
if err != nil {
endpoint . log . Error ( "failed to construct deleted object information" ,
zap . Stringer ( "Project ID" , keyInfo . ProjectID ) ,
zap . String ( "Bucket" , string ( req . Bucket ) ) ,
zap . String ( "Encrypted Path" , string ( req . EncryptedPath ) ) ,
zap . Error ( err ) ,
)
}
2020-07-27 21:12:14 +01:00
if len ( deletedObjects ) > 0 {
object = deletedObjects [ 0 ]
}
}
2020-01-20 18:48:26 +00:00
endpoint . log . Info ( "Object Delete" , zap . Stringer ( "Project ID" , keyInfo . ProjectID ) , zap . String ( "operation" , "delete" ) , zap . String ( "type" , "object" ) )
2020-01-29 15:03:30 +00:00
mon . Meter ( "req_delete_object" ) . Mark ( 1 )
2020-01-20 18:48:26 +00:00
2019-07-16 11:39:23 +01:00
return & pb . ObjectBeginDeleteResponse {
2020-07-17 10:17:31 +01:00
Object : object ,
2019-07-16 11:39:23 +01:00
} , nil
}
2020-06-30 22:49:29 +01:00
// FinishDeleteObject finishes object deletion.
2019-07-16 11:39:23 +01:00
func ( endpoint * Endpoint ) FinishDeleteObject ( ctx context . Context , req * pb . ObjectFinishDeleteRequest ) ( resp * pb . ObjectFinishDeleteResponse , err error ) {
2020-10-08 12:00:32 +01:00
// all logic for deleting is now in BeginDeleteObject
return nil , rpcstatus . Error ( rpcstatus . Unimplemented , "not implemented" )
2019-07-16 11:39:23 +01:00
}
2019-07-22 15:45:18 +01:00
2020-08-11 18:35:23 +01:00
// GetObjectIPs returns the IP addresses of the nodes holding the pieces for
// the provided object. This is useful for knowing the locations of the pieces.
func ( endpoint * Endpoint ) GetObjectIPs ( ctx context . Context , req * pb . ObjectGetIPsRequest ) ( resp * pb . ObjectGetIPsResponse , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
2020-08-13 17:43:21 +01:00
keyInfo , err := endpoint . validateAuth ( ctx , req . Header , macaroon . Action {
Op : macaroon . ActionRead ,
Bucket : req . Bucket ,
EncryptedPath : req . EncryptedPath ,
Time : time . Now ( ) ,
} )
if err != nil {
return nil , err
}
err = endpoint . validateBucket ( ctx , req . Bucket )
if err != nil {
return nil , rpcstatus . Error ( rpcstatus . InvalidArgument , err . Error ( ) )
}
2020-09-16 12:25:14 +01:00
lastPointer , _ , err := endpoint . getPointer ( ctx , keyInfo . ProjectID , metabase . LastSegmentIndex , req . Bucket , req . EncryptedPath )
2020-08-13 17:43:21 +01:00
if err != nil {
// endpoint.getPointer already returns valid rpcstatus errors
return nil , err
}
var nodeIDs [ ] storj . NodeID
addPointerToNodeIDs := func ( pointer * pb . Pointer ) {
if pointer . Remote != nil {
for _ , piece := range pointer . Remote . RemotePieces {
nodeIDs = append ( nodeIDs , piece . NodeId )
}
}
}
addPointerToNodeIDs ( lastPointer )
streamMeta := & pb . StreamMeta { }
err = pb . Unmarshal ( lastPointer . Metadata , streamMeta )
if err != nil {
return nil , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
}
numSegmentsKnown := streamMeta . NumberOfSegments > 0
numberOfSegmentsToFetch := int ( streamMeta . NumberOfSegments ) - 1 // remove last segment since we've already fetch manually
// If we do not know the number of segments from the streamMeta, we want to
// continue to run this loop until it cannot find another segment (the
// break condition in the loop).
//
// If we do know the number of segments, we want to run the loop as long as
// the numberOfSegmentsToFetch is > 0 and until we have fetched that many
// segments.
2020-09-16 12:25:14 +01:00
for i := metabase . FirstSegmentIndex ; ! numSegmentsKnown || ( numSegmentsKnown && numberOfSegmentsToFetch > 0 && i < numberOfSegmentsToFetch ) ; i ++ {
2020-09-03 14:54:56 +01:00
location , err := CreatePath ( ctx , keyInfo . ProjectID , int64 ( i ) , req . Bucket , req . EncryptedPath )
2020-08-13 17:43:21 +01:00
if err != nil {
return nil , rpcstatus . Error ( rpcstatus . InvalidArgument , err . Error ( ) )
}
2020-09-03 14:54:56 +01:00
pointer , err := endpoint . metainfo . Get ( ctx , location . Encode ( ) )
2020-08-13 17:43:21 +01:00
if err != nil {
if storj . ErrObjectNotFound . Has ( err ) {
break
}
return nil , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
}
addPointerToNodeIDs ( pointer )
}
nodes , err := endpoint . overlay . GetOnlineNodesForGetDelete ( ctx , nodeIDs )
if err != nil {
return nil , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
}
resp = & pb . ObjectGetIPsResponse { }
for _ , node := range nodes {
address := node . Address . GetAddress ( )
if address != "" {
resp . Ips = append ( resp . Ips , [ ] byte ( address ) )
}
}
return resp , nil
2020-08-11 18:35:23 +01:00
}
2020-06-30 22:49:29 +01:00
// BeginSegment begins segment uploading.
2019-07-22 15:45:18 +01:00
func ( endpoint * Endpoint ) BeginSegment ( ctx context . Context , req * pb . SegmentBeginRequest ) ( resp * pb . SegmentBeginResponse , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
streamID , err := endpoint . unmarshalSatStreamID ( ctx , req . StreamId )
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . InvalidArgument , err . Error ( ) )
2019-07-22 15:45:18 +01:00
}
2019-09-19 17:19:29 +01:00
keyInfo , err := endpoint . validateAuth ( ctx , req . Header , macaroon . Action {
2019-07-22 15:45:18 +01:00
Op : macaroon . ActionWrite ,
Bucket : streamID . Bucket ,
EncryptedPath : streamID . EncryptedPath ,
Time : time . Now ( ) ,
} )
if err != nil {
2020-03-10 09:58:14 +00:00
return nil , err
2019-07-22 15:45:18 +01:00
}
2019-07-24 12:33:23 +01:00
// no need to validate streamID fields because it was validated during BeginObject
2019-08-01 10:04:31 +01:00
if req . Position . Index < 0 {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . InvalidArgument , "segment index must be greater then 0" )
2019-08-01 10:04:31 +01:00
}
2020-12-22 11:12:07 +00:00
if err := endpoint . checkExceedsStorageUsage ( ctx , keyInfo . ProjectID ) ; err != nil {
return nil , err
2019-07-24 12:33:23 +01:00
}
redundancy , err := eestream . NewRedundancyStrategyFromProto ( streamID . Redundancy )
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . InvalidArgument , err . Error ( ) )
2019-07-24 12:33:23 +01:00
}
maxPieceSize := eestream . CalcPieceSize ( req . MaxOrderLimit , redundancy )
2019-07-22 15:45:18 +01:00
2019-07-24 12:33:23 +01:00
request := overlay . FindStorageNodesRequest {
RequestedCount : redundancy . TotalCount ( ) ,
}
2020-05-06 14:05:31 +01:00
nodes , err := endpoint . overlay . FindStorageNodesForUpload ( ctx , request )
2019-07-24 12:33:23 +01:00
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
2019-07-24 12:33:23 +01:00
}
2020-08-28 12:56:09 +01:00
bucket := metabase . BucketLocation { ProjectID : keyInfo . ProjectID , BucketName : string ( streamID . Bucket ) }
rootPieceID , addressedLimits , piecePrivateKey , err := endpoint . orders . CreatePutOrderLimits ( ctx , bucket , nodes , streamID . ExpirationDate , maxPieceSize )
2019-07-24 12:33:23 +01:00
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
2019-07-24 12:33:23 +01:00
}
2020-10-29 16:16:25 +00:00
segmentID , err := endpoint . packSegmentID ( ctx , & internalpb . SegmentID {
2019-07-24 12:33:23 +01:00
StreamId : streamID ,
2019-08-01 10:04:31 +01:00
Index : req . Position . Index ,
2019-07-24 12:33:23 +01:00
OriginalOrderLimits : addressedLimits ,
RootPieceId : rootPieceID ,
CreationDate : time . Now ( ) ,
} )
2020-01-20 18:48:26 +00:00
endpoint . log . Info ( "Segment Upload" , zap . Stringer ( "Project ID" , keyInfo . ProjectID ) , zap . String ( "operation" , "put" ) , zap . String ( "type" , "remote" ) )
2020-01-29 15:03:30 +00:00
mon . Meter ( "req_put_remote" ) . Mark ( 1 )
2019-12-02 14:39:19 +00:00
2019-07-24 12:33:23 +01:00
return & pb . SegmentBeginResponse {
SegmentId : segmentID ,
AddressedLimits : addressedLimits ,
PrivateKey : piecePrivateKey ,
} , nil
2019-07-22 15:45:18 +01:00
}
2020-06-30 22:49:29 +01:00
// CommitSegment commits segment after uploading.
2019-07-22 15:45:18 +01:00
func ( endpoint * Endpoint ) CommitSegment ( ctx context . Context , req * pb . SegmentCommitRequest ) ( resp * pb . SegmentCommitResponse , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
2020-01-27 20:25:52 +00:00
_ , resp , err = endpoint . commitSegment ( ctx , req , true )
return resp , err
}
func ( endpoint * Endpoint ) commitSegment ( ctx context . Context , req * pb . SegmentCommitRequest , savePointer bool ) ( pointer * pb . Pointer , resp * pb . SegmentCommitResponse , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
2019-07-22 15:45:18 +01:00
segmentID , err := endpoint . unmarshalSatSegmentID ( ctx , req . SegmentId )
if err != nil {
2020-01-27 20:25:52 +00:00
return nil , nil , rpcstatus . Error ( rpcstatus . InvalidArgument , err . Error ( ) )
2019-07-22 15:45:18 +01:00
}
streamID := segmentID . StreamId
2019-09-19 17:19:29 +01:00
keyInfo , err := endpoint . validateAuth ( ctx , req . Header , macaroon . Action {
2019-07-22 15:45:18 +01:00
Op : macaroon . ActionWrite ,
Bucket : streamID . Bucket ,
EncryptedPath : streamID . EncryptedPath ,
Time : time . Now ( ) ,
} )
if err != nil {
2020-03-10 09:58:14 +00:00
return nil , nil , err
2019-07-22 15:45:18 +01:00
}
2019-10-17 19:01:40 +01:00
if numResults := len ( req . UploadResult ) ; numResults < int ( streamID . Redundancy . GetSuccessThreshold ( ) ) {
endpoint . log . Debug ( "the results of uploaded pieces for the segment is below the redundancy optimal threshold" ,
zap . Int ( "upload pieces results" , numResults ) ,
zap . Int32 ( "redundancy optimal threshold" , streamID . Redundancy . GetSuccessThreshold ( ) ) ,
2019-11-05 21:04:07 +00:00
zap . Stringer ( "Segment ID" , req . SegmentId ) ,
2019-10-17 19:01:40 +01:00
)
2020-01-27 20:25:52 +00:00
return nil , nil , rpcstatus . Errorf ( rpcstatus . InvalidArgument ,
2019-10-17 19:01:40 +01:00
"the number of results of uploaded pieces (%d) is below the optimal threshold (%d)" ,
numResults , streamID . Redundancy . GetSuccessThreshold ( ) ,
)
}
2019-07-24 12:33:23 +01:00
pieces := make ( [ ] * pb . RemotePiece , len ( req . UploadResult ) )
for i , result := range req . UploadResult {
pieces [ i ] = & pb . RemotePiece {
PieceNum : result . PieceNum ,
NodeId : result . NodeId ,
Hash : result . Hash ,
}
}
remote := & pb . RemoteSegment {
Redundancy : streamID . Redundancy ,
RootPieceId : segmentID . RootPieceId ,
RemotePieces : pieces ,
}
2020-04-08 13:08:57 +01:00
metadata , err := pb . Marshal ( & pb . SegmentMeta {
2019-08-15 12:45:49 +01:00
EncryptedKey : req . EncryptedKey ,
KeyNonce : req . EncryptedKeyNonce . Bytes ( ) ,
} )
if err != nil {
endpoint . log . Error ( "unable to marshal segment metadata" , zap . Error ( err ) )
2020-01-27 20:25:52 +00:00
return nil , nil , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
2019-08-15 12:45:49 +01:00
}
2020-01-27 20:25:52 +00:00
pointer = & pb . Pointer {
2019-07-24 12:33:23 +01:00
Type : pb . Pointer_REMOTE ,
Remote : remote ,
SegmentSize : req . SizeEncryptedData ,
CreationDate : streamID . CreationDate ,
ExpirationDate : streamID . ExpirationDate ,
2019-08-15 12:45:49 +01:00
Metadata : metadata ,
2019-09-18 14:50:33 +01:00
PieceHashesVerified : true ,
2019-07-24 12:33:23 +01:00
}
orderLimits := make ( [ ] * pb . OrderLimit , len ( segmentID . OriginalOrderLimits ) )
for i , orderLimit := range segmentID . OriginalOrderLimits {
orderLimits [ i ] = orderLimit . Limit
}
err = endpoint . validatePointer ( ctx , pointer , orderLimits )
if err != nil {
2020-01-27 20:25:52 +00:00
return nil , nil , rpcstatus . Error ( rpcstatus . InvalidArgument , err . Error ( ) )
2019-07-24 12:33:23 +01:00
}
err = endpoint . filterValidPieces ( ctx , pointer , orderLimits )
if err != nil {
2020-01-27 20:25:52 +00:00
return nil , nil , err
2019-07-24 12:33:23 +01:00
}
2020-12-22 11:12:07 +00:00
if err := endpoint . checkExceedsStorageUsage ( ctx , keyInfo . ProjectID ) ; err != nil {
return nil , nil , err
2019-07-24 12:33:23 +01:00
}
2019-08-01 10:04:31 +01:00
// clear hashes so we don't store them
for _ , piece := range pointer . GetRemote ( ) . GetRemotePieces ( ) {
piece . Hash = nil
}
2019-10-31 17:27:38 +00:00
segmentSize , totalStored := calculateSpaceUsed ( pointer )
2019-07-24 12:33:23 +01:00
// ToDo: Replace with hash & signature validation
// Ensure neither uplink or storage nodes are cheating on us
if pointer . Type == pb . Pointer_REMOTE {
2020-10-13 13:47:55 +01:00
// We cannot have more redundancy than total/min
2019-10-31 17:27:38 +00:00
if float64 ( totalStored ) > ( float64 ( pointer . SegmentSize ) / float64 ( pointer . Remote . Redundancy . MinReq ) ) * float64 ( pointer . Remote . Redundancy . Total ) {
2019-10-17 19:01:40 +01:00
endpoint . log . Debug ( "data size mismatch" ,
zap . Int64 ( "segment" , pointer . SegmentSize ) ,
2019-10-31 17:27:38 +00:00
zap . Int64 ( "pieces" , totalStored ) ,
2019-10-17 19:01:40 +01:00
zap . Int32 ( "redundancy minimum requested" , pointer . Remote . Redundancy . MinReq ) ,
zap . Int32 ( "redundancy total" , pointer . Remote . Redundancy . Total ) ,
)
2020-01-27 20:25:52 +00:00
return nil , nil , rpcstatus . Error ( rpcstatus . InvalidArgument , "mismatched segment size and piece usage" )
2019-07-24 12:33:23 +01:00
}
}
2019-10-31 17:27:38 +00:00
if err := endpoint . projectUsage . AddProjectStorageUsage ( ctx , keyInfo . ProjectID , segmentSize ) ; err != nil {
2020-12-22 11:12:07 +00:00
// log it and continue. it's most likely our own fault that we couldn't
// track it, and the only thing that will be affected is our per-project
// storage limits.
endpoint . log . Error ( "Could not track new project's storage usage" ,
2019-11-05 21:04:07 +00:00
zap . Stringer ( "Project ID" , keyInfo . ProjectID ) ,
2019-10-17 19:01:40 +01:00
zap . Error ( err ) ,
)
2019-07-24 12:33:23 +01:00
}
2020-01-27 20:25:52 +00:00
if savePointer {
2020-09-03 14:54:56 +01:00
location , err := CreatePath ( ctx , keyInfo . ProjectID , int64 ( segmentID . Index ) , streamID . Bucket , streamID . EncryptedPath )
2020-01-27 20:25:52 +00:00
if err != nil {
return nil , nil , rpcstatus . Error ( rpcstatus . InvalidArgument , err . Error ( ) )
}
2020-09-03 14:54:56 +01:00
err = endpoint . metainfo . UnsynchronizedPut ( ctx , location . Encode ( ) , pointer )
2020-01-27 20:25:52 +00:00
if err != nil {
return nil , nil , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
}
2019-07-24 12:33:23 +01:00
}
2019-07-22 15:45:18 +01:00
2020-01-27 20:25:52 +00:00
return pointer , & pb . SegmentCommitResponse {
2019-09-18 14:50:33 +01:00
SuccessfulPieces : int32 ( len ( pointer . Remote . RemotePieces ) ) ,
} , nil
2019-07-22 15:45:18 +01:00
}
2020-06-30 22:49:29 +01:00
// MakeInlineSegment makes inline segment on satellite.
2019-07-22 15:45:18 +01:00
func ( endpoint * Endpoint ) MakeInlineSegment ( ctx context . Context , req * pb . SegmentMakeInlineRequest ) ( resp * pb . SegmentMakeInlineResponse , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
2020-01-27 20:25:52 +00:00
_ , resp , err = endpoint . makeInlineSegment ( ctx , req , true )
return resp , err
}
2020-07-16 15:18:02 +01:00
// MakeInlineSegment makes inline segment on satellite.
2020-01-27 20:25:52 +00:00
func ( endpoint * Endpoint ) makeInlineSegment ( ctx context . Context , req * pb . SegmentMakeInlineRequest , savePointer bool ) ( pointer * pb . Pointer , resp * pb . SegmentMakeInlineResponse , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
2019-07-22 15:45:18 +01:00
streamID , err := endpoint . unmarshalSatStreamID ( ctx , req . StreamId )
if err != nil {
2020-01-27 20:25:52 +00:00
return nil , nil , rpcstatus . Error ( rpcstatus . InvalidArgument , err . Error ( ) )
2019-07-22 15:45:18 +01:00
}
2019-09-19 17:19:29 +01:00
keyInfo , err := endpoint . validateAuth ( ctx , req . Header , macaroon . Action {
2019-07-22 15:45:18 +01:00
Op : macaroon . ActionWrite ,
Bucket : streamID . Bucket ,
EncryptedPath : streamID . EncryptedPath ,
Time : time . Now ( ) ,
} )
if err != nil {
2020-03-10 09:58:14 +00:00
return nil , nil , err
2019-07-22 15:45:18 +01:00
}
2019-08-01 10:04:31 +01:00
if req . Position . Index < 0 {
2020-01-27 20:25:52 +00:00
return nil , nil , rpcstatus . Error ( rpcstatus . InvalidArgument , "segment index must be greater then 0" )
2019-07-24 12:33:23 +01:00
}
2020-04-01 10:15:24 +01:00
inlineUsed := int64 ( len ( req . EncryptedInlineData ) )
2020-04-09 09:19:16 +01:00
if inlineUsed > endpoint . encInlineSegmentSize {
2020-04-01 10:15:24 +01:00
return nil , nil , rpcstatus . Error ( rpcstatus . InvalidArgument , fmt . Sprintf ( "inline segment size cannot be larger than %s" , endpoint . config . MaxInlineSegmentSize ) )
}
2020-12-22 11:12:07 +00:00
if err := endpoint . checkExceedsStorageUsage ( ctx , keyInfo . ProjectID ) ; err != nil {
return nil , nil , err
2019-07-24 12:33:23 +01:00
}
2019-10-31 17:27:38 +00:00
if err := endpoint . projectUsage . AddProjectStorageUsage ( ctx , keyInfo . ProjectID , inlineUsed ) ; err != nil {
2020-12-22 11:12:07 +00:00
// log it and continue. it's most likely our own fault that we couldn't
// track it, and the only thing that will be affected is our per-project
// bandwidth and storage limits.
endpoint . log . Error ( "Could not track new project's storage usage" ,
zap . Stringer ( "Project ID" , keyInfo . ProjectID ) ,
zap . Error ( err ) ,
)
2019-07-24 12:33:23 +01:00
}
2020-04-08 13:08:57 +01:00
metadata , err := pb . Marshal ( & pb . SegmentMeta {
2019-09-19 00:18:14 +01:00
EncryptedKey : req . EncryptedKey ,
KeyNonce : req . EncryptedKeyNonce . Bytes ( ) ,
} )
2020-01-27 20:25:52 +00:00
pointer = & pb . Pointer {
2019-07-24 12:33:23 +01:00
Type : pb . Pointer_INLINE ,
SegmentSize : inlineUsed ,
CreationDate : streamID . CreationDate ,
ExpirationDate : streamID . ExpirationDate ,
InlineSegment : req . EncryptedInlineData ,
2019-09-19 00:18:14 +01:00
Metadata : metadata ,
2019-07-24 12:33:23 +01:00
}
2020-01-27 20:25:52 +00:00
if savePointer {
2020-09-03 14:54:56 +01:00
location , err := CreatePath ( ctx , keyInfo . ProjectID , int64 ( req . Position . Index ) , streamID . Bucket , streamID . EncryptedPath )
2020-01-27 20:25:52 +00:00
if err != nil {
return nil , nil , rpcstatus . Error ( rpcstatus . InvalidArgument , err . Error ( ) )
}
2020-09-03 14:54:56 +01:00
err = endpoint . metainfo . UnsynchronizedPut ( ctx , location . Encode ( ) , pointer )
2020-01-27 20:25:52 +00:00
if err != nil {
return nil , nil , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
}
2019-07-24 12:33:23 +01:00
}
2020-08-28 12:56:09 +01:00
bucket := metabase . BucketLocation { ProjectID : keyInfo . ProjectID , BucketName : string ( streamID . Bucket ) }
err = endpoint . orders . UpdatePutInlineOrder ( ctx , bucket , inlineUsed )
2019-07-24 12:33:23 +01:00
if err != nil {
2020-01-27 20:25:52 +00:00
return nil , nil , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
2019-07-24 12:33:23 +01:00
}
2019-07-22 15:45:18 +01:00
2020-01-20 18:48:26 +00:00
endpoint . log . Info ( "Inline Segment Upload" , zap . Stringer ( "Project ID" , keyInfo . ProjectID ) , zap . String ( "operation" , "put" ) , zap . String ( "type" , "inline" ) )
2020-01-29 15:03:30 +00:00
mon . Meter ( "req_put_inline" ) . Mark ( 1 )
2019-12-02 14:39:19 +00:00
2020-01-27 20:25:52 +00:00
return pointer , & pb . SegmentMakeInlineResponse { } , nil
2019-07-22 15:45:18 +01:00
}
2020-06-30 22:49:29 +01:00
// BeginDeleteSegment begins segment deletion process.
2019-07-22 15:45:18 +01:00
func ( endpoint * Endpoint ) BeginDeleteSegment ( ctx context . Context , req * pb . SegmentBeginDeleteRequest ) ( resp * pb . SegmentBeginDeleteResponse , err error ) {
2020-10-08 12:00:32 +01:00
// all logic for deleting is now in BeginDeleteObject
return nil , rpcstatus . Error ( rpcstatus . Unimplemented , "not implemented" )
2019-07-22 15:45:18 +01:00
}
2020-06-30 22:49:29 +01:00
// FinishDeleteSegment finishes segment deletion process.
2019-07-22 15:45:18 +01:00
func ( endpoint * Endpoint ) FinishDeleteSegment ( ctx context . Context , req * pb . SegmentFinishDeleteRequest ) ( resp * pb . SegmentFinishDeleteResponse , err error ) {
2020-10-08 12:00:32 +01:00
// all logic for deleting is now in BeginDeleteObject
return nil , rpcstatus . Error ( rpcstatus . Unimplemented , "not implemented" )
2019-07-22 15:45:18 +01:00
}
2020-06-30 22:49:29 +01:00
// ListSegments list object segments.
2019-07-22 15:45:18 +01:00
func ( endpoint * Endpoint ) ListSegments ( ctx context . Context , req * pb . SegmentListRequest ) ( resp * pb . SegmentListResponse , err error ) {
2020-10-08 12:00:32 +01:00
// nothing is using this method
return nil , rpcstatus . Error ( rpcstatus . Unimplemented , "not implemented" )
2019-07-22 15:45:18 +01:00
}
2021-01-13 14:08:38 +00:00
// GetPendingObjects returns pending objects.
func ( endpoint * Endpoint ) GetPendingObjects ( ctx context . Context , req * pb . GetPendingObjectsRequest ) ( resp * pb . GetPendingObjectsResponse , err error ) {
return nil , rpcstatus . Error ( rpcstatus . Unimplemented , "not implemented" )
}
2020-06-30 22:49:29 +01:00
// DownloadSegment returns data necessary to download segment.
2019-07-22 15:45:18 +01:00
func ( endpoint * Endpoint ) DownloadSegment ( ctx context . Context , req * pb . SegmentDownloadRequest ) ( resp * pb . SegmentDownloadResponse , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
streamID , err := endpoint . unmarshalSatStreamID ( ctx , req . StreamId )
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . InvalidArgument , err . Error ( ) )
2019-07-22 15:45:18 +01:00
}
2019-09-19 17:19:29 +01:00
keyInfo , err := endpoint . validateAuth ( ctx , req . Header , macaroon . Action {
2019-07-22 15:45:18 +01:00
Op : macaroon . ActionRead ,
Bucket : streamID . Bucket ,
EncryptedPath : streamID . EncryptedPath ,
Time : time . Now ( ) ,
} )
if err != nil {
2020-03-10 09:58:14 +00:00
return nil , err
2019-07-22 15:45:18 +01:00
}
2020-08-28 12:56:09 +01:00
bucket := metabase . BucketLocation { ProjectID : keyInfo . ProjectID , BucketName : string ( streamID . Bucket ) }
2019-07-24 12:33:23 +01:00
2020-12-22 11:12:07 +00:00
if exceeded , limit , err := endpoint . projectUsage . ExceedsBandwidthUsage ( ctx , keyInfo . ProjectID ) ; err != nil {
endpoint . log . Error ( "Retrieving project bandwidth total failed; bandwidth limit won't be enforced" , zap . Error ( err ) )
} else if exceeded {
endpoint . log . Error ( "Monthly bandwidth limit exceeded" ,
2020-04-13 10:31:17 +01:00
zap . Stringer ( "Limit" , limit ) ,
zap . Stringer ( "Project ID" , keyInfo . ProjectID ) ,
2019-07-24 12:33:23 +01:00
)
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . ResourceExhausted , "Exceeded Usage Limit" )
2019-07-24 12:33:23 +01:00
}
2019-08-01 10:04:31 +01:00
pointer , _ , err := endpoint . getPointer ( ctx , keyInfo . ProjectID , int64 ( req . CursorPosition . Index ) , streamID . Bucket , streamID . EncryptedPath )
2019-07-24 12:33:23 +01:00
if err != nil {
2019-08-01 10:04:31 +01:00
return nil , err
2019-07-24 12:33:23 +01:00
}
2020-06-02 00:19:10 +01:00
// Update the current bandwidth cache value incrementing the SegmentSize.
2020-12-22 11:12:07 +00:00
if err = endpoint . projectUsage . UpdateProjectBandwidthUsage ( ctx , keyInfo . ProjectID , pointer . SegmentSize ) ; err != nil {
// log it and continue. it's most likely our own fault that we couldn't
// track it, and the only thing that will be affected is our per-project
// bandwidth limits.
endpoint . log . Error ( "Could not track the new project's bandwidth usage" ,
zap . Stringer ( "Project ID" , keyInfo . ProjectID ) ,
zap . Error ( err ) ,
)
2020-06-02 00:19:10 +01:00
}
2020-10-29 16:16:25 +00:00
segmentID , err := endpoint . packSegmentID ( ctx , & internalpb . SegmentID { } )
2020-06-02 09:39:41 +01:00
if err != nil {
return nil , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
}
2019-08-01 10:04:31 +01:00
var encryptedKeyNonce storj . Nonce
var encryptedKey [ ] byte
if len ( pointer . Metadata ) != 0 {
2019-08-29 09:00:20 +01:00
var segmentMeta * pb . SegmentMeta
2020-09-16 12:25:14 +01:00
if req . CursorPosition . Index == metabase . LastSegmentIndex {
2019-08-01 10:04:31 +01:00
streamMeta := & pb . StreamMeta { }
2020-04-08 13:08:57 +01:00
err = pb . Unmarshal ( pointer . Metadata , streamMeta )
2019-08-01 10:04:31 +01:00
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
2019-08-01 10:04:31 +01:00
}
2019-08-29 09:00:20 +01:00
segmentMeta = streamMeta . LastSegmentMeta
2019-08-01 10:04:31 +01:00
} else {
2019-09-19 00:18:14 +01:00
segmentMeta = & pb . SegmentMeta { }
2020-04-08 13:08:57 +01:00
err = pb . Unmarshal ( pointer . Metadata , segmentMeta )
2019-08-01 10:04:31 +01:00
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
2019-08-01 10:04:31 +01:00
}
2019-07-24 12:33:23 +01:00
}
2019-08-29 09:00:20 +01:00
if segmentMeta != nil {
encryptedKeyNonce , err = storj . NonceFromBytes ( segmentMeta . KeyNonce )
if err != nil {
endpoint . log . Error ( "unable to get encryption key nonce from metadata" , zap . Error ( err ) )
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
2019-08-29 09:00:20 +01:00
}
2019-07-24 12:33:23 +01:00
2019-08-29 09:00:20 +01:00
encryptedKey = segmentMeta . EncryptedKey
2019-08-01 10:04:31 +01:00
}
}
2019-07-24 12:33:23 +01:00
if pointer . Type == pb . Pointer_INLINE {
2020-08-28 12:56:09 +01:00
err := endpoint . orders . UpdateGetInlineOrder ( ctx , bucket , int64 ( len ( pointer . InlineSegment ) ) )
2019-07-24 12:33:23 +01:00
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
2019-07-24 12:33:23 +01:00
}
2020-01-29 15:03:30 +00:00
endpoint . log . Info ( "Inline Segment Download" , zap . Stringer ( "Project ID" , keyInfo . ProjectID ) , zap . String ( "operation" , "get" ) , zap . String ( "type" , "inline" ) )
mon . Meter ( "req_get_inline" ) . Mark ( 1 )
2019-07-24 12:33:23 +01:00
return & pb . SegmentDownloadResponse {
SegmentId : segmentID ,
2019-08-01 10:04:31 +01:00
SegmentSize : pointer . SegmentSize ,
2019-07-24 12:33:23 +01:00
EncryptedInlineData : pointer . InlineSegment ,
2019-08-01 10:04:31 +01:00
EncryptedKeyNonce : encryptedKeyNonce ,
EncryptedKey : encryptedKey ,
2019-07-24 12:33:23 +01:00
} , nil
} else if pointer . Type == pb . Pointer_REMOTE && pointer . Remote != nil {
2020-08-28 12:56:09 +01:00
limits , privateKey , err := endpoint . orders . CreateGetOrderLimits ( ctx , bucket , pointer )
2019-07-24 12:33:23 +01:00
if err != nil {
2019-12-04 21:24:36 +00:00
if orders . ErrDownloadFailedNotEnoughPieces . Has ( err ) {
2020-04-13 10:31:17 +01:00
endpoint . log . Error ( "Unable to create order limits." ,
zap . Stringer ( "Project ID" , keyInfo . ProjectID ) ,
zap . Stringer ( "API Key ID" , keyInfo . ID ) ,
zap . Error ( err ) ,
)
2019-12-04 21:24:36 +00:00
}
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
2019-07-24 12:33:23 +01:00
}
2019-08-01 10:04:31 +01:00
limits = sortLimits ( limits , pointer )
// workaround to avoid sending nil values on top level
for i := range limits {
if limits [ i ] == nil {
limits [ i ] = & pb . AddressedOrderLimit { }
}
}
2020-01-29 15:03:30 +00:00
endpoint . log . Info ( "Segment Download" , zap . Stringer ( "Project ID" , keyInfo . ProjectID ) , zap . String ( "operation" , "get" ) , zap . String ( "type" , "remote" ) )
mon . Meter ( "req_get_remote" ) . Mark ( 1 )
2019-12-02 14:39:19 +00:00
2019-07-24 12:33:23 +01:00
return & pb . SegmentDownloadResponse {
SegmentId : segmentID ,
AddressedLimits : limits ,
2019-08-01 10:04:31 +01:00
PrivateKey : privateKey ,
SegmentSize : pointer . SegmentSize ,
EncryptedKeyNonce : encryptedKeyNonce ,
EncryptedKey : encryptedKey ,
2019-07-24 12:33:23 +01:00
} , nil
}
2019-09-19 05:46:39 +01:00
return & pb . SegmentDownloadResponse { } , rpcstatus . Error ( rpcstatus . Internal , "invalid type of pointer" )
2019-07-24 12:33:23 +01:00
}
2019-12-10 11:15:35 +00:00
// getPointer returns the pointer and the segment path projectID, bucket and
// encryptedPath. It returns an error with a specific RPC status.
2019-12-11 18:46:41 +00:00
func ( endpoint * Endpoint ) getPointer (
ctx context . Context , projectID uuid . UUID , segmentIndex int64 , bucket , encryptedPath [ ] byte ,
2020-09-03 14:54:56 +01:00
) ( pointer * pb . Pointer , location metabase . SegmentLocation , err error ) {
2019-12-11 18:46:41 +00:00
defer mon . Task ( ) ( & ctx , projectID . String ( ) , segmentIndex , bucket , encryptedPath ) ( & err )
2020-09-03 14:54:56 +01:00
location , err = CreatePath ( ctx , projectID , segmentIndex , bucket , encryptedPath )
2019-08-01 10:04:31 +01:00
if err != nil {
2020-09-03 14:54:56 +01:00
return nil , location , rpcstatus . Error ( rpcstatus . InvalidArgument , err . Error ( ) )
2019-08-01 10:04:31 +01:00
}
2020-09-03 14:54:56 +01:00
pointer , err = endpoint . metainfo . Get ( ctx , location . Encode ( ) )
2019-08-01 10:04:31 +01:00
if err != nil {
2019-12-10 20:21:30 +00:00
if storj . ErrObjectNotFound . Has ( err ) {
2020-09-03 14:54:56 +01:00
return nil , location , rpcstatus . Error ( rpcstatus . NotFound , err . Error ( ) )
2019-08-01 10:04:31 +01:00
}
2019-12-10 11:15:35 +00:00
endpoint . log . Error ( "error getting the pointer from metainfo service" , zap . Error ( err ) )
2020-09-03 14:54:56 +01:00
return nil , location , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
2019-08-01 10:04:31 +01:00
}
2020-09-03 14:54:56 +01:00
return pointer , location , nil
2019-08-01 10:04:31 +01:00
}
2020-06-30 22:49:29 +01:00
// sortLimits sorts order limits and fill missing ones with nil values.
2019-08-01 10:04:31 +01:00
func sortLimits ( limits [ ] * pb . AddressedOrderLimit , pointer * pb . Pointer ) [ ] * pb . AddressedOrderLimit {
sorted := make ( [ ] * pb . AddressedOrderLimit , pointer . GetRemote ( ) . GetRedundancy ( ) . GetTotal ( ) )
for _ , piece := range pointer . GetRemote ( ) . GetRemotePieces ( ) {
sorted [ piece . GetPieceNum ( ) ] = getLimitByStorageNodeID ( limits , piece . NodeId )
}
return sorted
}
func getLimitByStorageNodeID ( limits [ ] * pb . AddressedOrderLimit , storageNodeID storj . NodeID ) * pb . AddressedOrderLimit {
for _ , limit := range limits {
2020-01-27 20:01:37 +00:00
if limit == nil || limit . GetLimit ( ) == nil {
continue
}
2019-08-01 10:04:31 +01:00
if limit . GetLimit ( ) . StorageNodeId == storageNodeID {
return limit
}
}
return nil
}
2020-10-29 16:16:25 +00:00
func ( endpoint * Endpoint ) packStreamID ( ctx context . Context , satStreamID * internalpb . StreamID ) ( streamID storj . StreamID , err error ) {
2019-07-24 12:33:23 +01:00
defer mon . Task ( ) ( & ctx ) ( & err )
2019-07-22 15:45:18 +01:00
2020-10-29 16:16:25 +00:00
signedStreamID , err := SignStreamID ( ctx , endpoint . satellite , satStreamID )
2019-07-24 12:33:23 +01:00
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
2019-07-24 12:33:23 +01:00
}
2020-04-08 13:08:57 +01:00
encodedStreamID , err := pb . Marshal ( signedStreamID )
2019-07-24 12:33:23 +01:00
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
2019-07-24 12:33:23 +01:00
}
streamID , err = storj . StreamIDFromBytes ( encodedStreamID )
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
2019-07-24 12:33:23 +01:00
}
return streamID , nil
2019-07-22 15:45:18 +01:00
}
2020-10-29 16:16:25 +00:00
func ( endpoint * Endpoint ) packSegmentID ( ctx context . Context , satSegmentID * internalpb . SegmentID ) ( segmentID storj . SegmentID , err error ) {
2019-07-24 12:33:23 +01:00
defer mon . Task ( ) ( & ctx ) ( & err )
2020-10-29 16:16:25 +00:00
signedSegmentID , err := SignSegmentID ( ctx , endpoint . satellite , satSegmentID )
2019-07-24 12:33:23 +01:00
if err != nil {
return nil , err
}
2020-04-08 13:08:57 +01:00
encodedSegmentID , err := pb . Marshal ( signedSegmentID )
2019-07-24 12:33:23 +01:00
if err != nil {
return nil , err
}
segmentID , err = storj . SegmentIDFromBytes ( encodedSegmentID )
if err != nil {
return nil , err
}
return segmentID , nil
}
2020-10-29 16:16:25 +00:00
func ( endpoint * Endpoint ) unmarshalSatStreamID ( ctx context . Context , streamID storj . StreamID ) ( _ * internalpb . StreamID , err error ) {
2019-07-24 12:33:23 +01:00
defer mon . Task ( ) ( & ctx ) ( & err )
2020-10-29 16:16:25 +00:00
satStreamID := & internalpb . StreamID { }
2020-04-08 13:08:57 +01:00
err = pb . Unmarshal ( streamID , satStreamID )
2019-07-22 15:45:18 +01:00
if err != nil {
return nil , err
}
2020-10-29 16:16:25 +00:00
err = VerifyStreamID ( ctx , endpoint . satellite , satStreamID )
2019-07-22 15:45:18 +01:00
if err != nil {
return nil , err
}
if satStreamID . CreationDate . Before ( time . Now ( ) . Add ( - satIDExpiration ) ) {
return nil , errs . New ( "stream ID expired" )
}
return satStreamID , nil
}
2020-10-30 11:22:16 +00:00
func ( endpoint * Endpoint ) unmarshalSatSegmentID ( ctx context . Context , segmentID storj . SegmentID ) ( _ * internalpb . SegmentID , err error ) {
2019-07-24 12:33:23 +01:00
defer mon . Task ( ) ( & ctx ) ( & err )
2020-10-30 11:22:16 +00:00
satSegmentID := & internalpb . SegmentID { }
2020-04-08 13:08:57 +01:00
err = pb . Unmarshal ( segmentID , satSegmentID )
2019-07-22 15:45:18 +01:00
if err != nil {
return nil , err
}
2019-11-15 15:44:23 +00:00
if satSegmentID . StreamId == nil {
return nil , errs . New ( "stream ID missing" )
}
2019-07-22 15:45:18 +01:00
2020-10-30 11:22:16 +00:00
err = VerifySegmentID ( ctx , endpoint . satellite , satSegmentID )
2019-07-22 15:45:18 +01:00
if err != nil {
return nil , err
}
if satSegmentID . CreationDate . Before ( time . Now ( ) . Add ( - satIDExpiration ) ) {
return nil , errs . New ( "segment ID expired" )
}
return satSegmentID , nil
}
2019-12-11 17:44:13 +00:00
// DeleteObjectPieces deletes all the pieces of the storage nodes that belongs
// to the specified object.
//
// NOTE: this method is exported for being able to individually test it without
// having import cycles.
func ( endpoint * Endpoint ) DeleteObjectPieces (
ctx context . Context , projectID uuid . UUID , bucket , encryptedPath [ ] byte ,
2020-07-27 21:12:14 +01:00
) ( report objectdeletion . Report , err error ) {
2019-12-11 17:44:13 +00:00
defer mon . Task ( ) ( & ctx , projectID . String ( ) , bucket , encryptedPath ) ( & err )
2020-09-04 08:46:53 +01:00
req := & metabase . ObjectLocation {
ProjectID : projectID ,
BucketName : string ( bucket ) ,
ObjectKey : metabase . ObjectKey ( encryptedPath ) ,
2020-07-27 21:12:14 +01:00
}
2020-08-06 02:23:45 +01:00
report , err = endpoint . deleteObjectsPieces ( ctx , req )
2020-07-27 21:12:14 +01:00
if err != nil {
endpoint . log . Error ( "failed to delete pointers" ,
zap . Stringer ( "project_id" , projectID ) ,
zap . ByteString ( "bucket_name" , bucket ) ,
zap . Binary ( "encrypted_path" , encryptedPath ) ,
zap . Error ( err ) ,
)
2020-08-06 02:23:45 +01:00
// Only return an error if we failed to delete the pointers. If we failed
// to delete pieces, let garbage collector take care of it.
if objectdeletion . Error . Has ( err ) {
return report , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
}
2019-12-11 17:44:13 +00:00
}
2020-08-06 02:23:45 +01:00
return report , nil
}
2020-09-04 08:46:53 +01:00
func ( endpoint * Endpoint ) deleteObjectsPieces ( ctx context . Context , reqs ... * metabase . ObjectLocation ) ( report objectdeletion . Report , err error ) {
2020-08-06 02:23:45 +01:00
// We should ignore client cancelling and always try to delete segments.
ctx = context2 . WithoutCancellation ( ctx )
results , err := endpoint . deleteObjects . Delete ( ctx , reqs ... )
if err != nil {
return report , err
}
var requests [ ] piecedeletion . Request
2020-07-27 21:12:14 +01:00
for _ , r := range results {
pointers := r . DeletedPointers ( )
report . Deleted = append ( report . Deleted , r . Deleted ... )
report . Failed = append ( report . Failed , r . Failed ... )
2019-12-11 17:44:13 +00:00
2020-07-27 21:12:14 +01:00
nodesPieces := objectdeletion . GroupPiecesByNodeID ( pointers )
2019-12-16 19:03:20 +00:00
2020-07-27 21:12:14 +01:00
if len ( nodesPieces ) == 0 {
continue
2020-01-17 18:47:37 +00:00
}
2019-12-16 19:03:20 +00:00
2020-08-07 19:13:55 +01:00
for node , pieces := range nodesPieces {
2020-07-27 21:12:14 +01:00
requests = append ( requests , piecedeletion . Request {
Node : storj . NodeURL {
2020-08-07 19:13:55 +01:00
ID : node ,
2020-07-27 21:12:14 +01:00
} ,
2020-08-07 19:13:55 +01:00
Pieces : pieces ,
2020-07-27 21:12:14 +01:00
} )
2019-12-11 17:44:13 +00:00
}
2020-08-06 02:23:45 +01:00
}
2019-12-16 19:03:20 +00:00
2020-08-06 02:23:45 +01:00
if err := endpoint . deletePieces . Delete ( ctx , requests , deleteObjectPiecesSuccessThreshold ) ; err != nil {
endpoint . log . Error ( "failed to delete pieces" , zap . Error ( err ) )
2019-12-16 19:03:20 +00:00
}
2020-07-27 21:12:14 +01:00
return report , nil
2020-01-17 18:47:37 +00:00
}
2020-01-28 13:44:47 +00:00
2020-06-10 15:10:44 +01:00
// RevokeAPIKey handles requests to revoke an api key.
func ( endpoint * Endpoint ) RevokeAPIKey ( ctx context . Context , req * pb . RevokeAPIKeyRequest ) ( resp * pb . RevokeAPIKeyResponse , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
macToRevoke , err := macaroon . ParseMacaroon ( req . GetApiKey ( ) )
if err != nil {
return nil , rpcstatus . Error ( rpcstatus . InvalidArgument , "API key to revoke is not a macaroon" )
}
keyInfo , err := endpoint . validateRevoke ( ctx , req . Header , macToRevoke )
if err != nil {
return nil , err
}
err = endpoint . revocations . Revoke ( ctx , macToRevoke . Tail ( ) , keyInfo . ID [ : ] )
if err != nil {
endpoint . log . Error ( "Failed to revoke API key" , zap . Error ( err ) )
return nil , rpcstatus . Error ( rpcstatus . Internal , "Failed to revoke API key" )
}
return & pb . RevokeAPIKeyResponse { } , nil
2020-06-16 14:03:02 +01:00
}
2020-09-03 14:54:56 +01:00
2020-12-22 11:12:07 +00:00
func ( endpoint * Endpoint ) checkExceedsStorageUsage ( ctx context . Context , projectID uuid . UUID ) ( err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
exceeded , limit , err := endpoint . projectUsage . ExceedsStorageUsage ( ctx , projectID )
if err != nil {
endpoint . log . Error (
"Retrieving project storage totals failed; storage usage limit won't be enforced" ,
zap . Error ( err ) ,
)
} else if exceeded {
endpoint . log . Error ( "Monthly storage limit exceeded" ,
zap . Stringer ( "Limit" , limit ) ,
zap . Stringer ( "Project ID" , projectID ) ,
)
return rpcstatus . Error ( rpcstatus . ResourceExhausted , "Exceeded Usage Limit" )
}
return nil
}
2020-09-03 14:54:56 +01:00
// CreatePath creates a segment key.
func CreatePath ( ctx context . Context , projectID uuid . UUID , segmentIndex int64 , bucket , path [ ] byte ) ( _ metabase . SegmentLocation , err error ) {
// TODO rename to CreateLocation
defer mon . Task ( ) ( & ctx ) ( & err )
2020-09-16 12:25:14 +01:00
if segmentIndex < metabase . LastSegmentIndex {
2020-09-03 14:54:56 +01:00
return metabase . SegmentLocation { } , errors . New ( "invalid segment index" )
}
return metabase . SegmentLocation {
ProjectID : projectID ,
BucketName : string ( bucket ) ,
Index : segmentIndex ,
ObjectKey : metabase . ObjectKey ( path ) ,
} , nil
}