diff --git a/storagenode/piecestore/endpoint.go b/storagenode/piecestore/endpoint.go index df3c37fc4..45b05fdf3 100644 --- a/storagenode/piecestore/endpoint.go +++ b/storagenode/piecestore/endpoint.go @@ -604,19 +604,18 @@ func (endpoint *Endpoint) Download(stream pb.DRPCPiecestore_DownloadStream) (err } var pieceReader *pieces.Reader + downloadedBytes := make(chan int64, 1) + largestOrder := pb.Order{} defer func() { endTime := time.Now().UTC() dt := endTime.Sub(startTime) - downloadSize := int64(0) - if pieceReader != nil { - downloadSize = pieceReader.Size() - } downloadRate := float64(0) + downloadSize := <-downloadedBytes if dt.Seconds() > 0 { downloadRate = float64(downloadSize) / dt.Seconds() } downloadDuration := dt.Nanoseconds() - if errs2.IsCanceled(err) || drpc.ClosedError.Has(err) { + if errs2.IsCanceled(err) || drpc.ClosedError.Has(err) || (err == nil && chunk.ChunkSize != downloadSize) { mon.Counter("download_cancel_count", actionSeriesTag).Inc(1) mon.Meter("download_cancel_byte_meter", actionSeriesTag).Mark64(downloadSize) mon.IntVal("download_cancel_size_bytes", actionSeriesTag).Observe(downloadSize) @@ -638,6 +637,10 @@ func (endpoint *Endpoint) Download(stream pb.DRPCPiecestore_DownloadStream) (err mon.FloatVal("download_success_rate_bytes_per_sec", actionSeriesTag).Observe(downloadRate) endpoint.log.Info("downloaded", zap.Stringer("Piece ID", limit.PieceId), zap.Stringer("Satellite ID", limit.SatelliteId), zap.Stringer("Action", limit.Action), zap.Int64("Offset", chunk.Offset), zap.Int64("Size", downloadSize), zap.String("Remote Address", remoteAddr)) } + mon.IntVal("download_orders_amount", actionSeriesTag).Observe(largestOrder.Amount) + }() + defer func() { + close(downloadedBytes) }() pieceReader, err = endpoint.store.Reader(ctx, limit.SatelliteId, limit.PieceId) @@ -693,6 +696,11 @@ func (endpoint *Endpoint) Download(stream pb.DRPCPiecestore_DownloadStream) (err currentOffset := chunk.Offset unsentAmount := chunk.ChunkSize + + defer func() { + downloadedBytes <- chunk.ChunkSize - unsentAmount + }() + for unsentAmount > 0 { tryToSend := min(unsentAmount, maximumChunkSize) @@ -700,7 +708,7 @@ func (endpoint *Endpoint) Download(stream pb.DRPCPiecestore_DownloadStream) (err chunkSize, err := throttle.ConsumeOrWait(tryToSend) if err != nil { // this can happen only because uplink decided to close the connection - return nil //nolint: nilerr // We don't need to return an error when client cancels. + return nil // We don't need to return an error when client cancels. } chunkData := make([]byte, chunkSize) @@ -745,7 +753,6 @@ func (endpoint *Endpoint) Download(stream pb.DRPCPiecestore_DownloadStream) (err if err != nil { return err } - largestOrder := pb.Order{} defer commitOrderToStore(ctx, &largestOrder) // ensure that we always terminate sending goroutine