satellite/orders: filter nodes based on segment placement
this change adds code to CreateGetOrderLimits to filter out any nodes that are not in the placement specified by the segment. notably, it does not change the audit or repair order limits. the list segments code had to be changed to include getting the placement field from the database. Change-Id: Ice3e42a327811bb20928c619a72ed94e0c1464ac
This commit is contained in:
parent
337eb9be6a
commit
32f683fe9d
@ -61,7 +61,7 @@ func (db *DB) ListSegments(ctx context.Context, opts ListSegments) (result ListS
|
|||||||
position, created_at, expires_at, root_piece_id,
|
position, created_at, expires_at, root_piece_id,
|
||||||
encrypted_key_nonce, encrypted_key, encrypted_size,
|
encrypted_key_nonce, encrypted_key, encrypted_size,
|
||||||
plain_offset, plain_size, encrypted_etag, redundancy,
|
plain_offset, plain_size, encrypted_etag, redundancy,
|
||||||
inline_data, remote_alias_pieces
|
inline_data, remote_alias_pieces, placement
|
||||||
FROM segments
|
FROM segments
|
||||||
WHERE
|
WHERE
|
||||||
stream_id = $1 AND
|
stream_id = $1 AND
|
||||||
@ -75,7 +75,7 @@ func (db *DB) ListSegments(ctx context.Context, opts ListSegments) (result ListS
|
|||||||
position, created_at, expires_at, root_piece_id,
|
position, created_at, expires_at, root_piece_id,
|
||||||
encrypted_key_nonce, encrypted_key, encrypted_size,
|
encrypted_key_nonce, encrypted_key, encrypted_size,
|
||||||
plain_offset, plain_size, encrypted_etag, redundancy,
|
plain_offset, plain_size, encrypted_etag, redundancy,
|
||||||
inline_data, remote_alias_pieces
|
inline_data, remote_alias_pieces, placement
|
||||||
FROM segments
|
FROM segments
|
||||||
WHERE
|
WHERE
|
||||||
stream_id = $1 AND
|
stream_id = $1 AND
|
||||||
@ -98,6 +98,7 @@ func (db *DB) ListSegments(ctx context.Context, opts ListSegments) (result ListS
|
|||||||
&segment.EncryptedETag,
|
&segment.EncryptedETag,
|
||||||
redundancyScheme{&segment.Redundancy},
|
redundancyScheme{&segment.Redundancy},
|
||||||
&segment.InlineData, &aliasPieces,
|
&segment.InlineData, &aliasPieces,
|
||||||
|
&segment.Placement,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return Error.New("failed to scan segments: %w", err)
|
return Error.New("failed to scan segments: %w", err)
|
||||||
|
@ -836,6 +836,50 @@ func TestCommitSegment_RejectRetryDuplicate(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestSegmentPlacementConstraints(t *testing.T) {
|
||||||
|
testplanet.Run(t, testplanet.Config{
|
||||||
|
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
||||||
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||||
|
satellite := planet.Satellites[0]
|
||||||
|
apiKey := planet.Uplinks[0].APIKey[planet.Satellites[0].ID()]
|
||||||
|
uplink := planet.Uplinks[0]
|
||||||
|
|
||||||
|
expectedBucketName := "some-bucket"
|
||||||
|
err := uplink.Upload(ctx, satellite, expectedBucketName, "file-object", testrand.Bytes(50*memory.KiB))
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
metainfoClient, err := uplink.DialMetainfo(ctx, satellite, apiKey)
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer ctx.Check(metainfoClient.Close)
|
||||||
|
|
||||||
|
items, _, err := metainfoClient.ListObjects(ctx, metaclient.ListObjectsParams{
|
||||||
|
Bucket: []byte(expectedBucketName),
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Len(t, items, 1)
|
||||||
|
|
||||||
|
{ // download should succeed because placement allows any node
|
||||||
|
_, err := metainfoClient.DownloadObject(ctx, metaclient.DownloadObjectParams{
|
||||||
|
Bucket: []byte(expectedBucketName),
|
||||||
|
EncryptedObjectKey: items[0].EncryptedObjectKey,
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = satellite.Metabase.DB.UnderlyingTagSQL().QueryRowContext(ctx,
|
||||||
|
`UPDATE segments SET placement = 1`).Err()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
{ // download should fail because non-zero placement and nodes have no country codes
|
||||||
|
_, err := metainfoClient.DownloadObject(ctx, metaclient.DownloadObjectParams{
|
||||||
|
Bucket: []byte(expectedBucketName),
|
||||||
|
EncryptedObjectKey: items[0].EncryptedObjectKey,
|
||||||
|
})
|
||||||
|
require.Error(t, err)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
func createTestBucket(ctx context.Context, tb testing.TB, planet *testplanet.Planet) buckets.Bucket {
|
func createTestBucket(ctx context.Context, tb testing.TB, planet *testplanet.Planet) buckets.Bucket {
|
||||||
bucket, err := planet.Satellites[0].API.Buckets.Service.CreateBucket(ctx, buckets.Bucket{
|
bucket, err := planet.Satellites[0].API.Buckets.Service.CreateBucket(ctx, buckets.Bucket{
|
||||||
Name: "test",
|
Name: "test",
|
||||||
|
@ -144,6 +144,14 @@ func (service *Service) CreateGetOrderLimits(ctx context.Context, bucket metabas
|
|||||||
return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
|
return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if segment.Placement != storj.EveryCountry {
|
||||||
|
for id, node := range nodes {
|
||||||
|
if !segment.Placement.AllowedCountry(node.CountryCode) {
|
||||||
|
delete(nodes, id)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
signer, err := NewSignerGet(service, segment.RootPieceID, time.Now(), orderLimit, bucket)
|
signer, err := NewSignerGet(service, segment.RootPieceID, time.Now(), orderLimit, bucket)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
|
return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
|
||||||
@ -152,8 +160,8 @@ func (service *Service) CreateGetOrderLimits(ctx context.Context, bucket metabas
|
|||||||
neededLimits := segment.Redundancy.DownloadNodes()
|
neededLimits := segment.Redundancy.DownloadNodes()
|
||||||
if desiredNodes > neededLimits {
|
if desiredNodes > neededLimits {
|
||||||
neededLimits = desiredNodes
|
neededLimits = desiredNodes
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pieces := segment.Pieces
|
pieces := segment.Pieces
|
||||||
for _, pieceIndex := range service.perm(len(pieces)) {
|
for _, pieceIndex := range service.perm(len(pieces)) {
|
||||||
piece := pieces[pieceIndex]
|
piece := pieces[pieceIndex]
|
||||||
|
Loading…
Reference in New Issue
Block a user