satellite/metainfo: revert combine CommitSegment and CommitObject in batch

This reverts commit 8772867855.

for uplink versions v0.25.0 through v0.30.7, there's a bug with multiplesegment upload
where the last segment is inline caused by this commit.

Change-Id: If375e186b23265586caf08991c25980e99f3cc1a
This commit is contained in:
Yingrong Zhao 2020-01-27 13:13:43 -05:00
parent 9bcb81108f
commit f3fcbe256c
3 changed files with 63 additions and 136 deletions

View File

@ -22,8 +22,7 @@ func (endpoint *Endpoint) Batch(ctx context.Context, req *pb.BatchRequest) (resp
var lastStreamID storj.StreamID
var lastSegmentID storj.SegmentID
var prevSegmentReq *pb.BatchRequestItem
for i, request := range req.Requests {
for _, request := range req.Requests {
switch singleRequest := request.Request.(type) {
// BUCKET
case *pb.BatchRequestItem_BucketCreate:
@ -101,42 +100,10 @@ func (endpoint *Endpoint) Batch(ctx context.Context, req *pb.BatchRequest) (resp
singleRequest.ObjectCommit.StreamId = lastStreamID
}
var response *pb.ObjectCommitResponse
var err error
switch {
case prevSegmentReq.GetSegmentMakeInline() != nil:
pointer, segmentResp, segmentErr := endpoint.makeInlineSegment(ctx, prevSegmentReq.GetSegmentMakeInline(), false)
prevSegmentReq = nil
if segmentErr != nil {
return resp, segmentErr
}
resp.Responses = append(resp.Responses, &pb.BatchResponseItem{
Response: &pb.BatchResponseItem_SegmentMakeInline{
SegmentMakeInline: segmentResp,
},
})
response, err = endpoint.commitObject(ctx, singleRequest.ObjectCommit, pointer)
case prevSegmentReq.GetSegmentCommit() != nil:
pointer, segmentResp, segmentErr := endpoint.commitSegment(ctx, prevSegmentReq.GetSegmentCommit(), false)
prevSegmentReq = nil
if segmentErr != nil {
return resp, segmentErr
}
resp.Responses = append(resp.Responses, &pb.BatchResponseItem{
Response: &pb.BatchResponseItem_SegmentCommit{
SegmentCommit: segmentResp,
},
})
response, err = endpoint.commitObject(ctx, singleRequest.ObjectCommit, pointer)
default:
response, err = endpoint.CommitObject(ctx, singleRequest.ObjectCommit)
}
response, err := endpoint.CommitObject(ctx, singleRequest.ObjectCommit)
if err != nil {
return resp, err
}
resp.Responses = append(resp.Responses, &pb.BatchResponseItem{
Response: &pb.BatchResponseItem_ObjectCommit{
ObjectCommit: response,
@ -218,11 +185,6 @@ func (endpoint *Endpoint) Batch(ctx context.Context, req *pb.BatchRequest) (resp
singleRequest.SegmentCommit.SegmentId = lastSegmentID
}
if i < len(req.Requests)-1 && req.Requests[i+1].GetObjectCommit() != nil {
prevSegmentReq = request
continue
}
response, err := endpoint.CommitSegment(ctx, singleRequest.SegmentCommit)
if err != nil {
return resp, err
@ -255,11 +217,6 @@ func (endpoint *Endpoint) Batch(ctx context.Context, req *pb.BatchRequest) (resp
singleRequest.SegmentMakeInline.StreamId = lastStreamID
}
if i < len(req.Requests)-1 && req.Requests[i+1].GetObjectCommit() != nil {
prevSegmentReq = request
continue
}
response, err := endpoint.MakeInlineSegment(ctx, singleRequest.SegmentMakeInline)
if err != nil {
return resp, err

View File

@ -112,14 +112,13 @@ func TestEndpoint_DeleteObjectPieces(t *testing.T) {
{caseDescription: "several segments (remote + inline)", objData: testrand.Bytes(33 * memory.KiB)},
}
for i, tc := range testCases {
i := i
for _, tc := range testCases {
tc := tc
t.Run(tc.caseDescription, func(t *testing.T) {
var (
const (
bucketName = "a-bucket"
objectName = "object-filename" + strconv.Itoa(i)
objectName = "object-filename"
)
testplanet.Run(t, testplanet.Config{
@ -189,13 +188,12 @@ func TestEndpoint_DeleteObjectPieces(t *testing.T) {
{caseDescription: "several segments (remote + inline)", objData: testrand.Bytes(33 * memory.KiB)},
}
for i, tc := range testCases {
i := i
for _, tc := range testCases {
tc := tc
t.Run(tc.caseDescription, func(t *testing.T) {
var (
const (
bucketName = "a-bucket"
objectName = "object-filename" + strconv.Itoa(i)
objectName = "object-filename"
)
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,

View File

@ -1134,12 +1134,6 @@ func (endpoint *Endpoint) BeginObject(ctx context.Context, req *pb.ObjectBeginRe
func (endpoint *Endpoint) CommitObject(ctx context.Context, req *pb.ObjectCommitRequest) (resp *pb.ObjectCommitResponse, err error) {
defer mon.Task()(&ctx)(&err)
return endpoint.commitObject(ctx, req, nil)
}
func (endpoint *Endpoint) commitObject(ctx context.Context, req *pb.ObjectCommitRequest, pointer *pb.Pointer) (resp *pb.ObjectCommitResponse, err error) {
defer mon.Task()(&ctx)(&err)
streamID := &pb.SatStreamID{}
err = proto.Unmarshal(req.StreamId, streamID)
if err != nil {
@ -1171,29 +1165,19 @@ func (endpoint *Endpoint) commitObject(ctx context.Context, req *pb.ObjectCommit
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, "invalid metadata structure")
}
lastSegmentPointer := pointer
if pointer == nil {
lastSegmentIndex := streamMeta.NumberOfSegments - 1
lastSegmentPath, err := CreatePath(ctx, keyInfo.ProjectID, lastSegmentIndex, streamID.Bucket, streamID.EncryptedPath)
if err != nil {
return nil, rpcstatus.Errorf(rpcstatus.InvalidArgument, "unable to create segment path: %s", err.Error())
}
lastSegmentIndex := streamMeta.NumberOfSegments - 1
lastSegmentPath, err := CreatePath(ctx, keyInfo.ProjectID, lastSegmentIndex, streamID.Bucket, streamID.EncryptedPath)
if err != nil {
return nil, rpcstatus.Errorf(rpcstatus.InvalidArgument, "unable to create segment path: %s", err.Error())
}
var lastSegmentPointerBytes []byte
lastSegmentPointerBytes, lastSegmentPointer, err = endpoint.metainfo.GetWithBytes(ctx, lastSegmentPath)
if err != nil {
endpoint.log.Error("unable to get pointer", zap.String("segmentPath", lastSegmentPath), zap.Error(err))
return nil, rpcstatus.Error(rpcstatus.Internal, "unable to commit object")
}
if lastSegmentPointer == nil {
return nil, rpcstatus.Errorf(rpcstatus.NotFound, "unable to find object: %q/%q", streamID.Bucket, streamID.EncryptedPath)
}
err = endpoint.metainfo.Delete(ctx, lastSegmentPath, lastSegmentPointerBytes)
if err != nil {
endpoint.log.Error("unable to delete pointer", zap.String("segmentPath", lastSegmentPath), zap.Error(err))
return nil, rpcstatus.Error(rpcstatus.Internal, "unable to commit object")
}
lastSegmentPointerBytes, lastSegmentPointer, err := endpoint.metainfo.GetWithBytes(ctx, lastSegmentPath)
if err != nil {
endpoint.log.Error("unable to get pointer", zap.String("segmentPath", lastSegmentPath), zap.Error(err))
return nil, rpcstatus.Error(rpcstatus.Internal, "unable to commit object")
}
if lastSegmentPointer == nil {
return nil, rpcstatus.Errorf(rpcstatus.NotFound, "unable to find object: %q/%q", streamID.Bucket, streamID.EncryptedPath)
}
if lastSegmentPointer.Remote == nil {
@ -1203,7 +1187,14 @@ func (endpoint *Endpoint) commitObject(ctx context.Context, req *pb.ObjectCommit
lastSegmentPointer.Remote.Redundancy = streamID.Redundancy
lastSegmentPointer.Metadata = req.EncryptedMetadata
lastSegmentPath, err := CreatePath(ctx, keyInfo.ProjectID, int64(lastSegment), streamID.Bucket, streamID.EncryptedPath)
err = endpoint.metainfo.Delete(ctx, lastSegmentPath, lastSegmentPointerBytes)
if err != nil {
endpoint.log.Error("unable to delete pointer", zap.String("segmentPath", lastSegmentPath), zap.Error(err))
return nil, rpcstatus.Error(rpcstatus.Internal, "unable to commit object")
}
lastSegmentIndex = -1
lastSegmentPath, err = CreatePath(ctx, keyInfo.ProjectID, lastSegmentIndex, streamID.Bucket, streamID.EncryptedPath)
if err != nil {
endpoint.log.Error("unable to create path", zap.Error(err))
return nil, rpcstatus.Error(rpcstatus.Internal, "unable to commit object")
@ -1533,16 +1524,9 @@ func (endpoint *Endpoint) BeginSegment(ctx context.Context, req *pb.SegmentBegin
func (endpoint *Endpoint) CommitSegment(ctx context.Context, req *pb.SegmentCommitRequest) (resp *pb.SegmentCommitResponse, err error) {
defer mon.Task()(&ctx)(&err)
_, resp, err = endpoint.commitSegment(ctx, req, true)
return resp, err
}
func (endpoint *Endpoint) commitSegment(ctx context.Context, req *pb.SegmentCommitRequest, savePointer bool) (pointer *pb.Pointer, resp *pb.SegmentCommitResponse, err error) {
defer mon.Task()(&ctx)(&err)
segmentID, err := endpoint.unmarshalSatSegmentID(ctx, req.SegmentId)
if err != nil {
return nil, nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
}
streamID := segmentID.StreamId
@ -1554,7 +1538,7 @@ func (endpoint *Endpoint) commitSegment(ctx context.Context, req *pb.SegmentComm
Time: time.Now(),
})
if err != nil {
return nil, nil, rpcstatus.Error(rpcstatus.Unauthenticated, err.Error())
return nil, rpcstatus.Error(rpcstatus.Unauthenticated, err.Error())
}
if numResults := len(req.UploadResult); numResults < int(streamID.Redundancy.GetSuccessThreshold()) {
@ -1563,7 +1547,7 @@ func (endpoint *Endpoint) commitSegment(ctx context.Context, req *pb.SegmentComm
zap.Int32("redundancy optimal threshold", streamID.Redundancy.GetSuccessThreshold()),
zap.Stringer("Segment ID", req.SegmentId),
)
return nil, nil, rpcstatus.Errorf(rpcstatus.InvalidArgument,
return nil, rpcstatus.Errorf(rpcstatus.InvalidArgument,
"the number of results of uploaded pieces (%d) is below the optimal threshold (%d)",
numResults, streamID.Redundancy.GetSuccessThreshold(),
)
@ -1589,10 +1573,10 @@ func (endpoint *Endpoint) commitSegment(ctx context.Context, req *pb.SegmentComm
})
if err != nil {
endpoint.log.Error("unable to marshal segment metadata", zap.Error(err))
return nil, nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
pointer = &pb.Pointer{
pointer := &pb.Pointer{
Type: pb.Pointer_REMOTE,
Remote: remote,
SegmentSize: req.SizeEncryptedData,
@ -1611,24 +1595,29 @@ func (endpoint *Endpoint) commitSegment(ctx context.Context, req *pb.SegmentComm
err = endpoint.validatePointer(ctx, pointer, orderLimits)
if err != nil {
return nil, nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
}
err = endpoint.filterValidPieces(ctx, pointer, orderLimits)
if err != nil {
return nil, nil, err
return nil, err
}
path, err := CreatePath(ctx, keyInfo.ProjectID, int64(segmentID.Index), streamID.Bucket, streamID.EncryptedPath)
if err != nil {
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
}
exceeded, limit, err := endpoint.projectUsage.ExceedsStorageUsage(ctx, keyInfo.ProjectID)
if err != nil {
return nil, nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
if exceeded {
endpoint.log.Error("The project limit of storage and bandwidth has been exceeded",
zap.Int64("limit", limit.Int64()),
zap.Stringer("Project ID", keyInfo.ProjectID),
)
return nil, nil, rpcstatus.Error(rpcstatus.ResourceExhausted, "Exceeded Usage Limit")
return nil, rpcstatus.Error(rpcstatus.ResourceExhausted, "Exceeded Usage Limit")
}
// clear hashes so we don't store them
@ -1649,7 +1638,7 @@ func (endpoint *Endpoint) commitSegment(ctx context.Context, req *pb.SegmentComm
zap.Int32("redundancy minimum requested", pointer.Remote.Redundancy.MinReq),
zap.Int32("redundancy total", pointer.Remote.Redundancy.Total),
)
return nil, nil, rpcstatus.Error(rpcstatus.InvalidArgument, "mismatched segment size and piece usage")
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, "mismatched segment size and piece usage")
}
}
@ -1662,19 +1651,12 @@ func (endpoint *Endpoint) commitSegment(ctx context.Context, req *pb.SegmentComm
// that will be affected is our per-project bandwidth and storage limits.
}
if savePointer {
path, err := CreatePath(ctx, keyInfo.ProjectID, int64(segmentID.Index), streamID.Bucket, streamID.EncryptedPath)
if err != nil {
return nil, nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
}
err = endpoint.metainfo.Put(ctx, path, pointer)
if err != nil {
return nil, nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
err = endpoint.metainfo.Put(ctx, path, pointer)
if err != nil {
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
return pointer, &pb.SegmentCommitResponse{
return &pb.SegmentCommitResponse{
SuccessfulPieces: int32(len(pointer.Remote.RemotePieces)),
}, nil
}
@ -1683,17 +1665,9 @@ func (endpoint *Endpoint) commitSegment(ctx context.Context, req *pb.SegmentComm
func (endpoint *Endpoint) MakeInlineSegment(ctx context.Context, req *pb.SegmentMakeInlineRequest) (resp *pb.SegmentMakeInlineResponse, err error) {
defer mon.Task()(&ctx)(&err)
_, resp, err = endpoint.makeInlineSegment(ctx, req, true)
return resp, err
}
// MakeInlineSegment makes inline segment on satellite
func (endpoint *Endpoint) makeInlineSegment(ctx context.Context, req *pb.SegmentMakeInlineRequest, savePointer bool) (pointer *pb.Pointer, resp *pb.SegmentMakeInlineResponse, err error) {
defer mon.Task()(&ctx)(&err)
streamID, err := endpoint.unmarshalSatStreamID(ctx, req.StreamId)
if err != nil {
return nil, nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
}
keyInfo, err := endpoint.validateAuth(ctx, req.Header, macaroon.Action{
@ -1703,22 +1677,27 @@ func (endpoint *Endpoint) makeInlineSegment(ctx context.Context, req *pb.Segment
Time: time.Now(),
})
if err != nil {
return nil, nil, rpcstatus.Error(rpcstatus.Unauthenticated, err.Error())
return nil, rpcstatus.Error(rpcstatus.Unauthenticated, err.Error())
}
if req.Position.Index < 0 {
return nil, nil, rpcstatus.Error(rpcstatus.InvalidArgument, "segment index must be greater then 0")
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, "segment index must be greater then 0")
}
path, err := CreatePath(ctx, keyInfo.ProjectID, int64(req.Position.Index), streamID.Bucket, streamID.EncryptedPath)
if err != nil {
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
}
exceeded, limit, err := endpoint.projectUsage.ExceedsStorageUsage(ctx, keyInfo.ProjectID)
if err != nil {
return nil, nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
if exceeded {
endpoint.log.Sugar().Errorf("monthly project limits are %s of storage and bandwidth usage. This limit has been exceeded for storage for projectID %s.",
limit, keyInfo.ProjectID,
)
return nil, nil, rpcstatus.Error(rpcstatus.ResourceExhausted, "Exceeded Usage Limit")
return nil, rpcstatus.Error(rpcstatus.ResourceExhausted, "Exceeded Usage Limit")
}
inlineUsed := int64(len(req.EncryptedInlineData))
@ -1734,7 +1713,7 @@ func (endpoint *Endpoint) makeInlineSegment(ctx context.Context, req *pb.Segment
KeyNonce: req.EncryptedKeyNonce.Bytes(),
})
pointer = &pb.Pointer{
pointer := &pb.Pointer{
Type: pb.Pointer_INLINE,
SegmentSize: inlineUsed,
CreationDate: streamID.CreationDate,
@ -1743,26 +1722,19 @@ func (endpoint *Endpoint) makeInlineSegment(ctx context.Context, req *pb.Segment
Metadata: metadata,
}
if savePointer {
path, err := CreatePath(ctx, keyInfo.ProjectID, int64(req.Position.Index), streamID.Bucket, streamID.EncryptedPath)
if err != nil {
return nil, nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
}
err = endpoint.metainfo.Put(ctx, path, pointer)
if err != nil {
return nil, nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
err = endpoint.metainfo.Put(ctx, path, pointer)
if err != nil {
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
err = endpoint.orders.UpdatePutInlineOrder(ctx, keyInfo.ProjectID, streamID.Bucket, inlineUsed)
if err != nil {
return nil, nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
endpoint.log.Info("Inline Segment Upload", zap.Stringer("Project ID", keyInfo.ProjectID), zap.String("operation", "put"), zap.String("type", "inline"))
return pointer, &pb.SegmentMakeInlineResponse{}, nil
return &pb.SegmentMakeInlineResponse{}, nil
}
// BeginDeleteSegment begins segment deletion process