4a79b609e9
all permissions Without read and list permissions BeginObjectDelete won't return error if occurs. This was breaking Batch processing because there was assumption that without error response will be always not nil. https://storjlabs.atlassian.net/browse/SM-590 Change-Id: I0fc9539e429110a660eb28725b266d5e4771d198
367 lines
12 KiB
Go
367 lines
12 KiB
Go
// Copyright (C) 2019 Storj Labs, Inc.
|
|
// See LICENSE for copying information.
|
|
|
|
package metainfo
|
|
|
|
import (
|
|
"context"
|
|
|
|
"github.com/gogo/protobuf/proto"
|
|
"github.com/zeebo/errs"
|
|
"go.uber.org/zap"
|
|
|
|
"storj.io/common/pb"
|
|
"storj.io/common/storj"
|
|
)
|
|
|
|
// Batch handle requests sent in batch
|
|
func (endpoint *Endpoint) Batch(ctx context.Context, req *pb.BatchRequest) (resp *pb.BatchResponse, err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
resp = &pb.BatchResponse{}
|
|
|
|
resp.Responses = make([]*pb.BatchResponseItem, 0, len(req.Requests))
|
|
|
|
var lastStreamID storj.StreamID
|
|
var lastSegmentID storj.SegmentID
|
|
var prevSegmentReq *pb.BatchRequestItem
|
|
for i, request := range req.Requests {
|
|
switch singleRequest := request.Request.(type) {
|
|
// BUCKET
|
|
case *pb.BatchRequestItem_BucketCreate:
|
|
singleRequest.BucketCreate.Header = req.Header
|
|
response, err := endpoint.CreateBucket(ctx, singleRequest.BucketCreate)
|
|
if err != nil {
|
|
return resp, err
|
|
}
|
|
resp.Responses = append(resp.Responses, &pb.BatchResponseItem{
|
|
Response: &pb.BatchResponseItem_BucketCreate{
|
|
BucketCreate: response,
|
|
},
|
|
})
|
|
case *pb.BatchRequestItem_BucketGet:
|
|
singleRequest.BucketGet.Header = req.Header
|
|
response, err := endpoint.GetBucket(ctx, singleRequest.BucketGet)
|
|
if err != nil {
|
|
return resp, err
|
|
}
|
|
resp.Responses = append(resp.Responses, &pb.BatchResponseItem{
|
|
Response: &pb.BatchResponseItem_BucketGet{
|
|
BucketGet: response,
|
|
},
|
|
})
|
|
case *pb.BatchRequestItem_BucketDelete:
|
|
singleRequest.BucketDelete.Header = req.Header
|
|
response, err := endpoint.DeleteBucket(ctx, singleRequest.BucketDelete)
|
|
if err != nil {
|
|
return resp, err
|
|
}
|
|
resp.Responses = append(resp.Responses, &pb.BatchResponseItem{
|
|
Response: &pb.BatchResponseItem_BucketDelete{
|
|
BucketDelete: response,
|
|
},
|
|
})
|
|
case *pb.BatchRequestItem_BucketList:
|
|
singleRequest.BucketList.Header = req.Header
|
|
response, err := endpoint.ListBuckets(ctx, singleRequest.BucketList)
|
|
if err != nil {
|
|
return resp, err
|
|
}
|
|
resp.Responses = append(resp.Responses, &pb.BatchResponseItem{
|
|
Response: &pb.BatchResponseItem_BucketList{
|
|
BucketList: response,
|
|
},
|
|
})
|
|
case *pb.BatchRequestItem_BucketSetAttribution:
|
|
singleRequest.BucketSetAttribution.Header = req.Header
|
|
response, err := endpoint.SetBucketAttribution(ctx, singleRequest.BucketSetAttribution)
|
|
if err != nil {
|
|
return resp, err
|
|
}
|
|
resp.Responses = append(resp.Responses, &pb.BatchResponseItem{
|
|
Response: &pb.BatchResponseItem_BucketSetAttribution{
|
|
BucketSetAttribution: response,
|
|
},
|
|
})
|
|
//OBJECT
|
|
case *pb.BatchRequestItem_ObjectBegin:
|
|
singleRequest.ObjectBegin.Header = req.Header
|
|
response, err := endpoint.BeginObject(ctx, singleRequest.ObjectBegin)
|
|
if err != nil {
|
|
return resp, err
|
|
}
|
|
resp.Responses = append(resp.Responses, &pb.BatchResponseItem{
|
|
Response: &pb.BatchResponseItem_ObjectBegin{
|
|
ObjectBegin: response,
|
|
},
|
|
})
|
|
if response != nil {
|
|
lastStreamID = response.StreamId
|
|
}
|
|
case *pb.BatchRequestItem_ObjectCommit:
|
|
singleRequest.ObjectCommit.Header = req.Header
|
|
|
|
if singleRequest.ObjectCommit.StreamId.IsZero() && !lastStreamID.IsZero() {
|
|
singleRequest.ObjectCommit.StreamId = lastStreamID
|
|
}
|
|
|
|
var response *pb.ObjectCommitResponse
|
|
var err error
|
|
switch {
|
|
case prevSegmentReq.GetSegmentMakeInline() != nil:
|
|
pointer, segmentResp, segmentErr := endpoint.makeInlineSegment(ctx, prevSegmentReq.GetSegmentMakeInline(), false)
|
|
prevSegmentReq = nil
|
|
if segmentErr != nil {
|
|
return resp, segmentErr
|
|
}
|
|
|
|
resp.Responses = append(resp.Responses, &pb.BatchResponseItem{
|
|
Response: &pb.BatchResponseItem_SegmentMakeInline{
|
|
SegmentMakeInline: segmentResp,
|
|
},
|
|
})
|
|
response, err = endpoint.commitObject(ctx, singleRequest.ObjectCommit, pointer)
|
|
case prevSegmentReq.GetSegmentCommit() != nil:
|
|
pointer, segmentResp, segmentErr := endpoint.commitSegment(ctx, prevSegmentReq.GetSegmentCommit(), false)
|
|
prevSegmentReq = nil
|
|
if segmentErr != nil {
|
|
return resp, segmentErr
|
|
}
|
|
|
|
resp.Responses = append(resp.Responses, &pb.BatchResponseItem{
|
|
Response: &pb.BatchResponseItem_SegmentCommit{
|
|
SegmentCommit: segmentResp,
|
|
},
|
|
})
|
|
response, err = endpoint.commitObject(ctx, singleRequest.ObjectCommit, pointer)
|
|
default:
|
|
response, err = endpoint.CommitObject(ctx, singleRequest.ObjectCommit)
|
|
}
|
|
if err != nil {
|
|
return resp, err
|
|
}
|
|
resp.Responses = append(resp.Responses, &pb.BatchResponseItem{
|
|
Response: &pb.BatchResponseItem_ObjectCommit{
|
|
ObjectCommit: response,
|
|
},
|
|
})
|
|
case *pb.BatchRequestItem_ObjectGet:
|
|
singleRequest.ObjectGet.Header = req.Header
|
|
response, err := endpoint.GetObject(ctx, singleRequest.ObjectGet)
|
|
if err != nil {
|
|
return resp, err
|
|
}
|
|
resp.Responses = append(resp.Responses, &pb.BatchResponseItem{
|
|
Response: &pb.BatchResponseItem_ObjectGet{
|
|
ObjectGet: response,
|
|
},
|
|
})
|
|
if response != nil && response.Object != nil {
|
|
lastStreamID = response.Object.StreamId
|
|
}
|
|
case *pb.BatchRequestItem_ObjectList:
|
|
singleRequest.ObjectList.Header = req.Header
|
|
response, err := endpoint.ListObjects(ctx, singleRequest.ObjectList)
|
|
if err != nil {
|
|
return resp, err
|
|
}
|
|
resp.Responses = append(resp.Responses, &pb.BatchResponseItem{
|
|
Response: &pb.BatchResponseItem_ObjectList{
|
|
ObjectList: response,
|
|
},
|
|
})
|
|
case *pb.BatchRequestItem_ObjectBeginDelete:
|
|
singleRequest.ObjectBeginDelete.Header = req.Header
|
|
response, err := endpoint.BeginDeleteObject(ctx, singleRequest.ObjectBeginDelete)
|
|
if err != nil {
|
|
return resp, err
|
|
}
|
|
resp.Responses = append(resp.Responses, &pb.BatchResponseItem{
|
|
Response: &pb.BatchResponseItem_ObjectBeginDelete{
|
|
ObjectBeginDelete: response,
|
|
},
|
|
})
|
|
if response != nil {
|
|
lastStreamID = response.StreamId
|
|
}
|
|
case *pb.BatchRequestItem_ObjectFinishDelete:
|
|
singleRequest.ObjectFinishDelete.Header = req.Header
|
|
|
|
if singleRequest.ObjectFinishDelete.StreamId.IsZero() && !lastStreamID.IsZero() {
|
|
singleRequest.ObjectFinishDelete.StreamId = lastStreamID
|
|
}
|
|
|
|
response, err := endpoint.FinishDeleteObject(ctx, singleRequest.ObjectFinishDelete)
|
|
if err != nil {
|
|
return resp, err
|
|
}
|
|
resp.Responses = append(resp.Responses, &pb.BatchResponseItem{
|
|
Response: &pb.BatchResponseItem_ObjectFinishDelete{
|
|
ObjectFinishDelete: response,
|
|
},
|
|
})
|
|
// SEGMENT
|
|
case *pb.BatchRequestItem_SegmentBegin:
|
|
singleRequest.SegmentBegin.Header = req.Header
|
|
|
|
if singleRequest.SegmentBegin.StreamId.IsZero() && !lastStreamID.IsZero() {
|
|
singleRequest.SegmentBegin.StreamId = lastStreamID
|
|
}
|
|
|
|
response, err := endpoint.BeginSegment(ctx, singleRequest.SegmentBegin)
|
|
if err != nil {
|
|
return resp, err
|
|
}
|
|
resp.Responses = append(resp.Responses, &pb.BatchResponseItem{
|
|
Response: &pb.BatchResponseItem_SegmentBegin{
|
|
SegmentBegin: response,
|
|
},
|
|
})
|
|
if response != nil {
|
|
lastSegmentID = response.SegmentId
|
|
}
|
|
case *pb.BatchRequestItem_SegmentCommit:
|
|
singleRequest.SegmentCommit.Header = req.Header
|
|
|
|
if singleRequest.SegmentCommit.SegmentId.IsZero() && !lastSegmentID.IsZero() {
|
|
singleRequest.SegmentCommit.SegmentId = lastSegmentID
|
|
}
|
|
|
|
segmentID, err := endpoint.unmarshalSatSegmentID(ctx, singleRequest.SegmentCommit.SegmentId)
|
|
if err != nil {
|
|
endpoint.log.Error("unable to unmarshal segment id", zap.Error(err))
|
|
} else if endpoint.shouldCombine(segmentID.Index, i, req.Requests) {
|
|
prevSegmentReq = request
|
|
continue
|
|
}
|
|
|
|
response, err := endpoint.CommitSegment(ctx, singleRequest.SegmentCommit)
|
|
if err != nil {
|
|
return resp, err
|
|
}
|
|
resp.Responses = append(resp.Responses, &pb.BatchResponseItem{
|
|
Response: &pb.BatchResponseItem_SegmentCommit{
|
|
SegmentCommit: response,
|
|
},
|
|
})
|
|
case *pb.BatchRequestItem_SegmentList:
|
|
singleRequest.SegmentList.Header = req.Header
|
|
|
|
if singleRequest.SegmentList.StreamId.IsZero() && !lastStreamID.IsZero() {
|
|
singleRequest.SegmentList.StreamId = lastStreamID
|
|
}
|
|
|
|
response, err := endpoint.ListSegments(ctx, singleRequest.SegmentList)
|
|
if err != nil {
|
|
return resp, err
|
|
}
|
|
resp.Responses = append(resp.Responses, &pb.BatchResponseItem{
|
|
Response: &pb.BatchResponseItem_SegmentList{
|
|
SegmentList: response,
|
|
},
|
|
})
|
|
case *pb.BatchRequestItem_SegmentMakeInline:
|
|
singleRequest.SegmentMakeInline.Header = req.Header
|
|
|
|
if singleRequest.SegmentMakeInline.StreamId.IsZero() && !lastStreamID.IsZero() {
|
|
singleRequest.SegmentMakeInline.StreamId = lastStreamID
|
|
}
|
|
|
|
if endpoint.shouldCombine(singleRequest.SegmentMakeInline.Position.Index, i, req.Requests) {
|
|
prevSegmentReq = request
|
|
continue
|
|
}
|
|
|
|
response, err := endpoint.MakeInlineSegment(ctx, singleRequest.SegmentMakeInline)
|
|
if err != nil {
|
|
return resp, err
|
|
}
|
|
resp.Responses = append(resp.Responses, &pb.BatchResponseItem{
|
|
Response: &pb.BatchResponseItem_SegmentMakeInline{
|
|
SegmentMakeInline: response,
|
|
},
|
|
})
|
|
case *pb.BatchRequestItem_SegmentDownload:
|
|
singleRequest.SegmentDownload.Header = req.Header
|
|
|
|
if singleRequest.SegmentDownload.StreamId.IsZero() && !lastStreamID.IsZero() {
|
|
singleRequest.SegmentDownload.StreamId = lastStreamID
|
|
}
|
|
|
|
response, err := endpoint.DownloadSegment(ctx, singleRequest.SegmentDownload)
|
|
if err != nil {
|
|
return resp, err
|
|
}
|
|
resp.Responses = append(resp.Responses, &pb.BatchResponseItem{
|
|
Response: &pb.BatchResponseItem_SegmentDownload{
|
|
SegmentDownload: response,
|
|
},
|
|
})
|
|
if response != nil {
|
|
lastSegmentID = response.SegmentId
|
|
}
|
|
case *pb.BatchRequestItem_SegmentBeginDelete:
|
|
singleRequest.SegmentBeginDelete.Header = req.Header
|
|
|
|
if singleRequest.SegmentBeginDelete.StreamId.IsZero() && !lastStreamID.IsZero() {
|
|
singleRequest.SegmentBeginDelete.StreamId = lastStreamID
|
|
}
|
|
|
|
response, err := endpoint.BeginDeleteSegment(ctx, singleRequest.SegmentBeginDelete)
|
|
if err != nil {
|
|
return resp, err
|
|
}
|
|
resp.Responses = append(resp.Responses, &pb.BatchResponseItem{
|
|
Response: &pb.BatchResponseItem_SegmentBeginDelete{
|
|
SegmentBeginDelete: response,
|
|
},
|
|
})
|
|
if response != nil {
|
|
lastSegmentID = response.SegmentId
|
|
}
|
|
case *pb.BatchRequestItem_SegmentFinishDelete:
|
|
singleRequest.SegmentFinishDelete.Header = req.Header
|
|
|
|
if singleRequest.SegmentFinishDelete.SegmentId.IsZero() && !lastSegmentID.IsZero() {
|
|
singleRequest.SegmentFinishDelete.SegmentId = lastSegmentID
|
|
}
|
|
|
|
response, err := endpoint.FinishDeleteSegment(ctx, singleRequest.SegmentFinishDelete)
|
|
if err != nil {
|
|
return resp, err
|
|
}
|
|
resp.Responses = append(resp.Responses, &pb.BatchResponseItem{
|
|
Response: &pb.BatchResponseItem_SegmentFinishDelete{
|
|
SegmentFinishDelete: response,
|
|
},
|
|
})
|
|
default:
|
|
return nil, errs.New("unsupported request type")
|
|
}
|
|
}
|
|
|
|
return resp, nil
|
|
}
|
|
|
|
// shouldCombine returns true if we are able to combine current request with next one. Main case is
|
|
// combining CommitSegment/MakeInlineSegment with ObjectCommmit.
|
|
//
|
|
// This method has a workaround for a bug in uplink where ObjectCommit was batched with
|
|
// segment N-2 instead of N-1 if list segment was inline segment. We are checking that
|
|
// current request segment index is last one before 'l' segment and we are not combining otherwise.
|
|
func (endpoint *Endpoint) shouldCombine(segmentIndex int32, reqIndex int, requests []*pb.BatchRequestItem) bool {
|
|
if reqIndex < len(requests)-1 && requests[reqIndex+1].GetObjectCommit() != nil {
|
|
objCommitReq := requests[reqIndex+1].GetObjectCommit()
|
|
|
|
streamMeta := pb.StreamMeta{}
|
|
err := proto.Unmarshal(objCommitReq.EncryptedMetadata, &streamMeta)
|
|
if err != nil {
|
|
endpoint.log.Error("unable to unmarshal stream meta", zap.Error(err))
|
|
return false
|
|
}
|
|
|
|
return int64(segmentIndex) != streamMeta.NumberOfSegments-2
|
|
}
|
|
return false
|
|
}
|