storj/satellite/metainfo/endpoint_segment.go
dlamarmorgan 5fc493b276 metainfo: per-project and user-agent eventkit-based usage tracking
Add eventkit events for each satellite RPC type, tagged with user agent and public project id.

Change-Id: I838401683298d4594495de18563c55031e82f881
2023-04-25 15:13:43 +00:00

780 lines
27 KiB
Go

// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package metainfo
import (
"context"
"fmt"
"time"
"go.uber.org/zap"
"storj.io/common/errs2"
"storj.io/common/macaroon"
"storj.io/common/pb"
"storj.io/common/rpc/rpcstatus"
"storj.io/common/storj"
"storj.io/common/uuid"
"storj.io/storj/satellite/internalpb"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/orders"
"storj.io/storj/satellite/overlay"
"storj.io/uplink/private/eestream"
)
func calculateSpaceUsed(segmentSize int64, numberOfPieces int, rs storj.RedundancyScheme) (totalStored int64) {
pieceSize := segmentSize / int64(rs.RequiredShares)
return pieceSize * int64(numberOfPieces)
}
// BeginSegment begins segment uploading.
func (endpoint *Endpoint) BeginSegment(ctx context.Context, req *pb.SegmentBeginRequest) (resp *pb.SegmentBeginResponse, err error) {
defer mon.Task()(&ctx)(&err)
endpoint.versionCollector.collect(req.Header.UserAgent, mon.Func().ShortName())
streamID, err := endpoint.unmarshalSatStreamID(ctx, req.StreamId)
if err != nil {
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
}
keyInfo, err := endpoint.validateAuth(ctx, req.Header, macaroon.Action{
Op: macaroon.ActionWrite,
Bucket: streamID.Bucket,
EncryptedPath: streamID.EncryptedObjectKey,
Time: time.Now(),
})
if err != nil {
return nil, err
}
endpoint.usageTracking(keyInfo, req.Header, fmt.Sprintf("%T", req))
// no need to validate streamID fields because it was validated during BeginObject
if req.Position.Index < 0 {
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, "segment index must be greater then 0")
}
if err := endpoint.checkUploadLimits(ctx, keyInfo.ProjectID); err != nil {
return nil, err
}
redundancy, err := eestream.NewRedundancyStrategyFromProto(endpoint.defaultRS)
if err != nil {
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
}
config := endpoint.config
defaultRedundancy := storj.RedundancyScheme{
Algorithm: storj.ReedSolomon,
RequiredShares: int16(config.RS.Min),
RepairShares: int16(config.RS.Repair),
OptimalShares: int16(config.RS.Success),
TotalShares: int16(config.RS.Total),
ShareSize: config.RS.ErasureShareSize.Int32(),
}
maxPieceSize := defaultRedundancy.PieceSize(req.MaxOrderLimit)
request := overlay.FindStorageNodesRequest{
RequestedCount: redundancy.TotalCount(),
Placement: storj.PlacementConstraint(streamID.Placement),
}
nodes, err := endpoint.overlay.FindStorageNodesForUpload(ctx, request)
if err != nil {
endpoint.log.Error("internal", zap.Error(err))
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
bucket := metabase.BucketLocation{ProjectID: keyInfo.ProjectID, BucketName: string(streamID.Bucket)}
rootPieceID, addressedLimits, piecePrivateKey, err := endpoint.orders.CreatePutOrderLimits(ctx, bucket, nodes, streamID.ExpirationDate, maxPieceSize)
if err != nil {
endpoint.log.Error("internal", zap.Error(err))
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
id, err := uuid.FromBytes(streamID.StreamId)
if err != nil {
endpoint.log.Error("internal", zap.Error(err))
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
pieces := metabase.Pieces{}
for i, limit := range addressedLimits {
pieces = append(pieces, metabase.Piece{
Number: uint16(i),
StorageNode: limit.Limit.StorageNodeId,
})
}
err = endpoint.metabase.BeginSegment(ctx, metabase.BeginSegment{
ObjectStream: metabase.ObjectStream{
ProjectID: keyInfo.ProjectID,
BucketName: string(streamID.Bucket),
ObjectKey: metabase.ObjectKey(streamID.EncryptedObjectKey),
StreamID: id,
Version: metabase.Version(streamID.Version),
},
Position: metabase.SegmentPosition{
Part: uint32(req.Position.PartNumber),
Index: uint32(req.Position.Index),
},
RootPieceID: rootPieceID,
Pieces: pieces,
})
if err != nil {
return nil, endpoint.convertMetabaseErr(err)
}
segmentID, err := endpoint.packSegmentID(ctx, &internalpb.SegmentID{
StreamId: streamID,
PartNumber: req.Position.PartNumber,
Index: req.Position.Index,
OriginalOrderLimits: addressedLimits,
RootPieceId: rootPieceID,
CreationDate: time.Now(),
})
if err != nil {
endpoint.log.Error("internal", zap.Error(err))
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
endpoint.log.Info("Segment Upload", zap.Stringer("Project ID", keyInfo.ProjectID), zap.String("operation", "put"), zap.String("type", "remote"))
mon.Meter("req_put_remote").Mark(1)
return &pb.SegmentBeginResponse{
SegmentId: segmentID,
AddressedLimits: addressedLimits,
PrivateKey: piecePrivateKey,
RedundancyScheme: endpoint.defaultRS,
}, nil
}
// RetryBeginSegmentPieces replaces put order limits for failed piece uploads.
func (endpoint *Endpoint) RetryBeginSegmentPieces(ctx context.Context, req *pb.RetryBeginSegmentPiecesRequest) (resp *pb.RetryBeginSegmentPiecesResponse, err error) {
defer mon.Task()(&ctx)(&err)
endpoint.versionCollector.collect(req.Header.UserAgent, mon.Func().ShortName())
segmentID, err := endpoint.unmarshalSatSegmentID(ctx, req.SegmentId)
if err != nil {
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
}
keyInfo, err := endpoint.validateAuth(ctx, req.Header, macaroon.Action{
Op: macaroon.ActionWrite,
Bucket: segmentID.StreamId.Bucket,
EncryptedPath: segmentID.StreamId.EncryptedObjectKey,
Time: time.Now(),
})
if err != nil {
return nil, err
}
endpoint.usageTracking(keyInfo, req.Header, fmt.Sprintf("%T", req))
if len(req.RetryPieceNumbers) == 0 {
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, "piece numbers to exchange cannot be empty")
}
pieceNumberSet := make(map[int32]struct{}, len(req.RetryPieceNumbers))
for _, pieceNumber := range req.RetryPieceNumbers {
if pieceNumber < 0 || int(pieceNumber) >= len(segmentID.OriginalOrderLimits) {
endpoint.log.Debug("piece number is out of range",
zap.Int32("piece number", pieceNumber),
zap.Int("redundancy total", len(segmentID.OriginalOrderLimits)),
zap.Stringer("Segment ID", req.SegmentId),
)
return nil, rpcstatus.Errorf(rpcstatus.InvalidArgument, "piece number %d must be within range [0,%d]", pieceNumber, len(segmentID.OriginalOrderLimits)-1)
}
if _, ok := pieceNumberSet[pieceNumber]; ok {
endpoint.log.Debug("piece number is duplicated",
zap.Int32("piece number", pieceNumber),
zap.Stringer("Segment ID", req.SegmentId),
)
return nil, rpcstatus.Errorf(rpcstatus.InvalidArgument, "piece number %d is duplicated", pieceNumber)
}
pieceNumberSet[pieceNumber] = struct{}{}
}
if err := endpoint.checkUploadLimits(ctx, keyInfo.ProjectID); err != nil {
return nil, err
}
// Find a new set of storage nodes, excluding any already represented in
// the current list of order limits.
// TODO: It's possible that a node gets reused across multiple calls to RetryBeginSegmentPieces.
excludedIDs := make([]storj.NodeID, 0, len(segmentID.OriginalOrderLimits))
for _, orderLimit := range segmentID.OriginalOrderLimits {
excludedIDs = append(excludedIDs, orderLimit.Limit.StorageNodeId)
}
nodes, err := endpoint.overlay.FindStorageNodesForUpload(ctx, overlay.FindStorageNodesRequest{
RequestedCount: len(req.RetryPieceNumbers),
Placement: storj.PlacementConstraint(segmentID.StreamId.Placement),
ExcludedIDs: excludedIDs,
})
if err != nil {
endpoint.log.Error("internal", zap.Error(err))
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
addressedLimits, err := endpoint.orders.ReplacePutOrderLimits(ctx, segmentID.RootPieceId, segmentID.OriginalOrderLimits, nodes, req.RetryPieceNumbers)
if err != nil {
endpoint.log.Error("internal", zap.Error(err))
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
segmentID.OriginalOrderLimits = addressedLimits
amendedSegmentID, err := endpoint.packSegmentID(ctx, segmentID)
if err != nil {
endpoint.log.Error("internal", zap.Error(err))
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
endpoint.log.Info("Segment Upload Piece Retry", zap.Stringer("Project ID", keyInfo.ProjectID), zap.String("operation", "put"), zap.String("type", "remote"))
return &pb.RetryBeginSegmentPiecesResponse{
SegmentId: amendedSegmentID,
AddressedLimits: addressedLimits,
}, nil
}
// CommitSegment commits segment after uploading.
func (endpoint *Endpoint) CommitSegment(ctx context.Context, req *pb.SegmentCommitRequest) (resp *pb.SegmentCommitResponse, err error) {
defer mon.Task()(&ctx)(&err)
endpoint.versionCollector.collect(req.Header.UserAgent, mon.Func().ShortName())
segmentID, err := endpoint.unmarshalSatSegmentID(ctx, req.SegmentId)
if err != nil {
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
}
streamID := segmentID.StreamId
keyInfo, err := endpoint.validateAuth(ctx, req.Header, macaroon.Action{
Op: macaroon.ActionWrite,
Bucket: streamID.Bucket,
EncryptedPath: streamID.EncryptedObjectKey,
Time: time.Now(),
})
if err != nil {
return nil, err
}
endpoint.usageTracking(keyInfo, req.Header, fmt.Sprintf("%T", req))
// cheap basic verification
if numResults := len(req.UploadResult); numResults < int(endpoint.defaultRS.GetSuccessThreshold()) {
endpoint.log.Debug("the results of uploaded pieces for the segment is below the redundancy optimal threshold",
zap.Int("upload pieces results", numResults),
zap.Int32("redundancy optimal threshold", endpoint.defaultRS.GetSuccessThreshold()),
zap.Stringer("Segment ID", req.SegmentId),
)
return nil, rpcstatus.Errorf(rpcstatus.InvalidArgument,
"the number of results of uploaded pieces (%d) is below the optimal threshold (%d)",
numResults, endpoint.defaultRS.GetSuccessThreshold(),
)
}
rs := storj.RedundancyScheme{
Algorithm: storj.RedundancyAlgorithm(endpoint.defaultRS.Type),
RequiredShares: int16(endpoint.defaultRS.MinReq),
RepairShares: int16(endpoint.defaultRS.RepairThreshold),
OptimalShares: int16(endpoint.defaultRS.SuccessThreshold),
TotalShares: int16(endpoint.defaultRS.Total),
ShareSize: endpoint.defaultRS.ErasureShareSize,
}
err = endpoint.pointerVerification.VerifySizes(ctx, rs, req.SizeEncryptedData, req.UploadResult)
if err != nil {
endpoint.log.Debug("piece sizes are invalid", zap.Error(err))
return nil, rpcstatus.Errorf(rpcstatus.InvalidArgument, "piece sizes are invalid: %v", err)
}
// extract the original order limits
originalLimits := make([]*pb.OrderLimit, len(segmentID.OriginalOrderLimits))
for i, orderLimit := range segmentID.OriginalOrderLimits {
originalLimits[i] = orderLimit.Limit
}
// verify the piece upload results
validPieces, invalidPieces, err := endpoint.pointerVerification.SelectValidPieces(ctx, req.UploadResult, originalLimits)
if err != nil {
endpoint.log.Debug("pointer verification failed", zap.Error(err))
return nil, rpcstatus.Errorf(rpcstatus.InvalidArgument, "pointer verification failed: %s", err)
}
if len(validPieces) < int(rs.OptimalShares) {
endpoint.log.Debug("Number of valid pieces is less than the success threshold",
zap.Int("totalReceivedPieces", len(req.UploadResult)),
zap.Int("validPieces", len(validPieces)),
zap.Int("invalidPieces", len(invalidPieces)),
zap.Int("successThreshold", int(rs.OptimalShares)),
)
errMsg := fmt.Sprintf("Number of valid pieces (%d) is less than the success threshold (%d). Found %d invalid pieces",
len(validPieces),
rs.OptimalShares,
len(invalidPieces),
)
if len(invalidPieces) > 0 {
errMsg = fmt.Sprintf("%s. Invalid Pieces:", errMsg)
for _, p := range invalidPieces {
errMsg = fmt.Sprintf("%s\nNodeID: %v, PieceNum: %d, Reason: %s",
errMsg, p.NodeID, p.PieceNum, p.Reason,
)
}
}
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, errMsg)
}
pieces := metabase.Pieces{}
for _, result := range validPieces {
pieces = append(pieces, metabase.Piece{
Number: uint16(result.PieceNum),
StorageNode: result.NodeId,
})
}
id, err := uuid.FromBytes(streamID.StreamId)
if err != nil {
endpoint.log.Error("internal", zap.Error(err))
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
var expiresAt *time.Time
if !streamID.ExpirationDate.IsZero() {
expiresAt = &streamID.ExpirationDate
}
mbCommitSegment := metabase.CommitSegment{
ObjectStream: metabase.ObjectStream{
ProjectID: keyInfo.ProjectID,
BucketName: string(streamID.Bucket),
ObjectKey: metabase.ObjectKey(streamID.EncryptedObjectKey),
StreamID: id,
Version: metabase.Version(streamID.Version),
},
ExpiresAt: expiresAt,
EncryptedKey: req.EncryptedKey,
EncryptedKeyNonce: req.EncryptedKeyNonce[:],
EncryptedSize: int32(req.SizeEncryptedData), // TODO incompatible types int32 vs int64
PlainSize: int32(req.PlainSize), // TODO incompatible types int32 vs int64
EncryptedETag: req.EncryptedETag,
Position: metabase.SegmentPosition{
Part: uint32(segmentID.PartNumber),
Index: uint32(segmentID.Index),
},
RootPieceID: segmentID.RootPieceId,
Redundancy: rs,
Pieces: pieces,
Placement: storj.PlacementConstraint(streamID.Placement),
}
err = endpoint.validateRemoteSegment(ctx, mbCommitSegment, originalLimits)
if err != nil {
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
}
if err := endpoint.checkUploadLimits(ctx, keyInfo.ProjectID); err != nil {
return nil, err
}
segmentSize := req.SizeEncryptedData
totalStored := calculateSpaceUsed(segmentSize, len(pieces), rs)
// ToDo: Replace with hash & signature validation
// Ensure neither uplink or storage nodes are cheating on us
// We cannot have more redundancy than total/min
if float64(totalStored) > (float64(segmentSize)/float64(rs.RequiredShares))*float64(rs.TotalShares) {
endpoint.log.Debug("data size mismatch",
zap.Int64("segment", segmentSize),
zap.Int64("pieces", totalStored),
zap.Int16("redundancy minimum requested", rs.RequiredShares),
zap.Int16("redundancy total", rs.TotalShares),
)
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, "mismatched segment size and piece usage")
}
err = endpoint.metabase.CommitSegment(ctx, mbCommitSegment)
if err != nil {
return nil, endpoint.convertMetabaseErr(err)
}
if err := endpoint.addSegmentToUploadLimits(ctx, keyInfo.ProjectID, segmentSize); err != nil {
return nil, err
}
// note: we collect transfer stats in CommitSegment instead because in BeginSegment
// they would always be MaxSegmentSize (64MiB)
endpoint.versionCollector.collectTransferStats(req.Header.UserAgent, upload, int(req.PlainSize))
mon.Meter("req_commit_segment").Mark(1)
return &pb.SegmentCommitResponse{
SuccessfulPieces: int32(len(pieces)),
}, nil
}
// MakeInlineSegment makes inline segment on satellite.
func (endpoint *Endpoint) MakeInlineSegment(ctx context.Context, req *pb.SegmentMakeInlineRequest) (resp *pb.SegmentMakeInlineResponse, err error) {
defer mon.Task()(&ctx)(&err)
endpoint.versionCollector.collect(req.Header.UserAgent, mon.Func().ShortName())
streamID, err := endpoint.unmarshalSatStreamID(ctx, req.StreamId)
if err != nil {
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
}
keyInfo, err := endpoint.validateAuth(ctx, req.Header, macaroon.Action{
Op: macaroon.ActionWrite,
Bucket: streamID.Bucket,
EncryptedPath: streamID.EncryptedObjectKey,
Time: time.Now(),
})
if err != nil {
return nil, err
}
endpoint.usageTracking(keyInfo, req.Header, fmt.Sprintf("%T", req))
if req.Position.Index < 0 {
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, "segment index must be greater then 0")
}
inlineUsed := int64(len(req.EncryptedInlineData))
if inlineUsed > endpoint.encInlineSegmentSize {
return nil, rpcstatus.Errorf(rpcstatus.InvalidArgument, "inline segment size cannot be larger than %s", endpoint.config.MaxInlineSegmentSize)
}
id, err := uuid.FromBytes(streamID.StreamId)
if err != nil {
endpoint.log.Error("internal", zap.Error(err))
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
if err := endpoint.checkUploadLimits(ctx, keyInfo.ProjectID); err != nil {
return nil, err
}
var expiresAt *time.Time
if !streamID.ExpirationDate.IsZero() {
expiresAt = &streamID.ExpirationDate
}
err = endpoint.metabase.CommitInlineSegment(ctx, metabase.CommitInlineSegment{
ObjectStream: metabase.ObjectStream{
ProjectID: keyInfo.ProjectID,
BucketName: string(streamID.Bucket),
ObjectKey: metabase.ObjectKey(streamID.EncryptedObjectKey),
StreamID: id,
Version: metabase.Version(streamID.Version),
},
ExpiresAt: expiresAt,
EncryptedKey: req.EncryptedKey,
EncryptedKeyNonce: req.EncryptedKeyNonce.Bytes(),
Position: metabase.SegmentPosition{
Part: uint32(req.Position.PartNumber),
Index: uint32(req.Position.Index),
},
PlainSize: int32(req.PlainSize), // TODO incompatible types int32 vs int64
EncryptedETag: req.EncryptedETag,
InlineData: req.EncryptedInlineData,
})
if err != nil {
return nil, endpoint.convertMetabaseErr(err)
}
bucket := metabase.BucketLocation{ProjectID: keyInfo.ProjectID, BucketName: string(streamID.Bucket)}
err = endpoint.orders.UpdatePutInlineOrder(ctx, bucket, inlineUsed)
if err != nil {
endpoint.log.Error("internal", zap.Error(err))
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
if err := endpoint.addSegmentToUploadLimits(ctx, keyInfo.ProjectID, inlineUsed); err != nil {
return nil, err
}
endpoint.versionCollector.collectTransferStats(req.Header.UserAgent, upload, int(req.PlainSize))
endpoint.log.Info("Inline Segment Upload", zap.Stringer("Project ID", keyInfo.ProjectID), zap.String("operation", "put"), zap.String("type", "inline"))
mon.Meter("req_put_inline").Mark(1)
return &pb.SegmentMakeInlineResponse{}, nil
}
// ListSegments list object segments.
func (endpoint *Endpoint) ListSegments(ctx context.Context, req *pb.SegmentListRequest) (resp *pb.SegmentListResponse, err error) {
defer mon.Task()(&ctx)(&err)
endpoint.versionCollector.collect(req.Header.UserAgent, mon.Func().ShortName())
streamID, err := endpoint.unmarshalSatStreamID(ctx, req.StreamId)
if err != nil {
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
}
keyInfo, err := endpoint.validateAuth(ctx, req.Header, macaroon.Action{
Op: macaroon.ActionRead,
Bucket: streamID.Bucket,
EncryptedPath: streamID.EncryptedObjectKey,
Time: time.Now(),
})
if err != nil {
return nil, err
}
endpoint.usageTracking(keyInfo, req.Header, fmt.Sprintf("%T", req))
cursor := req.CursorPosition
if cursor == nil {
cursor = &pb.SegmentPosition{}
}
id, err := uuid.FromBytes(streamID.StreamId)
if err != nil {
endpoint.log.Error("internal", zap.Error(err))
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
result, err := endpoint.metabase.ListStreamPositions(ctx, metabase.ListStreamPositions{
StreamID: id,
Cursor: metabase.SegmentPosition{
Part: uint32(cursor.PartNumber),
Index: uint32(cursor.Index),
},
Limit: int(req.Limit),
})
if err != nil {
return nil, endpoint.convertMetabaseErr(err)
}
response, err := convertStreamListResults(result)
if err != nil {
endpoint.log.Error("unable to convert stream list", zap.Error(err))
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
response.EncryptionParameters = streamID.EncryptionParameters
mon.Meter("req_list_segments").Mark(1)
return response, nil
}
func convertStreamListResults(result metabase.ListStreamPositionsResult) (*pb.SegmentListResponse, error) {
items := make([]*pb.SegmentListItem, len(result.Segments))
for i, item := range result.Segments {
items[i] = &pb.SegmentListItem{
Position: &pb.SegmentPosition{
PartNumber: int32(item.Position.Part),
Index: int32(item.Position.Index),
},
PlainSize: int64(item.PlainSize),
PlainOffset: item.PlainOffset,
}
if item.CreatedAt != nil {
items[i].CreatedAt = *item.CreatedAt
}
items[i].EncryptedETag = item.EncryptedETag
var err error
items[i].EncryptedKeyNonce, err = storj.NonceFromBytes(item.EncryptedKeyNonce)
if err != nil {
return nil, err
}
items[i].EncryptedKey = item.EncryptedKey
}
return &pb.SegmentListResponse{
Items: items,
More: result.More,
}, nil
}
// DownloadSegment returns data necessary to download segment.
func (endpoint *Endpoint) DownloadSegment(ctx context.Context, req *pb.SegmentDownloadRequest) (resp *pb.SegmentDownloadResponse, err error) {
defer mon.Task()(&ctx)(&err)
if ctx.Err() != nil {
return nil, rpcstatus.Error(rpcstatus.Canceled, "client has closed the connection")
}
endpoint.versionCollector.collect(req.Header.UserAgent, mon.Func().ShortName())
streamID, err := endpoint.unmarshalSatStreamID(ctx, req.StreamId)
if err != nil {
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
}
keyInfo, err := endpoint.validateAuth(ctx, req.Header, macaroon.Action{
Op: macaroon.ActionRead,
Bucket: streamID.Bucket,
EncryptedPath: streamID.EncryptedObjectKey,
Time: time.Now(),
})
if err != nil {
return nil, err
}
endpoint.usageTracking(keyInfo, req.Header, fmt.Sprintf("%T", req))
bucket := metabase.BucketLocation{ProjectID: keyInfo.ProjectID, BucketName: string(streamID.Bucket)}
if exceeded, limit, err := endpoint.projectUsage.ExceedsBandwidthUsage(ctx, keyInfo.ProjectID); err != nil {
if errs2.IsCanceled(err) {
return nil, rpcstatus.Wrap(rpcstatus.Canceled, err)
}
endpoint.log.Error(
"Retrieving project bandwidth total failed; bandwidth limit won't be enforced",
zap.Stringer("Project ID", keyInfo.ProjectID),
zap.Error(err),
)
} else if exceeded {
endpoint.log.Warn("Monthly bandwidth limit exceeded",
zap.Stringer("Limit", limit),
zap.Stringer("Project ID", keyInfo.ProjectID),
)
return nil, rpcstatus.Error(rpcstatus.ResourceExhausted, "Exceeded Usage Limit")
}
id, err := uuid.FromBytes(streamID.StreamId)
if err != nil {
endpoint.log.Error("internal", zap.Error(err))
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
var segment metabase.Segment
if req.CursorPosition.PartNumber == 0 && req.CursorPosition.Index == -1 {
if streamID.MultipartObject {
return nil, rpcstatus.Error(rpcstatus.Unimplemented, "Used uplink version cannot download multipart objects.")
}
segment, err = endpoint.metabase.GetLatestObjectLastSegment(ctx, metabase.GetLatestObjectLastSegment{
ObjectLocation: metabase.ObjectLocation{
ProjectID: keyInfo.ProjectID,
BucketName: string(streamID.Bucket),
ObjectKey: metabase.ObjectKey(streamID.EncryptedObjectKey),
},
})
} else {
segment, err = endpoint.metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
StreamID: id,
Position: metabase.SegmentPosition{
Part: uint32(req.CursorPosition.PartNumber),
Index: uint32(req.CursorPosition.Index),
},
})
}
if err != nil {
return nil, endpoint.convertMetabaseErr(err)
}
// Update the current bandwidth cache value incrementing the SegmentSize.
err = endpoint.projectUsage.UpdateProjectBandwidthUsage(ctx, keyInfo.ProjectID, int64(segment.EncryptedSize))
if err != nil {
if errs2.IsCanceled(err) {
return nil, rpcstatus.Wrap(rpcstatus.Canceled, err)
}
// log it and continue. it's most likely our own fault that we couldn't
// track it, and the only thing that will be affected is our per-project
// bandwidth limits.
endpoint.log.Error("Could not track the new project's bandwidth usage when downloading a segment",
zap.Stringer("Project ID", keyInfo.ProjectID),
zap.Error(err),
)
}
encryptedKeyNonce, err := storj.NonceFromBytes(segment.EncryptedKeyNonce)
if err != nil {
endpoint.log.Error("unable to get encryption key nonce from metadata", zap.Error(err))
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
if segment.Inline() {
err := endpoint.orders.UpdateGetInlineOrder(ctx, bucket, int64(len(segment.InlineData)))
if err != nil {
endpoint.log.Error("internal", zap.Error(err))
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
endpoint.versionCollector.collectTransferStats(req.Header.UserAgent, download, len(segment.InlineData))
endpoint.log.Info("Inline Segment Download", zap.Stringer("Project ID", keyInfo.ProjectID), zap.String("operation", "get"), zap.String("type", "inline"))
mon.Meter("req_get_inline").Mark(1)
return &pb.SegmentDownloadResponse{
PlainOffset: segment.PlainOffset,
PlainSize: int64(segment.PlainSize),
SegmentSize: int64(segment.EncryptedSize),
EncryptedInlineData: segment.InlineData,
EncryptedKeyNonce: encryptedKeyNonce,
EncryptedKey: segment.EncryptedKey,
Position: &pb.SegmentPosition{
PartNumber: int32(segment.Position.Part),
Index: int32(segment.Position.Index),
},
}, nil
}
// Remote segment
limits, privateKey, err := endpoint.orders.CreateGetOrderLimits(ctx, bucket, segment, req.GetDesiredNodes(), 0)
if err != nil {
if orders.ErrDownloadFailedNotEnoughPieces.Has(err) {
endpoint.log.Error("Unable to create order limits.",
zap.Stringer("Project ID", keyInfo.ProjectID),
zap.Stringer("API Key ID", keyInfo.ID),
zap.Error(err),
)
}
endpoint.log.Error("internal", zap.Error(err))
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
endpoint.versionCollector.collectTransferStats(req.Header.UserAgent, download, int(segment.EncryptedSize))
endpoint.log.Info("Segment Download", zap.Stringer("Project ID", keyInfo.ProjectID), zap.String("operation", "get"), zap.String("type", "remote"))
mon.Meter("req_get_remote").Mark(1)
return &pb.SegmentDownloadResponse{
AddressedLimits: limits,
PrivateKey: privateKey,
PlainOffset: segment.PlainOffset,
PlainSize: int64(segment.PlainSize),
SegmentSize: int64(segment.EncryptedSize),
EncryptedKeyNonce: encryptedKeyNonce,
EncryptedKey: segment.EncryptedKey,
RedundancyScheme: &pb.RedundancyScheme{
Type: pb.RedundancyScheme_SchemeType(segment.Redundancy.Algorithm),
ErasureShareSize: segment.Redundancy.ShareSize,
MinReq: int32(segment.Redundancy.RequiredShares),
RepairThreshold: int32(segment.Redundancy.RepairShares),
SuccessThreshold: int32(segment.Redundancy.OptimalShares),
Total: int32(segment.Redundancy.TotalShares),
},
Position: &pb.SegmentPosition{
PartNumber: int32(segment.Position.Part),
Index: int32(segment.Position.Index),
},
}, nil
}
// DeletePart is a no-op.
//
// It was used to perform the deletion of a single part from satellite db and
// from storage nodes. We made this method noop because now we can overwrite
// segments for pending objects. It's returning no error to avoid failures with
// uplinks that still are using this method.
func (endpoint *Endpoint) DeletePart(ctx context.Context, req *pb.PartDeleteRequest) (resp *pb.PartDeleteResponse, err error) {
defer mon.Task()(&ctx)(&err)
return &pb.PartDeleteResponse{}, nil
}