From 87728678557232977dac9964acc614cc6c7c50dd Mon Sep 17 00:00:00 2001 From: Michal Niewrzal Date: Thu, 16 Jan 2020 20:12:09 +0100 Subject: [PATCH] satellite/metainfo: combine CommitSegment and CommitObject in batch This change is a special case for batch processing. If in batch request CommitSegment and CommitObject are one after another we can execute these request as one. This will avoid current logic where we are saving pointer for CommitSegment and later we are deleting this pointer and saving it once again as under last segment path for CommitObject. Change-Id: If170e78c8410f5ba5916cbff6a29b9221db9ce2e --- satellite/metainfo/batch.go | 47 +++++++++- satellite/metainfo/endpoint_test.go | 22 +++-- satellite/metainfo/metainfo.go | 138 +++++++++++++++++----------- 3 files changed, 141 insertions(+), 66 deletions(-) diff --git a/satellite/metainfo/batch.go b/satellite/metainfo/batch.go index 53811ea26..a9d46851e 100644 --- a/satellite/metainfo/batch.go +++ b/satellite/metainfo/batch.go @@ -22,7 +22,8 @@ func (endpoint *Endpoint) Batch(ctx context.Context, req *pb.BatchRequest) (resp var lastStreamID storj.StreamID var lastSegmentID storj.SegmentID - for _, request := range req.Requests { + var prevSegmentReq *pb.BatchRequestItem + for i, request := range req.Requests { switch singleRequest := request.Request.(type) { // BUCKET case *pb.BatchRequestItem_BucketCreate: @@ -100,10 +101,42 @@ func (endpoint *Endpoint) Batch(ctx context.Context, req *pb.BatchRequest) (resp singleRequest.ObjectCommit.StreamId = lastStreamID } - response, err := endpoint.CommitObject(ctx, singleRequest.ObjectCommit) + var response *pb.ObjectCommitResponse + var err error + switch { + case prevSegmentReq.GetSegmentMakeInline() != nil: + pointer, segmentResp, segmentErr := endpoint.makeInlineSegment(ctx, prevSegmentReq.GetSegmentMakeInline(), false) + prevSegmentReq = nil + if segmentErr != nil { + return resp, segmentErr + } + + resp.Responses = append(resp.Responses, &pb.BatchResponseItem{ + Response: &pb.BatchResponseItem_SegmentMakeInline{ + SegmentMakeInline: segmentResp, + }, + }) + response, err = endpoint.commitObject(ctx, singleRequest.ObjectCommit, pointer) + case prevSegmentReq.GetSegmentCommit() != nil: + pointer, segmentResp, segmentErr := endpoint.commitSegment(ctx, prevSegmentReq.GetSegmentCommit(), false) + prevSegmentReq = nil + if segmentErr != nil { + return resp, segmentErr + } + + resp.Responses = append(resp.Responses, &pb.BatchResponseItem{ + Response: &pb.BatchResponseItem_SegmentCommit{ + SegmentCommit: segmentResp, + }, + }) + response, err = endpoint.commitObject(ctx, singleRequest.ObjectCommit, pointer) + default: + response, err = endpoint.CommitObject(ctx, singleRequest.ObjectCommit) + } if err != nil { return resp, err } + resp.Responses = append(resp.Responses, &pb.BatchResponseItem{ Response: &pb.BatchResponseItem_ObjectCommit{ ObjectCommit: response, @@ -185,6 +218,11 @@ func (endpoint *Endpoint) Batch(ctx context.Context, req *pb.BatchRequest) (resp singleRequest.SegmentCommit.SegmentId = lastSegmentID } + if i < len(req.Requests)-1 && req.Requests[i+1].GetObjectCommit() != nil { + prevSegmentReq = request + continue + } + response, err := endpoint.CommitSegment(ctx, singleRequest.SegmentCommit) if err != nil { return resp, err @@ -217,6 +255,11 @@ func (endpoint *Endpoint) Batch(ctx context.Context, req *pb.BatchRequest) (resp singleRequest.SegmentMakeInline.StreamId = lastStreamID } + if i < len(req.Requests)-1 && req.Requests[i+1].GetObjectCommit() != nil { + prevSegmentReq = request + continue + } + response, err := endpoint.MakeInlineSegment(ctx, singleRequest.SegmentMakeInline) if err != nil { return resp, err diff --git a/satellite/metainfo/endpoint_test.go b/satellite/metainfo/endpoint_test.go index 8476a0897..ffcba435a 100644 --- a/satellite/metainfo/endpoint_test.go +++ b/satellite/metainfo/endpoint_test.go @@ -5,6 +5,7 @@ package metainfo_test import ( "context" + "strconv" "testing" "github.com/skyrings/skyring-common/tools/uuid" @@ -46,12 +47,13 @@ func TestEndpoint_DeleteObjectPieces(t *testing.T) { {caseDescription: "several segments (remote + inline)", objData: testrand.Bytes(33 * memory.KiB)}, } - for _, tc := range testCases { + for i, tc := range testCases { + i := i tc := tc t.Run(tc.caseDescription, func(t *testing.T) { - const ( + var ( bucketName = "a-bucket" - objectName = "object-filename" + objectName = "object-filename" + strconv.Itoa(i) ) // Use RSConfig for ensuring that we don't have long-tail cancellations and the @@ -115,7 +117,8 @@ func TestEndpoint_DeleteObjectPieces(t *testing.T) { {caseDescription: "several segments (remote + inline)", objData: testrand.Bytes(33 * memory.KiB)}, } - for _, tc := range testCases { + for i, tc := range testCases { + i := i tc := tc t.Run(tc.caseDescription, func(t *testing.T) { ctx := testcontext.New(t) @@ -131,9 +134,9 @@ func TestEndpoint_DeleteObjectPieces(t *testing.T) { satelliteSys = planet.Satellites[0] ) - const ( + var ( bucketName = "a-bucket" - objectName = "object-filename" + objectName = "object-filename" + strconv.Itoa(i) ) // Use RSConfig for ensuring that we don't have long-tail cancellations and the @@ -198,12 +201,13 @@ func TestEndpoint_DeleteObjectPieces(t *testing.T) { {caseDescription: "several segments (remote + inline)", objData: testrand.Bytes(33 * memory.KiB)}, } - for _, tc := range testCases { + for i, tc := range testCases { + i := i tc := tc t.Run(tc.caseDescription, func(t *testing.T) { - const ( + var ( bucketName = "a-bucket" - objectName = "object-filename" + objectName = "object-filename" + strconv.Itoa(i) ) ctx := testcontext.New(t) diff --git a/satellite/metainfo/metainfo.go b/satellite/metainfo/metainfo.go index ad165cb9c..4c44a08b3 100644 --- a/satellite/metainfo/metainfo.go +++ b/satellite/metainfo/metainfo.go @@ -1152,6 +1152,12 @@ func (endpoint *Endpoint) BeginObject(ctx context.Context, req *pb.ObjectBeginRe func (endpoint *Endpoint) CommitObject(ctx context.Context, req *pb.ObjectCommitRequest) (resp *pb.ObjectCommitResponse, err error) { defer mon.Task()(&ctx)(&err) + return endpoint.commitObject(ctx, req, nil) +} + +func (endpoint *Endpoint) commitObject(ctx context.Context, req *pb.ObjectCommitRequest, pointer *pb.Pointer) (resp *pb.ObjectCommitResponse, err error) { + defer mon.Task()(&ctx)(&err) + streamID := &pb.SatStreamID{} err = proto.Unmarshal(req.StreamId, streamID) if err != nil { @@ -1183,19 +1189,29 @@ func (endpoint *Endpoint) CommitObject(ctx context.Context, req *pb.ObjectCommit return nil, rpcstatus.Error(rpcstatus.InvalidArgument, "invalid metadata structure") } - lastSegmentIndex := streamMeta.NumberOfSegments - 1 - lastSegmentPath, err := CreatePath(ctx, keyInfo.ProjectID, lastSegmentIndex, streamID.Bucket, streamID.EncryptedPath) - if err != nil { - return nil, rpcstatus.Errorf(rpcstatus.InvalidArgument, "unable to create segment path: %s", err.Error()) - } + lastSegmentPointer := pointer + if pointer == nil { + lastSegmentIndex := streamMeta.NumberOfSegments - 1 + lastSegmentPath, err := CreatePath(ctx, keyInfo.ProjectID, lastSegmentIndex, streamID.Bucket, streamID.EncryptedPath) + if err != nil { + return nil, rpcstatus.Errorf(rpcstatus.InvalidArgument, "unable to create segment path: %s", err.Error()) + } - lastSegmentPointerBytes, lastSegmentPointer, err := endpoint.metainfo.GetWithBytes(ctx, lastSegmentPath) - if err != nil { - endpoint.log.Error("unable to get pointer", zap.String("segmentPath", lastSegmentPath), zap.Error(err)) - return nil, rpcstatus.Error(rpcstatus.Internal, "unable to commit object") - } - if lastSegmentPointer == nil { - return nil, rpcstatus.Errorf(rpcstatus.NotFound, "unable to find object: %q/%q", streamID.Bucket, streamID.EncryptedPath) + var lastSegmentPointerBytes []byte + lastSegmentPointerBytes, lastSegmentPointer, err = endpoint.metainfo.GetWithBytes(ctx, lastSegmentPath) + if err != nil { + endpoint.log.Error("unable to get pointer", zap.String("segmentPath", lastSegmentPath), zap.Error(err)) + return nil, rpcstatus.Error(rpcstatus.Internal, "unable to commit object") + } + if lastSegmentPointer == nil { + return nil, rpcstatus.Errorf(rpcstatus.NotFound, "unable to find object: %q/%q", streamID.Bucket, streamID.EncryptedPath) + } + + err = endpoint.metainfo.Delete(ctx, lastSegmentPath, lastSegmentPointerBytes) + if err != nil { + endpoint.log.Error("unable to delete pointer", zap.String("segmentPath", lastSegmentPath), zap.Error(err)) + return nil, rpcstatus.Error(rpcstatus.Internal, "unable to commit object") + } } if lastSegmentPointer.Remote == nil { @@ -1205,14 +1221,7 @@ func (endpoint *Endpoint) CommitObject(ctx context.Context, req *pb.ObjectCommit lastSegmentPointer.Remote.Redundancy = streamID.Redundancy lastSegmentPointer.Metadata = req.EncryptedMetadata - err = endpoint.metainfo.Delete(ctx, lastSegmentPath, lastSegmentPointerBytes) - if err != nil { - endpoint.log.Error("unable to delete pointer", zap.String("segmentPath", lastSegmentPath), zap.Error(err)) - return nil, rpcstatus.Error(rpcstatus.Internal, "unable to commit object") - } - - lastSegmentIndex = -1 - lastSegmentPath, err = CreatePath(ctx, keyInfo.ProjectID, lastSegmentIndex, streamID.Bucket, streamID.EncryptedPath) + lastSegmentPath, err := CreatePath(ctx, keyInfo.ProjectID, int64(lastSegment), streamID.Bucket, streamID.EncryptedPath) if err != nil { endpoint.log.Error("unable to create path", zap.Error(err)) return nil, rpcstatus.Error(rpcstatus.Internal, "unable to commit object") @@ -1542,9 +1551,16 @@ func (endpoint *Endpoint) BeginSegment(ctx context.Context, req *pb.SegmentBegin func (endpoint *Endpoint) CommitSegment(ctx context.Context, req *pb.SegmentCommitRequest) (resp *pb.SegmentCommitResponse, err error) { defer mon.Task()(&ctx)(&err) + _, resp, err = endpoint.commitSegment(ctx, req, true) + return resp, err +} + +func (endpoint *Endpoint) commitSegment(ctx context.Context, req *pb.SegmentCommitRequest, savePointer bool) (pointer *pb.Pointer, resp *pb.SegmentCommitResponse, err error) { + defer mon.Task()(&ctx)(&err) + segmentID, err := endpoint.unmarshalSatSegmentID(ctx, req.SegmentId) if err != nil { - return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error()) + return nil, nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error()) } streamID := segmentID.StreamId @@ -1556,7 +1572,7 @@ func (endpoint *Endpoint) CommitSegment(ctx context.Context, req *pb.SegmentComm Time: time.Now(), }) if err != nil { - return nil, rpcstatus.Error(rpcstatus.Unauthenticated, err.Error()) + return nil, nil, rpcstatus.Error(rpcstatus.Unauthenticated, err.Error()) } if numResults := len(req.UploadResult); numResults < int(streamID.Redundancy.GetSuccessThreshold()) { @@ -1565,7 +1581,7 @@ func (endpoint *Endpoint) CommitSegment(ctx context.Context, req *pb.SegmentComm zap.Int32("redundancy optimal threshold", streamID.Redundancy.GetSuccessThreshold()), zap.Stringer("Segment ID", req.SegmentId), ) - return nil, rpcstatus.Errorf(rpcstatus.InvalidArgument, + return nil, nil, rpcstatus.Errorf(rpcstatus.InvalidArgument, "the number of results of uploaded pieces (%d) is below the optimal threshold (%d)", numResults, streamID.Redundancy.GetSuccessThreshold(), ) @@ -1591,10 +1607,10 @@ func (endpoint *Endpoint) CommitSegment(ctx context.Context, req *pb.SegmentComm }) if err != nil { endpoint.log.Error("unable to marshal segment metadata", zap.Error(err)) - return nil, rpcstatus.Error(rpcstatus.Internal, err.Error()) + return nil, nil, rpcstatus.Error(rpcstatus.Internal, err.Error()) } - pointer := &pb.Pointer{ + pointer = &pb.Pointer{ Type: pb.Pointer_REMOTE, Remote: remote, SegmentSize: req.SizeEncryptedData, @@ -1613,29 +1629,24 @@ func (endpoint *Endpoint) CommitSegment(ctx context.Context, req *pb.SegmentComm err = endpoint.validatePointer(ctx, pointer, orderLimits) if err != nil { - return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error()) + return nil, nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error()) } err = endpoint.filterValidPieces(ctx, pointer, orderLimits) if err != nil { - return nil, err - } - - path, err := CreatePath(ctx, keyInfo.ProjectID, int64(segmentID.Index), streamID.Bucket, streamID.EncryptedPath) - if err != nil { - return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error()) + return nil, nil, err } exceeded, limit, err := endpoint.projectUsage.ExceedsStorageUsage(ctx, keyInfo.ProjectID) if err != nil { - return nil, rpcstatus.Error(rpcstatus.Internal, err.Error()) + return nil, nil, rpcstatus.Error(rpcstatus.Internal, err.Error()) } if exceeded { endpoint.log.Error("The project limit of storage and bandwidth has been exceeded", zap.Int64("limit", limit.Int64()), zap.Stringer("Project ID", keyInfo.ProjectID), ) - return nil, rpcstatus.Error(rpcstatus.ResourceExhausted, "Exceeded Usage Limit") + return nil, nil, rpcstatus.Error(rpcstatus.ResourceExhausted, "Exceeded Usage Limit") } // clear hashes so we don't store them @@ -1656,7 +1667,7 @@ func (endpoint *Endpoint) CommitSegment(ctx context.Context, req *pb.SegmentComm zap.Int32("redundancy minimum requested", pointer.Remote.Redundancy.MinReq), zap.Int32("redundancy total", pointer.Remote.Redundancy.Total), ) - return nil, rpcstatus.Error(rpcstatus.InvalidArgument, "mismatched segment size and piece usage") + return nil, nil, rpcstatus.Error(rpcstatus.InvalidArgument, "mismatched segment size and piece usage") } } @@ -1669,12 +1680,19 @@ func (endpoint *Endpoint) CommitSegment(ctx context.Context, req *pb.SegmentComm // that will be affected is our per-project bandwidth and storage limits. } - err = endpoint.metainfo.Put(ctx, path, pointer) - if err != nil { - return nil, rpcstatus.Error(rpcstatus.Internal, err.Error()) + if savePointer { + path, err := CreatePath(ctx, keyInfo.ProjectID, int64(segmentID.Index), streamID.Bucket, streamID.EncryptedPath) + if err != nil { + return nil, nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error()) + } + + err = endpoint.metainfo.Put(ctx, path, pointer) + if err != nil { + return nil, nil, rpcstatus.Error(rpcstatus.Internal, err.Error()) + } } - return &pb.SegmentCommitResponse{ + return pointer, &pb.SegmentCommitResponse{ SuccessfulPieces: int32(len(pointer.Remote.RemotePieces)), }, nil } @@ -1683,9 +1701,17 @@ func (endpoint *Endpoint) CommitSegment(ctx context.Context, req *pb.SegmentComm func (endpoint *Endpoint) MakeInlineSegment(ctx context.Context, req *pb.SegmentMakeInlineRequest) (resp *pb.SegmentMakeInlineResponse, err error) { defer mon.Task()(&ctx)(&err) + _, resp, err = endpoint.makeInlineSegment(ctx, req, true) + return resp, err +} + +// MakeInlineSegment makes inline segment on satellite +func (endpoint *Endpoint) makeInlineSegment(ctx context.Context, req *pb.SegmentMakeInlineRequest, savePointer bool) (pointer *pb.Pointer, resp *pb.SegmentMakeInlineResponse, err error) { + defer mon.Task()(&ctx)(&err) + streamID, err := endpoint.unmarshalSatStreamID(ctx, req.StreamId) if err != nil { - return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error()) + return nil, nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error()) } keyInfo, err := endpoint.validateAuth(ctx, req.Header, macaroon.Action{ @@ -1695,27 +1721,22 @@ func (endpoint *Endpoint) MakeInlineSegment(ctx context.Context, req *pb.Segment Time: time.Now(), }) if err != nil { - return nil, rpcstatus.Error(rpcstatus.Unauthenticated, err.Error()) + return nil, nil, rpcstatus.Error(rpcstatus.Unauthenticated, err.Error()) } if req.Position.Index < 0 { - return nil, rpcstatus.Error(rpcstatus.InvalidArgument, "segment index must be greater then 0") - } - - path, err := CreatePath(ctx, keyInfo.ProjectID, int64(req.Position.Index), streamID.Bucket, streamID.EncryptedPath) - if err != nil { - return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error()) + return nil, nil, rpcstatus.Error(rpcstatus.InvalidArgument, "segment index must be greater then 0") } exceeded, limit, err := endpoint.projectUsage.ExceedsStorageUsage(ctx, keyInfo.ProjectID) if err != nil { - return nil, rpcstatus.Error(rpcstatus.Internal, err.Error()) + return nil, nil, rpcstatus.Error(rpcstatus.Internal, err.Error()) } if exceeded { endpoint.log.Sugar().Errorf("monthly project limits are %s of storage and bandwidth usage. This limit has been exceeded for storage for projectID %s.", limit, keyInfo.ProjectID, ) - return nil, rpcstatus.Error(rpcstatus.ResourceExhausted, "Exceeded Usage Limit") + return nil, nil, rpcstatus.Error(rpcstatus.ResourceExhausted, "Exceeded Usage Limit") } inlineUsed := int64(len(req.EncryptedInlineData)) @@ -1731,7 +1752,7 @@ func (endpoint *Endpoint) MakeInlineSegment(ctx context.Context, req *pb.Segment KeyNonce: req.EncryptedKeyNonce.Bytes(), }) - pointer := &pb.Pointer{ + pointer = &pb.Pointer{ Type: pb.Pointer_INLINE, SegmentSize: inlineUsed, CreationDate: streamID.CreationDate, @@ -1740,19 +1761,26 @@ func (endpoint *Endpoint) MakeInlineSegment(ctx context.Context, req *pb.Segment Metadata: metadata, } - err = endpoint.metainfo.Put(ctx, path, pointer) - if err != nil { - return nil, rpcstatus.Error(rpcstatus.Internal, err.Error()) + if savePointer { + path, err := CreatePath(ctx, keyInfo.ProjectID, int64(req.Position.Index), streamID.Bucket, streamID.EncryptedPath) + if err != nil { + return nil, nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error()) + } + + err = endpoint.metainfo.Put(ctx, path, pointer) + if err != nil { + return nil, nil, rpcstatus.Error(rpcstatus.Internal, err.Error()) + } } err = endpoint.orders.UpdatePutInlineOrder(ctx, keyInfo.ProjectID, streamID.Bucket, inlineUsed) if err != nil { - return nil, rpcstatus.Error(rpcstatus.Internal, err.Error()) + return nil, nil, rpcstatus.Error(rpcstatus.Internal, err.Error()) } endpoint.log.Info("Inline Segment Upload", zap.Stringer("Project ID", keyInfo.ProjectID), zap.String("operation", "put"), zap.String("type", "inline")) - return &pb.SegmentMakeInlineResponse{}, nil + return pointer, &pb.SegmentMakeInlineResponse{}, nil } // BeginDeleteSegment begins segment deletion process