storagenode: fix piecestore download metrics

Storagenode download metrics are not accurate:

 * the current metric bump cancel metrics only for specific error messages, but there are cases where the error is already handled (err == nill)
 * instead of the full size of the piece we need to use the size of the downloaded bytes

Change-Id: I6ca75770e2d40bf514f5e273785c78e02968c919
This commit is contained in:
Márton Elek 2022-10-25 00:03:56 +02:00 committed by Elek, Márton
parent f13d0f7df0
commit 644aca0e42

View File

@ -604,19 +604,18 @@ func (endpoint *Endpoint) Download(stream pb.DRPCPiecestore_DownloadStream) (err
}
var pieceReader *pieces.Reader
downloadedBytes := make(chan int64, 1)
largestOrder := pb.Order{}
defer func() {
endTime := time.Now().UTC()
dt := endTime.Sub(startTime)
downloadSize := int64(0)
if pieceReader != nil {
downloadSize = pieceReader.Size()
}
downloadRate := float64(0)
downloadSize := <-downloadedBytes
if dt.Seconds() > 0 {
downloadRate = float64(downloadSize) / dt.Seconds()
}
downloadDuration := dt.Nanoseconds()
if errs2.IsCanceled(err) || drpc.ClosedError.Has(err) {
if errs2.IsCanceled(err) || drpc.ClosedError.Has(err) || (err == nil && chunk.ChunkSize != downloadSize) {
mon.Counter("download_cancel_count", actionSeriesTag).Inc(1)
mon.Meter("download_cancel_byte_meter", actionSeriesTag).Mark64(downloadSize)
mon.IntVal("download_cancel_size_bytes", actionSeriesTag).Observe(downloadSize)
@ -638,6 +637,10 @@ func (endpoint *Endpoint) Download(stream pb.DRPCPiecestore_DownloadStream) (err
mon.FloatVal("download_success_rate_bytes_per_sec", actionSeriesTag).Observe(downloadRate)
endpoint.log.Info("downloaded", zap.Stringer("Piece ID", limit.PieceId), zap.Stringer("Satellite ID", limit.SatelliteId), zap.Stringer("Action", limit.Action), zap.Int64("Offset", chunk.Offset), zap.Int64("Size", downloadSize), zap.String("Remote Address", remoteAddr))
}
mon.IntVal("download_orders_amount", actionSeriesTag).Observe(largestOrder.Amount)
}()
defer func() {
close(downloadedBytes)
}()
pieceReader, err = endpoint.store.Reader(ctx, limit.SatelliteId, limit.PieceId)
@ -693,6 +696,11 @@ func (endpoint *Endpoint) Download(stream pb.DRPCPiecestore_DownloadStream) (err
currentOffset := chunk.Offset
unsentAmount := chunk.ChunkSize
defer func() {
downloadedBytes <- chunk.ChunkSize - unsentAmount
}()
for unsentAmount > 0 {
tryToSend := min(unsentAmount, maximumChunkSize)
@ -700,7 +708,7 @@ func (endpoint *Endpoint) Download(stream pb.DRPCPiecestore_DownloadStream) (err
chunkSize, err := throttle.ConsumeOrWait(tryToSend)
if err != nil {
// this can happen only because uplink decided to close the connection
return nil //nolint: nilerr // We don't need to return an error when client cancels.
return nil // We don't need to return an error when client cancels.
}
chunkData := make([]byte, chunkSize)
@ -745,7 +753,6 @@ func (endpoint *Endpoint) Download(stream pb.DRPCPiecestore_DownloadStream) (err
if err != nil {
return err
}
largestOrder := pb.Order{}
defer commitOrderToStore(ctx, &largestOrder)
// ensure that we always terminate sending goroutine