satellite confirms success threshold reached (#2657)
This commit is contained in:
parent
c9b46f2fe2
commit
238e264a8f
@ -63,17 +63,17 @@ type Containment interface {
|
||||
|
||||
// Endpoint metainfo endpoint
|
||||
type Endpoint struct {
|
||||
log *zap.Logger
|
||||
metainfo *Service
|
||||
orders *orders.Service
|
||||
cache *overlay.Cache
|
||||
partnerinfo attribution.DB
|
||||
projectUsage *accounting.ProjectUsage
|
||||
containment Containment
|
||||
apiKeys APIKeys
|
||||
createRequests *createRequests
|
||||
rsConfig RSConfig
|
||||
satellite signing.Signer
|
||||
log *zap.Logger
|
||||
metainfo *Service
|
||||
orders *orders.Service
|
||||
cache *overlay.Cache
|
||||
partnerinfo attribution.DB
|
||||
projectUsage *accounting.ProjectUsage
|
||||
containment Containment
|
||||
apiKeys APIKeys
|
||||
createRequests *createRequests
|
||||
requiredRSConfig RSConfig
|
||||
satellite signing.Signer
|
||||
}
|
||||
|
||||
// NewEndpoint creates new metainfo endpoint instance
|
||||
@ -81,17 +81,17 @@ func NewEndpoint(log *zap.Logger, metainfo *Service, orders *orders.Service, cac
|
||||
containment Containment, apiKeys APIKeys, projectUsage *accounting.ProjectUsage, rsConfig RSConfig, satellite signing.Signer) *Endpoint {
|
||||
// TODO do something with too many params
|
||||
return &Endpoint{
|
||||
log: log,
|
||||
metainfo: metainfo,
|
||||
orders: orders,
|
||||
cache: cache,
|
||||
partnerinfo: partnerinfo,
|
||||
containment: containment,
|
||||
apiKeys: apiKeys,
|
||||
projectUsage: projectUsage,
|
||||
createRequests: newCreateRequests(),
|
||||
rsConfig: rsConfig,
|
||||
satellite: satellite,
|
||||
log: log,
|
||||
metainfo: metainfo,
|
||||
orders: orders,
|
||||
cache: cache,
|
||||
partnerinfo: partnerinfo,
|
||||
containment: containment,
|
||||
apiKeys: apiKeys,
|
||||
projectUsage: projectUsage,
|
||||
createRequests: newCreateRequests(),
|
||||
requiredRSConfig: rsConfig,
|
||||
satellite: satellite,
|
||||
}
|
||||
}
|
||||
|
||||
@ -523,8 +523,8 @@ func (endpoint *Endpoint) filterValidPieces(ctx context.Context, pointer *pb.Poi
|
||||
return Error.New("all pieces needs to have the same size")
|
||||
}
|
||||
|
||||
// we repair when the number of healthy files is less than or equal to the repair threshold
|
||||
// except for the case when the repair and success thresholds are the same (a case usually seen during testing)
|
||||
// We repair when the number of healthy files is less than or equal to the repair threshold
|
||||
// except for the case when the repair and success thresholds are the same (a case usually seen during testing).
|
||||
if int32(len(remotePieces)) <= remote.Redundancy.RepairThreshold && int32(len(remotePieces)) < remote.Redundancy.SuccessThreshold {
|
||||
return Error.New("Number of valid pieces (%d) is less than or equal to the repair threshold (%d)",
|
||||
len(remotePieces),
|
||||
@ -532,6 +532,13 @@ func (endpoint *Endpoint) filterValidPieces(ctx context.Context, pointer *pb.Poi
|
||||
)
|
||||
}
|
||||
|
||||
if int32(len(remotePieces)) < remote.Redundancy.SuccessThreshold {
|
||||
return Error.New("Number of valid pieces (%d) is less than the success threshold (%d)",
|
||||
len(remotePieces),
|
||||
remote.Redundancy.SuccessThreshold,
|
||||
)
|
||||
}
|
||||
|
||||
remote.RemotePieces = remotePieces
|
||||
}
|
||||
return nil
|
||||
|
@ -290,7 +290,7 @@ func TestServiceList(t *testing.T) {
|
||||
|
||||
func TestCommitSegment(t *testing.T) {
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
||||
SatelliteCount: 1, StorageNodeCount: 6, UplinkCount: 1,
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
apiKey := planet.Uplinks[0].APIKey[planet.Satellites[0].ID()]
|
||||
|
||||
@ -304,7 +304,7 @@ func TestCommitSegment(t *testing.T) {
|
||||
require.Error(t, err)
|
||||
}
|
||||
{
|
||||
// error if number of remote pieces is lower then repair threshold
|
||||
// error if number of remote pieces is lower than repair threshold
|
||||
redundancy := &pb.RedundancyScheme{
|
||||
MinReq: 1,
|
||||
RepairThreshold: 2,
|
||||
@ -313,11 +313,11 @@ func TestCommitSegment(t *testing.T) {
|
||||
ErasureShareSize: 256,
|
||||
}
|
||||
expirationDate := time.Now().Add(time.Hour)
|
||||
addresedLimits, rootPieceID, _, err := metainfo.CreateSegment(ctx, "bucket", "path", -1, redundancy, 1000, expirationDate)
|
||||
addressedLimits, rootPieceID, _, err := metainfo.CreateSegment(ctx, "bucket", "path", -1, redundancy, 1000, expirationDate)
|
||||
require.NoError(t, err)
|
||||
|
||||
// create number of pieces below repair threshold
|
||||
usedForPieces := addresedLimits[:redundancy.RepairThreshold-1]
|
||||
usedForPieces := addressedLimits[:redundancy.RepairThreshold-1]
|
||||
pieces := make([]*pb.RemotePiece, len(usedForPieces))
|
||||
for i, limit := range usedForPieces {
|
||||
pieces[i] = &pb.RemotePiece{
|
||||
@ -343,13 +343,62 @@ func TestCommitSegment(t *testing.T) {
|
||||
ExpirationDate: expirationDate,
|
||||
}
|
||||
|
||||
limits := make([]*pb.OrderLimit, len(addresedLimits))
|
||||
for i, addresedLimit := range addresedLimits {
|
||||
limits[i] = addresedLimit.Limit
|
||||
limits := make([]*pb.OrderLimit, len(addressedLimits))
|
||||
for i, addressedLimit := range addressedLimits {
|
||||
limits[i] = addressedLimit.Limit
|
||||
}
|
||||
_, err = metainfo.CommitSegment(ctx, "bucket", "path", -1, pointer, limits)
|
||||
require.Error(t, err)
|
||||
require.Contains(t, err.Error(), "less than or equal to the repair threshold")
|
||||
require.Contains(t, err.Error(), "is less than or equal to the repair threshold")
|
||||
}
|
||||
|
||||
{
|
||||
// error if number of remote pieces is lower than success threshold
|
||||
redundancy := &pb.RedundancyScheme{
|
||||
MinReq: 1,
|
||||
RepairThreshold: 2,
|
||||
SuccessThreshold: 5,
|
||||
Total: 6,
|
||||
ErasureShareSize: 256,
|
||||
}
|
||||
expirationDate := time.Now().Add(time.Hour)
|
||||
addressedLimits, rootPieceID, _, err := metainfo.CreateSegment(ctx, "bucket", "path", -1, redundancy, 1000, expirationDate)
|
||||
require.NoError(t, err)
|
||||
|
||||
// create number of pieces below success threshold
|
||||
usedForPieces := addressedLimits[:redundancy.SuccessThreshold-1]
|
||||
pieces := make([]*pb.RemotePiece, len(usedForPieces))
|
||||
for i, limit := range usedForPieces {
|
||||
pieces[i] = &pb.RemotePiece{
|
||||
PieceNum: int32(i),
|
||||
NodeId: limit.Limit.StorageNodeId,
|
||||
Hash: &pb.PieceHash{
|
||||
PieceId: limit.Limit.PieceId,
|
||||
PieceSize: 256,
|
||||
Timestamp: time.Now(),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
pointer := &pb.Pointer{
|
||||
CreationDate: time.Now(),
|
||||
Type: pb.Pointer_REMOTE,
|
||||
SegmentSize: 10,
|
||||
Remote: &pb.RemoteSegment{
|
||||
RootPieceId: rootPieceID,
|
||||
Redundancy: redundancy,
|
||||
RemotePieces: pieces,
|
||||
},
|
||||
ExpirationDate: expirationDate,
|
||||
}
|
||||
|
||||
limits := make([]*pb.OrderLimit, len(addressedLimits))
|
||||
for i, addressedLimit := range addressedLimits {
|
||||
limits[i] = addressedLimit.Limit
|
||||
}
|
||||
_, err = metainfo.CommitSegment(ctx, "bucket", "path", -1, pointer, limits)
|
||||
require.Error(t, err)
|
||||
require.Contains(t, err.Error(), "is less than the success threshold")
|
||||
}
|
||||
})
|
||||
}
|
||||
@ -586,21 +635,21 @@ func TestCommitSegmentPointer(t *testing.T) {
|
||||
Modify: func(pointer *pb.Pointer) {
|
||||
pointer.Remote.RemotePieces[0].Hash = nil
|
||||
},
|
||||
ErrorMessage: "Number of valid pieces (1) is less than or equal to the repair threshold (1)",
|
||||
ErrorMessage: "Number of valid pieces (2) is less than the success threshold (3)",
|
||||
},
|
||||
{
|
||||
// invalid timestamp removes piece from pointer, not enough pieces for successful upload
|
||||
Modify: func(pointer *pb.Pointer) {
|
||||
pointer.Remote.RemotePieces[0].Hash.Timestamp = time.Now().Add(-24 * time.Hour)
|
||||
},
|
||||
ErrorMessage: "Number of valid pieces (1) is less than or equal to the repair threshold (1)",
|
||||
ErrorMessage: "Number of valid pieces (2) is less than the success threshold (3)",
|
||||
},
|
||||
{
|
||||
// invalid hash PieceID removes piece from pointer, not enough pieces for successful upload
|
||||
Modify: func(pointer *pb.Pointer) {
|
||||
pointer.Remote.RemotePieces[0].Hash.PieceId = storj.PieceID{1}
|
||||
},
|
||||
ErrorMessage: "Number of valid pieces (1) is less than or equal to the repair threshold (1)",
|
||||
ErrorMessage: "Number of valid pieces (2) is less than the success threshold (3)",
|
||||
},
|
||||
{
|
||||
Modify: func(pointer *pb.Pointer) {
|
||||
@ -764,6 +813,13 @@ func createTestPointer(t *testing.T) *pb.Pointer {
|
||||
Timestamp: timestamp,
|
||||
},
|
||||
},
|
||||
{
|
||||
PieceNum: 2,
|
||||
Hash: &pb.PieceHash{
|
||||
PieceSize: pieceSize,
|
||||
Timestamp: timestamp,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
ExpirationDate: timestamp,
|
||||
@ -1002,6 +1058,7 @@ func TestBeginCommitListSegment(t *testing.T) {
|
||||
UploadResult: []*pb.SegmentPieceUploadResult{
|
||||
makeResult(0),
|
||||
makeResult(1),
|
||||
makeResult(2),
|
||||
},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
@ -283,7 +283,7 @@ func (endpoint *Endpoint) validatePointer(ctx context.Context, pointer *pb.Point
|
||||
return Error.New("invalid no order limit for piece")
|
||||
}
|
||||
|
||||
maxAllowed, err := encryption.CalcEncryptedSize(endpoint.rsConfig.MaxSegmentSize.Int64(), storj.EncryptionParameters{
|
||||
maxAllowed, err := encryption.CalcEncryptedSize(endpoint.requiredRSConfig.MaxSegmentSize.Int64(), storj.EncryptionParameters{
|
||||
CipherSuite: storj.EncAESGCM,
|
||||
BlockSize: 128, // intentionally low block size to allow maximum possible encryption overhead
|
||||
})
|
||||
@ -323,18 +323,18 @@ func (endpoint *Endpoint) validatePointer(ctx context.Context, pointer *pb.Point
|
||||
func (endpoint *Endpoint) validateRedundancy(ctx context.Context, redundancy *pb.RedundancyScheme) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
if endpoint.rsConfig.Validate == true {
|
||||
if endpoint.rsConfig.ErasureShareSize.Int32() != redundancy.ErasureShareSize ||
|
||||
endpoint.rsConfig.MaxThreshold != int(redundancy.Total) ||
|
||||
endpoint.rsConfig.MinThreshold != int(redundancy.MinReq) ||
|
||||
endpoint.rsConfig.RepairThreshold != int(redundancy.RepairThreshold) ||
|
||||
endpoint.rsConfig.SuccessThreshold != int(redundancy.SuccessThreshold) {
|
||||
if endpoint.requiredRSConfig.Validate == true {
|
||||
if endpoint.requiredRSConfig.ErasureShareSize.Int32() != redundancy.ErasureShareSize ||
|
||||
endpoint.requiredRSConfig.MaxThreshold != int(redundancy.Total) ||
|
||||
endpoint.requiredRSConfig.MinThreshold != int(redundancy.MinReq) ||
|
||||
endpoint.requiredRSConfig.RepairThreshold != int(redundancy.RepairThreshold) ||
|
||||
endpoint.requiredRSConfig.SuccessThreshold != int(redundancy.SuccessThreshold) {
|
||||
return Error.New("provided redundancy scheme parameters not allowed: want [%d, %d, %d, %d, %d] got [%d, %d, %d, %d, %d]",
|
||||
endpoint.rsConfig.MinThreshold,
|
||||
endpoint.rsConfig.RepairThreshold,
|
||||
endpoint.rsConfig.SuccessThreshold,
|
||||
endpoint.rsConfig.MaxThreshold,
|
||||
endpoint.rsConfig.ErasureShareSize.Int32(),
|
||||
endpoint.requiredRSConfig.MinThreshold,
|
||||
endpoint.requiredRSConfig.RepairThreshold,
|
||||
endpoint.requiredRSConfig.SuccessThreshold,
|
||||
endpoint.requiredRSConfig.MaxThreshold,
|
||||
endpoint.requiredRSConfig.ErasureShareSize.Int32(),
|
||||
|
||||
redundancy.MinReq,
|
||||
redundancy.RepairThreshold,
|
||||
|
Loading…
Reference in New Issue
Block a user