satellite/metainfo: simplifying limits code

Its a very simple change to reduct code duplication.

Change-Id: Ia135232e3aefd094f76c6988e82e297be028e174
This commit is contained in:
Michał Niewrzał 2021-09-22 15:58:06 +02:00 committed by Michal Niewrzal
parent 118e64fa15
commit 1ed5db1467
4 changed files with 50 additions and 42 deletions

View File

@ -142,9 +142,16 @@ func TestDisqualifiedNodesGetNoDownload(t *testing.T) {
limits, _, err := satellitePeer.Orders.Service.CreateGetOrderLimits(ctx, bucket, segment, 0) limits, _, err := satellitePeer.Orders.Service.CreateGetOrderLimits(ctx, bucket, segment, 0)
require.NoError(t, err) require.NoError(t, err)
assert.Len(t, limits, len(segment.Pieces)-1)
notNilLimits := []*pb.AddressedOrderLimit{}
for _, orderLimit := range limits { for _, orderLimit := range limits {
if orderLimit.Limit != nil {
notNilLimits = append(notNilLimits, orderLimit)
}
}
assert.Len(t, notNilLimits, len(segment.Pieces)-1)
for _, orderLimit := range notNilLimits {
assert.False(t, isDisqualified(t, ctx, satellitePeer, orderLimit.Limit.StorageNodeId)) assert.False(t, isDisqualified(t, ctx, satellitePeer, orderLimit.Limit.StorageNodeId))
assert.NotEqual(t, orderLimit.Limit.StorageNodeId, disqualifiedNode) assert.NotEqual(t, orderLimit.Limit.StorageNodeId, disqualifiedNode)
} }

View File

@ -1048,15 +1048,6 @@ func (endpoint *Endpoint) DownloadObject(ctx context.Context, req *pb.ObjectDown
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error()) return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
} }
limits = sortLimits(limits, segment)
// workaround to avoid sending nil values on top level
for i := range limits {
if limits[i] == nil {
limits[i] = &pb.AddressedOrderLimit{}
}
}
endpoint.log.Info("Segment Download", zap.Stringer("Project ID", keyInfo.ProjectID), zap.String("operation", "get"), zap.String("type", "remote")) endpoint.log.Info("Segment Download", zap.Stringer("Project ID", keyInfo.ProjectID), zap.String("operation", "get"), zap.String("type", "remote"))
mon.Meter("req_get_remote").Mark(1) mon.Meter("req_get_remote").Mark(1)
@ -2293,15 +2284,6 @@ func (endpoint *Endpoint) DownloadSegment(ctx context.Context, req *pb.SegmentDo
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error()) return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
} }
limits = sortLimits(limits, segment)
// workaround to avoid sending nil values on top level
for i := range limits {
if limits[i] == nil {
limits[i] = &pb.AddressedOrderLimit{}
}
}
endpoint.log.Info("Segment Download", zap.Stringer("Project ID", keyInfo.ProjectID), zap.String("operation", "get"), zap.String("type", "remote")) endpoint.log.Info("Segment Download", zap.Stringer("Project ID", keyInfo.ProjectID), zap.String("operation", "get"), zap.String("type", "remote"))
mon.Meter("req_get_remote").Mark(1) mon.Meter("req_get_remote").Mark(1)
@ -2377,28 +2359,6 @@ func (endpoint *Endpoint) DeletePart(ctx context.Context, req *pb.PartDeleteRequ
return &pb.PartDeleteResponse{}, nil return &pb.PartDeleteResponse{}, nil
} }
// sortLimits sorts order limits and fill missing ones with nil values.
func sortLimits(limits []*pb.AddressedOrderLimit, segment metabase.Segment) []*pb.AddressedOrderLimit {
sorted := make([]*pb.AddressedOrderLimit, segment.Redundancy.TotalShares)
for _, piece := range segment.Pieces {
sorted[piece.Number] = getLimitByStorageNodeID(limits, piece.StorageNode)
}
return sorted
}
func getLimitByStorageNodeID(limits []*pb.AddressedOrderLimit, storageNodeID storj.NodeID) *pb.AddressedOrderLimit {
for _, limit := range limits {
if limit == nil || limit.GetLimit() == nil {
continue
}
if limit.GetLimit().StorageNodeId == storageNodeID {
return limit
}
}
return nil
}
func (endpoint *Endpoint) packStreamID(ctx context.Context, satStreamID *internalpb.StreamID) (streamID storj.StreamID, err error) { func (endpoint *Endpoint) packStreamID(ctx context.Context, satStreamID *internalpb.StreamID) (streamID storj.StreamID, err error) {
defer mon.Task()(&ctx)(&err) defer mon.Task()(&ctx)(&err)

View File

@ -189,6 +189,17 @@ func (service *Service) CreateGetOrderLimits(ctx context.Context, bucket metabas
return nil, storj.PiecePrivateKey{}, Error.Wrap(err) return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
} }
signer.AddressedLimits, err = sortLimits(signer.AddressedLimits, segment)
if err != nil {
return nil, storj.PiecePrivateKey{}, err
}
// workaround to avoid sending nil values on top level
for i := range signer.AddressedLimits {
if signer.AddressedLimits[i] == nil {
signer.AddressedLimits[i] = &pb.AddressedOrderLimit{}
}
}
return signer.AddressedLimits, signer.PrivateKey, nil return signer.AddressedLimits, signer.PrivateKey, nil
} }
@ -198,6 +209,32 @@ func (service *Service) perm(n int) []int {
return service.rng.Perm(n) return service.rng.Perm(n)
} }
// sortLimits sorts order limits and fill missing ones with nil values.
func sortLimits(limits []*pb.AddressedOrderLimit, segment metabase.Segment) ([]*pb.AddressedOrderLimit, error) {
sorted := make([]*pb.AddressedOrderLimit, segment.Redundancy.TotalShares)
for _, piece := range segment.Pieces {
if int16(piece.Number) >= segment.Redundancy.TotalShares {
return nil, Error.New("piece number is greater than redundancy total shares: got %d, max %d",
piece.Number, (segment.Redundancy.TotalShares - 1))
}
sorted[piece.Number] = getLimitByStorageNodeID(limits, piece.StorageNode)
}
return sorted, nil
}
func getLimitByStorageNodeID(limits []*pb.AddressedOrderLimit, storageNodeID storj.NodeID) *pb.AddressedOrderLimit {
for _, limit := range limits {
if limit == nil || limit.GetLimit() == nil {
continue
}
if limit.GetLimit().StorageNodeId == storageNodeID {
return limit
}
}
return nil
}
// CreatePutOrderLimits creates the order limits for uploading pieces to nodes. // CreatePutOrderLimits creates the order limits for uploading pieces to nodes.
func (service *Service) CreatePutOrderLimits(ctx context.Context, bucket metabase.BucketLocation, nodes []*overlay.SelectedNode, pieceExpiration time.Time, maxPieceSize int64) (_ storj.PieceID, _ []*pb.AddressedOrderLimit, privateKey storj.PiecePrivateKey, err error) { func (service *Service) CreatePutOrderLimits(ctx context.Context, bucket metabase.BucketLocation, nodes []*overlay.SelectedNode, pieceExpiration time.Time, maxPieceSize int64) (_ storj.PieceID, _ []*pb.AddressedOrderLimit, privateKey storj.PiecePrivateKey, err error) {
defer mon.Task()(&ctx)(&err) defer mon.Task()(&ctx)(&err)

View File

@ -40,11 +40,15 @@ func TestOrderLimitsEncryptedMetadata(t *testing.T) {
limits, _, err := satellitePeer.Orders.Service.CreateGetOrderLimits(ctx, bucket, segments[0], 0) limits, _, err := satellitePeer.Orders.Service.CreateGetOrderLimits(ctx, bucket, segments[0], 0)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, 2, len(limits)) require.Equal(t, 3, len(limits))
// Test: get the bucket name and project ID from the encrypted metadata and // Test: get the bucket name and project ID from the encrypted metadata and
// compare with the old method of getting the data from the serial numbers table. // compare with the old method of getting the data from the serial numbers table.
orderLimit1 := limits[0].Limit orderLimit1 := limits[0].Limit
// from 3 order limits only one can be nil
if orderLimit1 == nil {
orderLimit1 = limits[1].Limit
}
require.True(t, len(orderLimit1.EncryptedMetadata) > 0) require.True(t, len(orderLimit1.EncryptedMetadata) > 0)
_, err = metabase.ParseBucketPrefix(metabase.BucketPrefix("")) _, err = metabase.ParseBucketPrefix(metabase.BucketPrefix(""))