satellite/metainfo: reduce pointerDB access for CommitObject (#3589)
This commit is contained in:
parent
48a557eb2b
commit
1aa2bc0a83
@ -1119,28 +1119,22 @@ func (endpoint *Endpoint) CommitObject(ctx context.Context, req *pb.ObjectCommit
|
||||
return nil, rpcstatus.Error(rpcstatus.Unauthenticated, err.Error())
|
||||
}
|
||||
|
||||
segmentIndex := int64(0)
|
||||
var lastSegmentPointerBytes []byte
|
||||
var lastSegmentPointer *pb.Pointer
|
||||
var lastSegmentPath string
|
||||
for {
|
||||
path, err := CreatePath(ctx, keyInfo.ProjectID, segmentIndex, streamID.Bucket, streamID.EncryptedPath)
|
||||
streamMeta := pb.StreamMeta{}
|
||||
err = proto.Unmarshal(req.EncryptedMetadata, &streamMeta)
|
||||
if err != nil {
|
||||
return nil, rpcstatus.Errorf(rpcstatus.InvalidArgument, "unable to create segment path: %v", err)
|
||||
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, "invalid metadata structure")
|
||||
}
|
||||
|
||||
pointerBytes, pointer, err := endpoint.metainfo.GetWithBytes(ctx, path)
|
||||
lastSegmentIndex := streamMeta.NumberOfSegments - 1
|
||||
lastSegmentPath, err := CreatePath(ctx, keyInfo.ProjectID, lastSegmentIndex, streamID.Bucket, streamID.EncryptedPath)
|
||||
if err != nil {
|
||||
if storage.ErrKeyNotFound.Has(err) {
|
||||
break
|
||||
}
|
||||
return nil, rpcstatus.Errorf(rpcstatus.Internal, "unable to create get segment: %v", err)
|
||||
return nil, rpcstatus.Errorf(rpcstatus.InvalidArgument, "unable to create segment path: %s", err.Error())
|
||||
}
|
||||
|
||||
lastSegmentPointerBytes = pointerBytes
|
||||
lastSegmentPointer = pointer
|
||||
lastSegmentPath = path
|
||||
segmentIndex++
|
||||
lastSegmentPointerBytes, lastSegmentPointer, err := endpoint.metainfo.GetWithBytes(ctx, lastSegmentPath)
|
||||
if err != nil {
|
||||
endpoint.log.Error("unable to get pointer", zap.String("segmentPath", lastSegmentPath), zap.Error(err))
|
||||
return nil, rpcstatus.Error(rpcstatus.Internal, "unable to commit object")
|
||||
}
|
||||
if lastSegmentPointer == nil {
|
||||
return nil, rpcstatus.Errorf(rpcstatus.NotFound, "unable to find object: %q/%q", streamID.Bucket, streamID.EncryptedPath)
|
||||
@ -1155,17 +1149,21 @@ func (endpoint *Endpoint) CommitObject(ctx context.Context, req *pb.ObjectCommit
|
||||
|
||||
err = endpoint.metainfo.Delete(ctx, lastSegmentPath, lastSegmentPointerBytes)
|
||||
if err != nil {
|
||||
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
|
||||
endpoint.log.Error("unable to delete pointer", zap.String("segmentPath", lastSegmentPath), zap.Error(err))
|
||||
return nil, rpcstatus.Error(rpcstatus.Internal, "unable to commit object")
|
||||
}
|
||||
|
||||
lastSegmentPath, err = CreatePath(ctx, keyInfo.ProjectID, -1, streamID.Bucket, streamID.EncryptedPath)
|
||||
lastSegmentIndex = -1
|
||||
lastSegmentPath, err = CreatePath(ctx, keyInfo.ProjectID, lastSegmentIndex, streamID.Bucket, streamID.EncryptedPath)
|
||||
if err != nil {
|
||||
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
|
||||
endpoint.log.Error("unable to create path", zap.Error(err))
|
||||
return nil, rpcstatus.Error(rpcstatus.Internal, "unable to commit object")
|
||||
}
|
||||
|
||||
err = endpoint.metainfo.Put(ctx, lastSegmentPath, lastSegmentPointer)
|
||||
if err != nil {
|
||||
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
|
||||
endpoint.log.Error("unable to put pointer", zap.Error(err))
|
||||
return nil, rpcstatus.Error(rpcstatus.Internal, "unable to commit object")
|
||||
}
|
||||
|
||||
return &pb.ObjectCommitResponse{}, nil
|
||||
|
@ -1139,8 +1139,13 @@ func TestBeginCommitListSegment(t *testing.T) {
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
metadata, err := proto.Marshal(&pb.StreamMeta{
|
||||
NumberOfSegments: 1,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
err = metainfoClient.CommitObject(ctx, metainfo.CommitObjectParams{
|
||||
StreamID: streamID,
|
||||
EncryptedMetadata: metadata,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
@ -1297,8 +1302,13 @@ func TestInlineSegment(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
metadata, err := proto.Marshal(&pb.StreamMeta{
|
||||
NumberOfSegments: int64(len(segments)),
|
||||
})
|
||||
require.NoError(t, err)
|
||||
err = metainfoClient.CommitObject(ctx, metainfo.CommitObjectParams{
|
||||
StreamID: streamID,
|
||||
EncryptedMetadata: metadata,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
@ -1650,7 +1660,13 @@ func TestBatch(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
requests = append(requests, &metainfo.CommitObjectParams{})
|
||||
metadata, err := proto.Marshal(&pb.StreamMeta{
|
||||
NumberOfSegments: int64(numOfSegments),
|
||||
})
|
||||
require.NoError(t, err)
|
||||
requests = append(requests, &metainfo.CommitObjectParams{
|
||||
EncryptedMetadata: metadata,
|
||||
})
|
||||
requests = append(requests, &metainfo.ListSegmentsParams{})
|
||||
|
||||
responses, err := metainfoClient.Batch(ctx, requests...)
|
||||
@ -1708,8 +1724,13 @@ func TestBatch(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
metadata, err := proto.Marshal(&pb.StreamMeta{
|
||||
NumberOfSegments: int64(numOfSegments),
|
||||
})
|
||||
require.NoError(t, err)
|
||||
requests = append(requests, &metainfo.CommitObjectParams{
|
||||
StreamID: streamID,
|
||||
EncryptedMetadata: metadata,
|
||||
})
|
||||
|
||||
responses, err := metainfoClient.Batch(ctx, requests...)
|
||||
|
Loading…
Reference in New Issue
Block a user