satellite/metainfo: combine CommitSegment and CommitObject in batch v2
This change is a special case for batch processing. If in batch request CommitSegment and CommitObject are one after another we can execute these requests as one. This will avoid current logic where we are saving pointer for CommitSegment and later we are deleting this pointer and saving it once again as under last segment path for CommitObject. This change should handle issue we have in older uplinks with incorrect order of storing pointers. Change-Id: I86514c95df169e6fbc91b52e5117472cae70cb8b
This commit is contained in:
parent
1db087cfba
commit
4deab5ac6c
@ -6,7 +6,9 @@ package metainfo
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"github.com/zeebo/errs"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"storj.io/common/pb"
|
||||
"storj.io/common/storj"
|
||||
@ -22,7 +24,8 @@ func (endpoint *Endpoint) Batch(ctx context.Context, req *pb.BatchRequest) (resp
|
||||
|
||||
var lastStreamID storj.StreamID
|
||||
var lastSegmentID storj.SegmentID
|
||||
for _, request := range req.Requests {
|
||||
var prevSegmentReq *pb.BatchRequestItem
|
||||
for i, request := range req.Requests {
|
||||
switch singleRequest := request.Request.(type) {
|
||||
// BUCKET
|
||||
case *pb.BatchRequestItem_BucketCreate:
|
||||
@ -100,7 +103,38 @@ func (endpoint *Endpoint) Batch(ctx context.Context, req *pb.BatchRequest) (resp
|
||||
singleRequest.ObjectCommit.StreamId = lastStreamID
|
||||
}
|
||||
|
||||
response, err := endpoint.CommitObject(ctx, singleRequest.ObjectCommit)
|
||||
var response *pb.ObjectCommitResponse
|
||||
var err error
|
||||
switch {
|
||||
case prevSegmentReq.GetSegmentMakeInline() != nil:
|
||||
pointer, segmentResp, segmentErr := endpoint.makeInlineSegment(ctx, prevSegmentReq.GetSegmentMakeInline(), false)
|
||||
prevSegmentReq = nil
|
||||
if segmentErr != nil {
|
||||
return resp, segmentErr
|
||||
}
|
||||
|
||||
resp.Responses = append(resp.Responses, &pb.BatchResponseItem{
|
||||
Response: &pb.BatchResponseItem_SegmentMakeInline{
|
||||
SegmentMakeInline: segmentResp,
|
||||
},
|
||||
})
|
||||
response, err = endpoint.commitObject(ctx, singleRequest.ObjectCommit, pointer)
|
||||
case prevSegmentReq.GetSegmentCommit() != nil:
|
||||
pointer, segmentResp, segmentErr := endpoint.commitSegment(ctx, prevSegmentReq.GetSegmentCommit(), false)
|
||||
prevSegmentReq = nil
|
||||
if segmentErr != nil {
|
||||
return resp, segmentErr
|
||||
}
|
||||
|
||||
resp.Responses = append(resp.Responses, &pb.BatchResponseItem{
|
||||
Response: &pb.BatchResponseItem_SegmentCommit{
|
||||
SegmentCommit: segmentResp,
|
||||
},
|
||||
})
|
||||
response, err = endpoint.commitObject(ctx, singleRequest.ObjectCommit, pointer)
|
||||
default:
|
||||
response, err = endpoint.CommitObject(ctx, singleRequest.ObjectCommit)
|
||||
}
|
||||
if err != nil {
|
||||
return resp, err
|
||||
}
|
||||
@ -185,6 +219,14 @@ func (endpoint *Endpoint) Batch(ctx context.Context, req *pb.BatchRequest) (resp
|
||||
singleRequest.SegmentCommit.SegmentId = lastSegmentID
|
||||
}
|
||||
|
||||
segmentID, err := endpoint.unmarshalSatSegmentID(ctx, singleRequest.SegmentCommit.SegmentId)
|
||||
if err != nil {
|
||||
endpoint.log.Error("unable to unmarshal segment id", zap.Error(err))
|
||||
} else if endpoint.shouldCombine(segmentID.Index, i, req.Requests) {
|
||||
prevSegmentReq = request
|
||||
continue
|
||||
}
|
||||
|
||||
response, err := endpoint.CommitSegment(ctx, singleRequest.SegmentCommit)
|
||||
if err != nil {
|
||||
return resp, err
|
||||
@ -217,6 +259,11 @@ func (endpoint *Endpoint) Batch(ctx context.Context, req *pb.BatchRequest) (resp
|
||||
singleRequest.SegmentMakeInline.StreamId = lastStreamID
|
||||
}
|
||||
|
||||
if endpoint.shouldCombine(singleRequest.SegmentMakeInline.Position.Index, i, req.Requests) {
|
||||
prevSegmentReq = request
|
||||
continue
|
||||
}
|
||||
|
||||
response, err := endpoint.MakeInlineSegment(ctx, singleRequest.SegmentMakeInline)
|
||||
if err != nil {
|
||||
return resp, err
|
||||
@ -283,3 +330,25 @@ func (endpoint *Endpoint) Batch(ctx context.Context, req *pb.BatchRequest) (resp
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
// shouldCombine returns true if we are able to combine current request with next one. Main case is
|
||||
// combining CommitSegment/MakeInlineSegment with ObjectCommmit.
|
||||
//
|
||||
// This method has a workaround for a bug in uplink where ObjectCommit was batched with
|
||||
// segment N-2 instead of N-1 if list segment was inline segment. We are checking that
|
||||
// current request segment index is last one before 'l' segment and we are not combining otherwise.
|
||||
func (endpoint *Endpoint) shouldCombine(segmentIndex int32, reqIndex int, requests []*pb.BatchRequestItem) bool {
|
||||
if reqIndex < len(requests)-1 && requests[reqIndex+1].GetObjectCommit() != nil {
|
||||
objCommitReq := requests[reqIndex+1].GetObjectCommit()
|
||||
|
||||
streamMeta := pb.StreamMeta{}
|
||||
err := proto.Unmarshal(objCommitReq.EncryptedMetadata, &streamMeta)
|
||||
if err != nil {
|
||||
endpoint.log.Error("unable to unmarshal stream meta", zap.Error(err))
|
||||
return false
|
||||
}
|
||||
|
||||
return int64(segmentIndex) != streamMeta.NumberOfSegments-2
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
@ -1116,6 +1116,12 @@ func (endpoint *Endpoint) BeginObject(ctx context.Context, req *pb.ObjectBeginRe
|
||||
func (endpoint *Endpoint) CommitObject(ctx context.Context, req *pb.ObjectCommitRequest) (resp *pb.ObjectCommitResponse, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
return endpoint.commitObject(ctx, req, nil)
|
||||
}
|
||||
|
||||
func (endpoint *Endpoint) commitObject(ctx context.Context, req *pb.ObjectCommitRequest, pointer *pb.Pointer) (resp *pb.ObjectCommitResponse, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
streamID := &pb.SatStreamID{}
|
||||
err = proto.Unmarshal(req.StreamId, streamID)
|
||||
if err != nil {
|
||||
@ -1147,19 +1153,29 @@ func (endpoint *Endpoint) CommitObject(ctx context.Context, req *pb.ObjectCommit
|
||||
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, "invalid metadata structure")
|
||||
}
|
||||
|
||||
lastSegmentIndex := streamMeta.NumberOfSegments - 1
|
||||
lastSegmentPath, err := CreatePath(ctx, keyInfo.ProjectID, lastSegmentIndex, streamID.Bucket, streamID.EncryptedPath)
|
||||
if err != nil {
|
||||
return nil, rpcstatus.Errorf(rpcstatus.InvalidArgument, "unable to create segment path: %s", err.Error())
|
||||
}
|
||||
lastSegmentPointer := pointer
|
||||
if pointer == nil {
|
||||
lastSegmentIndex := streamMeta.NumberOfSegments - 1
|
||||
lastSegmentPath, err := CreatePath(ctx, keyInfo.ProjectID, lastSegmentIndex, streamID.Bucket, streamID.EncryptedPath)
|
||||
if err != nil {
|
||||
return nil, rpcstatus.Errorf(rpcstatus.InvalidArgument, "unable to create segment path: %s", err.Error())
|
||||
}
|
||||
|
||||
lastSegmentPointerBytes, lastSegmentPointer, err := endpoint.metainfo.GetWithBytes(ctx, lastSegmentPath)
|
||||
if err != nil {
|
||||
endpoint.log.Error("unable to get pointer", zap.String("segmentPath", lastSegmentPath), zap.Error(err))
|
||||
return nil, rpcstatus.Error(rpcstatus.Internal, "unable to commit object")
|
||||
}
|
||||
if lastSegmentPointer == nil {
|
||||
return nil, rpcstatus.Errorf(rpcstatus.NotFound, "unable to find object: %q/%q", streamID.Bucket, streamID.EncryptedPath)
|
||||
var lastSegmentPointerBytes []byte
|
||||
lastSegmentPointerBytes, lastSegmentPointer, err = endpoint.metainfo.GetWithBytes(ctx, lastSegmentPath)
|
||||
if err != nil {
|
||||
endpoint.log.Error("unable to get pointer", zap.String("segmentPath", lastSegmentPath), zap.Error(err))
|
||||
return nil, rpcstatus.Error(rpcstatus.Internal, "unable to commit object")
|
||||
}
|
||||
if lastSegmentPointer == nil {
|
||||
return nil, rpcstatus.Errorf(rpcstatus.NotFound, "unable to find object: %q/%q", streamID.Bucket, streamID.EncryptedPath)
|
||||
}
|
||||
|
||||
err = endpoint.metainfo.Delete(ctx, lastSegmentPath, lastSegmentPointerBytes)
|
||||
if err != nil {
|
||||
endpoint.log.Error("unable to delete pointer", zap.String("segmentPath", lastSegmentPath), zap.Error(err))
|
||||
return nil, rpcstatus.Error(rpcstatus.Internal, "unable to commit object")
|
||||
}
|
||||
}
|
||||
|
||||
if lastSegmentPointer.Remote == nil {
|
||||
@ -1169,14 +1185,7 @@ func (endpoint *Endpoint) CommitObject(ctx context.Context, req *pb.ObjectCommit
|
||||
lastSegmentPointer.Remote.Redundancy = streamID.Redundancy
|
||||
lastSegmentPointer.Metadata = req.EncryptedMetadata
|
||||
|
||||
err = endpoint.metainfo.Delete(ctx, lastSegmentPath, lastSegmentPointerBytes)
|
||||
if err != nil {
|
||||
endpoint.log.Error("unable to delete pointer", zap.String("segmentPath", lastSegmentPath), zap.Error(err))
|
||||
return nil, rpcstatus.Error(rpcstatus.Internal, "unable to commit object")
|
||||
}
|
||||
|
||||
lastSegmentIndex = -1
|
||||
lastSegmentPath, err = CreatePath(ctx, keyInfo.ProjectID, lastSegmentIndex, streamID.Bucket, streamID.EncryptedPath)
|
||||
lastSegmentPath, err := CreatePath(ctx, keyInfo.ProjectID, int64(lastSegment), streamID.Bucket, streamID.EncryptedPath)
|
||||
if err != nil {
|
||||
endpoint.log.Error("unable to create path", zap.Error(err))
|
||||
return nil, rpcstatus.Error(rpcstatus.Internal, "unable to commit object")
|
||||
@ -1509,9 +1518,16 @@ func (endpoint *Endpoint) BeginSegment(ctx context.Context, req *pb.SegmentBegin
|
||||
func (endpoint *Endpoint) CommitSegment(ctx context.Context, req *pb.SegmentCommitRequest) (resp *pb.SegmentCommitResponse, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
_, resp, err = endpoint.commitSegment(ctx, req, true)
|
||||
return resp, err
|
||||
}
|
||||
|
||||
func (endpoint *Endpoint) commitSegment(ctx context.Context, req *pb.SegmentCommitRequest, savePointer bool) (pointer *pb.Pointer, resp *pb.SegmentCommitResponse, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
segmentID, err := endpoint.unmarshalSatSegmentID(ctx, req.SegmentId)
|
||||
if err != nil {
|
||||
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
|
||||
return nil, nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
|
||||
}
|
||||
|
||||
streamID := segmentID.StreamId
|
||||
@ -1523,7 +1539,7 @@ func (endpoint *Endpoint) CommitSegment(ctx context.Context, req *pb.SegmentComm
|
||||
Time: time.Now(),
|
||||
})
|
||||
if err != nil {
|
||||
return nil, rpcstatus.Error(rpcstatus.Unauthenticated, err.Error())
|
||||
return nil, nil, rpcstatus.Error(rpcstatus.Unauthenticated, err.Error())
|
||||
}
|
||||
|
||||
if numResults := len(req.UploadResult); numResults < int(streamID.Redundancy.GetSuccessThreshold()) {
|
||||
@ -1532,7 +1548,7 @@ func (endpoint *Endpoint) CommitSegment(ctx context.Context, req *pb.SegmentComm
|
||||
zap.Int32("redundancy optimal threshold", streamID.Redundancy.GetSuccessThreshold()),
|
||||
zap.Stringer("Segment ID", req.SegmentId),
|
||||
)
|
||||
return nil, rpcstatus.Errorf(rpcstatus.InvalidArgument,
|
||||
return nil, nil, rpcstatus.Errorf(rpcstatus.InvalidArgument,
|
||||
"the number of results of uploaded pieces (%d) is below the optimal threshold (%d)",
|
||||
numResults, streamID.Redundancy.GetSuccessThreshold(),
|
||||
)
|
||||
@ -1558,10 +1574,10 @@ func (endpoint *Endpoint) CommitSegment(ctx context.Context, req *pb.SegmentComm
|
||||
})
|
||||
if err != nil {
|
||||
endpoint.log.Error("unable to marshal segment metadata", zap.Error(err))
|
||||
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
|
||||
return nil, nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
|
||||
}
|
||||
|
||||
pointer := &pb.Pointer{
|
||||
pointer = &pb.Pointer{
|
||||
Type: pb.Pointer_REMOTE,
|
||||
Remote: remote,
|
||||
SegmentSize: req.SizeEncryptedData,
|
||||
@ -1580,29 +1596,24 @@ func (endpoint *Endpoint) CommitSegment(ctx context.Context, req *pb.SegmentComm
|
||||
|
||||
err = endpoint.validatePointer(ctx, pointer, orderLimits)
|
||||
if err != nil {
|
||||
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
|
||||
return nil, nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
|
||||
}
|
||||
|
||||
err = endpoint.filterValidPieces(ctx, pointer, orderLimits)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
path, err := CreatePath(ctx, keyInfo.ProjectID, int64(segmentID.Index), streamID.Bucket, streamID.EncryptedPath)
|
||||
if err != nil {
|
||||
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
exceeded, limit, err := endpoint.projectUsage.ExceedsStorageUsage(ctx, keyInfo.ProjectID)
|
||||
if err != nil {
|
||||
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
|
||||
return nil, nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
|
||||
}
|
||||
if exceeded {
|
||||
endpoint.log.Error("The project limit of storage and bandwidth has been exceeded",
|
||||
zap.Int64("limit", limit.Int64()),
|
||||
zap.Stringer("Project ID", keyInfo.ProjectID),
|
||||
)
|
||||
return nil, rpcstatus.Error(rpcstatus.ResourceExhausted, "Exceeded Usage Limit")
|
||||
return nil, nil, rpcstatus.Error(rpcstatus.ResourceExhausted, "Exceeded Usage Limit")
|
||||
}
|
||||
|
||||
// clear hashes so we don't store them
|
||||
@ -1623,7 +1634,7 @@ func (endpoint *Endpoint) CommitSegment(ctx context.Context, req *pb.SegmentComm
|
||||
zap.Int32("redundancy minimum requested", pointer.Remote.Redundancy.MinReq),
|
||||
zap.Int32("redundancy total", pointer.Remote.Redundancy.Total),
|
||||
)
|
||||
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, "mismatched segment size and piece usage")
|
||||
return nil, nil, rpcstatus.Error(rpcstatus.InvalidArgument, "mismatched segment size and piece usage")
|
||||
}
|
||||
}
|
||||
|
||||
@ -1636,12 +1647,19 @@ func (endpoint *Endpoint) CommitSegment(ctx context.Context, req *pb.SegmentComm
|
||||
// that will be affected is our per-project bandwidth and storage limits.
|
||||
}
|
||||
|
||||
err = endpoint.metainfo.UnsynchronizedPut(ctx, path, pointer)
|
||||
if err != nil {
|
||||
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
|
||||
if savePointer {
|
||||
path, err := CreatePath(ctx, keyInfo.ProjectID, int64(segmentID.Index), streamID.Bucket, streamID.EncryptedPath)
|
||||
if err != nil {
|
||||
return nil, nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
|
||||
}
|
||||
|
||||
err = endpoint.metainfo.UnsynchronizedPut(ctx, path, pointer)
|
||||
if err != nil {
|
||||
return nil, nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
return &pb.SegmentCommitResponse{
|
||||
return pointer, &pb.SegmentCommitResponse{
|
||||
SuccessfulPieces: int32(len(pointer.Remote.RemotePieces)),
|
||||
}, nil
|
||||
}
|
||||
@ -1650,9 +1668,17 @@ func (endpoint *Endpoint) CommitSegment(ctx context.Context, req *pb.SegmentComm
|
||||
func (endpoint *Endpoint) MakeInlineSegment(ctx context.Context, req *pb.SegmentMakeInlineRequest) (resp *pb.SegmentMakeInlineResponse, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
_, resp, err = endpoint.makeInlineSegment(ctx, req, true)
|
||||
return resp, err
|
||||
}
|
||||
|
||||
// MakeInlineSegment makes inline segment on satellite
|
||||
func (endpoint *Endpoint) makeInlineSegment(ctx context.Context, req *pb.SegmentMakeInlineRequest, savePointer bool) (pointer *pb.Pointer, resp *pb.SegmentMakeInlineResponse, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
streamID, err := endpoint.unmarshalSatStreamID(ctx, req.StreamId)
|
||||
if err != nil {
|
||||
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
|
||||
return nil, nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
|
||||
}
|
||||
|
||||
keyInfo, err := endpoint.validateAuth(ctx, req.Header, macaroon.Action{
|
||||
@ -1662,27 +1688,22 @@ func (endpoint *Endpoint) MakeInlineSegment(ctx context.Context, req *pb.Segment
|
||||
Time: time.Now(),
|
||||
})
|
||||
if err != nil {
|
||||
return nil, rpcstatus.Error(rpcstatus.Unauthenticated, err.Error())
|
||||
return nil, nil, rpcstatus.Error(rpcstatus.Unauthenticated, err.Error())
|
||||
}
|
||||
|
||||
if req.Position.Index < 0 {
|
||||
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, "segment index must be greater then 0")
|
||||
}
|
||||
|
||||
path, err := CreatePath(ctx, keyInfo.ProjectID, int64(req.Position.Index), streamID.Bucket, streamID.EncryptedPath)
|
||||
if err != nil {
|
||||
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
|
||||
return nil, nil, rpcstatus.Error(rpcstatus.InvalidArgument, "segment index must be greater then 0")
|
||||
}
|
||||
|
||||
exceeded, limit, err := endpoint.projectUsage.ExceedsStorageUsage(ctx, keyInfo.ProjectID)
|
||||
if err != nil {
|
||||
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
|
||||
return nil, nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
|
||||
}
|
||||
if exceeded {
|
||||
endpoint.log.Sugar().Errorf("monthly project limits are %s of storage and bandwidth usage. This limit has been exceeded for storage for projectID %s.",
|
||||
limit, keyInfo.ProjectID,
|
||||
)
|
||||
return nil, rpcstatus.Error(rpcstatus.ResourceExhausted, "Exceeded Usage Limit")
|
||||
return nil, nil, rpcstatus.Error(rpcstatus.ResourceExhausted, "Exceeded Usage Limit")
|
||||
}
|
||||
|
||||
inlineUsed := int64(len(req.EncryptedInlineData))
|
||||
@ -1698,7 +1719,7 @@ func (endpoint *Endpoint) MakeInlineSegment(ctx context.Context, req *pb.Segment
|
||||
KeyNonce: req.EncryptedKeyNonce.Bytes(),
|
||||
})
|
||||
|
||||
pointer := &pb.Pointer{
|
||||
pointer = &pb.Pointer{
|
||||
Type: pb.Pointer_INLINE,
|
||||
SegmentSize: inlineUsed,
|
||||
CreationDate: streamID.CreationDate,
|
||||
@ -1707,20 +1728,27 @@ func (endpoint *Endpoint) MakeInlineSegment(ctx context.Context, req *pb.Segment
|
||||
Metadata: metadata,
|
||||
}
|
||||
|
||||
err = endpoint.metainfo.UnsynchronizedPut(ctx, path, pointer)
|
||||
if err != nil {
|
||||
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
|
||||
if savePointer {
|
||||
path, err := CreatePath(ctx, keyInfo.ProjectID, int64(req.Position.Index), streamID.Bucket, streamID.EncryptedPath)
|
||||
if err != nil {
|
||||
return nil, nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
|
||||
}
|
||||
|
||||
err = endpoint.metainfo.UnsynchronizedPut(ctx, path, pointer)
|
||||
if err != nil {
|
||||
return nil, nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
err = endpoint.orders.UpdatePutInlineOrder(ctx, keyInfo.ProjectID, streamID.Bucket, inlineUsed)
|
||||
if err != nil {
|
||||
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
|
||||
return nil, nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
|
||||
}
|
||||
|
||||
endpoint.log.Info("Inline Segment Upload", zap.Stringer("Project ID", keyInfo.ProjectID), zap.String("operation", "put"), zap.String("type", "inline"))
|
||||
mon.Meter("req_put_inline").Mark(1)
|
||||
|
||||
return &pb.SegmentMakeInlineResponse{}, nil
|
||||
return pointer, &pb.SegmentMakeInlineResponse{}, nil
|
||||
}
|
||||
|
||||
// BeginDeleteSegment begins segment deletion process
|
||||
|
Loading…
Reference in New Issue
Block a user