storj/satellite/metainfo/metainfo.go

2127 lines
69 KiB
Go
Raw Normal View History

// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package metainfo
import (
"context"
"crypto/sha256"
"errors"
"fmt"
"time"
"github.com/spacemonkeygo/monkit/v3"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/common/context2"
"storj.io/common/encryption"
"storj.io/common/macaroon"
"storj.io/common/memory"
"storj.io/common/pb"
"storj.io/common/rpc/rpcstatus"
"storj.io/common/signing"
"storj.io/common/storj"
"storj.io/common/uuid"
lrucache "storj.io/storj/pkg/cache"
"storj.io/storj/satellite/accounting"
"storj.io/storj/satellite/attribution"
"storj.io/storj/satellite/console"
"storj.io/storj/satellite/metainfo/metabase"
"storj.io/storj/satellite/metainfo/objectdeletion"
"storj.io/storj/satellite/metainfo/piecedeletion"
"storj.io/storj/satellite/metainfo/pointerverification"
"storj.io/storj/satellite/orders"
"storj.io/storj/satellite/overlay"
"storj.io/storj/satellite/revocation"
"storj.io/storj/satellite/rewards"
"storj.io/uplink/private/eestream"
"storj.io/uplink/private/storage/meta"
)
2019-07-16 11:39:23 +01:00
const (
satIDExpiration = 48 * time.Hour
lastSegment = -1
firstSegment = 0
listLimit = 1000
deleteObjectPiecesSuccessThreshold = 0.75
2019-07-16 11:39:23 +01:00
)
var (
mon = monkit.Package()
// Error general metainfo error.
Error = errs.Class("metainfo error")
// ErrNodeAlreadyExists pointer already has a piece for a node err.
ErrNodeAlreadyExists = errs.Class("metainfo error: node already exists")
)
// APIKeys is api keys store methods used by endpoint.
2019-09-10 14:24:16 +01:00
//
// architecture: Database
type APIKeys interface {
GetByHead(ctx context.Context, head []byte) (*console.APIKeyInfo, error)
}
// Endpoint metainfo endpoint.
2019-09-10 14:24:16 +01:00
//
// architecture: Endpoint
type Endpoint struct {
log *zap.Logger
metainfo *Service
deletePieces *piecedeletion.Service
deleteObjects *objectdeletion.Service
orders *orders.Service
overlay *overlay.Service
attributions attribution.DB
partners *rewards.PartnersService
pointerVerification *pointerverification.Service
projectUsage *accounting.Service
projects console.Projects
apiKeys APIKeys
createRequests *createRequests
satellite signing.Signer
limiterCache *lrucache.ExpiringLRU
encInlineSegmentSize int64 // max inline segment size + encryption overhead
revocations revocation.DB
config Config
}
// NewEndpoint creates new metainfo endpoint instance.
func NewEndpoint(log *zap.Logger, metainfo *Service, deletePieces *piecedeletion.Service,
orders *orders.Service, cache *overlay.Service, attributions attribution.DB,
partners *rewards.PartnersService, peerIdentities overlay.PeerIdentities,
apiKeys APIKeys, projectUsage *accounting.Service, projects console.Projects,
satellite signing.Signer, revocations revocation.DB, config Config) (*Endpoint, error) {
// TODO do something with too many params
encInlineSegmentSize, err := encryption.CalcEncryptedSize(config.MaxInlineSegmentSize.Int64(), storj.EncryptionParameters{
CipherSuite: storj.EncAESGCM,
BlockSize: 128, // intentionally low block size to allow maximum possible encryption overhead
})
if err != nil {
return nil, err
}
objectDeletion, err := objectdeletion.NewService(log, metainfo, config.ObjectDeletion)
if err != nil {
return nil, err
}
return &Endpoint{
log: log,
metainfo: metainfo,
deletePieces: deletePieces,
deleteObjects: objectDeletion,
orders: orders,
overlay: cache,
attributions: attributions,
partners: partners,
pointerVerification: pointerverification.NewService(peerIdentities),
apiKeys: apiKeys,
projectUsage: projectUsage,
projects: projects,
createRequests: newCreateRequests(),
satellite: satellite,
limiterCache: lrucache.New(lrucache.Options{
Capacity: config.RateLimiter.CacheCapacity,
Expiration: config.RateLimiter.CacheExpiration,
}),
encInlineSegmentSize: encInlineSegmentSize,
revocations: revocations,
config: config,
}, nil
}
// Close closes resources.
func (endpoint *Endpoint) Close() error { return nil }
satellite/accounting: refactor live accounting to hold current estimated totals live accounting used to be a cache to store writes before they are picked up during the tally iteration, after which the cache is cleared. This created a window in which users could potentially exceed the storage limit. This PR refactors live accounting to hold current estimations of space used per project. This should also reduce DB load since we no longer need to query the satellite DB when checking space used for limiting. The mechanism by which the new live accounting system works is as follows: During the upload of any segment, the size of that segment is added to its respective project total in live accounting. At the beginning of the tally iteration we record the current values in live accounting as `initialLiveTotals`. At the end of the tally iteration we again record the current totals in live accounting as `latestLiveTotals`. The metainfo loop observer in tally allows us to get the project totals from what it observed in metainfo DB which are stored in `tallyProjectTotals`. However, for any particular segment uploaded during the metainfo loop, the observer may or may not have seen it. Thus, we take half of the difference between `latestLiveTotals` and `initialLiveTotals`, and add that to the total that was found during tally and set that as the new live accounting total. Initially, live accounting was storing the total stored amount across all nodes rather than the segment size, which is inconsistent with how we record amounts stored in the project accounting DB, so we have refactored live accounting to record segment size Change-Id: Ie48bfdef453428fcdc180b2d781a69d58fd927fb
2019-10-31 17:27:38 +00:00
func calculateSpaceUsed(ptr *pb.Pointer) (segmentSize, totalStored int64) {
inline := ptr.GetInlineSegment()
if inline != nil {
satellite/accounting: refactor live accounting to hold current estimated totals live accounting used to be a cache to store writes before they are picked up during the tally iteration, after which the cache is cleared. This created a window in which users could potentially exceed the storage limit. This PR refactors live accounting to hold current estimations of space used per project. This should also reduce DB load since we no longer need to query the satellite DB when checking space used for limiting. The mechanism by which the new live accounting system works is as follows: During the upload of any segment, the size of that segment is added to its respective project total in live accounting. At the beginning of the tally iteration we record the current values in live accounting as `initialLiveTotals`. At the end of the tally iteration we again record the current totals in live accounting as `latestLiveTotals`. The metainfo loop observer in tally allows us to get the project totals from what it observed in metainfo DB which are stored in `tallyProjectTotals`. However, for any particular segment uploaded during the metainfo loop, the observer may or may not have seen it. Thus, we take half of the difference between `latestLiveTotals` and `initialLiveTotals`, and add that to the total that was found during tally and set that as the new live accounting total. Initially, live accounting was storing the total stored amount across all nodes rather than the segment size, which is inconsistent with how we record amounts stored in the project accounting DB, so we have refactored live accounting to record segment size Change-Id: Ie48bfdef453428fcdc180b2d781a69d58fd927fb
2019-10-31 17:27:38 +00:00
inlineSize := int64(len(inline))
return inlineSize, inlineSize
}
satellite/accounting: refactor live accounting to hold current estimated totals live accounting used to be a cache to store writes before they are picked up during the tally iteration, after which the cache is cleared. This created a window in which users could potentially exceed the storage limit. This PR refactors live accounting to hold current estimations of space used per project. This should also reduce DB load since we no longer need to query the satellite DB when checking space used for limiting. The mechanism by which the new live accounting system works is as follows: During the upload of any segment, the size of that segment is added to its respective project total in live accounting. At the beginning of the tally iteration we record the current values in live accounting as `initialLiveTotals`. At the end of the tally iteration we again record the current totals in live accounting as `latestLiveTotals`. The metainfo loop observer in tally allows us to get the project totals from what it observed in metainfo DB which are stored in `tallyProjectTotals`. However, for any particular segment uploaded during the metainfo loop, the observer may or may not have seen it. Thus, we take half of the difference between `latestLiveTotals` and `initialLiveTotals`, and add that to the total that was found during tally and set that as the new live accounting total. Initially, live accounting was storing the total stored amount across all nodes rather than the segment size, which is inconsistent with how we record amounts stored in the project accounting DB, so we have refactored live accounting to record segment size Change-Id: Ie48bfdef453428fcdc180b2d781a69d58fd927fb
2019-10-31 17:27:38 +00:00
segmentSize = ptr.GetSegmentSize()
remote := ptr.GetRemote()
if remote == nil {
return 0, 0
}
minReq := remote.GetRedundancy().GetMinReq()
pieceSize := segmentSize / int64(minReq)
pieces := remote.GetRemotePieces()
satellite/accounting: refactor live accounting to hold current estimated totals live accounting used to be a cache to store writes before they are picked up during the tally iteration, after which the cache is cleared. This created a window in which users could potentially exceed the storage limit. This PR refactors live accounting to hold current estimations of space used per project. This should also reduce DB load since we no longer need to query the satellite DB when checking space used for limiting. The mechanism by which the new live accounting system works is as follows: During the upload of any segment, the size of that segment is added to its respective project total in live accounting. At the beginning of the tally iteration we record the current values in live accounting as `initialLiveTotals`. At the end of the tally iteration we again record the current totals in live accounting as `latestLiveTotals`. The metainfo loop observer in tally allows us to get the project totals from what it observed in metainfo DB which are stored in `tallyProjectTotals`. However, for any particular segment uploaded during the metainfo loop, the observer may or may not have seen it. Thus, we take half of the difference between `latestLiveTotals` and `initialLiveTotals`, and add that to the total that was found during tally and set that as the new live accounting total. Initially, live accounting was storing the total stored amount across all nodes rather than the segment size, which is inconsistent with how we record amounts stored in the project accounting DB, so we have refactored live accounting to record segment size Change-Id: Ie48bfdef453428fcdc180b2d781a69d58fd927fb
2019-10-31 17:27:38 +00:00
return segmentSize, pieceSize * int64(len(pieces))
}
// filterValidPieces filter out the invalid remote pieces held by pointer.
//
// This method expect the pointer to be valid, so it has to be validated before
// calling it.
//
// The method always return a gRPC status error so the caller can directly
// return it to the client.
func (endpoint *Endpoint) filterValidPieces(ctx context.Context, pointer *pb.Pointer, originalLimits []*pb.OrderLimit) (err error) {
defer mon.Task()(&ctx)(&err)
if pointer.Type != pb.Pointer_REMOTE {
return nil
}
// verify that the piece sizes matches what we would expect.
err = endpoint.pointerVerification.VerifySizes(ctx, pointer)
if err != nil {
endpoint.log.Debug("piece sizes are invalid", zap.Error(err))
return rpcstatus.Errorf(rpcstatus.InvalidArgument, "piece sizes are invalid: %v", err)
}
validPieces, invalidPieces, err := endpoint.pointerVerification.SelectValidPieces(ctx, pointer, originalLimits)
if err != nil {
endpoint.log.Debug("pointer verification failed", zap.Error(err))
return rpcstatus.Errorf(rpcstatus.InvalidArgument, "pointer verification failed: %s", err)
}
remote := pointer.Remote
if int32(len(validPieces)) < remote.Redundancy.SuccessThreshold {
endpoint.log.Debug("Number of valid pieces is less than the success threshold",
zap.Int("totalReceivedPieces", len(remote.RemotePieces)),
zap.Int("validPieces", len(validPieces)),
zap.Int("invalidPieces", len(invalidPieces)),
zap.Int32("successThreshold", remote.Redundancy.SuccessThreshold),
)
errMsg := fmt.Sprintf("Number of valid pieces (%d) is less than the success threshold (%d). Found %d invalid pieces",
len(validPieces),
remote.Redundancy.SuccessThreshold,
len(remote.RemotePieces),
)
if len(invalidPieces) > 0 {
errMsg = fmt.Sprintf("%s. Invalid Pieces:", errMsg)
for _, p := range invalidPieces {
errMsg = fmt.Sprintf("%s\nNodeID: %v, PieceNum: %d, Reason: %s",
errMsg, p.NodeID, p.PieceNum, p.Reason,
)
}
}
return rpcstatus.Error(rpcstatus.InvalidArgument, errMsg)
}
remote.RemotePieces = validPieces
return nil
}
// ProjectInfo returns allowed ProjectInfo for the provided API key.
func (endpoint *Endpoint) ProjectInfo(ctx context.Context, req *pb.ProjectInfoRequest) (_ *pb.ProjectInfoResponse, err error) {
defer mon.Task()(&ctx)(&err)
keyInfo, err := endpoint.validateAuth(ctx, req.Header, macaroon.Action{
Op: macaroon.ActionProjectInfo,
Time: time.Now(),
})
if err != nil {
return nil, err
}
salt := sha256.Sum256(keyInfo.ProjectID[:])
return &pb.ProjectInfoResponse{
ProjectSalt: salt[:],
}, nil
}
// GetBucket returns a bucket.
func (endpoint *Endpoint) GetBucket(ctx context.Context, req *pb.BucketGetRequest) (resp *pb.BucketGetResponse, err error) {
defer mon.Task()(&ctx)(&err)
keyInfo, err := endpoint.validateAuth(ctx, req.Header, macaroon.Action{
Op: macaroon.ActionRead,
Bucket: req.Name,
Time: time.Now(),
})
if err != nil {
return nil, err
}
bucket, err := endpoint.metainfo.GetBucket(ctx, req.GetName(), keyInfo.ProjectID)
if err != nil {
if storj.ErrBucketNotFound.Has(err) {
return nil, rpcstatus.Error(rpcstatus.NotFound, err.Error())
}
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
// override RS to fit satellite settings
convBucket, err := convertBucketToProto(bucket, endpoint.redundancyScheme())
if err != nil {
return resp, err
}
return &pb.BucketGetResponse{
Bucket: convBucket,
}, nil
}
// CreateBucket creates a new bucket.
func (endpoint *Endpoint) CreateBucket(ctx context.Context, req *pb.BucketCreateRequest) (resp *pb.BucketCreateResponse, err error) {
defer mon.Task()(&ctx)(&err)
keyInfo, err := endpoint.validateAuth(ctx, req.Header, macaroon.Action{
Op: macaroon.ActionWrite,
Bucket: req.Name,
Time: time.Now(),
})
if err != nil {
return nil, err
}
err = endpoint.validateBucket(ctx, req.Name)
if err != nil {
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
}
// checks if bucket exists before updates it or makes a new entry
_, err = endpoint.metainfo.GetBucket(ctx, req.GetName(), keyInfo.ProjectID)
if err == nil {
// When the bucket exists, try to set the attribution.
if err := endpoint.ensureAttribution(ctx, req.Header, keyInfo, req.GetName()); err != nil {
return nil, err
}
return nil, rpcstatus.Error(rpcstatus.AlreadyExists, "bucket already exists")
}
if !storj.ErrBucketNotFound.Has(err) {
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
// check if project has exceeded its allocated bucket limit
maxBuckets, err := endpoint.projects.GetMaxBuckets(ctx, keyInfo.ProjectID)
if err != nil {
return nil, err
}
if maxBuckets == 0 {
maxBuckets = endpoint.config.ProjectLimits.MaxBuckets
}
bucketCount, err := endpoint.metainfo.CountBuckets(ctx, keyInfo.ProjectID)
if err != nil {
return nil, err
}
if bucketCount >= maxBuckets {
return nil, rpcstatus.Error(rpcstatus.ResourceExhausted, fmt.Sprintf("number of allocated buckets (%d) exceeded", endpoint.config.ProjectLimits.MaxBuckets))
}
bucketReq, err := convertProtoToBucket(req, keyInfo.ProjectID)
if err != nil {
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
}
bucket, err := endpoint.metainfo.CreateBucket(ctx, bucketReq)
if err != nil {
endpoint.log.Error("error while creating bucket", zap.String("bucketName", bucketReq.Name), zap.Error(err))
return nil, rpcstatus.Error(rpcstatus.Internal, "unable to create bucket")
}
// Once we have created the bucket, we can try setting the attribution.
if err := endpoint.ensureAttribution(ctx, req.Header, keyInfo, req.GetName()); err != nil {
return nil, err
}
// override RS to fit satellite settings
convBucket, err := convertBucketToProto(bucket, endpoint.redundancyScheme())
if err != nil {
endpoint.log.Error("error while converting bucket to proto", zap.String("bucketName", bucket.Name), zap.Error(err))
return nil, rpcstatus.Error(rpcstatus.Internal, "unable to create bucket")
}
return &pb.BucketCreateResponse{
Bucket: convBucket,
}, nil
}
// DeleteBucket deletes a bucket.
func (endpoint *Endpoint) DeleteBucket(ctx context.Context, req *pb.BucketDeleteRequest) (resp *pb.BucketDeleteResponse, err error) {
defer mon.Task()(&ctx)(&err)
now := time.Now()
keyInfo, err := endpoint.validateAuth(ctx, req.Header, macaroon.Action{
Op: macaroon.ActionDelete,
Bucket: req.Name,
Time: now,
})
if err != nil {
return nil, err
}
err = endpoint.validateBucket(ctx, req.Name)
if err != nil {
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
}
_, err = endpoint.validateAuth(ctx, req.Header, macaroon.Action{
Op: macaroon.ActionRead,
Bucket: req.Name,
Time: now,
})
canRead := err == nil
_, err = endpoint.validateAuth(ctx, req.Header, macaroon.Action{
Op: macaroon.ActionList,
Bucket: req.Name,
Time: now,
})
canList := err == nil
var (
bucket storj.Bucket
convBucket *pb.Bucket
)
if canRead || canList {
// Info about deleted bucket is returned only if either Read, or List permission is granted.
bucket, err = endpoint.metainfo.GetBucket(ctx, req.Name, keyInfo.ProjectID)
if err != nil {
if storj.ErrBucketNotFound.Has(err) {
return nil, rpcstatus.Error(rpcstatus.NotFound, err.Error())
}
return nil, err
}
convBucket, err = convertBucketToProto(bucket, endpoint.redundancyScheme())
if err != nil {
return nil, err
}
}
err = endpoint.metainfo.DeleteBucket(ctx, req.Name, keyInfo.ProjectID)
if err != nil {
if !canRead && !canList {
// No error info is returned if neither Read, nor List permission is granted.
return &pb.BucketDeleteResponse{}, nil
}
if ErrBucketNotEmpty.Has(err) {
// List permission is required to delete all objects in a bucket.
if !req.GetDeleteAll() || !canList {
return nil, rpcstatus.Error(rpcstatus.FailedPrecondition, err.Error())
}
_, deletedObjCount, err := endpoint.deleteBucketNotEmpty(ctx, keyInfo.ProjectID, req.Name)
if err != nil {
return nil, err
}
return &pb.BucketDeleteResponse{Bucket: convBucket, DeletedObjectsCount: int64(deletedObjCount)}, nil
}
if storj.ErrBucketNotFound.Has(err) {
return &pb.BucketDeleteResponse{Bucket: convBucket}, nil
}
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
return &pb.BucketDeleteResponse{Bucket: convBucket}, nil
}
// deleteBucketNotEmpty deletes all objects that're complete or have first segment.
// On success, it returns only the number of complete objects that has been deleted
// since from the user's perspective, objects without last segment are invisible.
func (endpoint *Endpoint) deleteBucketNotEmpty(ctx context.Context, projectID uuid.UUID, bucketName []byte) ([]byte, int, error) {
// Delete all objects that has last segment.
deletedCount, err := endpoint.deleteByPrefix(ctx, projectID, bucketName, lastSegment)
if err != nil {
return nil, 0, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
// Delete all zombie objects that have first segment.
_, err = endpoint.deleteByPrefix(ctx, projectID, bucketName, firstSegment)
if err != nil {
return nil, deletedCount, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
err = endpoint.metainfo.DeleteBucket(ctx, bucketName, projectID)
if err != nil {
if ErrBucketNotEmpty.Has(err) {
return nil, deletedCount, rpcstatus.Error(rpcstatus.FailedPrecondition, "cannot delete the bucket because it's being used by another process")
}
if storj.ErrBucketNotFound.Has(err) {
return bucketName, 0, nil
}
return nil, deletedCount, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
return bucketName, deletedCount, nil
}
// deleteByPrefix deletes all objects that matches with a prefix.
func (endpoint *Endpoint) deleteByPrefix(ctx context.Context, projectID uuid.UUID, bucketName []byte, segmentIdx int64) (deletedCount int, err error) {
defer mon.Task()(&ctx)(&err)
location, err := CreatePath(ctx, projectID, segmentIdx, bucketName, []byte{})
if err != nil {
return deletedCount, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
}
prefix := location.Encode()
for {
segments, more, err := endpoint.metainfo.List(ctx, prefix, "", true, 0, meta.None)
if err != nil {
return deletedCount, err
}
deleteReqs := make([]*objectdeletion.ObjectIdentifier, len(segments))
for i, segment := range segments {
deleteReqs[i] = &objectdeletion.ObjectIdentifier{
ProjectID: projectID,
Bucket: bucketName,
EncryptedPath: []byte(segment.Path),
}
}
rep, err := endpoint.deleteObjectsPieces(ctx, deleteReqs...)
if err != nil {
return deletedCount, err
}
deletedCount += len(rep.Deleted)
if !more {
break
}
}
return deletedCount, nil
}
// ListBuckets returns buckets in a project where the bucket name matches the request cursor.
func (endpoint *Endpoint) ListBuckets(ctx context.Context, req *pb.BucketListRequest) (resp *pb.BucketListResponse, err error) {
defer mon.Task()(&ctx)(&err)
action := macaroon.Action{
// TODO: This has to be ActionList, but it seems to be set to
// ActionRead as a hacky workaround to make bucket listing possible.
Op: macaroon.ActionRead,
Time: time.Now(),
}
keyInfo, err := endpoint.validateAuth(ctx, req.Header, action)
if err != nil {
return nil, err
}
allowedBuckets, err := getAllowedBuckets(ctx, req.Header, action)
if err != nil {
return nil, err
}
listOpts := storj.BucketListOptions{
Cursor: string(req.Cursor),
Limit: int(req.Limit),
Direction: storj.ListDirection(req.Direction),
}
bucketList, err := endpoint.metainfo.ListBuckets(ctx, keyInfo.ProjectID, listOpts, allowedBuckets)
if err != nil {
return nil, err
}
bucketItems := make([]*pb.BucketListItem, len(bucketList.Items))
for i, item := range bucketList.Items {
bucketItems[i] = &pb.BucketListItem{
Name: []byte(item.Name),
CreatedAt: item.Created,
}
}
return &pb.BucketListResponse{
Items: bucketItems,
More: bucketList.More,
}, nil
}
// CountBuckets returns the number of buckets a project currently has.
// TODO: add this to the uplink client side.
func (endpoint *Endpoint) CountBuckets(ctx context.Context, projectID uuid.UUID) (count int, err error) {
count, err = endpoint.metainfo.CountBuckets(ctx, projectID)
if err != nil {
return 0, err
}
return count, nil
}
func getAllowedBuckets(ctx context.Context, header *pb.RequestHeader, action macaroon.Action) (_ macaroon.AllowedBuckets, err error) {
key, err := getAPIKey(ctx, header)
if err != nil {
return macaroon.AllowedBuckets{}, rpcstatus.Errorf(rpcstatus.InvalidArgument, "Invalid API credentials: %v", err)
}
allowedBuckets, err := key.GetAllowedBuckets(ctx, action)
if err != nil {
return macaroon.AllowedBuckets{}, rpcstatus.Errorf(rpcstatus.Internal, "GetAllowedBuckets: %v", err)
}
return allowedBuckets, err
}
func convertProtoToBucket(req *pb.BucketCreateRequest, projectID uuid.UUID) (bucket storj.Bucket, err error) {
bucketID, err := uuid.New()
if err != nil {
return storj.Bucket{}, err
}
defaultRS := req.GetDefaultRedundancyScheme()
defaultEP := req.GetDefaultEncryptionParameters()
// TODO: resolve partner id
var partnerID uuid.UUID
err = partnerID.UnmarshalJSON(req.GetPartnerId())
// bucket's partnerID should never be set
// it is always read back from buckets DB
if err != nil && !partnerID.IsZero() {
return bucket, errs.New("Invalid uuid")
}
return storj.Bucket{
ID: bucketID,
Name: string(req.GetName()),
ProjectID: projectID,
PartnerID: partnerID,
PathCipher: storj.CipherSuite(req.GetPathCipher()),
DefaultSegmentsSize: req.GetDefaultSegmentSize(),
DefaultRedundancyScheme: storj.RedundancyScheme{
Algorithm: storj.RedundancyAlgorithm(defaultRS.GetType()),
ShareSize: defaultRS.GetErasureShareSize(),
RequiredShares: int16(defaultRS.GetMinReq()),
RepairShares: int16(defaultRS.GetRepairThreshold()),
OptimalShares: int16(defaultRS.GetSuccessThreshold()),
TotalShares: int16(defaultRS.GetTotal()),
},
DefaultEncryptionParameters: storj.EncryptionParameters{
CipherSuite: storj.CipherSuite(defaultEP.CipherSuite),
BlockSize: int32(defaultEP.BlockSize),
},
}, nil
}
func convertBucketToProto(bucket storj.Bucket, rs *pb.RedundancyScheme) (pbBucket *pb.Bucket, err error) {
if bucket == (storj.Bucket{}) {
return nil, nil
}
partnerID, err := bucket.PartnerID.MarshalJSON()
if err != nil {
return pbBucket, rpcstatus.Error(rpcstatus.Internal, "UUID marshal error")
}
pbBucket = &pb.Bucket{
Name: []byte(bucket.Name),
PathCipher: pb.CipherSuite(bucket.PathCipher),
PartnerId: partnerID,
CreatedAt: bucket.Created,
DefaultSegmentSize: bucket.DefaultSegmentsSize,
DefaultRedundancyScheme: rs,
DefaultEncryptionParameters: &pb.EncryptionParameters{
CipherSuite: pb.CipherSuite(bucket.DefaultEncryptionParameters.CipherSuite),
BlockSize: int64(bucket.DefaultEncryptionParameters.BlockSize),
},
}
// this part is to provide default ciphers (path and encryption) for old uplinks
// new uplinks are using ciphers from encryption access
if pbBucket.PathCipher == pb.CipherSuite_ENC_UNSPECIFIED {
pbBucket.PathCipher = pb.CipherSuite_ENC_AESGCM
}
if pbBucket.DefaultEncryptionParameters.CipherSuite == pb.CipherSuite_ENC_UNSPECIFIED {
pbBucket.DefaultEncryptionParameters.CipherSuite = pb.CipherSuite_ENC_AESGCM
pbBucket.DefaultEncryptionParameters.BlockSize = int64(rs.ErasureShareSize * rs.MinReq)
}
return pbBucket, nil
}
2019-07-16 11:39:23 +01:00
// BeginObject begins object.
2019-07-16 11:39:23 +01:00
func (endpoint *Endpoint) BeginObject(ctx context.Context, req *pb.ObjectBeginRequest) (resp *pb.ObjectBeginResponse, err error) {
defer mon.Task()(&ctx)(&err)
keyInfo, err := endpoint.validateAuth(ctx, req.Header, macaroon.Action{
2019-07-16 11:39:23 +01:00
Op: macaroon.ActionWrite,
Bucket: req.Bucket,
EncryptedPath: req.EncryptedPath,
Time: time.Now(),
})
if err != nil {
return nil, err
2019-07-16 11:39:23 +01:00
}
if !req.ExpiresAt.IsZero() && !req.ExpiresAt.After(time.Now()) {
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, "Invalid expiration time")
}
err = endpoint.validateBucket(ctx, req.Bucket)
if err != nil {
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
}
// TODO this needs to be optimized to avoid DB call on each request
_, err = endpoint.metainfo.GetBucket(ctx, req.Bucket, keyInfo.ProjectID)
if err != nil {
if storj.ErrBucketNotFound.Has(err) {
return nil, rpcstatus.Error(rpcstatus.NotFound, err.Error())
}
endpoint.log.Error("unable to check bucket", zap.Error(err))
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
if err := endpoint.ensureAttribution(ctx, req.Header, keyInfo, req.Bucket); err != nil {
return nil, err
}
// use only satellite values for Redundancy Scheme
pbRS := endpoint.redundancyScheme()
2019-07-16 11:39:23 +01:00
streamID, err := endpoint.packStreamID(ctx, &pb.SatStreamID{
Bucket: req.Bucket,
EncryptedPath: req.EncryptedPath,
Version: req.Version,
Redundancy: pbRS,
CreationDate: time.Now(),
ExpirationDate: req.ExpiresAt,
})
2019-07-16 11:39:23 +01:00
if err != nil {
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
2019-07-16 11:39:23 +01:00
}
_, err = endpoint.validateAuth(ctx, req.Header, macaroon.Action{
Op: macaroon.ActionDelete,
Bucket: req.Bucket,
EncryptedPath: req.EncryptedPath,
Time: time.Now(),
})
canDelete := err == nil
if canDelete {
_, err = endpoint.DeleteObjectPieces(ctx, keyInfo.ProjectID, req.Bucket, req.EncryptedPath)
if err != nil {
return nil, err
}
} else {
location, err := CreatePath(ctx, keyInfo.ProjectID, lastSegment, req.Bucket, req.EncryptedPath)
if err != nil {
endpoint.log.Error("unable to create path", zap.Error(err))
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
// TODO maybe we can have different Get without pointer unmarshaling
_, _, err = endpoint.metainfo.GetWithBytes(ctx, location.Encode())
if err == nil {
return nil, rpcstatus.Error(rpcstatus.PermissionDenied, "Unauthorized API credentials")
}
}
endpoint.log.Info("Object Upload", zap.Stringer("Project ID", keyInfo.ProjectID), zap.String("operation", "put"), zap.String("type", "object"))
mon.Meter("req_put_object").Mark(1)
2019-07-16 11:39:23 +01:00
return &pb.ObjectBeginResponse{
Bucket: req.Bucket,
EncryptedPath: req.EncryptedPath,
Version: req.Version,
StreamId: streamID,
RedundancyScheme: pbRS,
2019-07-16 11:39:23 +01:00
}, nil
}
// CommitObject commits an object when all its segments have already been committed.
2019-07-16 11:39:23 +01:00
func (endpoint *Endpoint) CommitObject(ctx context.Context, req *pb.ObjectCommitRequest) (resp *pb.ObjectCommitResponse, err error) {
defer mon.Task()(&ctx)(&err)
return endpoint.commitObject(ctx, req, nil)
}
func (endpoint *Endpoint) commitObject(ctx context.Context, req *pb.ObjectCommitRequest, pointer *pb.Pointer) (resp *pb.ObjectCommitResponse, err error) {
defer mon.Task()(&ctx)(&err)
2019-07-16 11:39:23 +01:00
streamID := &pb.SatStreamID{}
err = pb.Unmarshal(req.StreamId, streamID)
2019-07-16 11:39:23 +01:00
if err != nil {
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
2019-07-16 11:39:23 +01:00
}
err = signing.VerifyStreamID(ctx, endpoint.satellite, streamID)
if err != nil {
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
2019-07-16 11:39:23 +01:00
}
if streamID.CreationDate.Before(time.Now().Add(-satIDExpiration)) {
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, "stream ID expired")
2019-07-16 11:39:23 +01:00
}
keyInfo, err := endpoint.validateAuth(ctx, req.Header, macaroon.Action{
2019-07-16 11:39:23 +01:00
Op: macaroon.ActionWrite,
Bucket: streamID.Bucket,
EncryptedPath: streamID.EncryptedPath,
Time: time.Now(),
})
if err != nil {
return nil, err
2019-07-16 11:39:23 +01:00
}
metadataSize := memory.Size(len(req.EncryptedMetadata))
if metadataSize > endpoint.config.MaxMetadataSize {
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, fmt.Sprintf("Metadata is too large, got %v, maximum allowed is %v", metadataSize, endpoint.config.MaxMetadataSize))
}
streamMeta := pb.StreamMeta{}
err = pb.Unmarshal(req.EncryptedMetadata, &streamMeta)
if err != nil {
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, "invalid metadata structure")
}
lastSegmentPointer := pointer
if pointer == nil {
lastSegmentIndex := streamMeta.NumberOfSegments - 1
lastSegmentLocation, err := CreatePath(ctx, keyInfo.ProjectID, lastSegmentIndex, streamID.Bucket, streamID.EncryptedPath)
if err != nil {
return nil, rpcstatus.Errorf(rpcstatus.InvalidArgument, "unable to create segment path: %s", err.Error())
}
var lastSegmentPointerBytes []byte
lastSegmentKey := lastSegmentLocation.Encode()
lastSegmentPointerBytes, lastSegmentPointer, err = endpoint.metainfo.GetWithBytes(ctx, lastSegmentKey)
if err != nil {
endpoint.log.Error("unable to get pointer", zap.ByteString("segmentPath", lastSegmentKey), zap.Error(err))
return nil, rpcstatus.Error(rpcstatus.Internal, "unable to commit object")
}
if lastSegmentPointer == nil {
return nil, rpcstatus.Errorf(rpcstatus.NotFound, "unable to find object: %q/%q", streamID.Bucket, streamID.EncryptedPath)
}
err = endpoint.metainfo.Delete(ctx, lastSegmentKey, lastSegmentPointerBytes)
if err != nil {
endpoint.log.Error("unable to delete pointer", zap.ByteString("segmentPath", lastSegmentKey), zap.Error(err))
return nil, rpcstatus.Error(rpcstatus.Internal, "unable to commit object")
}
}
if lastSegmentPointer.Remote == nil {
lastSegmentPointer.Remote = &pb.RemoteSegment{}
}
// RS is set always for last segment to emulate RS per object
lastSegmentPointer.Remote.Redundancy = streamID.Redundancy
lastSegmentPointer.Metadata = req.EncryptedMetadata
lastSegmentLocation, err := CreatePath(ctx, keyInfo.ProjectID, int64(lastSegment), streamID.Bucket, streamID.EncryptedPath)
if err != nil {
endpoint.log.Error("unable to create path", zap.Error(err))
return nil, rpcstatus.Error(rpcstatus.Internal, "unable to commit object")
}
err = endpoint.metainfo.UnsynchronizedPut(ctx, lastSegmentLocation.Encode(), lastSegmentPointer)
if err != nil {
endpoint.log.Error("unable to put pointer", zap.Error(err))
return nil, rpcstatus.Error(rpcstatus.Internal, "unable to commit object")
}
2019-07-16 11:39:23 +01:00
return &pb.ObjectCommitResponse{}, nil
}
// GetObject gets single object.
func (endpoint *Endpoint) GetObject(ctx context.Context, req *pb.ObjectGetRequest) (resp *pb.ObjectGetResponse, err error) {
defer mon.Task()(&ctx)(&err)
keyInfo, err := endpoint.validateAuth(ctx, req.Header, macaroon.Action{
Op: macaroon.ActionRead,
Bucket: req.Bucket,
EncryptedPath: req.EncryptedPath,
Time: time.Now(),
})
if err != nil {
return nil, err
}
err = endpoint.validateBucket(ctx, req.Bucket)
if err != nil {
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
}
object, err := endpoint.getObject(ctx, keyInfo.ProjectID, req.Bucket, req.EncryptedPath, req.Version)
if err != nil {
return nil, err
}
endpoint.log.Info("Object Download", zap.Stringer("Project ID", keyInfo.ProjectID), zap.String("operation", "get"), zap.String("type", "object"))
mon.Meter("req_get_object").Mark(1)
return &pb.ObjectGetResponse{
Object: object,
}, nil
}
func (endpoint *Endpoint) getObject(ctx context.Context, projectID uuid.UUID, bucket, encryptedPath []byte, version int32) (*pb.Object, error) {
pointer, _, err := endpoint.getPointer(ctx, projectID, lastSegment, bucket, encryptedPath)
if err != nil {
return nil, err
}
streamMeta := &pb.StreamMeta{}
err = pb.Unmarshal(pointer.Metadata, streamMeta)
if err != nil {
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
streamID, err := endpoint.packStreamID(ctx, &pb.SatStreamID{
Bucket: bucket,
EncryptedPath: encryptedPath,
Version: version,
CreationDate: time.Now(),
})
if err != nil {
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
object := &pb.Object{
Bucket: bucket,
EncryptedPath: encryptedPath,
Version: -1,
StreamId: streamID,
ExpiresAt: pointer.ExpirationDate,
CreatedAt: pointer.CreationDate,
EncryptedMetadata: pointer.Metadata,
EncryptionParameters: &pb.EncryptionParameters{
CipherSuite: pb.CipherSuite(streamMeta.EncryptionType),
BlockSize: int64(streamMeta.EncryptionBlockSize),
},
}
if pointer.Remote != nil {
object.RedundancyScheme = pointer.Remote.Redundancy
// NumberOfSegments == 0 - pointer with encrypted num of segments
// NumberOfSegments > 1 - pointer with unencrypted num of segments and multiple segments
} else if streamMeta.NumberOfSegments == 0 || streamMeta.NumberOfSegments > 1 {
// workaround
// The new metainfo API redundancy scheme is on object level (not per segment).
// Because of that, RS is always taken from the last segment.
// The old implementation saves RS per segment, and in some cases
// when the remote file's last segment is an inline segment, we end up
// missing an RS scheme. This loop will search for RS in segments other than the last one.
index := int64(0)
for {
location, err := CreatePath(ctx, projectID, index, bucket, encryptedPath)
if err != nil {
endpoint.log.Error("unable to get pointer path", zap.Error(err))
return nil, rpcstatus.Error(rpcstatus.Internal, "unable to get object")
}
pointer, err = endpoint.metainfo.Get(ctx, location.Encode())
if err != nil {
if storj.ErrObjectNotFound.Has(err) {
break
}
endpoint.log.Error("unable to get pointer", zap.Error(err))
return nil, rpcstatus.Error(rpcstatus.Internal, "unable to get object")
}
if pointer.Remote != nil {
object.RedundancyScheme = pointer.Remote.Redundancy
break
}
index++
}
}
return object, nil
}
// ListObjects list objects according to specific parameters.
2019-07-16 11:39:23 +01:00
func (endpoint *Endpoint) ListObjects(ctx context.Context, req *pb.ObjectListRequest) (resp *pb.ObjectListResponse, err error) {
defer mon.Task()(&ctx)(&err)
keyInfo, err := endpoint.validateAuth(ctx, req.Header, macaroon.Action{
2019-07-16 11:39:23 +01:00
Op: macaroon.ActionList,
Bucket: req.Bucket,
EncryptedPath: req.EncryptedPrefix,
2019-07-16 11:39:23 +01:00
Time: time.Now(),
})
if err != nil {
return nil, err
2019-07-16 11:39:23 +01:00
}
err = endpoint.validateBucket(ctx, req.Bucket)
if err != nil {
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
2019-07-16 11:39:23 +01:00
}
// TODO this needs to be optimized to avoid DB call on each request
_, err = endpoint.metainfo.GetBucket(ctx, req.Bucket, keyInfo.ProjectID)
if err != nil {
if storj.ErrBucketNotFound.Has(err) {
return nil, rpcstatus.Error(rpcstatus.NotFound, err.Error())
}
endpoint.log.Error("unable to check bucket", zap.Error(err))
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
prefix, err := CreatePath(ctx, keyInfo.ProjectID, lastSegment, req.Bucket, req.EncryptedPrefix)
2019-07-16 11:39:23 +01:00
if err != nil {
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
2019-07-16 11:39:23 +01:00
}
metaflags := meta.All
// TODO use flags
segments, more, err := endpoint.metainfo.List(ctx, prefix.Encode(), string(req.EncryptedCursor), req.Recursive, req.Limit, metaflags)
2019-07-16 11:39:23 +01:00
if err != nil {
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
2019-07-16 11:39:23 +01:00
}
items := make([]*pb.ObjectListItem, len(segments))
for i, segment := range segments {
items[i] = &pb.ObjectListItem{
EncryptedPath: []byte(segment.Path),
}
if segment.Pointer != nil {
items[i].EncryptedMetadata = segment.Pointer.Metadata
items[i].CreatedAt = segment.Pointer.CreationDate
items[i].ExpiresAt = segment.Pointer.ExpirationDate
2019-07-16 11:39:23 +01:00
}
}
endpoint.log.Info("Object List", zap.Stringer("Project ID", keyInfo.ProjectID), zap.String("operation", "list"), zap.String("type", "object"))
mon.Meter("req_list_object").Mark(1)
2019-07-16 11:39:23 +01:00
return &pb.ObjectListResponse{
Items: items,
More: more,
}, nil
}
// BeginDeleteObject begins object deletion process.
2019-07-16 11:39:23 +01:00
func (endpoint *Endpoint) BeginDeleteObject(ctx context.Context, req *pb.ObjectBeginDeleteRequest) (resp *pb.ObjectBeginDeleteResponse, err error) {
defer mon.Task()(&ctx)(&err)
now := time.Now()
keyInfo, err := endpoint.validateAuth(ctx, req.Header, macaroon.Action{
2019-07-16 11:39:23 +01:00
Op: macaroon.ActionDelete,
Bucket: req.Bucket,
EncryptedPath: req.EncryptedPath,
Time: now,
2019-07-16 11:39:23 +01:00
})
if err != nil {
return nil, err
2019-07-16 11:39:23 +01:00
}
err = endpoint.validateBucket(ctx, req.Bucket)
if err != nil {
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
2019-07-16 11:39:23 +01:00
}
_, err = endpoint.validateAuth(ctx, req.Header, macaroon.Action{
Op: macaroon.ActionRead,
Bucket: req.Bucket,
EncryptedPath: req.EncryptedPath,
Time: now,
})
canRead := err == nil
_, err = endpoint.validateAuth(ctx, req.Header, macaroon.Action{
Op: macaroon.ActionList,
Bucket: req.Bucket,
EncryptedPath: req.EncryptedPath,
Time: now,
})
canList := err == nil
report, err := endpoint.DeleteObjectPieces(ctx, keyInfo.ProjectID, req.Bucket, req.EncryptedPath)
if err != nil {
if !canRead && !canList {
// No error info is returned if neither Read, nor List permission is granted
return &pb.ObjectBeginDeleteResponse{}, nil
}
return nil, err
}
var object *pb.Object
if canRead || canList {
// Info about deleted object is returned only if either Read, or List permission is granted
deletedObjects, err := report.DeletedObjects()
if err != nil {
endpoint.log.Error("failed to construct deleted object information",
zap.Stringer("Project ID", keyInfo.ProjectID),
zap.String("Bucket", string(req.Bucket)),
zap.String("Encrypted Path", string(req.EncryptedPath)),
zap.Error(err),
)
}
if len(deletedObjects) > 0 {
object = deletedObjects[0]
}
}
endpoint.log.Info("Object Delete", zap.Stringer("Project ID", keyInfo.ProjectID), zap.String("operation", "delete"), zap.String("type", "object"))
mon.Meter("req_delete_object").Mark(1)
2019-07-16 11:39:23 +01:00
return &pb.ObjectBeginDeleteResponse{
Object: object,
2019-07-16 11:39:23 +01:00
}, nil
}
// FinishDeleteObject finishes object deletion.
2019-07-16 11:39:23 +01:00
func (endpoint *Endpoint) FinishDeleteObject(ctx context.Context, req *pb.ObjectFinishDeleteRequest) (resp *pb.ObjectFinishDeleteResponse, err error) {
defer mon.Task()(&ctx)(&err)
streamID := &pb.SatStreamID{}
err = pb.Unmarshal(req.StreamId, streamID)
2019-07-16 11:39:23 +01:00
if err != nil {
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
2019-07-16 11:39:23 +01:00
}
err = signing.VerifyStreamID(ctx, endpoint.satellite, streamID)
if err != nil {
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
2019-07-16 11:39:23 +01:00
}
if streamID.CreationDate.Before(time.Now().Add(-satIDExpiration)) {
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, "stream ID expired")
2019-07-16 11:39:23 +01:00
}
_, err = endpoint.validateAuth(ctx, req.Header, macaroon.Action{
2019-07-16 11:39:23 +01:00
Op: macaroon.ActionDelete,
Bucket: streamID.Bucket,
EncryptedPath: streamID.EncryptedPath,
Time: time.Now(),
})
if err != nil {
return nil, err
2019-07-16 11:39:23 +01:00
}
// we don't need to do anything for shim implementation
return &pb.ObjectFinishDeleteResponse{}, nil
}
// GetObjectIPs returns the IP addresses of the nodes holding the pieces for
// the provided object. This is useful for knowing the locations of the pieces.
func (endpoint *Endpoint) GetObjectIPs(ctx context.Context, req *pb.ObjectGetIPsRequest) (resp *pb.ObjectGetIPsResponse, err error) {
defer mon.Task()(&ctx)(&err)
keyInfo, err := endpoint.validateAuth(ctx, req.Header, macaroon.Action{
Op: macaroon.ActionRead,
Bucket: req.Bucket,
EncryptedPath: req.EncryptedPath,
Time: time.Now(),
})
if err != nil {
return nil, err
}
err = endpoint.validateBucket(ctx, req.Bucket)
if err != nil {
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
}
lastPointer, _, err := endpoint.getPointer(ctx, keyInfo.ProjectID, lastSegment, req.Bucket, req.EncryptedPath)
if err != nil {
// endpoint.getPointer already returns valid rpcstatus errors
return nil, err
}
var nodeIDs []storj.NodeID
addPointerToNodeIDs := func(pointer *pb.Pointer) {
if pointer.Remote != nil {
for _, piece := range pointer.Remote.RemotePieces {
nodeIDs = append(nodeIDs, piece.NodeId)
}
}
}
addPointerToNodeIDs(lastPointer)
streamMeta := &pb.StreamMeta{}
err = pb.Unmarshal(lastPointer.Metadata, streamMeta)
if err != nil {
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
numSegmentsKnown := streamMeta.NumberOfSegments > 0
numberOfSegmentsToFetch := int(streamMeta.NumberOfSegments) - 1 // remove last segment since we've already fetch manually
// If we do not know the number of segments from the streamMeta, we want to
// continue to run this loop until it cannot find another segment (the
// break condition in the loop).
//
// If we do know the number of segments, we want to run the loop as long as
// the numberOfSegmentsToFetch is > 0 and until we have fetched that many
// segments.
for i := firstSegment; !numSegmentsKnown || (numSegmentsKnown && numberOfSegmentsToFetch > 0 && i < numberOfSegmentsToFetch); i++ {
location, err := CreatePath(ctx, keyInfo.ProjectID, int64(i), req.Bucket, req.EncryptedPath)
if err != nil {
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
}
pointer, err := endpoint.metainfo.Get(ctx, location.Encode())
if err != nil {
if storj.ErrObjectNotFound.Has(err) {
break
}
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
addPointerToNodeIDs(pointer)
}
nodes, err := endpoint.overlay.GetOnlineNodesForGetDelete(ctx, nodeIDs)
if err != nil {
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
resp = &pb.ObjectGetIPsResponse{}
for _, node := range nodes {
address := node.Address.GetAddress()
if address != "" {
resp.Ips = append(resp.Ips, []byte(address))
}
}
return resp, nil
}
// BeginSegment begins segment uploading.
func (endpoint *Endpoint) BeginSegment(ctx context.Context, req *pb.SegmentBeginRequest) (resp *pb.SegmentBeginResponse, err error) {
defer mon.Task()(&ctx)(&err)
streamID, err := endpoint.unmarshalSatStreamID(ctx, req.StreamId)
if err != nil {
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
}
keyInfo, err := endpoint.validateAuth(ctx, req.Header, macaroon.Action{
Op: macaroon.ActionWrite,
Bucket: streamID.Bucket,
EncryptedPath: streamID.EncryptedPath,
Time: time.Now(),
})
if err != nil {
return nil, err
}
// no need to validate streamID fields because it was validated during BeginObject
if req.Position.Index < 0 {
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, "segment index must be greater then 0")
}
exceeded, limit, err := endpoint.projectUsage.ExceedsStorageUsage(ctx, keyInfo.ProjectID)
if err != nil {
endpoint.log.Error("Retrieving project storage totals failed.", zap.Error(err))
}
if exceeded {
endpoint.log.Error("Monthly storage limit exceeded.",
zap.Stringer("Limit", limit),
zap.Stringer("Project ID", keyInfo.ProjectID),
)
return nil, rpcstatus.Error(rpcstatus.ResourceExhausted, "Exceeded Usage Limit")
}
redundancy, err := eestream.NewRedundancyStrategyFromProto(streamID.Redundancy)
if err != nil {
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
}
maxPieceSize := eestream.CalcPieceSize(req.MaxOrderLimit, redundancy)
request := overlay.FindStorageNodesRequest{
RequestedCount: redundancy.TotalCount(),
}
nodes, err := endpoint.overlay.FindStorageNodesForUpload(ctx, request)
if err != nil {
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
bucket := metabase.BucketLocation{ProjectID: keyInfo.ProjectID, BucketName: string(streamID.Bucket)}
rootPieceID, addressedLimits, piecePrivateKey, err := endpoint.orders.CreatePutOrderLimits(ctx, bucket, nodes, streamID.ExpirationDate, maxPieceSize)
if err != nil {
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
segmentID, err := endpoint.packSegmentID(ctx, &pb.SatSegmentID{
StreamId: streamID,
Index: req.Position.Index,
OriginalOrderLimits: addressedLimits,
RootPieceId: rootPieceID,
CreationDate: time.Now(),
})
endpoint.log.Info("Segment Upload", zap.Stringer("Project ID", keyInfo.ProjectID), zap.String("operation", "put"), zap.String("type", "remote"))
mon.Meter("req_put_remote").Mark(1)
return &pb.SegmentBeginResponse{
SegmentId: segmentID,
AddressedLimits: addressedLimits,
PrivateKey: piecePrivateKey,
}, nil
}
// CommitSegment commits segment after uploading.
func (endpoint *Endpoint) CommitSegment(ctx context.Context, req *pb.SegmentCommitRequest) (resp *pb.SegmentCommitResponse, err error) {
defer mon.Task()(&ctx)(&err)
_, resp, err = endpoint.commitSegment(ctx, req, true)
return resp, err
}
func (endpoint *Endpoint) commitSegment(ctx context.Context, req *pb.SegmentCommitRequest, savePointer bool) (pointer *pb.Pointer, resp *pb.SegmentCommitResponse, err error) {
defer mon.Task()(&ctx)(&err)
segmentID, err := endpoint.unmarshalSatSegmentID(ctx, req.SegmentId)
if err != nil {
return nil, nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
}
streamID := segmentID.StreamId
keyInfo, err := endpoint.validateAuth(ctx, req.Header, macaroon.Action{
Op: macaroon.ActionWrite,
Bucket: streamID.Bucket,
EncryptedPath: streamID.EncryptedPath,
Time: time.Now(),
})
if err != nil {
return nil, nil, err
}
if numResults := len(req.UploadResult); numResults < int(streamID.Redundancy.GetSuccessThreshold()) {
endpoint.log.Debug("the results of uploaded pieces for the segment is below the redundancy optimal threshold",
zap.Int("upload pieces results", numResults),
zap.Int32("redundancy optimal threshold", streamID.Redundancy.GetSuccessThreshold()),
zap.Stringer("Segment ID", req.SegmentId),
)
return nil, nil, rpcstatus.Errorf(rpcstatus.InvalidArgument,
"the number of results of uploaded pieces (%d) is below the optimal threshold (%d)",
numResults, streamID.Redundancy.GetSuccessThreshold(),
)
}
pieces := make([]*pb.RemotePiece, len(req.UploadResult))
for i, result := range req.UploadResult {
pieces[i] = &pb.RemotePiece{
PieceNum: result.PieceNum,
NodeId: result.NodeId,
Hash: result.Hash,
}
}
remote := &pb.RemoteSegment{
Redundancy: streamID.Redundancy,
RootPieceId: segmentID.RootPieceId,
RemotePieces: pieces,
}
metadata, err := pb.Marshal(&pb.SegmentMeta{
EncryptedKey: req.EncryptedKey,
KeyNonce: req.EncryptedKeyNonce.Bytes(),
})
if err != nil {
endpoint.log.Error("unable to marshal segment metadata", zap.Error(err))
return nil, nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
pointer = &pb.Pointer{
Type: pb.Pointer_REMOTE,
Remote: remote,
SegmentSize: req.SizeEncryptedData,
CreationDate: streamID.CreationDate,
ExpirationDate: streamID.ExpirationDate,
Metadata: metadata,
PieceHashesVerified: true,
}
orderLimits := make([]*pb.OrderLimit, len(segmentID.OriginalOrderLimits))
for i, orderLimit := range segmentID.OriginalOrderLimits {
orderLimits[i] = orderLimit.Limit
}
err = endpoint.validatePointer(ctx, pointer, orderLimits)
if err != nil {
return nil, nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
}
err = endpoint.filterValidPieces(ctx, pointer, orderLimits)
if err != nil {
return nil, nil, err
}
exceeded, limit, err := endpoint.projectUsage.ExceedsStorageUsage(ctx, keyInfo.ProjectID)
if err != nil {
return nil, nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
if exceeded {
endpoint.log.Error("The project limit of storage and bandwidth has been exceeded",
zap.Int64("limit", limit.Int64()),
zap.Stringer("Project ID", keyInfo.ProjectID),
)
return nil, nil, rpcstatus.Error(rpcstatus.ResourceExhausted, "Exceeded Usage Limit")
}
// clear hashes so we don't store them
for _, piece := range pointer.GetRemote().GetRemotePieces() {
piece.Hash = nil
}
satellite/accounting: refactor live accounting to hold current estimated totals live accounting used to be a cache to store writes before they are picked up during the tally iteration, after which the cache is cleared. This created a window in which users could potentially exceed the storage limit. This PR refactors live accounting to hold current estimations of space used per project. This should also reduce DB load since we no longer need to query the satellite DB when checking space used for limiting. The mechanism by which the new live accounting system works is as follows: During the upload of any segment, the size of that segment is added to its respective project total in live accounting. At the beginning of the tally iteration we record the current values in live accounting as `initialLiveTotals`. At the end of the tally iteration we again record the current totals in live accounting as `latestLiveTotals`. The metainfo loop observer in tally allows us to get the project totals from what it observed in metainfo DB which are stored in `tallyProjectTotals`. However, for any particular segment uploaded during the metainfo loop, the observer may or may not have seen it. Thus, we take half of the difference between `latestLiveTotals` and `initialLiveTotals`, and add that to the total that was found during tally and set that as the new live accounting total. Initially, live accounting was storing the total stored amount across all nodes rather than the segment size, which is inconsistent with how we record amounts stored in the project accounting DB, so we have refactored live accounting to record segment size Change-Id: Ie48bfdef453428fcdc180b2d781a69d58fd927fb
2019-10-31 17:27:38 +00:00
segmentSize, totalStored := calculateSpaceUsed(pointer)
// ToDo: Replace with hash & signature validation
// Ensure neither uplink or storage nodes are cheating on us
if pointer.Type == pb.Pointer_REMOTE {
//We cannot have more redundancy than total/min
satellite/accounting: refactor live accounting to hold current estimated totals live accounting used to be a cache to store writes before they are picked up during the tally iteration, after which the cache is cleared. This created a window in which users could potentially exceed the storage limit. This PR refactors live accounting to hold current estimations of space used per project. This should also reduce DB load since we no longer need to query the satellite DB when checking space used for limiting. The mechanism by which the new live accounting system works is as follows: During the upload of any segment, the size of that segment is added to its respective project total in live accounting. At the beginning of the tally iteration we record the current values in live accounting as `initialLiveTotals`. At the end of the tally iteration we again record the current totals in live accounting as `latestLiveTotals`. The metainfo loop observer in tally allows us to get the project totals from what it observed in metainfo DB which are stored in `tallyProjectTotals`. However, for any particular segment uploaded during the metainfo loop, the observer may or may not have seen it. Thus, we take half of the difference between `latestLiveTotals` and `initialLiveTotals`, and add that to the total that was found during tally and set that as the new live accounting total. Initially, live accounting was storing the total stored amount across all nodes rather than the segment size, which is inconsistent with how we record amounts stored in the project accounting DB, so we have refactored live accounting to record segment size Change-Id: Ie48bfdef453428fcdc180b2d781a69d58fd927fb
2019-10-31 17:27:38 +00:00
if float64(totalStored) > (float64(pointer.SegmentSize)/float64(pointer.Remote.Redundancy.MinReq))*float64(pointer.Remote.Redundancy.Total) {
endpoint.log.Debug("data size mismatch",
zap.Int64("segment", pointer.SegmentSize),
satellite/accounting: refactor live accounting to hold current estimated totals live accounting used to be a cache to store writes before they are picked up during the tally iteration, after which the cache is cleared. This created a window in which users could potentially exceed the storage limit. This PR refactors live accounting to hold current estimations of space used per project. This should also reduce DB load since we no longer need to query the satellite DB when checking space used for limiting. The mechanism by which the new live accounting system works is as follows: During the upload of any segment, the size of that segment is added to its respective project total in live accounting. At the beginning of the tally iteration we record the current values in live accounting as `initialLiveTotals`. At the end of the tally iteration we again record the current totals in live accounting as `latestLiveTotals`. The metainfo loop observer in tally allows us to get the project totals from what it observed in metainfo DB which are stored in `tallyProjectTotals`. However, for any particular segment uploaded during the metainfo loop, the observer may or may not have seen it. Thus, we take half of the difference between `latestLiveTotals` and `initialLiveTotals`, and add that to the total that was found during tally and set that as the new live accounting total. Initially, live accounting was storing the total stored amount across all nodes rather than the segment size, which is inconsistent with how we record amounts stored in the project accounting DB, so we have refactored live accounting to record segment size Change-Id: Ie48bfdef453428fcdc180b2d781a69d58fd927fb
2019-10-31 17:27:38 +00:00
zap.Int64("pieces", totalStored),
zap.Int32("redundancy minimum requested", pointer.Remote.Redundancy.MinReq),
zap.Int32("redundancy total", pointer.Remote.Redundancy.Total),
)
return nil, nil, rpcstatus.Error(rpcstatus.InvalidArgument, "mismatched segment size and piece usage")
}
}
satellite/accounting: refactor live accounting to hold current estimated totals live accounting used to be a cache to store writes before they are picked up during the tally iteration, after which the cache is cleared. This created a window in which users could potentially exceed the storage limit. This PR refactors live accounting to hold current estimations of space used per project. This should also reduce DB load since we no longer need to query the satellite DB when checking space used for limiting. The mechanism by which the new live accounting system works is as follows: During the upload of any segment, the size of that segment is added to its respective project total in live accounting. At the beginning of the tally iteration we record the current values in live accounting as `initialLiveTotals`. At the end of the tally iteration we again record the current totals in live accounting as `latestLiveTotals`. The metainfo loop observer in tally allows us to get the project totals from what it observed in metainfo DB which are stored in `tallyProjectTotals`. However, for any particular segment uploaded during the metainfo loop, the observer may or may not have seen it. Thus, we take half of the difference between `latestLiveTotals` and `initialLiveTotals`, and add that to the total that was found during tally and set that as the new live accounting total. Initially, live accounting was storing the total stored amount across all nodes rather than the segment size, which is inconsistent with how we record amounts stored in the project accounting DB, so we have refactored live accounting to record segment size Change-Id: Ie48bfdef453428fcdc180b2d781a69d58fd927fb
2019-10-31 17:27:38 +00:00
if err := endpoint.projectUsage.AddProjectStorageUsage(ctx, keyInfo.ProjectID, segmentSize); err != nil {
endpoint.log.Error("Could not track new storage usage by project",
zap.Stringer("Project ID", keyInfo.ProjectID),
zap.Error(err),
)
// but continue. it's most likely our own fault that we couldn't track it, and the only thing
// that will be affected is our per-project bandwidth and storage limits.
}
if savePointer {
location, err := CreatePath(ctx, keyInfo.ProjectID, int64(segmentID.Index), streamID.Bucket, streamID.EncryptedPath)
if err != nil {
return nil, nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
}
err = endpoint.metainfo.UnsynchronizedPut(ctx, location.Encode(), pointer)
if err != nil {
return nil, nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
}
return pointer, &pb.SegmentCommitResponse{
SuccessfulPieces: int32(len(pointer.Remote.RemotePieces)),
}, nil
}
// MakeInlineSegment makes inline segment on satellite.
func (endpoint *Endpoint) MakeInlineSegment(ctx context.Context, req *pb.SegmentMakeInlineRequest) (resp *pb.SegmentMakeInlineResponse, err error) {
defer mon.Task()(&ctx)(&err)
_, resp, err = endpoint.makeInlineSegment(ctx, req, true)
return resp, err
}
// MakeInlineSegment makes inline segment on satellite.
func (endpoint *Endpoint) makeInlineSegment(ctx context.Context, req *pb.SegmentMakeInlineRequest, savePointer bool) (pointer *pb.Pointer, resp *pb.SegmentMakeInlineResponse, err error) {
defer mon.Task()(&ctx)(&err)
streamID, err := endpoint.unmarshalSatStreamID(ctx, req.StreamId)
if err != nil {
return nil, nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
}
keyInfo, err := endpoint.validateAuth(ctx, req.Header, macaroon.Action{
Op: macaroon.ActionWrite,
Bucket: streamID.Bucket,
EncryptedPath: streamID.EncryptedPath,
Time: time.Now(),
})
if err != nil {
return nil, nil, err
}
if req.Position.Index < 0 {
return nil, nil, rpcstatus.Error(rpcstatus.InvalidArgument, "segment index must be greater then 0")
}
inlineUsed := int64(len(req.EncryptedInlineData))
if inlineUsed > endpoint.encInlineSegmentSize {
return nil, nil, rpcstatus.Error(rpcstatus.InvalidArgument, fmt.Sprintf("inline segment size cannot be larger than %s", endpoint.config.MaxInlineSegmentSize))
}
exceeded, limit, err := endpoint.projectUsage.ExceedsStorageUsage(ctx, keyInfo.ProjectID)
if err != nil {
return nil, nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
if exceeded {
endpoint.log.Error("Monthly storage limit exceeded.",
zap.Stringer("Limit", limit),
zap.Stringer("Project ID", keyInfo.ProjectID),
)
return nil, nil, rpcstatus.Error(rpcstatus.ResourceExhausted, "Exceeded Usage Limit")
}
satellite/accounting: refactor live accounting to hold current estimated totals live accounting used to be a cache to store writes before they are picked up during the tally iteration, after which the cache is cleared. This created a window in which users could potentially exceed the storage limit. This PR refactors live accounting to hold current estimations of space used per project. This should also reduce DB load since we no longer need to query the satellite DB when checking space used for limiting. The mechanism by which the new live accounting system works is as follows: During the upload of any segment, the size of that segment is added to its respective project total in live accounting. At the beginning of the tally iteration we record the current values in live accounting as `initialLiveTotals`. At the end of the tally iteration we again record the current totals in live accounting as `latestLiveTotals`. The metainfo loop observer in tally allows us to get the project totals from what it observed in metainfo DB which are stored in `tallyProjectTotals`. However, for any particular segment uploaded during the metainfo loop, the observer may or may not have seen it. Thus, we take half of the difference between `latestLiveTotals` and `initialLiveTotals`, and add that to the total that was found during tally and set that as the new live accounting total. Initially, live accounting was storing the total stored amount across all nodes rather than the segment size, which is inconsistent with how we record amounts stored in the project accounting DB, so we have refactored live accounting to record segment size Change-Id: Ie48bfdef453428fcdc180b2d781a69d58fd927fb
2019-10-31 17:27:38 +00:00
if err := endpoint.projectUsage.AddProjectStorageUsage(ctx, keyInfo.ProjectID, inlineUsed); err != nil {
endpoint.log.Error("Could not track new storage usage.", zap.Stringer("Project ID", keyInfo.ProjectID), zap.Error(err))
// but continue. it's most likely our own fault that we couldn't track it, and the only thing
// that will be affected is our per-project bandwidth and storage limits.
}
metadata, err := pb.Marshal(&pb.SegmentMeta{
EncryptedKey: req.EncryptedKey,
KeyNonce: req.EncryptedKeyNonce.Bytes(),
})
pointer = &pb.Pointer{
Type: pb.Pointer_INLINE,
SegmentSize: inlineUsed,
CreationDate: streamID.CreationDate,
ExpirationDate: streamID.ExpirationDate,
InlineSegment: req.EncryptedInlineData,
Metadata: metadata,
}
if savePointer {
location, err := CreatePath(ctx, keyInfo.ProjectID, int64(req.Position.Index), streamID.Bucket, streamID.EncryptedPath)
if err != nil {
return nil, nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
}
err = endpoint.metainfo.UnsynchronizedPut(ctx, location.Encode(), pointer)
if err != nil {
return nil, nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
}
bucket := metabase.BucketLocation{ProjectID: keyInfo.ProjectID, BucketName: string(streamID.Bucket)}
err = endpoint.orders.UpdatePutInlineOrder(ctx, bucket, inlineUsed)
if err != nil {
return nil, nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
endpoint.log.Info("Inline Segment Upload", zap.Stringer("Project ID", keyInfo.ProjectID), zap.String("operation", "put"), zap.String("type", "inline"))
mon.Meter("req_put_inline").Mark(1)
return pointer, &pb.SegmentMakeInlineResponse{}, nil
}
// BeginDeleteSegment begins segment deletion process.
func (endpoint *Endpoint) BeginDeleteSegment(ctx context.Context, req *pb.SegmentBeginDeleteRequest) (resp *pb.SegmentBeginDeleteResponse, err error) {
defer mon.Task()(&ctx)(&err)
streamID, err := endpoint.unmarshalSatStreamID(ctx, req.StreamId)
if err != nil {
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
}
keyInfo, err := endpoint.validateAuth(ctx, req.Header, macaroon.Action{
Op: macaroon.ActionDelete,
Bucket: streamID.Bucket,
EncryptedPath: streamID.EncryptedPath,
Time: time.Now(),
})
if err != nil {
return nil, err
}
pointer, location, err := endpoint.getPointer(ctx, keyInfo.ProjectID, int64(req.Position.Index), streamID.Bucket, streamID.EncryptedPath)
if err != nil {
return nil, err
}
var limits []*pb.AddressedOrderLimit
var privateKey storj.PiecePrivateKey
if pointer.Type == pb.Pointer_REMOTE && pointer.Remote != nil {
bucket := metabase.BucketLocation{ProjectID: keyInfo.ProjectID, BucketName: string(streamID.Bucket)}
limits, privateKey, err = endpoint.orders.CreateDeleteOrderLimits(ctx, bucket, pointer)
if err != nil {
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
}
// moved from FinishDeleteSegment to avoid inconsistency if someone will not
// call FinishDeleteSegment on uplink side
err = endpoint.metainfo.UnsynchronizedDelete(ctx, location.Encode())
if err != nil {
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
segmentID, err := endpoint.packSegmentID(ctx, &pb.SatSegmentID{
StreamId: streamID,
OriginalOrderLimits: limits,
Index: req.Position.Index,
CreationDate: time.Now(),
})
endpoint.log.Info("Segment Delete", zap.Stringer("Project ID", keyInfo.ProjectID), zap.String("operation", "delete"), zap.String("type", "segment"))
mon.Meter("req_delete_segment").Mark(1)
return &pb.SegmentBeginDeleteResponse{
SegmentId: segmentID,
AddressedLimits: limits,
PrivateKey: privateKey,
}, nil
}
// FinishDeleteSegment finishes segment deletion process.
func (endpoint *Endpoint) FinishDeleteSegment(ctx context.Context, req *pb.SegmentFinishDeleteRequest) (resp *pb.SegmentFinishDeleteResponse, err error) {
defer mon.Task()(&ctx)(&err)
segmentID, err := endpoint.unmarshalSatSegmentID(ctx, req.SegmentId)
if err != nil {
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
}
streamID := segmentID.StreamId
_, err = endpoint.validateAuth(ctx, req.Header, macaroon.Action{
Op: macaroon.ActionDelete,
Bucket: streamID.Bucket,
EncryptedPath: streamID.EncryptedPath,
Time: time.Now(),
})
if err != nil {
return nil, err
}
// at the moment logic is in BeginDeleteSegment
return &pb.SegmentFinishDeleteResponse{}, nil
}
// ListSegments list object segments.
func (endpoint *Endpoint) ListSegments(ctx context.Context, req *pb.SegmentListRequest) (resp *pb.SegmentListResponse, err error) {
defer mon.Task()(&ctx)(&err)
streamID, err := endpoint.unmarshalSatStreamID(ctx, req.StreamId)
if err != nil {
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
}
keyInfo, err := endpoint.validateAuth(ctx, req.Header, macaroon.Action{
Op: macaroon.ActionList,
Bucket: streamID.Bucket,
EncryptedPath: streamID.EncryptedPath,
Time: time.Now(),
})
if err != nil {
return nil, err
}
limit := req.Limit
if limit == 0 || limit > listLimit {
limit = listLimit
}
pointer, _, err := endpoint.getPointer(ctx, keyInfo.ProjectID, lastSegment, streamID.Bucket, streamID.EncryptedPath)
if err != nil {
if rpcstatus.Code(err) == rpcstatus.NotFound {
return &pb.SegmentListResponse{}, nil
}
return nil, err
}
streamMeta := &pb.StreamMeta{}
err = pb.Unmarshal(pointer.Metadata, streamMeta)
if err != nil {
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
endpoint.log.Info("Segment List", zap.Stringer("Project ID", keyInfo.ProjectID), zap.String("operation", "list"), zap.String("type", "segment"))
mon.Meter("req_list_segment").Mark(1)
if streamMeta.NumberOfSegments > 0 {
// use unencrypted number of segments
// TODO cleanup int32 vs int64
return endpoint.listSegmentsFromNumberOfSegments(int32(streamMeta.NumberOfSegments), req.CursorPosition.Index, limit)
}
// list segments by requesting each segment from cursor index to n until n segment is not found
return endpoint.listSegmentsManually(ctx, keyInfo.ProjectID, streamID, req.CursorPosition.Index, limit)
}
func (endpoint *Endpoint) listSegmentsFromNumberOfSegments(numberOfSegments, cursorIndex, limit int32) (resp *pb.SegmentListResponse, err error) {
if numberOfSegments <= 0 {
endpoint.log.Error(
"Invalid number of segments; this function requires the value to be greater than 0",
zap.Int32("numberOfSegments", numberOfSegments),
)
return nil, rpcstatus.Error(rpcstatus.Internal, "unable to list segments")
}
if cursorIndex > numberOfSegments {
endpoint.log.Error(
"Invalid number cursor index; the index cannot be greater than the total number of segments",
zap.Int32("numberOfSegments", numberOfSegments),
zap.Int32("cursorIndex", cursorIndex),
)
return nil, rpcstatus.Error(rpcstatus.Internal, "unable to list segments")
}
numberOfSegments -= cursorIndex
var (
segmentItems = make([]*pb.SegmentListItem, 0)
more = false
)
if numberOfSegments > 0 {
segmentItems = make([]*pb.SegmentListItem, 0, int(numberOfSegments))
if numberOfSegments > limit {
more = true
numberOfSegments = limit
} else {
// remove last segment to avoid if statements in loop to detect last segment,
// last segment will be added manually at the end of this block
numberOfSegments--
}
for index := int32(0); index < numberOfSegments; index++ {
segmentItems = append(segmentItems, &pb.SegmentListItem{
Position: &pb.SegmentPosition{
Index: index + cursorIndex,
},
})
}
if !more {
// last segment is always the last one
segmentItems = append(segmentItems, &pb.SegmentListItem{
Position: &pb.SegmentPosition{
Index: lastSegment,
},
})
}
}
return &pb.SegmentListResponse{
Items: segmentItems,
More: more,
}, nil
}
// listSegmentManually lists the segments that belongs to projectID and streamID
// from the cursorIndex up to the limit. It stops before the limit when
// cursorIndex + n returns a not found pointer.
//
// limit must be greater than 0 and cursorIndex greater than or equal than 0,
// otherwise an error is returned.
func (endpoint *Endpoint) listSegmentsManually(ctx context.Context, projectID uuid.UUID, streamID *pb.SatStreamID, cursorIndex, limit int32) (resp *pb.SegmentListResponse, err error) {
if limit <= 0 {
return nil, rpcstatus.Errorf(
rpcstatus.InvalidArgument, "invalid limit, cannot be 0 or negative. Got %d", limit,
)
}
index := int64(cursorIndex)
segmentItems := make([]*pb.SegmentListItem, 0)
more := false
for {
_, _, err := endpoint.getPointer(ctx, projectID, index, streamID.Bucket, streamID.EncryptedPath)
if err != nil {
if rpcstatus.Code(err) != rpcstatus.NotFound {
return nil, err
}
break
}
if limit == int32(len(segmentItems)) {
more = true
break
}
segmentItems = append(segmentItems, &pb.SegmentListItem{
Position: &pb.SegmentPosition{
Index: int32(index),
},
})
index++
}
if limit > int32(len(segmentItems)) {
segmentItems = append(segmentItems, &pb.SegmentListItem{
Position: &pb.SegmentPosition{
Index: lastSegment,
},
})
} else {
more = true
}
return &pb.SegmentListResponse{
Items: segmentItems,
More: more,
}, nil
}
// DownloadSegment returns data necessary to download segment.
func (endpoint *Endpoint) DownloadSegment(ctx context.Context, req *pb.SegmentDownloadRequest) (resp *pb.SegmentDownloadResponse, err error) {
defer mon.Task()(&ctx)(&err)
streamID, err := endpoint.unmarshalSatStreamID(ctx, req.StreamId)
if err != nil {
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
}
keyInfo, err := endpoint.validateAuth(ctx, req.Header, macaroon.Action{
Op: macaroon.ActionRead,
Bucket: streamID.Bucket,
EncryptedPath: streamID.EncryptedPath,
Time: time.Now(),
})
if err != nil {
return nil, err
}
bucket := metabase.BucketLocation{ProjectID: keyInfo.ProjectID, BucketName: string(streamID.Bucket)}
exceeded, limit, err := endpoint.projectUsage.ExceedsBandwidthUsage(ctx, keyInfo.ProjectID)
if err != nil {
endpoint.log.Error("Retrieving project bandwidth total failed.", zap.Error(err))
}
if exceeded {
endpoint.log.Error("Monthly bandwidth limit exceeded.",
zap.Stringer("Limit", limit),
zap.Stringer("Project ID", keyInfo.ProjectID),
)
return nil, rpcstatus.Error(rpcstatus.ResourceExhausted, "Exceeded Usage Limit")
}
pointer, _, err := endpoint.getPointer(ctx, keyInfo.ProjectID, int64(req.CursorPosition.Index), streamID.Bucket, streamID.EncryptedPath)
if err != nil {
return nil, err
}
// Update the current bandwidth cache value incrementing the SegmentSize.
err = endpoint.projectUsage.UpdateProjectBandwidthUsage(ctx, keyInfo.ProjectID, pointer.SegmentSize)
if err != nil {
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
segmentID, err := endpoint.packSegmentID(ctx, &pb.SatSegmentID{})
if err != nil {
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
var encryptedKeyNonce storj.Nonce
var encryptedKey []byte
if len(pointer.Metadata) != 0 {
var segmentMeta *pb.SegmentMeta
if req.CursorPosition.Index == lastSegment {
streamMeta := &pb.StreamMeta{}
err = pb.Unmarshal(pointer.Metadata, streamMeta)
if err != nil {
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
segmentMeta = streamMeta.LastSegmentMeta
} else {
segmentMeta = &pb.SegmentMeta{}
err = pb.Unmarshal(pointer.Metadata, segmentMeta)
if err != nil {
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
}
if segmentMeta != nil {
encryptedKeyNonce, err = storj.NonceFromBytes(segmentMeta.KeyNonce)
if err != nil {
endpoint.log.Error("unable to get encryption key nonce from metadata", zap.Error(err))
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
encryptedKey = segmentMeta.EncryptedKey
}
}
if pointer.Type == pb.Pointer_INLINE {
err := endpoint.orders.UpdateGetInlineOrder(ctx, bucket, int64(len(pointer.InlineSegment)))
if err != nil {
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
endpoint.log.Info("Inline Segment Download", zap.Stringer("Project ID", keyInfo.ProjectID), zap.String("operation", "get"), zap.String("type", "inline"))
mon.Meter("req_get_inline").Mark(1)
return &pb.SegmentDownloadResponse{
SegmentId: segmentID,
SegmentSize: pointer.SegmentSize,
EncryptedInlineData: pointer.InlineSegment,
EncryptedKeyNonce: encryptedKeyNonce,
EncryptedKey: encryptedKey,
}, nil
} else if pointer.Type == pb.Pointer_REMOTE && pointer.Remote != nil {
limits, privateKey, err := endpoint.orders.CreateGetOrderLimits(ctx, bucket, pointer)
if err != nil {
if orders.ErrDownloadFailedNotEnoughPieces.Has(err) {
endpoint.log.Error("Unable to create order limits.",
zap.Stringer("Project ID", keyInfo.ProjectID),
zap.Stringer("API Key ID", keyInfo.ID),
zap.Error(err),
)
}
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
limits = sortLimits(limits, pointer)
// workaround to avoid sending nil values on top level
for i := range limits {
if limits[i] == nil {
limits[i] = &pb.AddressedOrderLimit{}
}
}
endpoint.log.Info("Segment Download", zap.Stringer("Project ID", keyInfo.ProjectID), zap.String("operation", "get"), zap.String("type", "remote"))
mon.Meter("req_get_remote").Mark(1)
return &pb.SegmentDownloadResponse{
SegmentId: segmentID,
AddressedLimits: limits,
PrivateKey: privateKey,
SegmentSize: pointer.SegmentSize,
EncryptedKeyNonce: encryptedKeyNonce,
EncryptedKey: encryptedKey,
}, nil
}
return &pb.SegmentDownloadResponse{}, rpcstatus.Error(rpcstatus.Internal, "invalid type of pointer")
}
// getPointer returns the pointer and the segment path projectID, bucket and
// encryptedPath. It returns an error with a specific RPC status.
func (endpoint *Endpoint) getPointer(
ctx context.Context, projectID uuid.UUID, segmentIndex int64, bucket, encryptedPath []byte,
) (pointer *pb.Pointer, location metabase.SegmentLocation, err error) {
defer mon.Task()(&ctx, projectID.String(), segmentIndex, bucket, encryptedPath)(&err)
location, err = CreatePath(ctx, projectID, segmentIndex, bucket, encryptedPath)
if err != nil {
return nil, location, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
}
pointer, err = endpoint.metainfo.Get(ctx, location.Encode())
if err != nil {
if storj.ErrObjectNotFound.Has(err) {
return nil, location, rpcstatus.Error(rpcstatus.NotFound, err.Error())
}
endpoint.log.Error("error getting the pointer from metainfo service", zap.Error(err))
return nil, location, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
return pointer, location, nil
}
// sortLimits sorts order limits and fill missing ones with nil values.
func sortLimits(limits []*pb.AddressedOrderLimit, pointer *pb.Pointer) []*pb.AddressedOrderLimit {
sorted := make([]*pb.AddressedOrderLimit, pointer.GetRemote().GetRedundancy().GetTotal())
for _, piece := range pointer.GetRemote().GetRemotePieces() {
sorted[piece.GetPieceNum()] = getLimitByStorageNodeID(limits, piece.NodeId)
}
return sorted
}
func getLimitByStorageNodeID(limits []*pb.AddressedOrderLimit, storageNodeID storj.NodeID) *pb.AddressedOrderLimit {
for _, limit := range limits {
if limit == nil || limit.GetLimit() == nil {
continue
}
if limit.GetLimit().StorageNodeId == storageNodeID {
return limit
}
}
return nil
}
func (endpoint *Endpoint) packStreamID(ctx context.Context, satStreamID *pb.SatStreamID) (streamID storj.StreamID, err error) {
defer mon.Task()(&ctx)(&err)
signedStreamID, err := signing.SignStreamID(ctx, endpoint.satellite, satStreamID)
if err != nil {
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
encodedStreamID, err := pb.Marshal(signedStreamID)
if err != nil {
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
streamID, err = storj.StreamIDFromBytes(encodedStreamID)
if err != nil {
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
return streamID, nil
}
func (endpoint *Endpoint) packSegmentID(ctx context.Context, satSegmentID *pb.SatSegmentID) (segmentID storj.SegmentID, err error) {
defer mon.Task()(&ctx)(&err)
signedSegmentID, err := signing.SignSegmentID(ctx, endpoint.satellite, satSegmentID)
if err != nil {
return nil, err
}
encodedSegmentID, err := pb.Marshal(signedSegmentID)
if err != nil {
return nil, err
}
segmentID, err = storj.SegmentIDFromBytes(encodedSegmentID)
if err != nil {
return nil, err
}
return segmentID, nil
}
func (endpoint *Endpoint) unmarshalSatStreamID(ctx context.Context, streamID storj.StreamID) (_ *pb.SatStreamID, err error) {
defer mon.Task()(&ctx)(&err)
satStreamID := &pb.SatStreamID{}
err = pb.Unmarshal(streamID, satStreamID)
if err != nil {
return nil, err
}
err = signing.VerifyStreamID(ctx, endpoint.satellite, satStreamID)
if err != nil {
return nil, err
}
if satStreamID.CreationDate.Before(time.Now().Add(-satIDExpiration)) {
return nil, errs.New("stream ID expired")
}
return satStreamID, nil
}
func (endpoint *Endpoint) unmarshalSatSegmentID(ctx context.Context, segmentID storj.SegmentID) (_ *pb.SatSegmentID, err error) {
defer mon.Task()(&ctx)(&err)
satSegmentID := &pb.SatSegmentID{}
err = pb.Unmarshal(segmentID, satSegmentID)
if err != nil {
return nil, err
}
if satSegmentID.StreamId == nil {
return nil, errs.New("stream ID missing")
}
err = signing.VerifySegmentID(ctx, endpoint.satellite, satSegmentID)
if err != nil {
return nil, err
}
if satSegmentID.CreationDate.Before(time.Now().Add(-satIDExpiration)) {
return nil, errs.New("segment ID expired")
}
return satSegmentID, nil
}
// DeleteObjectPieces deletes all the pieces of the storage nodes that belongs
// to the specified object.
//
// NOTE: this method is exported for being able to individually test it without
// having import cycles.
func (endpoint *Endpoint) DeleteObjectPieces(
ctx context.Context, projectID uuid.UUID, bucket, encryptedPath []byte,
) (report objectdeletion.Report, err error) {
defer mon.Task()(&ctx, projectID.String(), bucket, encryptedPath)(&err)
req := &objectdeletion.ObjectIdentifier{
ProjectID: projectID,
Bucket: bucket,
EncryptedPath: encryptedPath,
}
report, err = endpoint.deleteObjectsPieces(ctx, req)
if err != nil {
endpoint.log.Error("failed to delete pointers",
zap.Stringer("project_id", projectID),
zap.ByteString("bucket_name", bucket),
zap.Binary("encrypted_path", encryptedPath),
zap.Error(err),
)
// Only return an error if we failed to delete the pointers. If we failed
// to delete pieces, let garbage collector take care of it.
if objectdeletion.Error.Has(err) {
return report, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
}
return report, nil
}
func (endpoint *Endpoint) deleteObjectsPieces(ctx context.Context, reqs ...*objectdeletion.ObjectIdentifier) (report objectdeletion.Report, err error) {
// We should ignore client cancelling and always try to delete segments.
ctx = context2.WithoutCancellation(ctx)
results, err := endpoint.deleteObjects.Delete(ctx, reqs...)
if err != nil {
return report, err
}
var requests []piecedeletion.Request
for _, r := range results {
pointers := r.DeletedPointers()
report.Deleted = append(report.Deleted, r.Deleted...)
report.Failed = append(report.Failed, r.Failed...)
nodesPieces := objectdeletion.GroupPiecesByNodeID(pointers)
if len(nodesPieces) == 0 {
continue
}
for node, pieces := range nodesPieces {
requests = append(requests, piecedeletion.Request{
Node: storj.NodeURL{
ID: node,
},
Pieces: pieces,
})
}
}
if err := endpoint.deletePieces.Delete(ctx, requests, deleteObjectPiecesSuccessThreshold); err != nil {
endpoint.log.Error("failed to delete pieces", zap.Error(err))
}
return report, nil
}
func (endpoint *Endpoint) redundancyScheme() *pb.RedundancyScheme {
return &pb.RedundancyScheme{
Type: pb.RedundancyScheme_RS,
MinReq: int32(endpoint.config.RS.MinThreshold),
RepairThreshold: int32(endpoint.config.RS.RepairThreshold),
SuccessThreshold: int32(endpoint.config.RS.SuccessThreshold),
Total: int32(endpoint.config.RS.TotalThreshold),
ErasureShareSize: endpoint.config.RS.ErasureShareSize.Int32(),
}
}
// RevokeAPIKey handles requests to revoke an api key.
func (endpoint *Endpoint) RevokeAPIKey(ctx context.Context, req *pb.RevokeAPIKeyRequest) (resp *pb.RevokeAPIKeyResponse, err error) {
defer mon.Task()(&ctx)(&err)
macToRevoke, err := macaroon.ParseMacaroon(req.GetApiKey())
if err != nil {
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, "API key to revoke is not a macaroon")
}
keyInfo, err := endpoint.validateRevoke(ctx, req.Header, macToRevoke)
if err != nil {
return nil, err
}
err = endpoint.revocations.Revoke(ctx, macToRevoke.Tail(), keyInfo.ID[:])
if err != nil {
endpoint.log.Error("Failed to revoke API key", zap.Error(err))
return nil, rpcstatus.Error(rpcstatus.Internal, "Failed to revoke API key")
}
return &pb.RevokeAPIKeyResponse{}, nil
}
// CreatePath creates a segment key.
func CreatePath(ctx context.Context, projectID uuid.UUID, segmentIndex int64, bucket, path []byte) (_ metabase.SegmentLocation, err error) {
// TODO rename to CreateLocation
defer mon.Task()(&ctx)(&err)
if segmentIndex < lastSegment { // lastSegment = -1
return metabase.SegmentLocation{}, errors.New("invalid segment index")
}
return metabase.SegmentLocation{
ProjectID: projectID,
BucketName: string(bucket),
Index: segmentIndex,
ObjectKey: metabase.ObjectKey(path),
}, nil
}