diff --git a/satellite/metainfo/batch.go b/satellite/metainfo/batch.go index 53811ea26..a9d46851e 100644 --- a/satellite/metainfo/batch.go +++ b/satellite/metainfo/batch.go @@ -22,7 +22,8 @@ func (endpoint *Endpoint) Batch(ctx context.Context, req *pb.BatchRequest) (resp var lastStreamID storj.StreamID var lastSegmentID storj.SegmentID - for _, request := range req.Requests { + var prevSegmentReq *pb.BatchRequestItem + for i, request := range req.Requests { switch singleRequest := request.Request.(type) { // BUCKET case *pb.BatchRequestItem_BucketCreate: @@ -100,10 +101,42 @@ func (endpoint *Endpoint) Batch(ctx context.Context, req *pb.BatchRequest) (resp singleRequest.ObjectCommit.StreamId = lastStreamID } - response, err := endpoint.CommitObject(ctx, singleRequest.ObjectCommit) + var response *pb.ObjectCommitResponse + var err error + switch { + case prevSegmentReq.GetSegmentMakeInline() != nil: + pointer, segmentResp, segmentErr := endpoint.makeInlineSegment(ctx, prevSegmentReq.GetSegmentMakeInline(), false) + prevSegmentReq = nil + if segmentErr != nil { + return resp, segmentErr + } + + resp.Responses = append(resp.Responses, &pb.BatchResponseItem{ + Response: &pb.BatchResponseItem_SegmentMakeInline{ + SegmentMakeInline: segmentResp, + }, + }) + response, err = endpoint.commitObject(ctx, singleRequest.ObjectCommit, pointer) + case prevSegmentReq.GetSegmentCommit() != nil: + pointer, segmentResp, segmentErr := endpoint.commitSegment(ctx, prevSegmentReq.GetSegmentCommit(), false) + prevSegmentReq = nil + if segmentErr != nil { + return resp, segmentErr + } + + resp.Responses = append(resp.Responses, &pb.BatchResponseItem{ + Response: &pb.BatchResponseItem_SegmentCommit{ + SegmentCommit: segmentResp, + }, + }) + response, err = endpoint.commitObject(ctx, singleRequest.ObjectCommit, pointer) + default: + response, err = endpoint.CommitObject(ctx, singleRequest.ObjectCommit) + } if err != nil { return resp, err } + resp.Responses = append(resp.Responses, &pb.BatchResponseItem{ Response: &pb.BatchResponseItem_ObjectCommit{ ObjectCommit: response, @@ -185,6 +218,11 @@ func (endpoint *Endpoint) Batch(ctx context.Context, req *pb.BatchRequest) (resp singleRequest.SegmentCommit.SegmentId = lastSegmentID } + if i < len(req.Requests)-1 && req.Requests[i+1].GetObjectCommit() != nil { + prevSegmentReq = request + continue + } + response, err := endpoint.CommitSegment(ctx, singleRequest.SegmentCommit) if err != nil { return resp, err @@ -217,6 +255,11 @@ func (endpoint *Endpoint) Batch(ctx context.Context, req *pb.BatchRequest) (resp singleRequest.SegmentMakeInline.StreamId = lastStreamID } + if i < len(req.Requests)-1 && req.Requests[i+1].GetObjectCommit() != nil { + prevSegmentReq = request + continue + } + response, err := endpoint.MakeInlineSegment(ctx, singleRequest.SegmentMakeInline) if err != nil { return resp, err diff --git a/satellite/metainfo/endpoint_test.go b/satellite/metainfo/endpoint_test.go index 8476a0897..ffcba435a 100644 --- a/satellite/metainfo/endpoint_test.go +++ b/satellite/metainfo/endpoint_test.go @@ -5,6 +5,7 @@ package metainfo_test import ( "context" + "strconv" "testing" "github.com/skyrings/skyring-common/tools/uuid" @@ -46,12 +47,13 @@ func TestEndpoint_DeleteObjectPieces(t *testing.T) { {caseDescription: "several segments (remote + inline)", objData: testrand.Bytes(33 * memory.KiB)}, } - for _, tc := range testCases { + for i, tc := range testCases { + i := i tc := tc t.Run(tc.caseDescription, func(t *testing.T) { - const ( + var ( bucketName = "a-bucket" - objectName = "object-filename" + objectName = "object-filename" + strconv.Itoa(i) ) // Use RSConfig for ensuring that we don't have long-tail cancellations and the @@ -115,7 +117,8 @@ func TestEndpoint_DeleteObjectPieces(t *testing.T) { {caseDescription: "several segments (remote + inline)", objData: testrand.Bytes(33 * memory.KiB)}, } - for _, tc := range testCases { + for i, tc := range testCases { + i := i tc := tc t.Run(tc.caseDescription, func(t *testing.T) { ctx := testcontext.New(t) @@ -131,9 +134,9 @@ func TestEndpoint_DeleteObjectPieces(t *testing.T) { satelliteSys = planet.Satellites[0] ) - const ( + var ( bucketName = "a-bucket" - objectName = "object-filename" + objectName = "object-filename" + strconv.Itoa(i) ) // Use RSConfig for ensuring that we don't have long-tail cancellations and the @@ -198,12 +201,13 @@ func TestEndpoint_DeleteObjectPieces(t *testing.T) { {caseDescription: "several segments (remote + inline)", objData: testrand.Bytes(33 * memory.KiB)}, } - for _, tc := range testCases { + for i, tc := range testCases { + i := i tc := tc t.Run(tc.caseDescription, func(t *testing.T) { - const ( + var ( bucketName = "a-bucket" - objectName = "object-filename" + objectName = "object-filename" + strconv.Itoa(i) ) ctx := testcontext.New(t) diff --git a/satellite/metainfo/metainfo.go b/satellite/metainfo/metainfo.go index ad165cb9c..4c44a08b3 100644 --- a/satellite/metainfo/metainfo.go +++ b/satellite/metainfo/metainfo.go @@ -1152,6 +1152,12 @@ func (endpoint *Endpoint) BeginObject(ctx context.Context, req *pb.ObjectBeginRe func (endpoint *Endpoint) CommitObject(ctx context.Context, req *pb.ObjectCommitRequest) (resp *pb.ObjectCommitResponse, err error) { defer mon.Task()(&ctx)(&err) + return endpoint.commitObject(ctx, req, nil) +} + +func (endpoint *Endpoint) commitObject(ctx context.Context, req *pb.ObjectCommitRequest, pointer *pb.Pointer) (resp *pb.ObjectCommitResponse, err error) { + defer mon.Task()(&ctx)(&err) + streamID := &pb.SatStreamID{} err = proto.Unmarshal(req.StreamId, streamID) if err != nil { @@ -1183,19 +1189,29 @@ func (endpoint *Endpoint) CommitObject(ctx context.Context, req *pb.ObjectCommit return nil, rpcstatus.Error(rpcstatus.InvalidArgument, "invalid metadata structure") } - lastSegmentIndex := streamMeta.NumberOfSegments - 1 - lastSegmentPath, err := CreatePath(ctx, keyInfo.ProjectID, lastSegmentIndex, streamID.Bucket, streamID.EncryptedPath) - if err != nil { - return nil, rpcstatus.Errorf(rpcstatus.InvalidArgument, "unable to create segment path: %s", err.Error()) - } + lastSegmentPointer := pointer + if pointer == nil { + lastSegmentIndex := streamMeta.NumberOfSegments - 1 + lastSegmentPath, err := CreatePath(ctx, keyInfo.ProjectID, lastSegmentIndex, streamID.Bucket, streamID.EncryptedPath) + if err != nil { + return nil, rpcstatus.Errorf(rpcstatus.InvalidArgument, "unable to create segment path: %s", err.Error()) + } - lastSegmentPointerBytes, lastSegmentPointer, err := endpoint.metainfo.GetWithBytes(ctx, lastSegmentPath) - if err != nil { - endpoint.log.Error("unable to get pointer", zap.String("segmentPath", lastSegmentPath), zap.Error(err)) - return nil, rpcstatus.Error(rpcstatus.Internal, "unable to commit object") - } - if lastSegmentPointer == nil { - return nil, rpcstatus.Errorf(rpcstatus.NotFound, "unable to find object: %q/%q", streamID.Bucket, streamID.EncryptedPath) + var lastSegmentPointerBytes []byte + lastSegmentPointerBytes, lastSegmentPointer, err = endpoint.metainfo.GetWithBytes(ctx, lastSegmentPath) + if err != nil { + endpoint.log.Error("unable to get pointer", zap.String("segmentPath", lastSegmentPath), zap.Error(err)) + return nil, rpcstatus.Error(rpcstatus.Internal, "unable to commit object") + } + if lastSegmentPointer == nil { + return nil, rpcstatus.Errorf(rpcstatus.NotFound, "unable to find object: %q/%q", streamID.Bucket, streamID.EncryptedPath) + } + + err = endpoint.metainfo.Delete(ctx, lastSegmentPath, lastSegmentPointerBytes) + if err != nil { + endpoint.log.Error("unable to delete pointer", zap.String("segmentPath", lastSegmentPath), zap.Error(err)) + return nil, rpcstatus.Error(rpcstatus.Internal, "unable to commit object") + } } if lastSegmentPointer.Remote == nil { @@ -1205,14 +1221,7 @@ func (endpoint *Endpoint) CommitObject(ctx context.Context, req *pb.ObjectCommit lastSegmentPointer.Remote.Redundancy = streamID.Redundancy lastSegmentPointer.Metadata = req.EncryptedMetadata - err = endpoint.metainfo.Delete(ctx, lastSegmentPath, lastSegmentPointerBytes) - if err != nil { - endpoint.log.Error("unable to delete pointer", zap.String("segmentPath", lastSegmentPath), zap.Error(err)) - return nil, rpcstatus.Error(rpcstatus.Internal, "unable to commit object") - } - - lastSegmentIndex = -1 - lastSegmentPath, err = CreatePath(ctx, keyInfo.ProjectID, lastSegmentIndex, streamID.Bucket, streamID.EncryptedPath) + lastSegmentPath, err := CreatePath(ctx, keyInfo.ProjectID, int64(lastSegment), streamID.Bucket, streamID.EncryptedPath) if err != nil { endpoint.log.Error("unable to create path", zap.Error(err)) return nil, rpcstatus.Error(rpcstatus.Internal, "unable to commit object") @@ -1542,9 +1551,16 @@ func (endpoint *Endpoint) BeginSegment(ctx context.Context, req *pb.SegmentBegin func (endpoint *Endpoint) CommitSegment(ctx context.Context, req *pb.SegmentCommitRequest) (resp *pb.SegmentCommitResponse, err error) { defer mon.Task()(&ctx)(&err) + _, resp, err = endpoint.commitSegment(ctx, req, true) + return resp, err +} + +func (endpoint *Endpoint) commitSegment(ctx context.Context, req *pb.SegmentCommitRequest, savePointer bool) (pointer *pb.Pointer, resp *pb.SegmentCommitResponse, err error) { + defer mon.Task()(&ctx)(&err) + segmentID, err := endpoint.unmarshalSatSegmentID(ctx, req.SegmentId) if err != nil { - return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error()) + return nil, nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error()) } streamID := segmentID.StreamId @@ -1556,7 +1572,7 @@ func (endpoint *Endpoint) CommitSegment(ctx context.Context, req *pb.SegmentComm Time: time.Now(), }) if err != nil { - return nil, rpcstatus.Error(rpcstatus.Unauthenticated, err.Error()) + return nil, nil, rpcstatus.Error(rpcstatus.Unauthenticated, err.Error()) } if numResults := len(req.UploadResult); numResults < int(streamID.Redundancy.GetSuccessThreshold()) { @@ -1565,7 +1581,7 @@ func (endpoint *Endpoint) CommitSegment(ctx context.Context, req *pb.SegmentComm zap.Int32("redundancy optimal threshold", streamID.Redundancy.GetSuccessThreshold()), zap.Stringer("Segment ID", req.SegmentId), ) - return nil, rpcstatus.Errorf(rpcstatus.InvalidArgument, + return nil, nil, rpcstatus.Errorf(rpcstatus.InvalidArgument, "the number of results of uploaded pieces (%d) is below the optimal threshold (%d)", numResults, streamID.Redundancy.GetSuccessThreshold(), ) @@ -1591,10 +1607,10 @@ func (endpoint *Endpoint) CommitSegment(ctx context.Context, req *pb.SegmentComm }) if err != nil { endpoint.log.Error("unable to marshal segment metadata", zap.Error(err)) - return nil, rpcstatus.Error(rpcstatus.Internal, err.Error()) + return nil, nil, rpcstatus.Error(rpcstatus.Internal, err.Error()) } - pointer := &pb.Pointer{ + pointer = &pb.Pointer{ Type: pb.Pointer_REMOTE, Remote: remote, SegmentSize: req.SizeEncryptedData, @@ -1613,29 +1629,24 @@ func (endpoint *Endpoint) CommitSegment(ctx context.Context, req *pb.SegmentComm err = endpoint.validatePointer(ctx, pointer, orderLimits) if err != nil { - return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error()) + return nil, nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error()) } err = endpoint.filterValidPieces(ctx, pointer, orderLimits) if err != nil { - return nil, err - } - - path, err := CreatePath(ctx, keyInfo.ProjectID, int64(segmentID.Index), streamID.Bucket, streamID.EncryptedPath) - if err != nil { - return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error()) + return nil, nil, err } exceeded, limit, err := endpoint.projectUsage.ExceedsStorageUsage(ctx, keyInfo.ProjectID) if err != nil { - return nil, rpcstatus.Error(rpcstatus.Internal, err.Error()) + return nil, nil, rpcstatus.Error(rpcstatus.Internal, err.Error()) } if exceeded { endpoint.log.Error("The project limit of storage and bandwidth has been exceeded", zap.Int64("limit", limit.Int64()), zap.Stringer("Project ID", keyInfo.ProjectID), ) - return nil, rpcstatus.Error(rpcstatus.ResourceExhausted, "Exceeded Usage Limit") + return nil, nil, rpcstatus.Error(rpcstatus.ResourceExhausted, "Exceeded Usage Limit") } // clear hashes so we don't store them @@ -1656,7 +1667,7 @@ func (endpoint *Endpoint) CommitSegment(ctx context.Context, req *pb.SegmentComm zap.Int32("redundancy minimum requested", pointer.Remote.Redundancy.MinReq), zap.Int32("redundancy total", pointer.Remote.Redundancy.Total), ) - return nil, rpcstatus.Error(rpcstatus.InvalidArgument, "mismatched segment size and piece usage") + return nil, nil, rpcstatus.Error(rpcstatus.InvalidArgument, "mismatched segment size and piece usage") } } @@ -1669,12 +1680,19 @@ func (endpoint *Endpoint) CommitSegment(ctx context.Context, req *pb.SegmentComm // that will be affected is our per-project bandwidth and storage limits. } - err = endpoint.metainfo.Put(ctx, path, pointer) - if err != nil { - return nil, rpcstatus.Error(rpcstatus.Internal, err.Error()) + if savePointer { + path, err := CreatePath(ctx, keyInfo.ProjectID, int64(segmentID.Index), streamID.Bucket, streamID.EncryptedPath) + if err != nil { + return nil, nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error()) + } + + err = endpoint.metainfo.Put(ctx, path, pointer) + if err != nil { + return nil, nil, rpcstatus.Error(rpcstatus.Internal, err.Error()) + } } - return &pb.SegmentCommitResponse{ + return pointer, &pb.SegmentCommitResponse{ SuccessfulPieces: int32(len(pointer.Remote.RemotePieces)), }, nil } @@ -1683,9 +1701,17 @@ func (endpoint *Endpoint) CommitSegment(ctx context.Context, req *pb.SegmentComm func (endpoint *Endpoint) MakeInlineSegment(ctx context.Context, req *pb.SegmentMakeInlineRequest) (resp *pb.SegmentMakeInlineResponse, err error) { defer mon.Task()(&ctx)(&err) + _, resp, err = endpoint.makeInlineSegment(ctx, req, true) + return resp, err +} + +// MakeInlineSegment makes inline segment on satellite +func (endpoint *Endpoint) makeInlineSegment(ctx context.Context, req *pb.SegmentMakeInlineRequest, savePointer bool) (pointer *pb.Pointer, resp *pb.SegmentMakeInlineResponse, err error) { + defer mon.Task()(&ctx)(&err) + streamID, err := endpoint.unmarshalSatStreamID(ctx, req.StreamId) if err != nil { - return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error()) + return nil, nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error()) } keyInfo, err := endpoint.validateAuth(ctx, req.Header, macaroon.Action{ @@ -1695,27 +1721,22 @@ func (endpoint *Endpoint) MakeInlineSegment(ctx context.Context, req *pb.Segment Time: time.Now(), }) if err != nil { - return nil, rpcstatus.Error(rpcstatus.Unauthenticated, err.Error()) + return nil, nil, rpcstatus.Error(rpcstatus.Unauthenticated, err.Error()) } if req.Position.Index < 0 { - return nil, rpcstatus.Error(rpcstatus.InvalidArgument, "segment index must be greater then 0") - } - - path, err := CreatePath(ctx, keyInfo.ProjectID, int64(req.Position.Index), streamID.Bucket, streamID.EncryptedPath) - if err != nil { - return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error()) + return nil, nil, rpcstatus.Error(rpcstatus.InvalidArgument, "segment index must be greater then 0") } exceeded, limit, err := endpoint.projectUsage.ExceedsStorageUsage(ctx, keyInfo.ProjectID) if err != nil { - return nil, rpcstatus.Error(rpcstatus.Internal, err.Error()) + return nil, nil, rpcstatus.Error(rpcstatus.Internal, err.Error()) } if exceeded { endpoint.log.Sugar().Errorf("monthly project limits are %s of storage and bandwidth usage. This limit has been exceeded for storage for projectID %s.", limit, keyInfo.ProjectID, ) - return nil, rpcstatus.Error(rpcstatus.ResourceExhausted, "Exceeded Usage Limit") + return nil, nil, rpcstatus.Error(rpcstatus.ResourceExhausted, "Exceeded Usage Limit") } inlineUsed := int64(len(req.EncryptedInlineData)) @@ -1731,7 +1752,7 @@ func (endpoint *Endpoint) MakeInlineSegment(ctx context.Context, req *pb.Segment KeyNonce: req.EncryptedKeyNonce.Bytes(), }) - pointer := &pb.Pointer{ + pointer = &pb.Pointer{ Type: pb.Pointer_INLINE, SegmentSize: inlineUsed, CreationDate: streamID.CreationDate, @@ -1740,19 +1761,26 @@ func (endpoint *Endpoint) MakeInlineSegment(ctx context.Context, req *pb.Segment Metadata: metadata, } - err = endpoint.metainfo.Put(ctx, path, pointer) - if err != nil { - return nil, rpcstatus.Error(rpcstatus.Internal, err.Error()) + if savePointer { + path, err := CreatePath(ctx, keyInfo.ProjectID, int64(req.Position.Index), streamID.Bucket, streamID.EncryptedPath) + if err != nil { + return nil, nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error()) + } + + err = endpoint.metainfo.Put(ctx, path, pointer) + if err != nil { + return nil, nil, rpcstatus.Error(rpcstatus.Internal, err.Error()) + } } err = endpoint.orders.UpdatePutInlineOrder(ctx, keyInfo.ProjectID, streamID.Bucket, inlineUsed) if err != nil { - return nil, rpcstatus.Error(rpcstatus.Internal, err.Error()) + return nil, nil, rpcstatus.Error(rpcstatus.Internal, err.Error()) } endpoint.log.Info("Inline Segment Upload", zap.Stringer("Project ID", keyInfo.ProjectID), zap.String("operation", "put"), zap.String("type", "inline")) - return &pb.SegmentMakeInlineResponse{}, nil + return pointer, &pb.SegmentMakeInlineResponse{}, nil } // BeginDeleteSegment begins segment deletion process