satellite/metainfo: override bucket RS values with satellite config

Satellite now is keeping RS values for uplink but old uplinks were using
default bucket settings. Because of that we need to override buckets
settings with satellite settings to avoid breaking older uplinks.

Change-Id: Ia1068db70e4adbf741c5e81d27d9e39799049c22
This commit is contained in:
Michal Niewrzal 2020-01-28 14:44:47 +01:00 committed by jens
parent 3abb8c8ed7
commit 90fc1922d0
3 changed files with 25 additions and 33 deletions

View File

@ -280,9 +280,8 @@ func TestBucketAttrs(t *testing.T) {
assert.Equal(t, bucketName, got.Name)
assert.Equal(t, inBucketConfig.PathCipher, got.PathCipher)
assert.Equal(t, inBucketConfig.EncryptionParameters, got.EncryptionParameters)
assert.Equal(t, inBucketConfig.Volatile.RedundancyScheme, got.Volatile.RedundancyScheme)
assert.Equal(t, inBucketConfig.Volatile.SegmentsSize, got.Volatile.SegmentsSize)
assert.Equal(t, inBucketConfig, got.BucketConfig)
// ignore RS values because satellite will override it
err = proj.DeleteBucket(ctx, bucketName)
require.NoError(t, err)

View File

@ -36,12 +36,7 @@ void handle_project(ProjectRef project) {
require(config.encryption_parameters.cipher_suite == info.encryption_parameters.cipher_suite);
require(config.encryption_parameters.block_size == info.encryption_parameters.block_size);
require(config.redundancy_scheme.algorithm == info.redundancy_scheme.algorithm);
require(config.redundancy_scheme.share_size == info.redundancy_scheme.share_size);
require(config.redundancy_scheme.required_shares == info.redundancy_scheme.required_shares);
require(config.redundancy_scheme.repair_shares == info.redundancy_scheme.repair_shares);
require(config.redundancy_scheme.optimal_shares == info.redundancy_scheme.optimal_shares);
require(config.redundancy_scheme.total_shares == info.redundancy_scheme.total_shares);
// ignore RS values because satellite will override it
free_bucket_info(&info);
}

View File

@ -746,7 +746,8 @@ func (endpoint *Endpoint) GetBucket(ctx context.Context, req *pb.BucketGetReques
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
convBucket, err := convertBucketToProto(ctx, bucket)
// override RS to fit satellite settings
convBucket, err := convertBucketToProto(ctx, bucket, endpoint.redundancyScheme())
if err != nil {
return resp, err
}
@ -800,7 +801,8 @@ func (endpoint *Endpoint) CreateBucket(ctx context.Context, req *pb.BucketCreate
return nil, rpcstatus.Error(rpcstatus.Internal, "unable to create bucket")
}
convBucket, err := convertBucketToProto(ctx, bucket)
// override RS to fit satellite settings
convBucket, err := convertBucketToProto(ctx, bucket, endpoint.redundancyScheme())
if err != nil {
endpoint.log.Error("error while converting bucket to proto", zap.String("bucketName", bucket.Name), zap.Error(err))
return nil, rpcstatus.Error(rpcstatus.Internal, "unable to create bucket")
@ -1046,26 +1048,18 @@ func convertProtoToBucket(req *pb.BucketCreateRequest, projectID uuid.UUID) (buc
}, nil
}
func convertBucketToProto(ctx context.Context, bucket storj.Bucket) (pbBucket *pb.Bucket, err error) {
rs := bucket.DefaultRedundancyScheme
func convertBucketToProto(ctx context.Context, bucket storj.Bucket, rs *pb.RedundancyScheme) (pbBucket *pb.Bucket, err error) {
partnerID, err := bucket.PartnerID.MarshalJSON()
if err != nil {
return pbBucket, rpcstatus.Error(rpcstatus.Internal, "UUID marshal error")
}
return &pb.Bucket{
Name: []byte(bucket.Name),
PathCipher: pb.CipherSuite(int(bucket.PathCipher)),
PartnerId: partnerID,
CreatedAt: bucket.Created,
DefaultSegmentSize: bucket.DefaultSegmentsSize,
DefaultRedundancyScheme: &pb.RedundancyScheme{
Type: pb.RedundancyScheme_RS,
MinReq: int32(rs.RequiredShares),
Total: int32(rs.TotalShares),
RepairThreshold: int32(rs.RepairShares),
SuccessThreshold: int32(rs.OptimalShares),
ErasureShareSize: rs.ShareSize,
},
Name: []byte(bucket.Name),
PathCipher: pb.CipherSuite(int(bucket.PathCipher)),
PartnerId: partnerID,
CreatedAt: bucket.Created,
DefaultSegmentSize: bucket.DefaultSegmentsSize,
DefaultRedundancyScheme: rs,
DefaultEncryptionParameters: &pb.EncryptionParameters{
CipherSuite: pb.CipherSuite(int(bucket.DefaultEncryptionParameters.CipherSuite)),
BlockSize: int64(bucket.DefaultEncryptionParameters.BlockSize),
@ -1092,14 +1086,7 @@ func (endpoint *Endpoint) BeginObject(ctx context.Context, req *pb.ObjectBeginRe
}
// use only satellite values for Redundancy Scheme
pbRS := &pb.RedundancyScheme{
Type: pb.RedundancyScheme_RS,
MinReq: int32(endpoint.requiredRSConfig.MinThreshold),
RepairThreshold: int32(endpoint.requiredRSConfig.RepairThreshold),
SuccessThreshold: int32(endpoint.requiredRSConfig.SuccessThreshold),
Total: int32(endpoint.requiredRSConfig.TotalThreshold),
ErasureShareSize: endpoint.requiredRSConfig.ErasureShareSize.Int32(),
}
pbRS := endpoint.redundancyScheme()
streamID, err := endpoint.packStreamID(ctx, &pb.SatStreamID{
Bucket: req.Bucket,
@ -2473,3 +2460,14 @@ func (endpoint *Endpoint) findIndexPreviousLastSegmentWhenNotKnowingNumSegments(
return lastIdxFound, nil
}
func (endpoint *Endpoint) redundancyScheme() *pb.RedundancyScheme {
return &pb.RedundancyScheme{
Type: pb.RedundancyScheme_RS,
MinReq: int32(endpoint.requiredRSConfig.MinThreshold),
RepairThreshold: int32(endpoint.requiredRSConfig.RepairThreshold),
SuccessThreshold: int32(endpoint.requiredRSConfig.SuccessThreshold),
Total: int32(endpoint.requiredRSConfig.TotalThreshold),
ErasureShareSize: endpoint.requiredRSConfig.ErasureShareSize.Int32(),
}
}