satellite/metainfo: combine CommitSegment and CommitObject in batch

This change is a special case for batch processing. If in batch request
CommitSegment and CommitObject are one after another we can execute
these request as one. This will avoid current logic where we are saving
pointer for CommitSegment and later we are deleting this pointer and
saving it once again as under last segment path for CommitObject.

Change-Id: If170e78c8410f5ba5916cbff6a29b9221db9ce2e
This commit is contained in:
Michal Niewrzal 2020-01-16 20:12:09 +01:00
parent 86f194769f
commit 8772867855
3 changed files with 141 additions and 66 deletions

View File

@ -22,7 +22,8 @@ func (endpoint *Endpoint) Batch(ctx context.Context, req *pb.BatchRequest) (resp
var lastStreamID storj.StreamID
var lastSegmentID storj.SegmentID
for _, request := range req.Requests {
var prevSegmentReq *pb.BatchRequestItem
for i, request := range req.Requests {
switch singleRequest := request.Request.(type) {
// BUCKET
case *pb.BatchRequestItem_BucketCreate:
@ -100,10 +101,42 @@ func (endpoint *Endpoint) Batch(ctx context.Context, req *pb.BatchRequest) (resp
singleRequest.ObjectCommit.StreamId = lastStreamID
}
response, err := endpoint.CommitObject(ctx, singleRequest.ObjectCommit)
var response *pb.ObjectCommitResponse
var err error
switch {
case prevSegmentReq.GetSegmentMakeInline() != nil:
pointer, segmentResp, segmentErr := endpoint.makeInlineSegment(ctx, prevSegmentReq.GetSegmentMakeInline(), false)
prevSegmentReq = nil
if segmentErr != nil {
return resp, segmentErr
}
resp.Responses = append(resp.Responses, &pb.BatchResponseItem{
Response: &pb.BatchResponseItem_SegmentMakeInline{
SegmentMakeInline: segmentResp,
},
})
response, err = endpoint.commitObject(ctx, singleRequest.ObjectCommit, pointer)
case prevSegmentReq.GetSegmentCommit() != nil:
pointer, segmentResp, segmentErr := endpoint.commitSegment(ctx, prevSegmentReq.GetSegmentCommit(), false)
prevSegmentReq = nil
if segmentErr != nil {
return resp, segmentErr
}
resp.Responses = append(resp.Responses, &pb.BatchResponseItem{
Response: &pb.BatchResponseItem_SegmentCommit{
SegmentCommit: segmentResp,
},
})
response, err = endpoint.commitObject(ctx, singleRequest.ObjectCommit, pointer)
default:
response, err = endpoint.CommitObject(ctx, singleRequest.ObjectCommit)
}
if err != nil {
return resp, err
}
resp.Responses = append(resp.Responses, &pb.BatchResponseItem{
Response: &pb.BatchResponseItem_ObjectCommit{
ObjectCommit: response,
@ -185,6 +218,11 @@ func (endpoint *Endpoint) Batch(ctx context.Context, req *pb.BatchRequest) (resp
singleRequest.SegmentCommit.SegmentId = lastSegmentID
}
if i < len(req.Requests)-1 && req.Requests[i+1].GetObjectCommit() != nil {
prevSegmentReq = request
continue
}
response, err := endpoint.CommitSegment(ctx, singleRequest.SegmentCommit)
if err != nil {
return resp, err
@ -217,6 +255,11 @@ func (endpoint *Endpoint) Batch(ctx context.Context, req *pb.BatchRequest) (resp
singleRequest.SegmentMakeInline.StreamId = lastStreamID
}
if i < len(req.Requests)-1 && req.Requests[i+1].GetObjectCommit() != nil {
prevSegmentReq = request
continue
}
response, err := endpoint.MakeInlineSegment(ctx, singleRequest.SegmentMakeInline)
if err != nil {
return resp, err

View File

@ -5,6 +5,7 @@ package metainfo_test
import (
"context"
"strconv"
"testing"
"github.com/skyrings/skyring-common/tools/uuid"
@ -46,12 +47,13 @@ func TestEndpoint_DeleteObjectPieces(t *testing.T) {
{caseDescription: "several segments (remote + inline)", objData: testrand.Bytes(33 * memory.KiB)},
}
for _, tc := range testCases {
for i, tc := range testCases {
i := i
tc := tc
t.Run(tc.caseDescription, func(t *testing.T) {
const (
var (
bucketName = "a-bucket"
objectName = "object-filename"
objectName = "object-filename" + strconv.Itoa(i)
)
// Use RSConfig for ensuring that we don't have long-tail cancellations and the
@ -115,7 +117,8 @@ func TestEndpoint_DeleteObjectPieces(t *testing.T) {
{caseDescription: "several segments (remote + inline)", objData: testrand.Bytes(33 * memory.KiB)},
}
for _, tc := range testCases {
for i, tc := range testCases {
i := i
tc := tc
t.Run(tc.caseDescription, func(t *testing.T) {
ctx := testcontext.New(t)
@ -131,9 +134,9 @@ func TestEndpoint_DeleteObjectPieces(t *testing.T) {
satelliteSys = planet.Satellites[0]
)
const (
var (
bucketName = "a-bucket"
objectName = "object-filename"
objectName = "object-filename" + strconv.Itoa(i)
)
// Use RSConfig for ensuring that we don't have long-tail cancellations and the
@ -198,12 +201,13 @@ func TestEndpoint_DeleteObjectPieces(t *testing.T) {
{caseDescription: "several segments (remote + inline)", objData: testrand.Bytes(33 * memory.KiB)},
}
for _, tc := range testCases {
for i, tc := range testCases {
i := i
tc := tc
t.Run(tc.caseDescription, func(t *testing.T) {
const (
var (
bucketName = "a-bucket"
objectName = "object-filename"
objectName = "object-filename" + strconv.Itoa(i)
)
ctx := testcontext.New(t)

View File

@ -1152,6 +1152,12 @@ func (endpoint *Endpoint) BeginObject(ctx context.Context, req *pb.ObjectBeginRe
func (endpoint *Endpoint) CommitObject(ctx context.Context, req *pb.ObjectCommitRequest) (resp *pb.ObjectCommitResponse, err error) {
defer mon.Task()(&ctx)(&err)
return endpoint.commitObject(ctx, req, nil)
}
func (endpoint *Endpoint) commitObject(ctx context.Context, req *pb.ObjectCommitRequest, pointer *pb.Pointer) (resp *pb.ObjectCommitResponse, err error) {
defer mon.Task()(&ctx)(&err)
streamID := &pb.SatStreamID{}
err = proto.Unmarshal(req.StreamId, streamID)
if err != nil {
@ -1183,19 +1189,29 @@ func (endpoint *Endpoint) CommitObject(ctx context.Context, req *pb.ObjectCommit
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, "invalid metadata structure")
}
lastSegmentIndex := streamMeta.NumberOfSegments - 1
lastSegmentPath, err := CreatePath(ctx, keyInfo.ProjectID, lastSegmentIndex, streamID.Bucket, streamID.EncryptedPath)
if err != nil {
return nil, rpcstatus.Errorf(rpcstatus.InvalidArgument, "unable to create segment path: %s", err.Error())
}
lastSegmentPointer := pointer
if pointer == nil {
lastSegmentIndex := streamMeta.NumberOfSegments - 1
lastSegmentPath, err := CreatePath(ctx, keyInfo.ProjectID, lastSegmentIndex, streamID.Bucket, streamID.EncryptedPath)
if err != nil {
return nil, rpcstatus.Errorf(rpcstatus.InvalidArgument, "unable to create segment path: %s", err.Error())
}
lastSegmentPointerBytes, lastSegmentPointer, err := endpoint.metainfo.GetWithBytes(ctx, lastSegmentPath)
if err != nil {
endpoint.log.Error("unable to get pointer", zap.String("segmentPath", lastSegmentPath), zap.Error(err))
return nil, rpcstatus.Error(rpcstatus.Internal, "unable to commit object")
}
if lastSegmentPointer == nil {
return nil, rpcstatus.Errorf(rpcstatus.NotFound, "unable to find object: %q/%q", streamID.Bucket, streamID.EncryptedPath)
var lastSegmentPointerBytes []byte
lastSegmentPointerBytes, lastSegmentPointer, err = endpoint.metainfo.GetWithBytes(ctx, lastSegmentPath)
if err != nil {
endpoint.log.Error("unable to get pointer", zap.String("segmentPath", lastSegmentPath), zap.Error(err))
return nil, rpcstatus.Error(rpcstatus.Internal, "unable to commit object")
}
if lastSegmentPointer == nil {
return nil, rpcstatus.Errorf(rpcstatus.NotFound, "unable to find object: %q/%q", streamID.Bucket, streamID.EncryptedPath)
}
err = endpoint.metainfo.Delete(ctx, lastSegmentPath, lastSegmentPointerBytes)
if err != nil {
endpoint.log.Error("unable to delete pointer", zap.String("segmentPath", lastSegmentPath), zap.Error(err))
return nil, rpcstatus.Error(rpcstatus.Internal, "unable to commit object")
}
}
if lastSegmentPointer.Remote == nil {
@ -1205,14 +1221,7 @@ func (endpoint *Endpoint) CommitObject(ctx context.Context, req *pb.ObjectCommit
lastSegmentPointer.Remote.Redundancy = streamID.Redundancy
lastSegmentPointer.Metadata = req.EncryptedMetadata
err = endpoint.metainfo.Delete(ctx, lastSegmentPath, lastSegmentPointerBytes)
if err != nil {
endpoint.log.Error("unable to delete pointer", zap.String("segmentPath", lastSegmentPath), zap.Error(err))
return nil, rpcstatus.Error(rpcstatus.Internal, "unable to commit object")
}
lastSegmentIndex = -1
lastSegmentPath, err = CreatePath(ctx, keyInfo.ProjectID, lastSegmentIndex, streamID.Bucket, streamID.EncryptedPath)
lastSegmentPath, err := CreatePath(ctx, keyInfo.ProjectID, int64(lastSegment), streamID.Bucket, streamID.EncryptedPath)
if err != nil {
endpoint.log.Error("unable to create path", zap.Error(err))
return nil, rpcstatus.Error(rpcstatus.Internal, "unable to commit object")
@ -1542,9 +1551,16 @@ func (endpoint *Endpoint) BeginSegment(ctx context.Context, req *pb.SegmentBegin
func (endpoint *Endpoint) CommitSegment(ctx context.Context, req *pb.SegmentCommitRequest) (resp *pb.SegmentCommitResponse, err error) {
defer mon.Task()(&ctx)(&err)
_, resp, err = endpoint.commitSegment(ctx, req, true)
return resp, err
}
func (endpoint *Endpoint) commitSegment(ctx context.Context, req *pb.SegmentCommitRequest, savePointer bool) (pointer *pb.Pointer, resp *pb.SegmentCommitResponse, err error) {
defer mon.Task()(&ctx)(&err)
segmentID, err := endpoint.unmarshalSatSegmentID(ctx, req.SegmentId)
if err != nil {
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
return nil, nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
}
streamID := segmentID.StreamId
@ -1556,7 +1572,7 @@ func (endpoint *Endpoint) CommitSegment(ctx context.Context, req *pb.SegmentComm
Time: time.Now(),
})
if err != nil {
return nil, rpcstatus.Error(rpcstatus.Unauthenticated, err.Error())
return nil, nil, rpcstatus.Error(rpcstatus.Unauthenticated, err.Error())
}
if numResults := len(req.UploadResult); numResults < int(streamID.Redundancy.GetSuccessThreshold()) {
@ -1565,7 +1581,7 @@ func (endpoint *Endpoint) CommitSegment(ctx context.Context, req *pb.SegmentComm
zap.Int32("redundancy optimal threshold", streamID.Redundancy.GetSuccessThreshold()),
zap.Stringer("Segment ID", req.SegmentId),
)
return nil, rpcstatus.Errorf(rpcstatus.InvalidArgument,
return nil, nil, rpcstatus.Errorf(rpcstatus.InvalidArgument,
"the number of results of uploaded pieces (%d) is below the optimal threshold (%d)",
numResults, streamID.Redundancy.GetSuccessThreshold(),
)
@ -1591,10 +1607,10 @@ func (endpoint *Endpoint) CommitSegment(ctx context.Context, req *pb.SegmentComm
})
if err != nil {
endpoint.log.Error("unable to marshal segment metadata", zap.Error(err))
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
return nil, nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
pointer := &pb.Pointer{
pointer = &pb.Pointer{
Type: pb.Pointer_REMOTE,
Remote: remote,
SegmentSize: req.SizeEncryptedData,
@ -1613,29 +1629,24 @@ func (endpoint *Endpoint) CommitSegment(ctx context.Context, req *pb.SegmentComm
err = endpoint.validatePointer(ctx, pointer, orderLimits)
if err != nil {
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
return nil, nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
}
err = endpoint.filterValidPieces(ctx, pointer, orderLimits)
if err != nil {
return nil, err
}
path, err := CreatePath(ctx, keyInfo.ProjectID, int64(segmentID.Index), streamID.Bucket, streamID.EncryptedPath)
if err != nil {
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
return nil, nil, err
}
exceeded, limit, err := endpoint.projectUsage.ExceedsStorageUsage(ctx, keyInfo.ProjectID)
if err != nil {
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
return nil, nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
if exceeded {
endpoint.log.Error("The project limit of storage and bandwidth has been exceeded",
zap.Int64("limit", limit.Int64()),
zap.Stringer("Project ID", keyInfo.ProjectID),
)
return nil, rpcstatus.Error(rpcstatus.ResourceExhausted, "Exceeded Usage Limit")
return nil, nil, rpcstatus.Error(rpcstatus.ResourceExhausted, "Exceeded Usage Limit")
}
// clear hashes so we don't store them
@ -1656,7 +1667,7 @@ func (endpoint *Endpoint) CommitSegment(ctx context.Context, req *pb.SegmentComm
zap.Int32("redundancy minimum requested", pointer.Remote.Redundancy.MinReq),
zap.Int32("redundancy total", pointer.Remote.Redundancy.Total),
)
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, "mismatched segment size and piece usage")
return nil, nil, rpcstatus.Error(rpcstatus.InvalidArgument, "mismatched segment size and piece usage")
}
}
@ -1669,12 +1680,19 @@ func (endpoint *Endpoint) CommitSegment(ctx context.Context, req *pb.SegmentComm
// that will be affected is our per-project bandwidth and storage limits.
}
err = endpoint.metainfo.Put(ctx, path, pointer)
if err != nil {
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
if savePointer {
path, err := CreatePath(ctx, keyInfo.ProjectID, int64(segmentID.Index), streamID.Bucket, streamID.EncryptedPath)
if err != nil {
return nil, nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
}
err = endpoint.metainfo.Put(ctx, path, pointer)
if err != nil {
return nil, nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
}
return &pb.SegmentCommitResponse{
return pointer, &pb.SegmentCommitResponse{
SuccessfulPieces: int32(len(pointer.Remote.RemotePieces)),
}, nil
}
@ -1683,9 +1701,17 @@ func (endpoint *Endpoint) CommitSegment(ctx context.Context, req *pb.SegmentComm
func (endpoint *Endpoint) MakeInlineSegment(ctx context.Context, req *pb.SegmentMakeInlineRequest) (resp *pb.SegmentMakeInlineResponse, err error) {
defer mon.Task()(&ctx)(&err)
_, resp, err = endpoint.makeInlineSegment(ctx, req, true)
return resp, err
}
// MakeInlineSegment makes inline segment on satellite
func (endpoint *Endpoint) makeInlineSegment(ctx context.Context, req *pb.SegmentMakeInlineRequest, savePointer bool) (pointer *pb.Pointer, resp *pb.SegmentMakeInlineResponse, err error) {
defer mon.Task()(&ctx)(&err)
streamID, err := endpoint.unmarshalSatStreamID(ctx, req.StreamId)
if err != nil {
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
return nil, nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
}
keyInfo, err := endpoint.validateAuth(ctx, req.Header, macaroon.Action{
@ -1695,27 +1721,22 @@ func (endpoint *Endpoint) MakeInlineSegment(ctx context.Context, req *pb.Segment
Time: time.Now(),
})
if err != nil {
return nil, rpcstatus.Error(rpcstatus.Unauthenticated, err.Error())
return nil, nil, rpcstatus.Error(rpcstatus.Unauthenticated, err.Error())
}
if req.Position.Index < 0 {
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, "segment index must be greater then 0")
}
path, err := CreatePath(ctx, keyInfo.ProjectID, int64(req.Position.Index), streamID.Bucket, streamID.EncryptedPath)
if err != nil {
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
return nil, nil, rpcstatus.Error(rpcstatus.InvalidArgument, "segment index must be greater then 0")
}
exceeded, limit, err := endpoint.projectUsage.ExceedsStorageUsage(ctx, keyInfo.ProjectID)
if err != nil {
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
return nil, nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
if exceeded {
endpoint.log.Sugar().Errorf("monthly project limits are %s of storage and bandwidth usage. This limit has been exceeded for storage for projectID %s.",
limit, keyInfo.ProjectID,
)
return nil, rpcstatus.Error(rpcstatus.ResourceExhausted, "Exceeded Usage Limit")
return nil, nil, rpcstatus.Error(rpcstatus.ResourceExhausted, "Exceeded Usage Limit")
}
inlineUsed := int64(len(req.EncryptedInlineData))
@ -1731,7 +1752,7 @@ func (endpoint *Endpoint) MakeInlineSegment(ctx context.Context, req *pb.Segment
KeyNonce: req.EncryptedKeyNonce.Bytes(),
})
pointer := &pb.Pointer{
pointer = &pb.Pointer{
Type: pb.Pointer_INLINE,
SegmentSize: inlineUsed,
CreationDate: streamID.CreationDate,
@ -1740,19 +1761,26 @@ func (endpoint *Endpoint) MakeInlineSegment(ctx context.Context, req *pb.Segment
Metadata: metadata,
}
err = endpoint.metainfo.Put(ctx, path, pointer)
if err != nil {
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
if savePointer {
path, err := CreatePath(ctx, keyInfo.ProjectID, int64(req.Position.Index), streamID.Bucket, streamID.EncryptedPath)
if err != nil {
return nil, nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
}
err = endpoint.metainfo.Put(ctx, path, pointer)
if err != nil {
return nil, nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
}
err = endpoint.orders.UpdatePutInlineOrder(ctx, keyInfo.ProjectID, streamID.Bucket, inlineUsed)
if err != nil {
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
return nil, nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
endpoint.log.Info("Inline Segment Upload", zap.Stringer("Project ID", keyInfo.ProjectID), zap.String("operation", "put"), zap.String("type", "inline"))
return &pb.SegmentMakeInlineResponse{}, nil
return pointer, &pb.SegmentMakeInlineResponse{}, nil
}
// BeginDeleteSegment begins segment deletion process