satellite/metainfo: return RS value with DownloadSegment response
Until now we where using single RS per object but it turns out that we need to be able to support RS per segment. We need to give uplink such information while downloading. As an addition we are using RedundancySchemePerSegment flag for GetObject request to detect if we should try to get RS from segment for this request response. Change-Id: I209dad324496ff59b521b11d2343da61dcdbe7f5
This commit is contained in:
parent
61f0fb67a9
commit
dd9ad09301
2
go.mod
2
go.mod
@ -46,7 +46,7 @@ require (
|
||||
golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e
|
||||
google.golang.org/api v0.20.0 // indirect
|
||||
google.golang.org/protobuf v1.25.0 // indirect
|
||||
storj.io/common v0.0.0-20210208122718-577b1f8a0a0f
|
||||
storj.io/common v0.0.0-20210217105242-970e119468ed
|
||||
storj.io/drpc v0.0.16
|
||||
storj.io/monkit-jaeger v0.0.0-20210205021559-85f08034688c
|
||||
storj.io/private v0.0.0-20210203200143-9d2ec06f0d3c
|
||||
|
2
go.sum
2
go.sum
@ -924,6 +924,8 @@ storj.io/common v0.0.0-20200424175742-65ac59022f4f/go.mod h1:pZyXiIE7bGETIRXtfs0
|
||||
storj.io/common v0.0.0-20201026135900-1aaeec90670b/go.mod h1:GqdmNf3fLm2UZX/7Zr0BLFCJ4gFjgm6eHrk/fnmr5jQ=
|
||||
storj.io/common v0.0.0-20210208122718-577b1f8a0a0f h1:O2/ia55Q/xhMBJ/WgeTQBEST7h8IWXZE4FEQyiM+RYc=
|
||||
storj.io/common v0.0.0-20210208122718-577b1f8a0a0f/go.mod h1:b8XP/TdW8OyTZ/J2BDFOIE9KojSUNZgImBFZI99zS04=
|
||||
storj.io/common v0.0.0-20210217105242-970e119468ed h1:hL0mXcag3pydoaRiQ8kYrj+qTSbR2Dp3nrC0penj/f0=
|
||||
storj.io/common v0.0.0-20210217105242-970e119468ed/go.mod h1:b8XP/TdW8OyTZ/J2BDFOIE9KojSUNZgImBFZI99zS04=
|
||||
storj.io/drpc v0.0.11/go.mod h1:TiFc2obNjL9/3isMW1Rpxjy8V9uE0B2HMeMFGiiI7Iw=
|
||||
storj.io/drpc v0.0.14/go.mod h1:82nfl+6YwRwF6UG31cEWWUqv/FaKvP5SGqUvoqTxCMA=
|
||||
storj.io/drpc v0.0.16 h1:9sxypc5lKi/0D69cR21BR0S21+IvXfON8L5nXMVNTwQ=
|
||||
|
@ -823,7 +823,7 @@ func (endpoint *Endpoint) commitObject(ctx context.Context, req *pb.ObjectCommit
|
||||
return &pb.ObjectCommitResponse{}, nil
|
||||
}
|
||||
|
||||
// GetObject gets single object.
|
||||
// GetObject gets single object metadata.
|
||||
func (endpoint *Endpoint) GetObject(ctx context.Context, req *pb.ObjectGetRequest) (resp *pb.ObjectGetResponse, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
@ -847,25 +847,11 @@ func (endpoint *Endpoint) GetObject(ctx context.Context, req *pb.ObjectGetReques
|
||||
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
|
||||
}
|
||||
|
||||
object, err := endpoint.getObject(ctx, keyInfo.ProjectID, req.Bucket, req.EncryptedPath, req.Version)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
endpoint.log.Info("Object Download", zap.Stringer("Project ID", keyInfo.ProjectID), zap.String("operation", "get"), zap.String("type", "object"))
|
||||
mon.Meter("req_get_object").Mark(1)
|
||||
|
||||
return &pb.ObjectGetResponse{
|
||||
Object: object,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (endpoint *Endpoint) getObject(ctx context.Context, projectID uuid.UUID, bucket, encryptedPath []byte, version int32) (*pb.Object, error) {
|
||||
metaObject, err := endpoint.metainfo.metabaseDB.GetObjectLatestVersion(ctx, metabase.GetObjectLatestVersion{
|
||||
mbObject, err := endpoint.metainfo.metabaseDB.GetObjectLatestVersion(ctx, metabase.GetObjectLatestVersion{
|
||||
ObjectLocation: metabase.ObjectLocation{
|
||||
ProjectID: projectID,
|
||||
BucketName: string(bucket),
|
||||
ObjectKey: metabase.ObjectKey(encryptedPath),
|
||||
ProjectID: keyInfo.ProjectID,
|
||||
BucketName: string(req.Bucket),
|
||||
ObjectKey: metabase.ObjectKey(req.EncryptedPath),
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
@ -876,10 +862,12 @@ func (endpoint *Endpoint) getObject(ctx context.Context, projectID uuid.UUID, bu
|
||||
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
|
||||
}
|
||||
|
||||
rs := endpoint.defaultRS
|
||||
if metaObject.SegmentCount > 0 {
|
||||
var segmentRS *pb.RedundancyScheme
|
||||
// TODO we may try to avoid additional request for inline objects
|
||||
if !req.RedundancySchemePerSegment && mbObject.SegmentCount > 0 {
|
||||
segmentRS = endpoint.defaultRS
|
||||
segment, err := endpoint.metainfo.metabaseDB.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
|
||||
StreamID: metaObject.StreamID,
|
||||
StreamID: mbObject.StreamID,
|
||||
Position: metabase.SegmentPosition{
|
||||
Index: 0,
|
||||
},
|
||||
@ -888,7 +876,7 @@ func (endpoint *Endpoint) getObject(ctx context.Context, projectID uuid.UUID, bu
|
||||
// don't fail because its possible that its multipart object
|
||||
endpoint.log.Error("internal", zap.Error(err))
|
||||
} else {
|
||||
rs = &pb.RedundancyScheme{
|
||||
segmentRS = &pb.RedundancyScheme{
|
||||
Type: pb.RedundancyScheme_SchemeType(segment.Redundancy.Algorithm),
|
||||
ErasureShareSize: segment.Redundancy.ShareSize,
|
||||
|
||||
@ -898,15 +886,21 @@ func (endpoint *Endpoint) getObject(ctx context.Context, projectID uuid.UUID, bu
|
||||
Total: int32(segment.Redundancy.TotalShares),
|
||||
}
|
||||
}
|
||||
|
||||
// monitor how many uplinks is still using this additional code
|
||||
mon.Meter("req_get_object_rs_per_object").Mark(1)
|
||||
}
|
||||
|
||||
object, err := endpoint.objectToProto(ctx, metaObject, rs)
|
||||
object, err := endpoint.objectToProto(ctx, mbObject, segmentRS)
|
||||
if err != nil {
|
||||
endpoint.log.Error("internal", zap.Error(err))
|
||||
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
|
||||
}
|
||||
|
||||
return object, nil
|
||||
endpoint.log.Info("Object Download", zap.Stringer("Project ID", keyInfo.ProjectID), zap.String("operation", "get"), zap.String("type", "object"))
|
||||
mon.Meter("req_get_object").Mark(1)
|
||||
|
||||
return &pb.ObjectGetResponse{Object: object}, nil
|
||||
}
|
||||
|
||||
// ListObjects list objects according to specific parameters.
|
||||
@ -1872,6 +1866,15 @@ func (endpoint *Endpoint) DownloadSegment(ctx context.Context, req *pb.SegmentDo
|
||||
|
||||
EncryptedKeyNonce: encryptedKeyNonce,
|
||||
EncryptedKey: segment.EncryptedKey,
|
||||
RedundancyScheme: &pb.RedundancyScheme{
|
||||
Type: pb.RedundancyScheme_SchemeType(segment.Redundancy.Algorithm),
|
||||
ErasureShareSize: segment.Redundancy.ShareSize,
|
||||
|
||||
MinReq: int32(segment.Redundancy.RequiredShares),
|
||||
RepairThreshold: int32(segment.Redundancy.RepairShares),
|
||||
SuccessThreshold: int32(segment.Redundancy.OptimalShares),
|
||||
Total: int32(segment.Redundancy.TotalShares),
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user