2019-03-18 10:55:06 +00:00
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package metainfo
import (
2021-10-27 09:50:27 +01:00
"bytes"
2019-03-18 10:55:06 +00:00
"context"
2019-10-17 19:01:40 +01:00
"fmt"
2019-04-02 19:21:18 +01:00
"time"
2019-03-18 10:55:06 +00:00
"go.uber.org/zap"
2020-08-11 14:00:57 +01:00
"storj.io/common/context2"
2020-04-09 09:19:16 +01:00
"storj.io/common/encryption"
2021-11-23 17:50:29 +00:00
"storj.io/common/errs2"
2020-05-29 14:31:26 +01:00
"storj.io/common/macaroon"
2020-06-01 21:07:31 +01:00
"storj.io/common/memory"
2019-12-27 11:48:47 +00:00
"storj.io/common/pb"
"storj.io/common/rpc/rpcstatus"
"storj.io/common/storj"
2020-03-30 10:08:50 +01:00
"storj.io/common/uuid"
2020-10-29 16:16:25 +00:00
"storj.io/storj/satellite/internalpb"
2021-04-21 13:42:57 +01:00
"storj.io/storj/satellite/metabase"
2020-03-12 07:03:46 +00:00
"storj.io/storj/satellite/metainfo/piecedeletion"
2019-03-27 10:24:35 +00:00
"storj.io/storj/satellite/orders"
2019-03-18 10:55:06 +00:00
)
2020-06-30 22:49:29 +01:00
// BeginObject begins object.
2019-07-16 11:39:23 +01:00
func ( endpoint * Endpoint ) BeginObject ( ctx context . Context , req * pb . ObjectBeginRequest ) ( resp * pb . ObjectBeginResponse , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
2021-02-09 22:40:23 +00:00
err = endpoint . versionCollector . collect ( req . Header . UserAgent , mon . Func ( ) . ShortName ( ) )
if err != nil {
endpoint . log . Warn ( "unable to collect uplink version" , zap . Error ( err ) )
}
2021-08-31 17:15:43 +01:00
now := time . Now ( )
var canDelete bool
keyInfo , err := endpoint . validateAuthN ( ctx , req . Header ,
verifyPermission {
action : macaroon . Action {
Op : macaroon . ActionWrite ,
Bucket : req . Bucket ,
EncryptedPath : req . EncryptedPath ,
Time : now ,
} ,
} ,
verifyPermission {
action : macaroon . Action {
Op : macaroon . ActionDelete ,
Bucket : req . Bucket ,
EncryptedPath : req . EncryptedPath ,
Time : now ,
} ,
actionPermitted : & canDelete ,
optional : true ,
} ,
)
2019-07-16 11:39:23 +01:00
if err != nil {
2020-03-10 09:58:14 +00:00
return nil , err
2019-07-16 11:39:23 +01:00
}
2020-01-03 09:27:10 +00:00
if ! req . ExpiresAt . IsZero ( ) && ! req . ExpiresAt . After ( time . Now ( ) ) {
return nil , rpcstatus . Error ( rpcstatus . InvalidArgument , "Invalid expiration time" )
}
2020-06-15 12:49:09 +01:00
err = endpoint . validateBucket ( ctx , req . Bucket )
if err != nil {
return nil , rpcstatus . Error ( rpcstatus . InvalidArgument , err . Error ( ) )
2020-03-16 08:55:52 +00:00
}
2022-01-13 09:57:31 +00:00
objectKeyLength := len ( req . EncryptedPath )
if objectKeyLength > endpoint . config . MaxEncryptedObjectKeyLength {
return nil , rpcstatus . Error ( rpcstatus . InvalidArgument , fmt . Sprintf ( "key length is too big, got %v, maximum allowed is %v" , objectKeyLength , endpoint . config . MaxEncryptedObjectKeyLength ) )
}
2021-12-14 13:49:33 +00:00
2022-01-13 09:57:31 +00:00
err = endpoint . checkUploadLimits ( ctx , keyInfo . ProjectID )
if err != nil {
return nil , err
2021-12-14 13:49:33 +00:00
}
2020-03-16 08:55:52 +00:00
// TODO this needs to be optimized to avoid DB call on each request
2021-10-27 09:50:27 +01:00
placement , err := endpoint . buckets . GetBucketPlacement ( ctx , req . Bucket , keyInfo . ProjectID )
2020-03-16 08:55:52 +00:00
if err != nil {
2021-10-27 09:50:27 +01:00
if storj . ErrBucketNotFound . Has ( err ) {
2021-11-24 17:47:36 +00:00
return nil , rpcstatus . Errorf ( rpcstatus . NotFound , "bucket not found: %s" , req . Bucket )
2021-10-27 09:50:27 +01:00
}
2020-03-16 08:55:52 +00:00
endpoint . log . Error ( "unable to check bucket" , zap . Error ( err ) )
return nil , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
}
2020-07-31 12:24:40 +01:00
if canDelete {
2021-01-11 10:08:18 +00:00
_ , err = endpoint . DeleteObjectAnyStatus ( ctx , metabase . ObjectLocation {
ProjectID : keyInfo . ProjectID ,
BucketName : string ( req . Bucket ) ,
ObjectKey : metabase . ObjectKey ( req . EncryptedPath ) ,
} )
if err != nil && ! storj . ErrObjectNotFound . Has ( err ) {
2020-07-31 12:24:40 +01:00
return nil , err
}
} else {
2022-01-28 10:57:51 +00:00
_ , err = endpoint . metabase . GetObjectExactVersion ( ctx , metabase . GetObjectExactVersion {
2020-12-22 10:44:28 +00:00
ObjectLocation : metabase . ObjectLocation {
ProjectID : keyInfo . ProjectID ,
BucketName : string ( req . Bucket ) ,
ObjectKey : metabase . ObjectKey ( req . EncryptedPath ) ,
} ,
2022-01-28 10:57:51 +00:00
Version : metabase . DefaultVersion ,
2020-12-22 10:44:28 +00:00
} )
2020-07-31 12:24:40 +01:00
if err == nil {
return nil , rpcstatus . Error ( rpcstatus . PermissionDenied , "Unauthorized API credentials" )
}
2020-01-24 13:25:38 +00:00
}
2020-11-06 11:54:52 +00:00
if err := endpoint . ensureAttribution ( ctx , req . Header , keyInfo , req . Bucket ) ; err != nil {
return nil , err
}
streamID , err := uuid . New ( )
if err != nil {
2021-07-23 16:16:49 +01:00
endpoint . log . Error ( "internal" , zap . Error ( err ) )
2020-11-06 11:54:52 +00:00
return nil , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
}
// TODO this will work only with newsest uplink
// figue out what to do with this
encryptionParameters := storj . EncryptionParameters {
CipherSuite : storj . CipherSuite ( req . EncryptionParameters . CipherSuite ) ,
BlockSize : int32 ( req . EncryptionParameters . BlockSize ) , // TODO check conversion
}
2020-11-30 12:33:06 +00:00
var expiresAt * time . Time
if req . ExpiresAt . IsZero ( ) {
expiresAt = nil
} else {
expiresAt = & req . ExpiresAt
}
2021-09-09 16:21:42 +01:00
object , err := endpoint . metabase . BeginObjectExactVersion ( ctx , metabase . BeginObjectExactVersion {
2020-11-06 11:54:52 +00:00
ObjectStream : metabase . ObjectStream {
ProjectID : keyInfo . ProjectID ,
BucketName : string ( req . Bucket ) ,
ObjectKey : metabase . ObjectKey ( req . EncryptedPath ) ,
StreamID : streamID ,
2022-01-28 10:57:51 +00:00
Version : metabase . DefaultVersion ,
2020-11-06 11:54:52 +00:00
} ,
2020-11-30 12:33:06 +00:00
ExpiresAt : expiresAt ,
2020-11-06 11:54:52 +00:00
Encryption : encryptionParameters ,
} )
if err != nil {
2021-10-22 09:42:52 +01:00
return nil , endpoint . convertMetabaseErr ( err )
2020-11-06 11:54:52 +00:00
}
satStreamID , err := endpoint . packStreamID ( ctx , & internalpb . StreamID {
2021-03-24 09:33:56 +00:00
Bucket : req . Bucket ,
2021-08-25 20:00:55 +01:00
EncryptedObjectKey : req . EncryptedPath ,
2021-03-24 09:33:56 +00:00
Version : int32 ( object . Version ) ,
CreationDate : object . CreatedAt ,
ExpirationDate : req . ExpiresAt ,
StreamId : streamID [ : ] ,
MultipartObject : object . FixedSegmentSize <= 0 ,
EncryptionParameters : req . EncryptionParameters ,
2021-10-27 09:50:27 +01:00
Placement : int32 ( placement ) ,
2020-11-06 11:54:52 +00:00
} )
if err != nil {
2021-07-23 16:16:49 +01:00
endpoint . log . Error ( "internal" , zap . Error ( err ) )
2020-11-06 11:54:52 +00:00
return nil , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
}
2020-01-20 18:48:26 +00:00
endpoint . log . Info ( "Object Upload" , zap . Stringer ( "Project ID" , keyInfo . ProjectID ) , zap . String ( "operation" , "put" ) , zap . String ( "type" , "object" ) )
2020-01-29 15:03:30 +00:00
mon . Meter ( "req_put_object" ) . Mark ( 1 )
2020-01-20 18:48:26 +00:00
2019-07-16 11:39:23 +01:00
return & pb . ObjectBeginResponse {
2020-04-06 12:36:34 +01:00
Bucket : req . Bucket ,
EncryptedPath : req . EncryptedPath ,
Version : req . Version ,
2020-11-06 11:54:52 +00:00
StreamId : satStreamID ,
2021-03-26 11:56:40 +00:00
RedundancyScheme : endpoint . defaultRS ,
2019-07-16 11:39:23 +01:00
} , nil
}
2020-06-30 22:49:29 +01:00
// CommitObject commits an object when all its segments have already been committed.
2019-07-16 11:39:23 +01:00
func ( endpoint * Endpoint ) CommitObject ( ctx context . Context , req * pb . ObjectCommitRequest ) ( resp * pb . ObjectCommitResponse , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
2021-02-09 22:40:23 +00:00
err = endpoint . versionCollector . collect ( req . Header . UserAgent , mon . Func ( ) . ShortName ( ) )
if err != nil {
endpoint . log . Warn ( "unable to collect uplink version" , zap . Error ( err ) )
}
2020-10-30 11:22:16 +00:00
streamID , err := endpoint . unmarshalSatStreamID ( ctx , req . StreamId )
2019-07-16 11:39:23 +01:00
if err != nil {
2020-03-10 09:58:14 +00:00
return nil , rpcstatus . Error ( rpcstatus . InvalidArgument , err . Error ( ) )
2019-07-16 11:39:23 +01:00
}
2019-09-19 17:19:29 +01:00
keyInfo , err := endpoint . validateAuth ( ctx , req . Header , macaroon . Action {
2019-07-16 11:39:23 +01:00
Op : macaroon . ActionWrite ,
Bucket : streamID . Bucket ,
2021-08-25 20:00:55 +01:00
EncryptedPath : streamID . EncryptedObjectKey ,
2019-07-16 11:39:23 +01:00
Time : time . Now ( ) ,
} )
if err != nil {
2020-03-10 09:58:14 +00:00
return nil , err
2019-07-16 11:39:23 +01:00
}
2020-06-01 21:07:31 +01:00
metadataSize := memory . Size ( len ( req . EncryptedMetadata ) )
if metadataSize > endpoint . config . MaxMetadataSize {
return nil , rpcstatus . Error ( rpcstatus . InvalidArgument , fmt . Sprintf ( "Metadata is too large, got %v, maximum allowed is %v" , metadataSize , endpoint . config . MaxMetadataSize ) )
}
2019-08-01 10:04:31 +01:00
2020-11-06 11:54:52 +00:00
id , err := uuid . FromBytes ( streamID . StreamId )
2019-08-01 10:04:31 +01:00
if err != nil {
2020-11-06 11:54:52 +00:00
endpoint . log . Error ( "internal" , zap . Error ( err ) )
return nil , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
2019-08-01 10:04:31 +01:00
}
2021-02-10 10:13:14 +00:00
// for old uplinks get Encryption from StreamMeta
streamMeta := & pb . StreamMeta { }
encryption := storj . EncryptionParameters { }
err = pb . Unmarshal ( req . EncryptedMetadata , streamMeta )
if err == nil {
encryption . CipherSuite = storj . CipherSuite ( streamMeta . EncryptionType )
encryption . BlockSize = streamMeta . EncryptionBlockSize
}
2021-10-29 12:04:55 +01:00
request := metabase . CommitObject {
2020-11-06 11:54:52 +00:00
ObjectStream : metabase . ObjectStream {
ProjectID : keyInfo . ProjectID ,
BucketName : string ( streamID . Bucket ) ,
2021-08-25 20:00:55 +01:00
ObjectKey : metabase . ObjectKey ( streamID . EncryptedObjectKey ) ,
2020-11-06 11:54:52 +00:00
StreamID : id ,
2022-01-28 10:57:51 +00:00
Version : metabase . DefaultVersion ,
2020-11-06 11:54:52 +00:00
} ,
2021-02-10 10:13:14 +00:00
Encryption : encryption ,
2021-10-29 12:04:55 +01:00
}
// uplink can send empty metadata with not empty key/nonce
// we need to fix it on uplink side but that part will be
// needed for backward compatibility
if len ( req . EncryptedMetadata ) != 0 {
request . EncryptedMetadata = req . EncryptedMetadata
request . EncryptedMetadataNonce = req . EncryptedMetadataNonce [ : ]
request . EncryptedMetadataEncryptedKey = req . EncryptedMetadataEncryptedKey
// older uplinks might send EncryptedMetadata directly with request but
// key/nonce will be part of StreamMeta
if req . EncryptedMetadataNonce . IsZero ( ) && len ( req . EncryptedMetadataEncryptedKey ) == 0 &&
streamMeta . LastSegmentMeta != nil {
request . EncryptedMetadataNonce = streamMeta . LastSegmentMeta . KeyNonce
request . EncryptedMetadataEncryptedKey = streamMeta . LastSegmentMeta . EncryptedKey
}
}
_ , err = endpoint . metabase . CommitObject ( ctx , request )
2019-08-01 10:04:31 +01:00
if err != nil {
2021-10-22 09:42:52 +01:00
return nil , endpoint . convertMetabaseErr ( err )
2019-08-01 10:04:31 +01:00
}
2019-07-16 11:39:23 +01:00
return & pb . ObjectCommitResponse { } , nil
}
2021-02-17 09:54:04 +00:00
// GetObject gets single object metadata.
2019-07-23 12:09:12 +01:00
func ( endpoint * Endpoint ) GetObject ( ctx context . Context , req * pb . ObjectGetRequest ) ( resp * pb . ObjectGetResponse , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
2021-02-09 22:40:23 +00:00
err = endpoint . versionCollector . collect ( req . Header . UserAgent , mon . Func ( ) . ShortName ( ) )
if err != nil {
endpoint . log . Warn ( "unable to collect uplink version" , zap . Error ( err ) )
}
2019-09-19 17:19:29 +01:00
keyInfo , err := endpoint . validateAuth ( ctx , req . Header , macaroon . Action {
2019-07-23 12:09:12 +01:00
Op : macaroon . ActionRead ,
Bucket : req . Bucket ,
EncryptedPath : req . EncryptedPath ,
Time : time . Now ( ) ,
} )
if err != nil {
2020-03-10 09:58:14 +00:00
return nil , err
2019-07-23 12:09:12 +01:00
}
err = endpoint . validateBucket ( ctx , req . Bucket )
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . InvalidArgument , err . Error ( ) )
2019-07-23 12:09:12 +01:00
}
2022-01-28 10:57:51 +00:00
mbObject , err := endpoint . metabase . GetObjectExactVersion ( ctx , metabase . GetObjectExactVersion {
2020-11-06 11:54:52 +00:00
ObjectLocation : metabase . ObjectLocation {
2021-02-17 09:54:04 +00:00
ProjectID : keyInfo . ProjectID ,
BucketName : string ( req . Bucket ) ,
ObjectKey : metabase . ObjectKey ( req . EncryptedPath ) ,
2020-11-06 11:54:52 +00:00
} ,
2022-01-28 10:57:51 +00:00
Version : metabase . DefaultVersion ,
2020-11-06 11:54:52 +00:00
} )
2019-07-23 12:09:12 +01:00
if err != nil {
2021-10-22 09:42:52 +01:00
return nil , endpoint . convertMetabaseErr ( err )
2019-07-23 12:09:12 +01:00
}
2021-02-17 09:54:04 +00:00
var segmentRS * pb . RedundancyScheme
// TODO we may try to avoid additional request for inline objects
if ! req . RedundancySchemePerSegment && mbObject . SegmentCount > 0 {
segmentRS = endpoint . defaultRS
2021-09-09 16:21:42 +01:00
segment , err := endpoint . metabase . GetSegmentByPosition ( ctx , metabase . GetSegmentByPosition {
2021-02-17 09:54:04 +00:00
StreamID : mbObject . StreamID ,
2021-02-16 15:36:09 +00:00
Position : metabase . SegmentPosition {
Index : 0 ,
} ,
} )
if err != nil {
// don't fail because its possible that its multipart object
endpoint . log . Error ( "internal" , zap . Error ( err ) )
} else {
2021-02-17 09:54:04 +00:00
segmentRS = & pb . RedundancyScheme {
2021-02-16 15:36:09 +00:00
Type : pb . RedundancyScheme_SchemeType ( segment . Redundancy . Algorithm ) ,
ErasureShareSize : segment . Redundancy . ShareSize ,
MinReq : int32 ( segment . Redundancy . RequiredShares ) ,
RepairThreshold : int32 ( segment . Redundancy . RepairShares ) ,
SuccessThreshold : int32 ( segment . Redundancy . OptimalShares ) ,
Total : int32 ( segment . Redundancy . TotalShares ) ,
}
}
2021-02-17 09:54:04 +00:00
// monitor how many uplinks is still using this additional code
mon . Meter ( "req_get_object_rs_per_object" ) . Mark ( 1 )
2021-02-16 15:36:09 +00:00
}
2021-02-17 09:54:04 +00:00
object , err := endpoint . objectToProto ( ctx , mbObject , segmentRS )
2020-11-17 14:09:04 +00:00
if err != nil {
endpoint . log . Error ( "internal" , zap . Error ( err ) )
return nil , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
}
2021-02-17 09:54:04 +00:00
endpoint . log . Info ( "Object Download" , zap . Stringer ( "Project ID" , keyInfo . ProjectID ) , zap . String ( "operation" , "get" ) , zap . String ( "type" , "object" ) )
mon . Meter ( "req_get_object" ) . Mark ( 1 )
return & pb . ObjectGetResponse { Object : object } , nil
2019-07-23 12:09:12 +01:00
}
2021-03-31 12:08:22 +01:00
// DownloadObject gets object information, creates a download for segments and lists the object segments.
func ( endpoint * Endpoint ) DownloadObject ( ctx context . Context , req * pb . ObjectDownloadRequest ) ( resp * pb . ObjectDownloadResponse , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
2021-11-23 17:50:29 +00:00
if ctx . Err ( ) != nil {
return nil , rpcstatus . Error ( rpcstatus . Canceled , "client has closed the connection" )
}
2021-03-31 12:08:22 +01:00
err = endpoint . versionCollector . collect ( req . Header . UserAgent , mon . Func ( ) . ShortName ( ) )
if err != nil {
endpoint . log . Warn ( "unable to collect uplink version" , zap . Error ( err ) )
}
keyInfo , err := endpoint . validateAuth ( ctx , req . Header , macaroon . Action {
Op : macaroon . ActionRead ,
Bucket : req . Bucket ,
EncryptedPath : req . EncryptedObjectKey ,
Time : time . Now ( ) ,
} )
if err != nil {
return nil , err
}
err = endpoint . validateBucket ( ctx , req . Bucket )
if err != nil {
return nil , rpcstatus . Error ( rpcstatus . InvalidArgument , err . Error ( ) )
}
if exceeded , limit , err := endpoint . projectUsage . ExceedsBandwidthUsage ( ctx , keyInfo . ProjectID ) ; err != nil {
2021-11-23 17:50:29 +00:00
if errs2 . IsCanceled ( err ) {
return nil , rpcstatus . Wrap ( rpcstatus . Canceled , err )
}
2021-11-11 20:04:48 +00:00
endpoint . log . Error (
"Retrieving project bandwidth total failed; bandwidth limit won't be enforced" ,
zap . Stringer ( "Project ID" , keyInfo . ProjectID ) ,
zap . Error ( err ) ,
)
2021-03-31 12:08:22 +01:00
} else if exceeded {
2021-11-11 12:50:14 +00:00
endpoint . log . Warn ( "Monthly bandwidth limit exceeded" ,
2021-03-31 12:08:22 +01:00
zap . Stringer ( "Limit" , limit ) ,
zap . Stringer ( "Project ID" , keyInfo . ProjectID ) ,
)
return nil , rpcstatus . Error ( rpcstatus . ResourceExhausted , "Exceeded Usage Limit" )
}
// get the object information
2022-01-28 10:57:51 +00:00
object , err := endpoint . metabase . GetObjectExactVersion ( ctx , metabase . GetObjectExactVersion {
2021-03-31 12:08:22 +01:00
ObjectLocation : metabase . ObjectLocation {
ProjectID : keyInfo . ProjectID ,
BucketName : string ( req . Bucket ) ,
ObjectKey : metabase . ObjectKey ( req . EncryptedObjectKey ) ,
} ,
2022-01-28 10:57:51 +00:00
Version : metabase . DefaultVersion ,
2021-03-31 12:08:22 +01:00
} )
if err != nil {
2021-10-22 09:42:52 +01:00
return nil , endpoint . convertMetabaseErr ( err )
2021-03-31 12:08:22 +01:00
}
// get the range segments
streamRange , err := calculateStreamRange ( object , req . Range )
if err != nil {
return nil , rpcstatus . Error ( rpcstatus . InvalidArgument , err . Error ( ) )
}
2021-09-09 16:21:42 +01:00
segments , err := endpoint . metabase . ListStreamPositions ( ctx , metabase . ListStreamPositions {
2021-03-31 12:08:22 +01:00
StreamID : object . StreamID ,
Range : streamRange ,
Limit : int ( req . Limit ) ,
} )
if err != nil {
2021-10-22 09:42:52 +01:00
return nil , endpoint . convertMetabaseErr ( err )
2021-03-31 12:08:22 +01:00
}
// get the download response for the first segment
2021-04-07 07:55:01 +01:00
downloadSegments , err := func ( ) ( [ ] * pb . SegmentDownloadResponse , error ) {
2021-03-31 12:08:22 +01:00
if len ( segments . Segments ) == 0 {
return nil , nil
}
2021-04-09 09:24:18 +01:00
if object . IsMigrated ( ) && streamRange != nil && streamRange . PlainStart > 0 {
return nil , nil
}
2021-03-31 12:08:22 +01:00
2021-09-09 16:21:42 +01:00
segment , err := endpoint . metabase . GetSegmentByPosition ( ctx , metabase . GetSegmentByPosition {
2021-03-31 12:08:22 +01:00
StreamID : object . StreamID ,
Position : segments . Segments [ 0 ] . Position ,
} )
if err != nil {
2021-10-22 09:42:52 +01:00
return nil , endpoint . convertMetabaseErr ( err )
2021-03-31 12:08:22 +01:00
}
2021-05-13 14:31:55 +01:00
downloadSizes := endpoint . calculateDownloadSizes ( streamRange , segment , object . Encryption )
2021-04-07 12:17:59 +01:00
// Update the current bandwidth cache value incrementing the SegmentSize.
2021-05-13 14:31:55 +01:00
err = endpoint . projectUsage . UpdateProjectBandwidthUsage ( ctx , keyInfo . ProjectID , downloadSizes . encryptedSize )
2021-04-07 12:17:59 +01:00
if err != nil {
2021-11-23 17:50:29 +00:00
if errs2 . IsCanceled ( err ) {
return nil , rpcstatus . Wrap ( rpcstatus . Canceled , err )
}
2021-04-07 12:17:59 +01:00
// log it and continue. it's most likely our own fault that we couldn't
// track it, and the only thing that will be affected is our per-project
// bandwidth limits.
2021-11-11 20:04:48 +00:00
endpoint . log . Error (
2021-11-23 17:50:29 +00:00
"Could not track the new project's bandwidth usage when downloading an object" ,
2021-11-11 20:04:48 +00:00
zap . Stringer ( "Project ID" , keyInfo . ProjectID ) ,
zap . Error ( err ) ,
)
2021-04-07 12:17:59 +01:00
}
2021-10-12 14:37:12 +01:00
encryptedKeyNonce , err := storj . NonceFromBytes ( segment . EncryptedKeyNonce )
if err != nil {
endpoint . log . Error ( "unable to get encryption key nonce from metadata" , zap . Error ( err ) )
return nil , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
}
2021-03-31 12:08:22 +01:00
if segment . Inline ( ) {
2021-05-13 14:31:55 +01:00
err := endpoint . orders . UpdateGetInlineOrder ( ctx , object . Location ( ) . Bucket ( ) , downloadSizes . plainSize )
2021-03-31 12:08:22 +01:00
if err != nil {
2021-07-23 16:16:49 +01:00
endpoint . log . Error ( "internal" , zap . Error ( err ) )
2021-03-31 12:08:22 +01:00
return nil , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
}
endpoint . log . Info ( "Inline Segment Download" , zap . Stringer ( "Project ID" , keyInfo . ProjectID ) , zap . String ( "operation" , "get" ) , zap . String ( "type" , "inline" ) )
mon . Meter ( "req_get_inline" ) . Mark ( 1 )
2021-04-07 07:55:01 +01:00
return [ ] * pb . SegmentDownloadResponse { {
2021-03-31 12:08:22 +01:00
PlainOffset : segment . PlainOffset ,
PlainSize : int64 ( segment . PlainSize ) ,
SegmentSize : int64 ( segment . EncryptedSize ) ,
EncryptedInlineData : segment . InlineData ,
2021-10-12 14:37:12 +01:00
EncryptedKeyNonce : encryptedKeyNonce ,
2021-03-31 12:08:22 +01:00
EncryptedKey : segment . EncryptedKey ,
Position : & pb . SegmentPosition {
PartNumber : int32 ( segment . Position . Part ) ,
Index : int32 ( segment . Position . Index ) ,
} ,
2021-04-07 07:55:01 +01:00
} } , nil
2021-03-31 12:08:22 +01:00
}
2021-05-13 14:31:55 +01:00
limits , privateKey , err := endpoint . orders . CreateGetOrderLimits ( ctx , object . Location ( ) . Bucket ( ) , segment , downloadSizes . orderLimit )
2021-03-31 12:08:22 +01:00
if err != nil {
if orders . ErrDownloadFailedNotEnoughPieces . Has ( err ) {
endpoint . log . Error ( "Unable to create order limits." ,
zap . Stringer ( "Project ID" , keyInfo . ProjectID ) ,
zap . Stringer ( "API Key ID" , keyInfo . ID ) ,
zap . Error ( err ) ,
)
}
2021-07-23 16:16:49 +01:00
endpoint . log . Error ( "internal" , zap . Error ( err ) )
2021-03-31 12:08:22 +01:00
return nil , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
}
endpoint . log . Info ( "Segment Download" , zap . Stringer ( "Project ID" , keyInfo . ProjectID ) , zap . String ( "operation" , "get" ) , zap . String ( "type" , "remote" ) )
mon . Meter ( "req_get_remote" ) . Mark ( 1 )
2021-04-07 07:55:01 +01:00
return [ ] * pb . SegmentDownloadResponse { {
2021-03-31 12:08:22 +01:00
AddressedLimits : limits ,
PrivateKey : privateKey ,
PlainOffset : segment . PlainOffset ,
PlainSize : int64 ( segment . PlainSize ) ,
SegmentSize : int64 ( segment . EncryptedSize ) ,
2021-10-12 14:37:12 +01:00
EncryptedKeyNonce : encryptedKeyNonce ,
2021-03-31 12:08:22 +01:00
EncryptedKey : segment . EncryptedKey ,
RedundancyScheme : & pb . RedundancyScheme {
Type : pb . RedundancyScheme_SchemeType ( segment . Redundancy . Algorithm ) ,
ErasureShareSize : segment . Redundancy . ShareSize ,
MinReq : int32 ( segment . Redundancy . RequiredShares ) ,
RepairThreshold : int32 ( segment . Redundancy . RepairShares ) ,
SuccessThreshold : int32 ( segment . Redundancy . OptimalShares ) ,
Total : int32 ( segment . Redundancy . TotalShares ) ,
} ,
Position : & pb . SegmentPosition {
PartNumber : int32 ( segment . Position . Part ) ,
Index : int32 ( segment . Position . Index ) ,
} ,
2021-04-07 07:55:01 +01:00
} } , nil
2021-03-31 12:08:22 +01:00
} ( )
if err != nil {
return nil , err
}
// convert to response
protoObject , err := endpoint . objectToProto ( ctx , object , nil )
if err != nil {
endpoint . log . Error ( "unable to convert object to proto" , zap . Error ( err ) )
return nil , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
}
segmentList , err := convertStreamListResults ( segments )
if err != nil {
endpoint . log . Error ( "unable to convert stream list" , zap . Error ( err ) )
return nil , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
}
endpoint . log . Info ( "Download Object" , zap . Stringer ( "Project ID" , keyInfo . ProjectID ) , zap . String ( "operation" , "download" ) , zap . String ( "type" , "object" ) )
mon . Meter ( "req_download_object" ) . Mark ( 1 )
return & pb . ObjectDownloadResponse {
Object : protoObject ,
// The RPC API allows for multiple segment download responses, but for now
// we return only one. This can be changed in the future if it seems useful
// to return more than one on the initial response.
2021-04-07 07:55:01 +01:00
SegmentDownload : downloadSegments ,
2021-03-31 12:08:22 +01:00
// In the case where the client needs the segment list, it will contain
// every segment. In the case where the segment list is not needed,
// segmentListItems will be nil.
SegmentList : segmentList ,
} , nil
}
2021-05-13 14:31:55 +01:00
type downloadSizes struct {
// amount of data that uplink eventually gets
plainSize int64
// amount of data that's present after encryption
encryptedSize int64
// amount of data that's read from a storage node
orderLimit int64
}
func ( endpoint * Endpoint ) calculateDownloadSizes ( streamRange * metabase . StreamRange , segment metabase . Segment , encryptionParams storj . EncryptionParameters ) downloadSizes {
if segment . Inline ( ) {
return downloadSizes {
plainSize : int64 ( len ( segment . InlineData ) ) ,
encryptedSize : int64 ( segment . EncryptedSize ) ,
}
}
// calculate the range inside the given segment
readStart := segment . PlainOffset
if streamRange != nil && readStart <= streamRange . PlainStart {
readStart = streamRange . PlainStart
}
readLimit := segment . PlainOffset + int64 ( segment . PlainSize )
if streamRange != nil && streamRange . PlainLimit < readLimit {
readLimit = streamRange . PlainLimit
}
plainSize := readLimit - readStart
// calculate the read range given the segment start
readStart -= segment . PlainOffset
readLimit -= segment . PlainOffset
// align to encryption block size
enc , err := encryption . NewEncrypter ( encryptionParams . CipherSuite , & storj . Key { 1 } , & storj . Nonce { 1 } , int ( encryptionParams . BlockSize ) )
if err != nil {
// We ignore the error and fallback to the max amount to download.
// It's unlikely that we fail here, but if we do, we don't want to block downloading.
endpoint . log . Error ( "unable to create encrypter" , zap . Error ( err ) )
return downloadSizes {
plainSize : int64 ( segment . PlainSize ) ,
encryptedSize : int64 ( segment . EncryptedSize ) ,
orderLimit : 0 ,
}
}
encryptedStartBlock , encryptedLimitBlock := calculateBlocks ( readStart , readLimit , int64 ( enc . InBlockSize ( ) ) )
encryptedStart , encryptedLimit := encryptedStartBlock * int64 ( enc . OutBlockSize ( ) ) , encryptedLimitBlock * int64 ( enc . OutBlockSize ( ) )
encryptedSize := encryptedLimit - encryptedStart
if encryptedSize > int64 ( segment . EncryptedSize ) {
encryptedSize = int64 ( segment . EncryptedSize )
}
// align to blocks
stripeSize := int64 ( segment . Redundancy . StripeSize ( ) )
stripeStart , stripeLimit := alignToBlock ( encryptedStart , encryptedLimit , stripeSize )
// calculate how much shares we need to download from a node
stripeCount := ( stripeLimit - stripeStart ) / stripeSize
orderLimit := stripeCount * int64 ( segment . Redundancy . ShareSize )
return downloadSizes {
plainSize : plainSize ,
encryptedSize : encryptedSize ,
orderLimit : orderLimit ,
}
}
func calculateBlocks ( start , limit int64 , blockSize int64 ) ( startBlock , limitBlock int64 ) {
return start / blockSize , ( limit + blockSize - 1 ) / blockSize
}
func alignToBlock ( start , limit int64 , blockSize int64 ) ( alignedStart , alignedLimit int64 ) {
return ( start / blockSize ) * blockSize , ( ( limit + blockSize - 1 ) / blockSize ) * blockSize
}
2021-03-31 12:08:22 +01:00
func calculateStreamRange ( object metabase . Object , req * pb . Range ) ( * metabase . StreamRange , error ) {
if req == nil || req . Range == nil {
return nil , nil
}
2021-04-09 09:24:18 +01:00
if object . IsMigrated ( ) {
// The object is in old format, which does not have plain_offset specified.
// We need to fallback to returning all segments.
2021-03-31 12:08:22 +01:00
return nil , nil
}
switch r := req . Range . ( type ) {
case * pb . Range_Start :
if r . Start == nil {
return nil , Error . New ( "Start missing for Range_Start" )
}
return & metabase . StreamRange {
PlainStart : r . Start . PlainStart ,
PlainLimit : object . TotalPlainSize ,
} , nil
case * pb . Range_StartLimit :
if r . StartLimit == nil {
return nil , Error . New ( "StartEnd missing for Range_StartEnd" )
}
return & metabase . StreamRange {
PlainStart : r . StartLimit . PlainStart ,
PlainLimit : r . StartLimit . PlainLimit ,
} , nil
case * pb . Range_Suffix :
if r . Suffix == nil {
return nil , Error . New ( "Suffix missing for Range_Suffix" )
}
return & metabase . StreamRange {
PlainStart : object . TotalPlainSize - r . Suffix . PlainSuffix ,
PlainLimit : object . TotalPlainSize ,
} , nil
}
// if it's a new unsupported range type, let's return all data
return nil , nil
}
2020-06-30 22:49:29 +01:00
// ListObjects list objects according to specific parameters.
2019-07-16 11:39:23 +01:00
func ( endpoint * Endpoint ) ListObjects ( ctx context . Context , req * pb . ObjectListRequest ) ( resp * pb . ObjectListResponse , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
2019-09-19 17:19:29 +01:00
keyInfo , err := endpoint . validateAuth ( ctx , req . Header , macaroon . Action {
2019-07-16 11:39:23 +01:00
Op : macaroon . ActionList ,
Bucket : req . Bucket ,
2019-10-24 22:05:08 +01:00
EncryptedPath : req . EncryptedPrefix ,
2019-07-16 11:39:23 +01:00
Time : time . Now ( ) ,
} )
if err != nil {
2020-03-10 09:58:14 +00:00
return nil , err
2019-07-16 11:39:23 +01:00
}
err = endpoint . validateBucket ( ctx , req . Bucket )
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . InvalidArgument , err . Error ( ) )
2019-07-16 11:39:23 +01:00
}
2020-03-16 08:55:52 +00:00
// TODO this needs to be optimized to avoid DB call on each request
2021-10-27 09:50:27 +01:00
placement , err := endpoint . buckets . GetBucketPlacement ( ctx , req . Bucket , keyInfo . ProjectID )
2020-03-16 08:55:52 +00:00
if err != nil {
2021-10-27 09:50:27 +01:00
if storj . ErrBucketNotFound . Has ( err ) {
2021-11-24 17:47:36 +00:00
return nil , rpcstatus . Errorf ( rpcstatus . NotFound , "bucket not found: %s" , req . Bucket )
2021-10-27 09:50:27 +01:00
}
2020-03-16 08:55:52 +00:00
endpoint . log . Error ( "unable to check bucket" , zap . Error ( err ) )
return nil , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
}
2020-11-06 12:20:54 +00:00
limit := int ( req . Limit )
if limit < 0 {
return nil , rpcstatus . Error ( rpcstatus . InvalidArgument , "limit is negative" )
}
2021-06-25 09:19:32 +01:00
metabase . ListLimit . Ensure ( & limit )
2019-07-16 11:39:23 +01:00
2020-11-17 12:57:53 +00:00
var prefix metabase . ObjectKey
if len ( req . EncryptedPrefix ) != 0 {
prefix = metabase . ObjectKey ( req . EncryptedPrefix )
2020-11-17 13:37:19 +00:00
if prefix [ len ( prefix ) - 1 ] != metabase . Delimiter {
prefix += metabase . ObjectKey ( metabase . Delimiter )
2020-11-17 12:57:53 +00:00
}
}
2020-11-18 11:16:00 +00:00
// Default to Commmitted status for backward-compatibility with older uplinks.
status := metabase . Committed
if req . Status != pb . Object_INVALID {
status = metabase . ObjectStatus ( req . Status )
}
2020-12-01 14:01:44 +00:00
cursor := string ( req . EncryptedCursor )
if len ( cursor ) != 0 {
cursor = string ( prefix ) + cursor
}
2021-09-28 13:36:10 +01:00
includeCustomMetadata := true
includeSystemMetadata := true
2021-08-02 19:30:02 +01:00
if req . UseObjectIncludes {
2021-09-28 13:36:10 +01:00
includeCustomMetadata = req . ObjectIncludes . Metadata
includeSystemMetadata = ! req . ObjectIncludes . ExcludeSystemMetadata
2021-08-02 19:30:02 +01:00
}
2020-11-06 12:20:54 +00:00
resp = & pb . ObjectListResponse { }
// TODO: Replace with IterateObjectsLatestVersion when ready
2021-09-09 16:21:42 +01:00
err = endpoint . metabase . IterateObjectsAllVersionsWithStatus ( ctx ,
2020-12-21 15:07:00 +00:00
metabase . IterateObjectsWithStatus {
2020-11-06 12:20:54 +00:00
ProjectID : keyInfo . ProjectID ,
BucketName : string ( req . Bucket ) ,
2020-11-17 12:57:53 +00:00
Prefix : prefix ,
2020-11-24 09:50:02 +00:00
Cursor : metabase . IterateCursor {
2020-12-01 14:01:44 +00:00
Key : metabase . ObjectKey ( cursor ) ,
2022-01-28 10:57:51 +00:00
Version : metabase . DefaultVersion , // TODO: set to a the version from the protobuf request when it supports this
2020-11-24 09:50:02 +00:00
} ,
2021-09-28 13:36:10 +01:00
Recursive : req . Recursive ,
BatchSize : limit + 1 ,
Status : status ,
IncludeCustomMetadata : includeCustomMetadata ,
IncludeSystemMetadata : includeSystemMetadata ,
2020-11-06 12:20:54 +00:00
} , func ( ctx context . Context , it metabase . ObjectsIterator ) error {
entry := metabase . ObjectEntry { }
for len ( resp . Items ) < limit && it . Next ( ctx , & entry ) {
2021-10-27 09:50:27 +01:00
item , err := endpoint . objectEntryToProtoListItem ( ctx , req . Bucket , entry , prefix , includeCustomMetadata , placement )
2020-12-02 11:34:41 +00:00
if err != nil {
return err
2020-11-19 12:21:51 +00:00
}
2020-11-06 12:20:54 +00:00
resp . Items = append ( resp . Items , item )
}
resp . More = it . Next ( ctx , & entry )
return nil
} ,
)
2019-07-16 11:39:23 +01:00
if err != nil {
2021-10-22 09:42:52 +01:00
return nil , endpoint . convertMetabaseErr ( err )
2019-07-16 11:39:23 +01:00
}
2020-01-20 18:48:26 +00:00
endpoint . log . Info ( "Object List" , zap . Stringer ( "Project ID" , keyInfo . ProjectID ) , zap . String ( "operation" , "list" ) , zap . String ( "type" , "object" ) )
2020-01-29 15:03:30 +00:00
mon . Meter ( "req_list_object" ) . Mark ( 1 )
2019-07-16 11:39:23 +01:00
2020-11-06 12:20:54 +00:00
return resp , nil
2019-07-16 11:39:23 +01:00
}
2021-01-11 12:06:04 +00:00
// ListPendingObjectStreams list pending objects according to specific parameters.
func ( endpoint * Endpoint ) ListPendingObjectStreams ( ctx context . Context , req * pb . ObjectListPendingStreamsRequest ) ( resp * pb . ObjectListPendingStreamsResponse , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
2021-02-09 22:40:23 +00:00
err = endpoint . versionCollector . collect ( req . Header . UserAgent , mon . Func ( ) . ShortName ( ) )
if err != nil {
endpoint . log . Warn ( "unable to collect uplink version" , zap . Error ( err ) )
}
2021-01-11 12:06:04 +00:00
keyInfo , err := endpoint . validateAuth ( ctx , req . Header , macaroon . Action {
Op : macaroon . ActionList ,
Bucket : req . Bucket ,
EncryptedPath : req . EncryptedPath ,
Time : time . Now ( ) ,
} )
if err != nil {
return nil , err
}
err = endpoint . validateBucket ( ctx , req . Bucket )
if err != nil {
return nil , rpcstatus . Error ( rpcstatus . InvalidArgument , err . Error ( ) )
}
2021-10-27 09:50:27 +01:00
placement , err := endpoint . buckets . GetBucketPlacement ( ctx , req . Bucket , keyInfo . ProjectID )
2021-01-11 12:06:04 +00:00
if err != nil {
2021-10-27 09:50:27 +01:00
if storj . ErrBucketNotFound . Has ( err ) {
2021-11-24 17:47:36 +00:00
return nil , rpcstatus . Errorf ( rpcstatus . NotFound , "bucket not found: %s" , req . Bucket )
2021-10-27 09:50:27 +01:00
}
2021-01-11 12:06:04 +00:00
endpoint . log . Error ( "unable to check bucket" , zap . Error ( err ) )
return nil , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
}
cursor := metabase . StreamIDCursor { }
if req . StreamIdCursor != nil {
streamID , err := endpoint . unmarshalSatStreamID ( ctx , req . StreamIdCursor )
if err != nil {
return nil , rpcstatus . Error ( rpcstatus . InvalidArgument , err . Error ( ) )
}
cursor . StreamID , err = uuid . FromBytes ( streamID . StreamId )
if err != nil {
2021-07-23 16:16:49 +01:00
endpoint . log . Error ( "internal" , zap . Error ( err ) )
2021-01-11 12:06:04 +00:00
return nil , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
}
}
limit := int ( req . Limit )
if limit < 0 {
return nil , rpcstatus . Error ( rpcstatus . InvalidArgument , "limit is negative" )
}
2021-06-25 09:19:32 +01:00
metabase . ListLimit . Ensure ( & limit )
2021-01-11 12:06:04 +00:00
resp = & pb . ObjectListPendingStreamsResponse { }
resp . Items = [ ] * pb . ObjectListItem { }
2021-09-09 16:21:42 +01:00
err = endpoint . metabase . IteratePendingObjectsByKey ( ctx ,
2021-01-11 12:06:04 +00:00
metabase . IteratePendingObjectsByKey {
ObjectLocation : metabase . ObjectLocation {
ProjectID : keyInfo . ProjectID ,
BucketName : string ( req . Bucket ) ,
ObjectKey : metabase . ObjectKey ( req . EncryptedPath ) ,
} ,
BatchSize : limit + 1 ,
Cursor : cursor ,
} , func ( ctx context . Context , it metabase . ObjectsIterator ) error {
entry := metabase . ObjectEntry { }
for len ( resp . Items ) < limit && it . Next ( ctx , & entry ) {
2021-10-27 09:50:27 +01:00
item , err := endpoint . objectEntryToProtoListItem ( ctx , req . Bucket , entry , "" , true , placement )
2021-01-11 12:06:04 +00:00
if err != nil {
return err
}
resp . Items = append ( resp . Items , item )
}
resp . More = it . Next ( ctx , & entry )
return nil
} ,
)
if err != nil {
2021-10-22 09:42:52 +01:00
return nil , endpoint . convertMetabaseErr ( err )
2021-01-11 12:06:04 +00:00
}
endpoint . log . Info ( "List pending object streams" , zap . Stringer ( "Project ID" , keyInfo . ProjectID ) , zap . String ( "operation" , "list" ) , zap . String ( "type" , "object" ) )
mon . Meter ( "req_list_pending_object_streams" ) . Mark ( 1 )
return resp , nil
}
2019-12-10 11:15:35 +00:00
// BeginDeleteObject begins object deletion process.
2019-07-16 11:39:23 +01:00
func ( endpoint * Endpoint ) BeginDeleteObject ( ctx context . Context , req * pb . ObjectBeginDeleteRequest ) ( resp * pb . ObjectBeginDeleteResponse , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
2021-02-09 22:40:23 +00:00
err = endpoint . versionCollector . collect ( req . Header . UserAgent , mon . Func ( ) . ShortName ( ) )
if err != nil {
endpoint . log . Warn ( "unable to collect uplink version" , zap . Error ( err ) )
}
2020-03-11 15:53:16 +00:00
now := time . Now ( )
2021-08-31 17:15:43 +01:00
var canRead , canList bool
keyInfo , err := endpoint . validateAuthN ( ctx , req . Header ,
verifyPermission {
action : macaroon . Action {
Op : macaroon . ActionDelete ,
Bucket : req . Bucket ,
EncryptedPath : req . EncryptedPath ,
Time : now ,
} ,
} ,
verifyPermission {
action : macaroon . Action {
Op : macaroon . ActionRead ,
Bucket : req . Bucket ,
EncryptedPath : req . EncryptedPath ,
Time : now ,
} ,
actionPermitted : & canRead ,
optional : true ,
} ,
verifyPermission {
action : macaroon . Action {
Op : macaroon . ActionList ,
Bucket : req . Bucket ,
EncryptedPath : req . EncryptedPath ,
Time : now ,
} ,
actionPermitted : & canList ,
optional : true ,
} ,
)
2019-07-16 11:39:23 +01:00
if err != nil {
2020-03-10 09:58:14 +00:00
return nil , err
2019-07-16 11:39:23 +01:00
}
err = endpoint . validateBucket ( ctx , req . Bucket )
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . InvalidArgument , err . Error ( ) )
2019-07-16 11:39:23 +01:00
}
2020-12-03 18:04:01 +00:00
var deletedObjects [ ] * pb . Object
if req . GetStatus ( ) == int32 ( metabase . Pending ) {
if req . StreamId == nil {
return nil , rpcstatus . Error ( rpcstatus . InvalidArgument , "StreamID missing" )
}
var pbStreamID * internalpb . StreamID
pbStreamID , err = endpoint . unmarshalSatStreamID ( ctx , * ( req . StreamId ) )
if err == nil {
var streamID uuid . UUID
streamID , err = uuid . FromBytes ( pbStreamID . StreamId )
if err == nil {
2021-05-04 14:51:40 +01:00
deletedObjects , err = endpoint . DeletePendingObject ( ctx ,
metabase . ObjectStream {
ProjectID : keyInfo . ProjectID ,
BucketName : string ( req . Bucket ) ,
ObjectKey : metabase . ObjectKey ( req . EncryptedPath ) ,
Version : metabase . Version ( req . GetVersion ( ) ) ,
StreamID : streamID ,
} )
2020-12-03 18:04:01 +00:00
}
}
} else {
deletedObjects , err = endpoint . DeleteCommittedObject ( ctx , keyInfo . ProjectID , string ( req . Bucket ) , metabase . ObjectKey ( req . EncryptedPath ) )
}
2019-08-01 10:04:31 +01:00
if err != nil {
2020-03-11 15:53:16 +00:00
if ! canRead && ! canList {
// No error info is returned if neither Read, nor List permission is granted
2020-04-02 08:45:51 +01:00
return & pb . ObjectBeginDeleteResponse { } , nil
2020-03-11 15:53:16 +00:00
}
2021-10-22 09:42:52 +01:00
return nil , endpoint . convertMetabaseErr ( err )
2019-08-01 10:04:31 +01:00
}
2020-07-27 21:12:14 +01:00
var object * pb . Object
if canRead || canList {
// Info about deleted object is returned only if either Read, or List permission is granted
2020-08-11 14:00:57 +01:00
if err != nil {
endpoint . log . Error ( "failed to construct deleted object information" ,
zap . Stringer ( "Project ID" , keyInfo . ProjectID ) ,
zap . String ( "Bucket" , string ( req . Bucket ) ) ,
zap . String ( "Encrypted Path" , string ( req . EncryptedPath ) ) ,
zap . Error ( err ) ,
)
}
2020-07-27 21:12:14 +01:00
if len ( deletedObjects ) > 0 {
object = deletedObjects [ 0 ]
}
}
2020-01-20 18:48:26 +00:00
endpoint . log . Info ( "Object Delete" , zap . Stringer ( "Project ID" , keyInfo . ProjectID ) , zap . String ( "operation" , "delete" ) , zap . String ( "type" , "object" ) )
2020-01-29 15:03:30 +00:00
mon . Meter ( "req_delete_object" ) . Mark ( 1 )
2020-01-20 18:48:26 +00:00
2019-07-16 11:39:23 +01:00
return & pb . ObjectBeginDeleteResponse {
2020-07-17 10:17:31 +01:00
Object : object ,
2019-07-16 11:39:23 +01:00
} , nil
}
2020-08-11 18:35:23 +01:00
// GetObjectIPs returns the IP addresses of the nodes holding the pieces for
// the provided object. This is useful for knowing the locations of the pieces.
func ( endpoint * Endpoint ) GetObjectIPs ( ctx context . Context , req * pb . ObjectGetIPsRequest ) ( resp * pb . ObjectGetIPsResponse , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
2021-02-09 22:40:23 +00:00
err = endpoint . versionCollector . collect ( req . Header . UserAgent , mon . Func ( ) . ShortName ( ) )
if err != nil {
endpoint . log . Warn ( "unable to collect uplink version" , zap . Error ( err ) )
}
2020-08-13 17:43:21 +01:00
keyInfo , err := endpoint . validateAuth ( ctx , req . Header , macaroon . Action {
Op : macaroon . ActionRead ,
Bucket : req . Bucket ,
EncryptedPath : req . EncryptedPath ,
Time : time . Now ( ) ,
} )
if err != nil {
return nil , err
}
err = endpoint . validateBucket ( ctx , req . Bucket )
if err != nil {
return nil , rpcstatus . Error ( rpcstatus . InvalidArgument , err . Error ( ) )
}
2020-12-07 15:43:57 +00:00
// TODO we may need custom metabase request to avoid two DB calls
2022-01-28 10:57:51 +00:00
object , err := endpoint . metabase . GetObjectExactVersion ( ctx , metabase . GetObjectExactVersion {
2020-12-07 15:43:57 +00:00
ObjectLocation : metabase . ObjectLocation {
ProjectID : keyInfo . ProjectID ,
BucketName : string ( req . Bucket ) ,
ObjectKey : metabase . ObjectKey ( req . EncryptedPath ) ,
} ,
2022-01-28 10:57:51 +00:00
Version : metabase . DefaultVersion ,
2020-12-07 15:43:57 +00:00
} )
2020-08-13 17:43:21 +01:00
if err != nil {
2021-10-22 09:42:52 +01:00
return nil , endpoint . convertMetabaseErr ( err )
2020-08-13 17:43:21 +01:00
}
2021-09-09 16:21:42 +01:00
pieceCountByNodeID , err := endpoint . metabase . GetStreamPieceCountByNodeID ( ctx ,
2021-03-08 13:09:32 +00:00
metabase . GetStreamPieceCountByNodeID {
2020-12-07 15:43:57 +00:00
StreamID : object . StreamID ,
} )
2021-03-08 13:09:32 +00:00
if err != nil {
2021-10-22 09:42:52 +01:00
return nil , endpoint . convertMetabaseErr ( err )
2020-08-13 17:43:21 +01:00
}
2021-01-13 13:59:05 +00:00
nodeIDs := make ( [ ] storj . NodeID , 0 , len ( pieceCountByNodeID ) )
for nodeID := range pieceCountByNodeID {
nodeIDs = append ( nodeIDs , nodeID )
}
nodeIPMap , err := endpoint . overlay . GetNodeIPs ( ctx , nodeIDs )
2020-08-13 17:43:21 +01:00
if err != nil {
2021-07-23 16:16:49 +01:00
endpoint . log . Error ( "internal" , zap . Error ( err ) )
2020-08-13 17:43:21 +01:00
return nil , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
}
2021-01-13 13:59:05 +00:00
nodeIPs := make ( [ ] [ ] byte , 0 , len ( nodeIPMap ) )
pieceCount := int64 ( 0 )
reliablePieceCount := int64 ( 0 )
for nodeID , count := range pieceCountByNodeID {
pieceCount += count
ip , reliable := nodeIPMap [ nodeID ]
if ! reliable {
continue
2020-08-13 17:43:21 +01:00
}
2021-01-13 13:59:05 +00:00
nodeIPs = append ( nodeIPs , [ ] byte ( ip ) )
reliablePieceCount += count
2020-08-13 17:43:21 +01:00
}
2021-01-13 13:59:05 +00:00
return & pb . ObjectGetIPsResponse {
Ips : nodeIPs ,
2021-01-14 12:33:00 +00:00
SegmentCount : int64 ( object . SegmentCount ) ,
2021-01-13 13:59:05 +00:00
ReliablePieceCount : reliablePieceCount ,
PieceCount : pieceCount ,
} , nil
2020-08-11 18:35:23 +01:00
}
2021-07-02 13:39:46 +01:00
// UpdateObjectMetadata replaces object metadata.
func ( endpoint * Endpoint ) UpdateObjectMetadata ( ctx context . Context , req * pb . ObjectUpdateMetadataRequest ) ( resp * pb . ObjectUpdateMetadataResponse , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
err = endpoint . versionCollector . collect ( req . Header . UserAgent , mon . Func ( ) . ShortName ( ) )
if err != nil {
endpoint . log . Warn ( "unable to collect uplink version" , zap . Error ( err ) )
}
keyInfo , err := endpoint . validateAuth ( ctx , req . Header , macaroon . Action {
Op : macaroon . ActionWrite ,
Bucket : req . Bucket ,
EncryptedPath : req . EncryptedObjectKey ,
Time : time . Now ( ) ,
} )
if err != nil {
return nil , err
}
err = endpoint . validateBucket ( ctx , req . Bucket )
if err != nil {
return nil , rpcstatus . Error ( rpcstatus . InvalidArgument , err . Error ( ) )
}
2021-07-08 15:50:37 +01:00
streamID , err := endpoint . unmarshalSatStreamID ( ctx , req . StreamId )
if err != nil {
return nil , rpcstatus . Error ( rpcstatus . InvalidArgument , err . Error ( ) )
}
id , err := uuid . FromBytes ( streamID . StreamId )
if err != nil {
2021-07-23 16:16:49 +01:00
endpoint . log . Error ( "internal" , zap . Error ( err ) )
2021-07-08 15:50:37 +01:00
return nil , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
}
2021-10-29 12:04:55 +01:00
var encryptedMetadataNonce [ ] byte
if ! req . EncryptedMetadataNonce . IsZero ( ) {
encryptedMetadataNonce = req . EncryptedMetadataNonce [ : ]
}
2021-09-09 16:21:42 +01:00
err = endpoint . metabase . UpdateObjectMetadata ( ctx , metabase . UpdateObjectMetadata {
2021-07-08 15:50:37 +01:00
ObjectStream : metabase . ObjectStream {
2021-07-02 13:39:46 +01:00
ProjectID : keyInfo . ProjectID ,
BucketName : string ( req . Bucket ) ,
ObjectKey : metabase . ObjectKey ( req . EncryptedObjectKey ) ,
2021-07-08 15:50:37 +01:00
Version : metabase . Version ( req . Version ) ,
StreamID : id ,
2021-07-02 13:39:46 +01:00
} ,
EncryptedMetadata : req . EncryptedMetadata ,
2021-10-29 12:04:55 +01:00
EncryptedMetadataNonce : encryptedMetadataNonce ,
2021-07-02 13:39:46 +01:00
EncryptedMetadataEncryptedKey : req . EncryptedMetadataEncryptedKey ,
} )
if err != nil {
2021-10-22 09:42:52 +01:00
return nil , endpoint . convertMetabaseErr ( err )
2021-07-02 13:39:46 +01:00
}
return & pb . ObjectUpdateMetadataResponse { } , nil
}
2022-01-24 15:17:12 +00:00
func ( endpoint * Endpoint ) objectToProto ( ctx context . Context , object metabase . Object , rs * pb . RedundancyScheme ) ( * pb . Object , error ) {
expires := time . Time { }
if object . ExpiresAt != nil {
expires = * object . ExpiresAt
2019-07-22 15:45:18 +01:00
}
2022-01-24 15:17:12 +00:00
// TotalPlainSize != 0 means object was uploaded with newer uplink
multipartObject := object . TotalPlainSize != 0 && object . FixedSegmentSize <= 0
streamID , err := endpoint . packStreamID ( ctx , & internalpb . StreamID {
Bucket : [ ] byte ( object . BucketName ) ,
EncryptedObjectKey : [ ] byte ( object . ObjectKey ) ,
Version : int32 ( object . Version ) , // TODO incomatible types
CreationDate : object . CreatedAt ,
ExpirationDate : expires ,
StreamId : object . StreamID [ : ] ,
MultipartObject : multipartObject ,
EncryptionParameters : & pb . EncryptionParameters {
CipherSuite : pb . CipherSuite ( object . Encryption . CipherSuite ) ,
BlockSize : int64 ( object . Encryption . BlockSize ) ,
} ,
// TODO: this is the only one place where placement is not added to the StreamID
// bucket info would be required to add placement here
2019-07-22 15:45:18 +01:00
} )
if err != nil {
2020-03-10 09:58:14 +00:00
return nil , err
2019-07-22 15:45:18 +01:00
}
2022-01-24 15:17:12 +00:00
var nonce storj . Nonce
if len ( object . EncryptedMetadataNonce ) > 0 {
nonce , err = storj . NonceFromBytes ( object . EncryptedMetadataNonce )
if err != nil {
return nil , err
}
2019-07-24 12:33:23 +01:00
}
2022-01-24 15:17:12 +00:00
streamMeta := & pb . StreamMeta { }
err = pb . Unmarshal ( object . EncryptedMetadata , streamMeta )
2019-07-24 12:33:23 +01:00
if err != nil {
2022-01-24 15:17:12 +00:00
return nil , err
2019-07-24 12:33:23 +01:00
}
2022-01-24 15:17:12 +00:00
// TODO is this enough to handle old uplinks
if streamMeta . EncryptionBlockSize == 0 {
streamMeta . EncryptionBlockSize = object . Encryption . BlockSize
2019-07-24 12:33:23 +01:00
}
2022-01-24 15:17:12 +00:00
if streamMeta . EncryptionType == 0 {
streamMeta . EncryptionType = int32 ( object . Encryption . CipherSuite )
2019-07-24 12:33:23 +01:00
}
2022-01-24 15:17:12 +00:00
if streamMeta . NumberOfSegments == 0 {
streamMeta . NumberOfSegments = int64 ( object . SegmentCount )
2019-07-24 12:33:23 +01:00
}
2022-01-24 15:17:12 +00:00
if streamMeta . LastSegmentMeta == nil {
streamMeta . LastSegmentMeta = & pb . SegmentMeta {
EncryptedKey : object . EncryptedMetadataEncryptedKey ,
KeyNonce : object . EncryptedMetadataNonce ,
}
2020-11-06 11:54:52 +00:00
}
2022-01-24 15:17:12 +00:00
metadataBytes , err := pb . Marshal ( streamMeta )
2020-11-06 11:54:52 +00:00
if err != nil {
2022-01-24 15:17:12 +00:00
return nil , err
2020-11-06 11:54:52 +00:00
}
2022-01-24 15:17:12 +00:00
result := & pb . Object {
Bucket : [ ] byte ( object . BucketName ) ,
EncryptedPath : [ ] byte ( object . ObjectKey ) ,
Version : int32 ( object . Version ) , // TODO incomatible types
StreamId : streamID ,
ExpiresAt : expires ,
CreatedAt : object . CreatedAt ,
2019-07-22 15:45:18 +01:00
2022-01-24 15:17:12 +00:00
TotalSize : object . TotalEncryptedSize ,
PlainSize : object . TotalPlainSize ,
2019-07-22 15:45:18 +01:00
2022-01-24 15:17:12 +00:00
EncryptedMetadata : metadataBytes ,
EncryptedMetadataNonce : nonce ,
EncryptedMetadataEncryptedKey : object . EncryptedMetadataEncryptedKey ,
EncryptionParameters : & pb . EncryptionParameters {
CipherSuite : pb . CipherSuite ( object . Encryption . CipherSuite ) ,
BlockSize : int64 ( object . Encryption . BlockSize ) ,
} ,
2021-02-09 22:40:23 +00:00
2022-01-24 15:17:12 +00:00
RedundancyScheme : rs ,
2019-07-22 15:45:18 +01:00
}
2022-01-24 15:17:12 +00:00
return result , nil
}
2019-07-22 15:45:18 +01:00
2022-01-24 15:17:12 +00:00
func ( endpoint * Endpoint ) objectEntryToProtoListItem ( ctx context . Context , bucket [ ] byte ,
entry metabase . ObjectEntry , prefixToPrependInSatStreamID metabase . ObjectKey ,
includeMetadata bool , placement storj . PlacementConstraint ) ( item * pb . ObjectListItem , err error ) {
2019-07-22 15:45:18 +01:00
2022-01-24 15:17:12 +00:00
expires := time . Time { }
if entry . ExpiresAt != nil {
expires = * entry . ExpiresAt
2019-10-17 19:01:40 +01:00
}
2022-01-24 15:17:12 +00:00
item = & pb . ObjectListItem {
EncryptedPath : [ ] byte ( entry . ObjectKey ) ,
Version : int32 ( entry . Version ) , // TODO incompatible types
Status : pb . Object_Status ( entry . Status ) ,
ExpiresAt : expires ,
CreatedAt : entry . CreatedAt ,
PlainSize : entry . TotalPlainSize ,
2020-11-06 11:54:52 +00:00
}
2022-01-24 15:17:12 +00:00
if includeMetadata {
var nonce storj . Nonce
if len ( entry . EncryptedMetadataNonce ) > 0 {
nonce , err = storj . NonceFromBytes ( entry . EncryptedMetadataNonce )
if err != nil {
return nil , err
}
}
2021-04-07 16:51:00 +01:00
2022-01-24 15:17:12 +00:00
streamMeta := & pb . StreamMeta { }
err = pb . Unmarshal ( entry . EncryptedMetadata , streamMeta )
if err != nil {
return nil , err
}
2021-04-07 16:51:00 +01:00
2022-01-24 15:17:12 +00:00
if entry . Encryption != ( storj . EncryptionParameters { } ) {
streamMeta . EncryptionType = int32 ( entry . Encryption . CipherSuite )
streamMeta . EncryptionBlockSize = entry . Encryption . BlockSize
}
2021-04-07 16:51:00 +01:00
2022-01-24 15:17:12 +00:00
if entry . SegmentCount != 0 {
streamMeta . NumberOfSegments = int64 ( entry . SegmentCount )
}
2021-04-07 16:51:00 +01:00
2022-01-24 15:17:12 +00:00
if entry . EncryptedMetadataEncryptedKey != nil {
streamMeta . LastSegmentMeta = & pb . SegmentMeta {
EncryptedKey : entry . EncryptedMetadataEncryptedKey ,
KeyNonce : entry . EncryptedMetadataNonce ,
2021-04-07 16:51:00 +01:00
}
}
2022-01-24 15:17:12 +00:00
metadataBytes , err := pb . Marshal ( streamMeta )
if err != nil {
return nil , err
}
2021-04-07 16:51:00 +01:00
2022-01-24 15:17:12 +00:00
item . EncryptedMetadata = metadataBytes
item . EncryptedMetadataNonce = nonce
item . EncryptedMetadataEncryptedKey = entry . EncryptedMetadataEncryptedKey
2020-11-06 11:54:52 +00:00
}
2020-01-27 20:25:52 +00:00
2022-01-24 15:17:12 +00:00
// Add Stream ID to list items if listing is for pending objects.
// The client requires the Stream ID to use in the MultipartInfo.
if entry . Status == metabase . Pending {
satStreamID , err := endpoint . packStreamID ( ctx , & internalpb . StreamID {
Bucket : bucket ,
EncryptedObjectKey : append ( [ ] byte ( prefixToPrependInSatStreamID ) , item . EncryptedPath ... ) ,
Version : item . Version ,
CreationDate : item . CreatedAt ,
ExpirationDate : item . ExpiresAt ,
StreamId : entry . StreamID [ : ] ,
MultipartObject : entry . FixedSegmentSize <= 0 ,
EncryptionParameters : & pb . EncryptionParameters {
CipherSuite : pb . CipherSuite ( entry . Encryption . CipherSuite ) ,
BlockSize : int64 ( entry . Encryption . BlockSize ) ,
} ,
Placement : int32 ( placement ) ,
} )
if err != nil {
return nil , err
}
item . StreamId = & satStreamID
2021-06-10 11:08:21 +01:00
}
2022-01-24 15:17:12 +00:00
return item , nil
}
2020-11-20 12:37:54 +00:00
2022-01-24 15:17:12 +00:00
// DeleteCommittedObject deletes all the pieces of the storage nodes that belongs
// to the specified object.
//
// NOTE: this method is exported for being able to individually test it without
// having import cycles.
func ( endpoint * Endpoint ) DeleteCommittedObject (
ctx context . Context , projectID uuid . UUID , bucket string , object metabase . ObjectKey ,
) ( deletedObjects [ ] * pb . Object , err error ) {
defer mon . Task ( ) ( & ctx , projectID . String ( ) , bucket , object ) ( & err )
2021-03-25 07:56:13 +00:00
2022-01-24 15:17:12 +00:00
req := metabase . ObjectLocation {
ProjectID : projectID ,
BucketName : bucket ,
ObjectKey : object ,
2021-04-07 15:20:05 +01:00
}
2022-01-24 15:17:12 +00:00
result , err := endpoint . metabase . DeleteObjectsAllVersions ( ctx , metabase . DeleteObjectsAllVersions { Locations : [ ] metabase . ObjectLocation { req } } )
2021-04-07 15:20:05 +01:00
if err != nil {
2022-01-24 15:17:12 +00:00
return nil , Error . Wrap ( err )
2021-04-07 15:20:05 +01:00
}
2022-01-24 15:17:12 +00:00
deletedObjects , err = endpoint . deleteObjectsPieces ( ctx , result )
2020-11-06 11:54:52 +00:00
if err != nil {
2022-01-24 15:17:12 +00:00
endpoint . log . Error ( "failed to delete pointers" ,
zap . Stringer ( "project" , projectID ) ,
zap . String ( "bucket" , bucket ) ,
zap . Binary ( "object" , [ ] byte ( object ) ) ,
zap . Error ( err ) ,
)
return deletedObjects , Error . Wrap ( err )
2019-12-11 17:44:13 +00:00
}
2020-11-03 12:58:27 +00:00
return deletedObjects , nil
2020-08-06 02:23:45 +01:00
}
2021-01-11 10:08:18 +00:00
// DeleteObjectAnyStatus deletes all the pieces of the storage nodes that belongs
// to the specified object.
//
// NOTE: this method is exported for being able to individually test it without
// having import cycles.
func ( endpoint * Endpoint ) DeleteObjectAnyStatus ( ctx context . Context , location metabase . ObjectLocation ,
) ( deletedObjects [ ] * pb . Object , err error ) {
defer mon . Task ( ) ( & ctx , location . ProjectID . String ( ) , location . BucketName , location . ObjectKey ) ( & err )
2021-09-09 16:21:42 +01:00
result , err := endpoint . metabase . DeleteObjectAnyStatusAllVersions ( ctx , metabase . DeleteObjectAnyStatusAllVersions {
2021-01-11 10:08:18 +00:00
ObjectLocation : location ,
} )
if err != nil {
2021-10-22 09:42:52 +01:00
return nil , Error . Wrap ( err )
2021-01-11 10:08:18 +00:00
}
deletedObjects , err = endpoint . deleteObjectsPieces ( ctx , result )
if err != nil {
endpoint . log . Error ( "failed to delete pointers" ,
zap . Stringer ( "project" , location . ProjectID ) ,
zap . String ( "bucket" , location . BucketName ) ,
zap . Binary ( "object" , [ ] byte ( location . ObjectKey ) ) ,
zap . Error ( err ) ,
)
return deletedObjects , err
}
return deletedObjects , nil
}
2020-12-03 18:04:01 +00:00
// DeletePendingObject deletes all the pieces of the storage nodes that belongs
// to the specified pending object.
//
// NOTE: this method is exported for being able to individually test it without
// having import cycles.
2021-05-04 14:51:40 +01:00
func ( endpoint * Endpoint ) DeletePendingObject ( ctx context . Context , stream metabase . ObjectStream ) ( deletedObjects [ ] * pb . Object , err error ) {
2020-12-03 18:04:01 +00:00
req := metabase . DeletePendingObject {
2021-05-04 14:51:40 +01:00
ObjectStream : stream ,
2020-12-03 18:04:01 +00:00
}
2021-09-09 16:21:42 +01:00
result , err := endpoint . metabase . DeletePendingObject ( ctx , req )
2020-08-06 02:23:45 +01:00
if err != nil {
2020-11-03 12:58:27 +00:00
return nil , err
2020-08-06 02:23:45 +01:00
}
2020-12-03 18:04:01 +00:00
return endpoint . deleteObjectsPieces ( ctx , result )
}
func ( endpoint * Endpoint ) deleteObjectsPieces ( ctx context . Context , result metabase . DeleteObjectResult ) ( deletedObjects [ ] * pb . Object , err error ) {
2021-10-21 07:47:45 +01:00
defer mon . Task ( ) ( & ctx ) ( & err )
2020-12-03 18:04:01 +00:00
// We should ignore client cancelling and always try to delete segments.
ctx = context2 . WithoutCancellation ( ctx )
2020-11-03 12:58:27 +00:00
deletedObjects = make ( [ ] * pb . Object , len ( result . Objects ) )
for i , object := range result . Objects {
2021-02-16 15:36:09 +00:00
deletedObject , err := endpoint . objectToProto ( ctx , object , endpoint . defaultRS )
2020-12-02 11:34:41 +00:00
if err != nil {
return nil , err
}
deletedObjects [ i ] = deletedObject
2020-11-03 12:58:27 +00:00
}
2019-12-16 19:03:20 +00:00
2020-12-09 12:24:37 +00:00
endpoint . deleteSegmentPieces ( ctx , result . Segments )
return deletedObjects , nil
}
func ( endpoint * Endpoint ) deleteSegmentPieces ( ctx context . Context , segments [ ] metabase . DeletedSegmentInfo ) {
2021-10-21 07:47:45 +01:00
var err error
defer mon . Task ( ) ( & ctx ) ( & err )
2020-12-09 12:24:37 +00:00
nodesPieces := groupPiecesByNodeID ( segments )
2019-12-16 19:03:20 +00:00
2020-11-03 12:58:27 +00:00
var requests [ ] piecedeletion . Request
for node , pieces := range nodesPieces {
requests = append ( requests , piecedeletion . Request {
Node : storj . NodeURL {
ID : node ,
} ,
Pieces : pieces ,
} )
2020-08-06 02:23:45 +01:00
}
2019-12-16 19:03:20 +00:00
2020-11-03 12:58:27 +00:00
// Only return an error if we failed to delete the objects. If we failed
// to delete pieces, let garbage collector take care of it.
2021-10-21 07:47:45 +01:00
err = endpoint . deletePieces . Delete ( ctx , requests , deleteObjectPiecesSuccessThreshold )
if err != nil {
2020-08-06 02:23:45 +01:00
endpoint . log . Error ( "failed to delete pieces" , zap . Error ( err ) )
2019-12-16 19:03:20 +00:00
}
2020-11-03 12:58:27 +00:00
}
// groupPiecesByNodeID returns a map that contains pieces with node id as the key.
func groupPiecesByNodeID ( segments [ ] metabase . DeletedSegmentInfo ) map [ storj . NodeID ] [ ] storj . PieceID {
piecesToDelete := map [ storj . NodeID ] [ ] storj . PieceID { }
for _ , segment := range segments {
for _ , piece := range segment . Pieces {
pieceID := segment . RootPieceID . Derive ( piece . StorageNode , int32 ( piece . Number ) )
piecesToDelete [ piece . StorageNode ] = append ( piecesToDelete [ piece . StorageNode ] , pieceID )
}
}
return piecesToDelete
2020-01-17 18:47:37 +00:00
}
2020-01-28 13:44:47 +00:00
2021-08-16 13:04:33 +01:00
// Server side move.
// BeginMoveObject begins moving object to different key.
func ( endpoint * Endpoint ) BeginMoveObject ( ctx context . Context , req * pb . ObjectBeginMoveRequest ) ( resp * pb . ObjectBeginMoveResponse , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
err = endpoint . versionCollector . collect ( req . Header . UserAgent , mon . Func ( ) . ShortName ( ) )
if err != nil {
endpoint . log . Warn ( "unable to collect uplink version" , zap . Error ( err ) )
}
now := time . Now ( )
keyInfo , err := endpoint . validateAuthN ( ctx , req . Header ,
verifyPermission {
action : macaroon . Action {
Op : macaroon . ActionRead ,
Bucket : req . Bucket ,
EncryptedPath : req . EncryptedObjectKey ,
Time : now ,
} ,
} ,
verifyPermission {
action : macaroon . Action {
Op : macaroon . ActionDelete ,
Bucket : req . Bucket ,
EncryptedPath : req . EncryptedObjectKey ,
Time : now ,
} ,
} ,
verifyPermission {
action : macaroon . Action {
Op : macaroon . ActionWrite ,
Bucket : req . NewBucket ,
EncryptedPath : req . NewEncryptedObjectKey ,
Time : now ,
} ,
} ,
)
if err != nil {
return nil , err
}
for _ , bucket := range [ ] [ ] byte { req . Bucket , req . NewBucket } {
err = endpoint . validateBucket ( ctx , bucket )
if err != nil {
return nil , rpcstatus . Error ( rpcstatus . InvalidArgument , err . Error ( ) )
}
}
// we are verifying existence of target bucket only because source bucket
// will be checked while quering source object
// TODO this needs to be optimized to avoid DB call on each request
2021-10-27 09:50:27 +01:00
newBucketPlacement , err := endpoint . buckets . GetBucketPlacement ( ctx , req . NewBucket , keyInfo . ProjectID )
2021-08-16 13:04:33 +01:00
if err != nil {
2021-10-27 09:50:27 +01:00
if storj . ErrBucketNotFound . Has ( err ) {
2021-12-07 11:11:03 +00:00
return nil , rpcstatus . Errorf ( rpcstatus . NotFound , "bucket not found: %s" , req . NewBucket )
2021-10-27 09:50:27 +01:00
}
2021-08-16 13:04:33 +01:00
endpoint . log . Error ( "unable to check bucket" , zap . Error ( err ) )
return nil , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
}
2021-10-27 09:50:27 +01:00
// if source and target buckets are different, we need to check their geofencing configs
if ! bytes . Equal ( req . Bucket , req . NewBucket ) {
oldBucketPlacement , err := endpoint . buckets . GetBucketPlacement ( ctx , req . Bucket , keyInfo . ProjectID )
if err != nil {
if storj . ErrBucketNotFound . Has ( err ) {
2021-12-07 11:11:03 +00:00
return nil , rpcstatus . Errorf ( rpcstatus . NotFound , "bucket not found: %s" , req . Bucket )
2021-10-27 09:50:27 +01:00
}
endpoint . log . Error ( "unable to check bucket" , zap . Error ( err ) )
return nil , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
}
if oldBucketPlacement != newBucketPlacement {
return nil , rpcstatus . Error ( rpcstatus . InvalidArgument , "moving object to bucket with different placement policy is not (yet) supported" )
}
}
2021-08-16 13:04:33 +01:00
result , err := endpoint . metabase . BeginMoveObject ( ctx , metabase . BeginMoveObject {
ObjectLocation : metabase . ObjectLocation {
ProjectID : keyInfo . ProjectID ,
BucketName : string ( req . Bucket ) ,
ObjectKey : metabase . ObjectKey ( req . EncryptedObjectKey ) ,
} ,
Version : metabase . DefaultVersion ,
} )
if err != nil {
2021-10-22 09:42:52 +01:00
return nil , endpoint . convertMetabaseErr ( err )
2021-08-16 13:04:33 +01:00
}
response , err := convertBeginMoveObjectResults ( result )
if err != nil {
endpoint . log . Error ( "internal" , zap . Error ( err ) )
return nil , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
}
satStreamID , err := endpoint . packStreamID ( ctx , & internalpb . StreamID {
2021-08-25 20:00:55 +01:00
Bucket : req . Bucket ,
EncryptedObjectKey : req . EncryptedObjectKey ,
Version : int32 ( metabase . DefaultVersion ) ,
StreamId : result . StreamID [ : ] ,
2021-08-16 13:04:33 +01:00
EncryptionParameters : & pb . EncryptionParameters {
CipherSuite : pb . CipherSuite ( result . EncryptionParameters . CipherSuite ) ,
BlockSize : int64 ( result . EncryptionParameters . BlockSize ) ,
} ,
2021-10-27 09:50:27 +01:00
Placement : int32 ( newBucketPlacement ) ,
2021-08-16 13:04:33 +01:00
} )
if err != nil {
endpoint . log . Error ( "internal" , zap . Error ( err ) )
return nil , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
}
response . StreamId = satStreamID
return response , nil
}
func convertBeginMoveObjectResults ( result metabase . BeginMoveObjectResult ) ( * pb . ObjectBeginMoveResponse , error ) {
keys := make ( [ ] * pb . EncryptedKeyAndNonce , len ( result . EncryptedKeysNonces ) )
for i , key := range result . EncryptedKeysNonces {
2021-10-29 12:04:55 +01:00
var nonce storj . Nonce
var err error
if len ( key . EncryptedKeyNonce ) != 0 {
nonce , err = storj . NonceFromBytes ( key . EncryptedKeyNonce )
if err != nil {
return nil , err
}
2021-10-12 14:37:12 +01:00
}
2021-08-16 13:04:33 +01:00
keys [ i ] = & pb . EncryptedKeyAndNonce {
2021-09-20 09:06:36 +01:00
Position : & pb . SegmentPosition {
PartNumber : int32 ( key . Position . Part ) ,
Index : int32 ( key . Position . Index ) ,
} ,
2021-08-16 13:04:33 +01:00
EncryptedKey : key . EncryptedKey ,
2021-10-12 14:37:12 +01:00
EncryptedKeyNonce : nonce ,
2021-08-16 13:04:33 +01:00
}
}
// TODO we need this becase of an uplink issue with how we are storing key and nonce
if result . EncryptedMetadataKey == nil {
streamMeta := & pb . StreamMeta { }
err := pb . Unmarshal ( result . EncryptedMetadata , streamMeta )
if err != nil {
return nil , err
}
if streamMeta . LastSegmentMeta != nil {
result . EncryptedMetadataKey = streamMeta . LastSegmentMeta . EncryptedKey
2021-10-12 14:37:12 +01:00
result . EncryptedMetadataKeyNonce = streamMeta . LastSegmentMeta . KeyNonce
2021-08-16 13:04:33 +01:00
}
}
2021-10-29 12:04:55 +01:00
var metadataNonce storj . Nonce
var err error
if len ( result . EncryptedMetadataKeyNonce ) != 0 {
metadataNonce , err = storj . NonceFromBytes ( result . EncryptedMetadataKeyNonce )
if err != nil {
return nil , err
}
2021-10-12 14:37:12 +01:00
}
2021-10-29 12:04:55 +01:00
2021-08-16 13:04:33 +01:00
return & pb . ObjectBeginMoveResponse {
EncryptedMetadataKey : result . EncryptedMetadataKey ,
2021-10-12 14:37:12 +01:00
EncryptedMetadataKeyNonce : metadataNonce ,
2021-08-16 13:04:33 +01:00
EncryptionParameters : & pb . EncryptionParameters {
CipherSuite : pb . CipherSuite ( result . EncryptionParameters . CipherSuite ) ,
BlockSize : int64 ( result . EncryptionParameters . BlockSize ) ,
} ,
SegmentKeys : keys ,
} , nil
}
2021-08-31 12:44:18 +01:00
// FinishMoveObject accepts new encryption keys for moved object and updates the corresponding object ObjectKey and segments EncryptedKey.
func ( endpoint * Endpoint ) FinishMoveObject ( ctx context . Context , req * pb . ObjectFinishMoveRequest ) ( resp * pb . ObjectFinishMoveResponse , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
err = endpoint . versionCollector . collect ( req . Header . UserAgent , mon . Func ( ) . ShortName ( ) )
if err != nil {
endpoint . log . Warn ( "unable to collect uplink version" , zap . Error ( err ) )
}
streamID , err := endpoint . unmarshalSatStreamID ( ctx , req . StreamId )
if err != nil {
return nil , rpcstatus . Error ( rpcstatus . InvalidArgument , err . Error ( ) )
}
keyInfo , err := endpoint . validateAuth ( ctx , req . Header , macaroon . Action {
Op : macaroon . ActionWrite ,
Time : time . Now ( ) ,
Bucket : req . NewBucket ,
EncryptedPath : req . NewEncryptedObjectKey ,
} )
if err != nil {
return nil , rpcstatus . Error ( rpcstatus . Unauthenticated , err . Error ( ) )
}
2021-09-22 08:50:24 +01:00
err = endpoint . validateBucket ( ctx , req . NewBucket )
if err != nil {
return nil , rpcstatus . Error ( rpcstatus . InvalidArgument , err . Error ( ) )
}
exists , err := endpoint . buckets . HasBucket ( ctx , req . NewBucket , keyInfo . ProjectID )
if err != nil {
endpoint . log . Error ( "unable to check bucket" , zap . Error ( err ) )
return nil , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
} else if ! exists {
2021-11-24 17:47:36 +00:00
return nil , rpcstatus . Errorf ( rpcstatus . NotFound , "target bucket not found: %s" , req . NewBucket )
2021-09-22 08:50:24 +01:00
}
2021-08-31 12:44:18 +01:00
streamUUID , err := uuid . FromBytes ( streamID . StreamId )
if err != nil {
return nil , rpcstatus . Error ( rpcstatus . InvalidArgument , err . Error ( ) )
}
err = endpoint . metabase . FinishMoveObject ( ctx , metabase . FinishMoveObject {
ObjectStream : metabase . ObjectStream {
ProjectID : keyInfo . ProjectID ,
BucketName : string ( streamID . Bucket ) ,
2021-08-25 20:00:55 +01:00
ObjectKey : metabase . ObjectKey ( streamID . EncryptedObjectKey ) ,
2021-08-31 12:44:18 +01:00
Version : metabase . DefaultVersion ,
StreamID : streamUUID ,
} ,
NewSegmentKeys : protobufkeysToMetabase ( req . NewSegmentKeys ) ,
2021-09-27 09:41:13 +01:00
NewBucket : string ( req . NewBucket ) ,
2021-08-31 12:44:18 +01:00
NewEncryptedObjectKey : req . NewEncryptedObjectKey ,
2021-10-12 14:37:12 +01:00
NewEncryptedMetadataKeyNonce : req . NewEncryptedMetadataKeyNonce [ : ] ,
2021-08-31 12:44:18 +01:00
NewEncryptedMetadataKey : req . NewEncryptedMetadataKey ,
} )
if err != nil {
2021-10-22 09:42:52 +01:00
return nil , endpoint . convertMetabaseErr ( err )
2021-08-31 12:44:18 +01:00
}
return & pb . ObjectFinishMoveResponse { } , nil
}
// protobufkeysToMetabase converts []*pb.EncryptedKeyAndNonce to []metabase.EncryptedKeyAndNonce.
func protobufkeysToMetabase ( protoKeys [ ] * pb . EncryptedKeyAndNonce ) [ ] metabase . EncryptedKeyAndNonce {
keys := make ( [ ] metabase . EncryptedKeyAndNonce , len ( protoKeys ) )
for i , key := range protoKeys {
position := metabase . SegmentPosition { }
if key . Position != nil {
position = metabase . SegmentPosition {
Part : uint32 ( key . Position . PartNumber ) ,
Index : uint32 ( key . Position . Index ) ,
}
}
keys [ i ] = metabase . EncryptedKeyAndNonce {
2021-10-12 14:37:12 +01:00
EncryptedKeyNonce : key . EncryptedKeyNonce . Bytes ( ) ,
2021-08-31 12:44:18 +01:00
EncryptedKey : key . EncryptedKey ,
Position : position ,
}
}
return keys
}