storagenode/piecestore: notify if download piece was restored from trash

Updates https://github.com/storj/storj/issues/6146

Change-Id: Iece285eb5ecb6898b29096416ab10e43338480b0
This commit is contained in:
Clement Sam 2023-09-26 12:49:34 +00:00 committed by Storj Robot
parent e072b37a86
commit d432a7197a

View File

@ -665,6 +665,7 @@ func (endpoint *Endpoint) Download(stream pb.DRPCPiecestore_DownloadStream) (err
close(downloadedBytes)
}()
restoredFromTrash := false
pieceReader, err = endpoint.store.Reader(ctx, limit.SatelliteId, limit.PieceId)
if err != nil {
if !errs.Is(err, fs.ErrNotExist) {
@ -683,6 +684,7 @@ func (endpoint *Endpoint) Download(stream pb.DRPCPiecestore_DownloadStream) (err
endpoint.monitor.VerifyDirReadableLoop.TriggerWait()
return rpcstatus.Wrap(rpcstatus.NotFound, tryRestoreErr)
}
restoredFromTrash = true
mon.Meter("download_file_in_trash", monkit.NewSeriesTag("namespace", limit.SatelliteId.String()), monkit.NewSeriesTag("piece_id", limit.PieceId.String())).Mark(1)
endpoint.log.Warn("file found in trash", zap.Stringer("Piece ID", limit.PieceId), zap.Stringer("Satellite ID", limit.SatelliteId), zap.Stringer("Action", limit.Action), zap.String("Remote Address", remoteAddr))
@ -713,12 +715,21 @@ func (endpoint *Endpoint) Download(stream pb.DRPCPiecestore_DownloadStream) (err
}
err = rpctimeout.Run(ctx, endpoint.config.StreamOperationTimeout, func(_ context.Context) (err error) {
return stream.Send(&pb.PieceDownloadResponse{Hash: &pieceHash, Limit: &orderLimit})
return stream.Send(&pb.PieceDownloadResponse{Hash: &pieceHash, Limit: &orderLimit, RestoredFromTrash: restoredFromTrash})
})
if err != nil {
endpoint.log.Error("error sending hash and order limit", zap.Error(err))
return rpcstatus.Wrap(rpcstatus.Internal, err)
}
} else if restoredFromTrash {
// notify that the piece was restored from trash
err = rpctimeout.Run(ctx, endpoint.config.StreamOperationTimeout, func(_ context.Context) (err error) {
return stream.Send(&pb.PieceDownloadResponse{RestoredFromTrash: restoredFromTrash})
})
if err != nil {
endpoint.log.Error("error sending response", zap.Error(err))
return rpcstatus.Wrap(rpcstatus.Internal, err)
}
}
// TODO: verify chunk.Size behavior logic with regards to reading all