satellite/metainfo: use range for specifying download limit
Previously the object range was not used for calculating order limit. This meant that even if you were downloading only a small range it would account bandwidth based on the full segment. This doesn't fully address the accounting since the lazy segment downloads do not send their requested range nor requested limit. Change-Id: Ic811e570c889be87bac4293547d6537a255078da
This commit is contained in:
parent
aa49c8c44d
commit
10a0216af5
@ -133,7 +133,7 @@ func TestDisqualifiedNodesGetNoDownload(t *testing.T) {
|
||||
err = satellitePeer.DB.OverlayCache().DisqualifyNode(ctx, disqualifiedNode)
|
||||
require.NoError(t, err)
|
||||
|
||||
limits, _, err := satellitePeer.Orders.Service.CreateGetOrderLimits(ctx, bucket, segment)
|
||||
limits, _, err := satellitePeer.Orders.Service.CreateGetOrderLimits(ctx, bucket, segment, 0)
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, limits, len(segment.Pieces)-1)
|
||||
|
||||
|
@ -939,8 +939,10 @@ func (endpoint *Endpoint) DownloadObject(ctx context.Context, req *pb.ObjectDown
|
||||
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
|
||||
}
|
||||
|
||||
downloadSizes := endpoint.calculateDownloadSizes(streamRange, segment, object.Encryption)
|
||||
|
||||
// Update the current bandwidth cache value incrementing the SegmentSize.
|
||||
err = endpoint.projectUsage.UpdateProjectBandwidthUsage(ctx, keyInfo.ProjectID, int64(segment.EncryptedSize))
|
||||
err = endpoint.projectUsage.UpdateProjectBandwidthUsage(ctx, keyInfo.ProjectID, downloadSizes.encryptedSize)
|
||||
if err != nil {
|
||||
// log it and continue. it's most likely our own fault that we couldn't
|
||||
// track it, and the only thing that will be affected is our per-project
|
||||
@ -955,7 +957,7 @@ func (endpoint *Endpoint) DownloadObject(ctx context.Context, req *pb.ObjectDown
|
||||
}
|
||||
|
||||
if segment.Inline() {
|
||||
err := endpoint.orders.UpdateGetInlineOrder(ctx, object.Location().Bucket(), int64(len(segment.InlineData)))
|
||||
err := endpoint.orders.UpdateGetInlineOrder(ctx, object.Location().Bucket(), downloadSizes.plainSize)
|
||||
if err != nil {
|
||||
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
|
||||
}
|
||||
@ -978,7 +980,7 @@ func (endpoint *Endpoint) DownloadObject(ctx context.Context, req *pb.ObjectDown
|
||||
}}, nil
|
||||
}
|
||||
|
||||
limits, privateKey, err := endpoint.orders.CreateGetOrderLimits(ctx, object.Location().Bucket(), segment)
|
||||
limits, privateKey, err := endpoint.orders.CreateGetOrderLimits(ctx, object.Location().Bucket(), segment, downloadSizes.orderLimit)
|
||||
if err != nil {
|
||||
if orders.ErrDownloadFailedNotEnoughPieces.Has(err) {
|
||||
endpoint.log.Error("Unable to create order limits.",
|
||||
@ -1063,6 +1065,83 @@ func (endpoint *Endpoint) DownloadObject(ctx context.Context, req *pb.ObjectDown
|
||||
}, nil
|
||||
}
|
||||
|
||||
type downloadSizes struct {
|
||||
// amount of data that uplink eventually gets
|
||||
plainSize int64
|
||||
// amount of data that's present after encryption
|
||||
encryptedSize int64
|
||||
// amount of data that's read from a storage node
|
||||
orderLimit int64
|
||||
}
|
||||
|
||||
func (endpoint *Endpoint) calculateDownloadSizes(streamRange *metabase.StreamRange, segment metabase.Segment, encryptionParams storj.EncryptionParameters) downloadSizes {
|
||||
if segment.Inline() {
|
||||
return downloadSizes{
|
||||
plainSize: int64(len(segment.InlineData)),
|
||||
encryptedSize: int64(segment.EncryptedSize),
|
||||
}
|
||||
}
|
||||
|
||||
// calculate the range inside the given segment
|
||||
readStart := segment.PlainOffset
|
||||
if streamRange != nil && readStart <= streamRange.PlainStart {
|
||||
readStart = streamRange.PlainStart
|
||||
}
|
||||
readLimit := segment.PlainOffset + int64(segment.PlainSize)
|
||||
if streamRange != nil && streamRange.PlainLimit < readLimit {
|
||||
readLimit = streamRange.PlainLimit
|
||||
}
|
||||
|
||||
plainSize := readLimit - readStart
|
||||
|
||||
// calculate the read range given the segment start
|
||||
readStart -= segment.PlainOffset
|
||||
readLimit -= segment.PlainOffset
|
||||
|
||||
// align to encryption block size
|
||||
enc, err := encryption.NewEncrypter(encryptionParams.CipherSuite, &storj.Key{1}, &storj.Nonce{1}, int(encryptionParams.BlockSize))
|
||||
if err != nil {
|
||||
// We ignore the error and fallback to the max amount to download.
|
||||
// It's unlikely that we fail here, but if we do, we don't want to block downloading.
|
||||
endpoint.log.Error("unable to create encrypter", zap.Error(err))
|
||||
return downloadSizes{
|
||||
plainSize: int64(segment.PlainSize),
|
||||
encryptedSize: int64(segment.EncryptedSize),
|
||||
orderLimit: 0,
|
||||
}
|
||||
}
|
||||
|
||||
encryptedStartBlock, encryptedLimitBlock := calculateBlocks(readStart, readLimit, int64(enc.InBlockSize()))
|
||||
encryptedStart, encryptedLimit := encryptedStartBlock*int64(enc.OutBlockSize()), encryptedLimitBlock*int64(enc.OutBlockSize())
|
||||
encryptedSize := encryptedLimit - encryptedStart
|
||||
|
||||
if encryptedSize > int64(segment.EncryptedSize) {
|
||||
encryptedSize = int64(segment.EncryptedSize)
|
||||
}
|
||||
|
||||
// align to blocks
|
||||
stripeSize := int64(segment.Redundancy.StripeSize())
|
||||
stripeStart, stripeLimit := alignToBlock(encryptedStart, encryptedLimit, stripeSize)
|
||||
|
||||
// calculate how much shares we need to download from a node
|
||||
stripeCount := (stripeLimit - stripeStart) / stripeSize
|
||||
orderLimit := stripeCount * int64(segment.Redundancy.ShareSize)
|
||||
|
||||
return downloadSizes{
|
||||
plainSize: plainSize,
|
||||
encryptedSize: encryptedSize,
|
||||
orderLimit: orderLimit,
|
||||
}
|
||||
}
|
||||
|
||||
func calculateBlocks(start, limit int64, blockSize int64) (startBlock, limitBlock int64) {
|
||||
return start / blockSize, (limit + blockSize - 1) / blockSize
|
||||
}
|
||||
|
||||
func alignToBlock(start, limit int64, blockSize int64) (alignedStart, alignedLimit int64) {
|
||||
return (start / blockSize) * blockSize, ((limit + blockSize - 1) / blockSize) * blockSize
|
||||
}
|
||||
|
||||
func calculateStreamRange(object metabase.Object, req *pb.Range) (*metabase.StreamRange, error) {
|
||||
if req == nil || req.Range == nil {
|
||||
return nil, nil
|
||||
@ -2064,7 +2143,7 @@ func (endpoint *Endpoint) DownloadSegment(ctx context.Context, req *pb.SegmentDo
|
||||
}
|
||||
|
||||
// Remote segment
|
||||
limits, privateKey, err := endpoint.orders.CreateGetOrderLimits(ctx, bucket, segment)
|
||||
limits, privateKey, err := endpoint.orders.CreateGetOrderLimits(ctx, bucket, segment, 0)
|
||||
if err != nil {
|
||||
if orders.ErrDownloadFailedNotEnoughPieces.Has(err) {
|
||||
endpoint.log.Error("Unable to create order limits.",
|
||||
|
@ -895,7 +895,7 @@ func TestRemoteSegment(t *testing.T) {
|
||||
uplink := planet.Uplinks[0]
|
||||
|
||||
expectedBucketName := "remote-segments-bucket"
|
||||
err := uplink.Upload(ctx, planet.Satellites[0], expectedBucketName, "file-object", testrand.Bytes(10*memory.KiB))
|
||||
err := uplink.Upload(ctx, planet.Satellites[0], expectedBucketName, "file-object", testrand.Bytes(50*memory.KiB))
|
||||
require.NoError(t, err)
|
||||
|
||||
metainfoClient, err := planet.Uplinks[0].DialMetainfo(ctx, planet.Satellites[0], apiKey)
|
||||
@ -928,6 +928,36 @@ func TestRemoteSegment(t *testing.T) {
|
||||
require.NotEmpty(t, limits)
|
||||
}
|
||||
|
||||
{
|
||||
// Download Object
|
||||
download, err := metainfoClient.DownloadObject(ctx, metaclient.DownloadObjectParams{
|
||||
Bucket: []byte(expectedBucketName),
|
||||
EncryptedObjectKey: items[0].EncryptedPath,
|
||||
Range: metaclient.StreamRange{
|
||||
Mode: metaclient.StreamRangeStartLimit,
|
||||
Start: 1,
|
||||
Limit: 2,
|
||||
},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.Len(t, download.DownloadedSegments, 1)
|
||||
require.NotEmpty(t, download.DownloadedSegments[0].Limits)
|
||||
for _, limit := range download.DownloadedSegments[0].Limits {
|
||||
if limit == nil {
|
||||
continue
|
||||
}
|
||||
// requested download size is
|
||||
// [1:2}
|
||||
// calculating encryption input block size (7408) indices gives us:
|
||||
// 0 and 1
|
||||
// converting these into output block size (7424), gives us:
|
||||
// [0:7424}
|
||||
// this aligned to stripe size (256), gives us:
|
||||
// [0:7424}
|
||||
require.Equal(t, int64(7424), limit.Limit.Limit)
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
// Begin deleting object
|
||||
// List objects
|
||||
|
@ -125,14 +125,17 @@ func (service *Service) updateBandwidth(ctx context.Context, bucket metabase.Buc
|
||||
}
|
||||
|
||||
// CreateGetOrderLimits creates the order limits for downloading the pieces of a segment.
|
||||
func (service *Service) CreateGetOrderLimits(ctx context.Context, bucket metabase.BucketLocation, segment metabase.Segment) (_ []*pb.AddressedOrderLimit, privateKey storj.PiecePrivateKey, err error) {
|
||||
func (service *Service) CreateGetOrderLimits(ctx context.Context, bucket metabase.BucketLocation, segment metabase.Segment, overrideLimit int64) (_ []*pb.AddressedOrderLimit, privateKey storj.PiecePrivateKey, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
redundancy, err := eestream.NewRedundancyStrategyFromStorj(segment.Redundancy)
|
||||
if err != nil {
|
||||
return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
|
||||
}
|
||||
pieceSize := eestream.CalcPieceSize(int64(segment.EncryptedSize), redundancy)
|
||||
orderLimit := eestream.CalcPieceSize(int64(segment.EncryptedSize), redundancy)
|
||||
if overrideLimit > 0 && overrideLimit < orderLimit {
|
||||
orderLimit = overrideLimit
|
||||
}
|
||||
|
||||
nodeIDs := make([]storj.NodeID, len(segment.Pieces))
|
||||
for i, piece := range segment.Pieces {
|
||||
@ -145,7 +148,7 @@ func (service *Service) CreateGetOrderLimits(ctx context.Context, bucket metabas
|
||||
return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
|
||||
}
|
||||
|
||||
signer, err := NewSignerGet(service, segment.RootPieceID, time.Now(), pieceSize, bucket)
|
||||
signer, err := NewSignerGet(service, segment.RootPieceID, time.Now(), orderLimit, bucket)
|
||||
if err != nil {
|
||||
return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
|
||||
}
|
||||
|
@ -38,7 +38,7 @@ func TestOrderLimitsEncryptedMetadata(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 1, len(segments))
|
||||
|
||||
limits, _, err := satellitePeer.Orders.Service.CreateGetOrderLimits(ctx, bucket, segments[0])
|
||||
limits, _, err := satellitePeer.Orders.Service.CreateGetOrderLimits(ctx, bucket, segments[0], 0)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 2, len(limits))
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user