satellite/metainfo: RetryBeginSegmentPieces RPC implementation

Part of:
https://github.com/storj/uplink/issues/120

Change-Id: I2a2873455f7498ffd31f50ade16c173fe1d18157
This commit is contained in:
Andrew Harding 2023-01-11 08:40:17 -07:00 committed by Storj Robot
parent bd8867cd09
commit abd0ad92dc
4 changed files with 325 additions and 35 deletions

View File

@ -242,6 +242,10 @@ func (endpoint *Endpoint) unmarshalSatStreamID(ctx context.Context, streamID sto
func (endpoint *Endpoint) unmarshalSatSegmentID(ctx context.Context, segmentID storj.SegmentID) (_ *internalpb.SegmentID, err error) {
defer mon.Task()(&ctx)(&err)
if len(segmentID) == 0 {
return nil, errs.New("segment ID missing")
}
satSegmentID := &internalpb.SegmentID{}
err = pb.Unmarshal(segmentID, satSegmentID)
if err != nil {

View File

@ -123,6 +123,10 @@ func (endpoint *Endpoint) BeginSegment(ctx context.Context, req *pb.SegmentBegin
RootPieceId: rootPieceID,
CreationDate: time.Now(),
})
if err != nil {
endpoint.log.Error("internal", zap.Error(err))
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
endpoint.log.Info("Segment Upload", zap.Stringer("Project ID", keyInfo.ProjectID), zap.String("operation", "put"), zap.String("type", "remote"))
mon.Meter("req_put_remote").Mark(1)
@ -135,6 +139,94 @@ func (endpoint *Endpoint) BeginSegment(ctx context.Context, req *pb.SegmentBegin
}, nil
}
// RetryBeginSegmentPieces replaces put order limits for failed piece uploads.
func (endpoint *Endpoint) RetryBeginSegmentPieces(ctx context.Context, req *pb.RetryBeginSegmentPiecesRequest) (resp *pb.RetryBeginSegmentPiecesResponse, err error) {
defer mon.Task()(&ctx)(&err)
endpoint.versionCollector.collect(req.Header.UserAgent, mon.Func().ShortName())
segmentID, err := endpoint.unmarshalSatSegmentID(ctx, req.SegmentId)
if err != nil {
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
}
keyInfo, err := endpoint.validateAuth(ctx, req.Header, macaroon.Action{
Op: macaroon.ActionWrite,
Bucket: segmentID.StreamId.Bucket,
EncryptedPath: segmentID.StreamId.EncryptedObjectKey,
Time: time.Now(),
})
if err != nil {
return nil, err
}
if len(req.RetryPieceNumbers) == 0 {
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, "piece numbers to exchange cannot be empty")
}
pieceNumberSet := make(map[int32]struct{}, len(req.RetryPieceNumbers))
for _, pieceNumber := range req.RetryPieceNumbers {
if pieceNumber < 0 || int(pieceNumber) >= len(segmentID.OriginalOrderLimits) {
endpoint.log.Debug("piece number is out of range",
zap.Int32("piece number", pieceNumber),
zap.Int("redundancy total", len(segmentID.OriginalOrderLimits)),
zap.Stringer("Segment ID", req.SegmentId),
)
return nil, rpcstatus.Errorf(rpcstatus.InvalidArgument, "piece number %d must be within range [0,%d]", pieceNumber, len(segmentID.OriginalOrderLimits)-1)
}
if _, ok := pieceNumberSet[pieceNumber]; ok {
endpoint.log.Debug("piece number is duplicated",
zap.Int32("piece number", pieceNumber),
zap.Stringer("Segment ID", req.SegmentId),
)
return nil, rpcstatus.Errorf(rpcstatus.InvalidArgument, "piece number %d is duplicated", pieceNumber)
}
pieceNumberSet[pieceNumber] = struct{}{}
}
if err := endpoint.checkUploadLimits(ctx, keyInfo.ProjectID); err != nil {
return nil, err
}
// Find a new set of storage nodes, excluding any already represented in
// the current list of order limits.
// TODO: It's possible that a node gets reused across multiple calls to RetryBeginSegmentPieces.
excludedIDs := make([]storj.NodeID, 0, len(segmentID.OriginalOrderLimits))
for _, orderLimit := range segmentID.OriginalOrderLimits {
excludedIDs = append(excludedIDs, orderLimit.Limit.StorageNodeId)
}
nodes, err := endpoint.overlay.FindStorageNodesForUpload(ctx, overlay.FindStorageNodesRequest{
RequestedCount: len(req.RetryPieceNumbers),
Placement: storj.PlacementConstraint(segmentID.StreamId.Placement),
ExcludedIDs: excludedIDs,
})
if err != nil {
endpoint.log.Error("internal", zap.Error(err))
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
addressedLimits, err := endpoint.orders.ReplacePutOrderLimits(ctx, segmentID.RootPieceId, segmentID.OriginalOrderLimits, nodes, req.RetryPieceNumbers)
if err != nil {
endpoint.log.Error("internal", zap.Error(err))
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
segmentID.OriginalOrderLimits = addressedLimits
amendedSegmentID, err := endpoint.packSegmentID(ctx, segmentID)
if err != nil {
endpoint.log.Error("internal", zap.Error(err))
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
endpoint.log.Info("Segment Upload Piece Retry", zap.Stringer("Project ID", keyInfo.ProjectID), zap.String("operation", "put"), zap.String("type", "remote"))
return &pb.RetryBeginSegmentPiecesResponse{
SegmentId: amendedSegmentID,
AddressedLimits: addressedLimits,
}, nil
}
// CommitSegment commits segment after uploading.
func (endpoint *Endpoint) CommitSegment(ctx context.Context, req *pb.SegmentCommitRequest) (resp *pb.SegmentCommitResponse, err error) {
defer mon.Task()(&ctx)(&err)

View File

@ -4,6 +4,7 @@
package metainfo_test
import (
"context"
"strconv"
"testing"
"time"
@ -15,6 +16,7 @@ import (
"storj.io/common/memory"
"storj.io/common/pb"
"storj.io/common/rpc/rpcstatus"
"storj.io/common/rpc/rpctest"
"storj.io/common/signing"
"storj.io/common/storj"
"storj.io/common/testcontext"
@ -28,14 +30,8 @@ func TestExpirationTimeSegment(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 0, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
apiKey := planet.Uplinks[0].APIKey[planet.Satellites[0].ID()]
err := planet.Uplinks[0].CreateBucket(ctx, planet.Satellites[0], "my-bucket-name")
require.NoError(t, err)
metainfoClient, err := planet.Uplinks[0].DialMetainfo(ctx, planet.Satellites[0], apiKey)
require.NoError(t, err)
defer ctx.Check(metainfoClient.Close)
bucket := createTestBucket(ctx, t, planet)
metainfoClient := createMetainfoClient(ctx, t, planet)
for i, r := range []struct {
expirationDate time.Time
@ -59,7 +55,7 @@ func TestExpirationTimeSegment(t *testing.T) {
},
} {
_, err := metainfoClient.BeginObject(ctx, metaclient.BeginObjectParams{
Bucket: []byte("my-bucket-name"),
Bucket: []byte(bucket.Name),
EncryptedObjectKey: []byte("path" + strconv.Itoa(i)),
ExpiresAt: r.expirationDate,
EncryptionParameters: storj.EncryptionParameters{
@ -80,10 +76,6 @@ func TestInlineSegment(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 0, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
apiKey := planet.Uplinks[0].APIKey[planet.Satellites[0].ID()]
buckets := planet.Satellites[0].API.Buckets.Service
// TODO maybe split into separate cases
// Test:
// * create bucket
@ -95,16 +87,8 @@ func TestInlineSegment(t *testing.T) {
// * download segments
// * delete segments and object
bucket := storj.Bucket{
Name: "inline-segments-bucket",
ProjectID: planet.Uplinks[0].Projects[0].ID,
}
_, err := buckets.CreateBucket(ctx, bucket)
require.NoError(t, err)
metainfoClient, err := planet.Uplinks[0].DialMetainfo(ctx, planet.Satellites[0], apiKey)
require.NoError(t, err)
defer ctx.Check(metainfoClient.Close)
bucket := createTestBucket(ctx, t, planet)
metainfoClient := createMetainfoClient(ctx, t, planet)
params := metaclient.BeginObjectParams{
Bucket: []byte(bucket.Name),
@ -410,16 +394,11 @@ func TestCommitSegment_Validation(t *testing.T) {
),
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
apiKey := planet.Uplinks[0].APIKey[planet.Satellites[0].ID()]
client, err := planet.Uplinks[0].DialMetainfo(ctx, planet.Satellites[0], apiKey)
require.NoError(t, err)
defer ctx.Check(client.Close)
err = planet.Uplinks[0].CreateBucket(ctx, planet.Satellites[0], "testbucket")
require.NoError(t, err)
bucket := createTestBucket(ctx, t, planet)
client := createMetainfoClient(ctx, t, planet)
beginObjectResponse, err := client.BeginObject(ctx, metaclient.BeginObjectParams{
Bucket: []byte("testbucket"),
Bucket: []byte(bucket.Name),
EncryptedObjectKey: []byte("a/b/testobject"),
EncryptionParameters: storj.EncryptionParameters{
CipherSuite: storj.EncAESGCM,
@ -581,3 +560,183 @@ func TestCommitSegment_Validation(t *testing.T) {
require.True(t, errs2.IsRPC(err, rpcstatus.InvalidArgument))
})
}
func TestRetryBeginSegmentPieces(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 10, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
bucket := createTestBucket(ctx, t, planet)
metainfoClient := createMetainfoClient(ctx, t, planet)
params := metaclient.BeginObjectParams{
Bucket: []byte(bucket.Name),
EncryptedObjectKey: []byte("encrypted-path"),
EncryptionParameters: storj.EncryptionParameters{
CipherSuite: storj.EncAESGCM,
BlockSize: 256,
},
}
beginObjectResp, err := metainfoClient.BeginObject(ctx, params)
require.NoError(t, err)
beginSegmentResp, err := metainfoClient.BeginSegment(ctx, metaclient.BeginSegmentParams{
StreamID: beginObjectResp.StreamID,
Position: metaclient.SegmentPosition{},
MaxOrderLimit: 1024,
})
require.NoError(t, err)
// This call should fail, since there will not be enough unique nodes
// available to replace all of the piece orders.
_, err = metainfoClient.RetryBeginSegmentPieces(ctx, metaclient.RetryBeginSegmentPiecesParams{
SegmentID: beginSegmentResp.SegmentID,
RetryPieceNumbers: []int{0, 1, 2, 3, 4, 5, 6},
})
rpctest.RequireStatus(t, err, rpcstatus.Internal, "metaclient: not enough nodes: not enough nodes: requested from cache 7, found 2")
// This exchange should succeed.
exchangeSegmentPieceOrdersResp, err := metainfoClient.RetryBeginSegmentPieces(ctx, metaclient.RetryBeginSegmentPiecesParams{
SegmentID: beginSegmentResp.SegmentID,
RetryPieceNumbers: []int{0, 2},
})
require.NoError(t, err)
makeResult := func(i int) *pb.SegmentPieceUploadResult {
limit := exchangeSegmentPieceOrdersResp.Limits[i].Limit
node := planet.FindNode(limit.StorageNodeId)
require.NotNil(t, node, "failed to locate node to sign hash for piece %d", i)
signer := signing.SignerFromFullIdentity(node.Identity)
hash, err := signing.SignPieceHash(ctx, signer, &pb.PieceHash{
PieceSize: 512,
PieceId: limit.PieceId,
Timestamp: time.Now(),
})
require.NoError(t, err)
return &pb.SegmentPieceUploadResult{
PieceNum: int32(i),
NodeId: limit.StorageNodeId,
Hash: hash,
}
}
// Commit with only 6 successful uploads, otherwise, the original
// limits will still be valid. We want to test that the exchange
// replaced the order limits.
commitSegmentParams := metaclient.CommitSegmentParams{
SegmentID: beginSegmentResp.SegmentID,
PlainSize: 512,
SizeEncryptedData: 512,
Encryption: metaclient.SegmentEncryption{
EncryptedKey: testrand.Bytes(256),
},
UploadResult: []*pb.SegmentPieceUploadResult{
makeResult(0),
makeResult(1),
makeResult(2),
makeResult(3),
makeResult(4),
makeResult(5),
},
}
// This call should fail since we are not using the segment ID
// augmented by RetryBeginSegmentPieces
err = metainfoClient.CommitSegment(ctx, commitSegmentParams)
rpctest.RequireStatusContains(t, err, rpcstatus.InvalidArgument, "metaclient: Number of valid pieces (4) is less than the success threshold (6).")
// This call should succeed.
commitSegmentParams.SegmentID = exchangeSegmentPieceOrdersResp.SegmentID
err = metainfoClient.CommitSegment(ctx, commitSegmentParams)
require.NoError(t, err)
})
}
func TestRetryBeginSegmentPieces_Validation(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 10, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
bucket := createTestBucket(ctx, t, planet)
metainfoClient := createMetainfoClient(ctx, t, planet)
params := metaclient.BeginObjectParams{
Bucket: []byte(bucket.Name),
EncryptedObjectKey: []byte("encrypted-path"),
EncryptionParameters: storj.EncryptionParameters{
CipherSuite: storj.EncAESGCM,
BlockSize: 256,
},
}
beginObjectResp, err := metainfoClient.BeginObject(ctx, params)
require.NoError(t, err)
beginSegmentResp, err := metainfoClient.BeginSegment(ctx, metaclient.BeginSegmentParams{
StreamID: beginObjectResp.StreamID,
Position: metaclient.SegmentPosition{},
MaxOrderLimit: 1024,
})
require.NoError(t, err)
t.Run("segment ID missing", func(t *testing.T) {
_, err := metainfoClient.RetryBeginSegmentPieces(ctx, metaclient.RetryBeginSegmentPiecesParams{
SegmentID: nil,
RetryPieceNumbers: []int{0, 1},
})
rpctest.RequireStatus(t, err, rpcstatus.InvalidArgument, "metaclient: segment ID missing")
})
t.Run("piece numbers is empty", func(t *testing.T) {
_, err := metainfoClient.RetryBeginSegmentPieces(ctx, metaclient.RetryBeginSegmentPiecesParams{
SegmentID: beginSegmentResp.SegmentID,
RetryPieceNumbers: nil,
})
rpctest.RequireStatus(t, err, rpcstatus.InvalidArgument, "metaclient: piece numbers to exchange cannot be empty")
})
t.Run("piece numbers are less than zero", func(t *testing.T) {
_, err := metainfoClient.RetryBeginSegmentPieces(ctx, metaclient.RetryBeginSegmentPiecesParams{
SegmentID: beginSegmentResp.SegmentID,
RetryPieceNumbers: []int{-1},
})
rpctest.RequireStatus(t, err, rpcstatus.InvalidArgument, "metaclient: piece number -1 must be within range [0,7]")
})
t.Run("piece numbers are larger than expected", func(t *testing.T) {
_, err := metainfoClient.RetryBeginSegmentPieces(ctx, metaclient.RetryBeginSegmentPiecesParams{
SegmentID: beginSegmentResp.SegmentID,
RetryPieceNumbers: []int{len(beginSegmentResp.Limits)},
})
rpctest.RequireStatus(t, err, rpcstatus.InvalidArgument, "metaclient: piece number 8 must be within range [0,7]")
})
t.Run("piece numbers are duplicate", func(t *testing.T) {
_, err := metainfoClient.RetryBeginSegmentPieces(ctx, metaclient.RetryBeginSegmentPiecesParams{
SegmentID: beginSegmentResp.SegmentID,
RetryPieceNumbers: []int{0, 0},
})
rpctest.RequireStatus(t, err, rpcstatus.InvalidArgument, "metaclient: piece number 0 is duplicated")
})
t.Run("success", func(t *testing.T) {
_, err := metainfoClient.RetryBeginSegmentPieces(ctx, metaclient.RetryBeginSegmentPiecesParams{
SegmentID: beginSegmentResp.SegmentID,
RetryPieceNumbers: []int{0, 1},
})
require.NoError(t, err)
})
})
}
func createTestBucket(ctx context.Context, tb testing.TB, planet *testplanet.Planet) storj.Bucket {
bucket, err := planet.Satellites[0].API.Buckets.Service.CreateBucket(ctx, storj.Bucket{
Name: "test",
ProjectID: planet.Uplinks[0].Projects[0].ID,
})
require.NoError(tb, err)
return bucket
}
func createMetainfoClient(ctx *testcontext.Context, tb testing.TB, planet *testplanet.Planet) *metaclient.Client {
apiKey := planet.Uplinks[0].APIKey[planet.Satellites[0].ID()]
metainfoClient, err := planet.Uplinks[0].DialMetainfo(ctx, planet.Satellites[0], apiKey)
require.NoError(tb, err)
tb.Cleanup(func() { ctx.Check(metainfoClient.Close) })
return metainfoClient
}

View File

@ -235,10 +235,7 @@ func (service *Service) CreatePutOrderLimits(ctx context.Context, bucket metabas
}
for pieceNum, node := range nodes {
address := node.Address.Address
if node.LastIPPort != "" {
address = node.LastIPPort
}
address := storageNodeAddress(node)
_, err := signer.Sign(ctx, storj.NodeURL{ID: node.ID, Address: address}, int32(pieceNum))
if err != nil {
return storj.PieceID{}, nil, storj.PiecePrivateKey{}, Error.Wrap(err)
@ -248,6 +245,36 @@ func (service *Service) CreatePutOrderLimits(ctx context.Context, bucket metabas
return signer.RootPieceID, signer.AddressedLimits, signer.PrivateKey, nil
}
// ReplacePutOrderLimits replaces order limits for uploading pieces to nodes.
func (service *Service) ReplacePutOrderLimits(ctx context.Context, rootPieceID storj.PieceID, addressedLimits []*pb.AddressedOrderLimit, nodes []*overlay.SelectedNode, pieceNumbers []int32) (_ []*pb.AddressedOrderLimit, err error) {
defer mon.Task()(&ctx)(&err)
pieceIDDeriver := rootPieceID.Deriver()
newAddressedLimits := make([]*pb.AddressedOrderLimit, len(addressedLimits))
copy(newAddressedLimits, addressedLimits)
for i, pieceNumber := range pieceNumbers {
if pieceNumber < 0 || int(pieceNumber) >= len(addressedLimits) {
return nil, Error.New("invalid piece number %d", pieceNumber)
}
// TODO: clone?
newAddressedLimit := *addressedLimits[pieceNumber].Limit
newAddressedLimit.StorageNodeId = nodes[i].ID
newAddressedLimit.PieceId = pieceIDDeriver.Derive(nodes[i].ID, pieceNumber)
newAddressedLimit.SatelliteSignature = nil
newAddressedLimits[pieceNumber].Limit, err = signing.SignOrderLimit(ctx, service.satellite, &newAddressedLimit)
if err != nil {
return nil, ErrSigner.Wrap(err)
}
newAddressedLimits[pieceNumber].StorageNodeAddress.Address = storageNodeAddress(nodes[i])
}
return newAddressedLimits, nil
}
// CreateAuditOrderLimits creates the order limits for auditing the pieces of a segment.
func (service *Service) CreateAuditOrderLimits(ctx context.Context, segment metabase.Segment, skip map[storj.NodeID]bool) (_ []*pb.AddressedOrderLimit, _ storj.PiecePrivateKey, cachedNodesInfo map[storj.NodeID]overlay.NodeReputation, err error) {
defer mon.Task()(&ctx)(&err)
@ -577,3 +604,11 @@ func (service *Service) DecryptOrderMetadata(ctx context.Context, order *pb.Orde
}
return key.DecryptMetadata(order.SerialNumber, order.EncryptedMetadata)
}
func storageNodeAddress(node *overlay.SelectedNode) string {
address := node.Address.Address
if node.LastIPPort != "" {
address = node.LastIPPort
}
return address
}