diff --git a/storagenode/piecestore/endpoint.go b/storagenode/piecestore/endpoint.go index 05d3cd7d6..507de81ea 100644 --- a/storagenode/piecestore/endpoint.go +++ b/storagenode/piecestore/endpoint.go @@ -7,6 +7,7 @@ import ( "context" "fmt" "io" + "net" "os" "sync" "sync/atomic" @@ -29,6 +30,7 @@ import ( "storj.io/common/storj" "storj.io/common/sync2" "storj.io/drpc" + "storj.io/drpc/drpcctx" "storj.io/storj/storagenode/bandwidth" "storj.io/storj/storagenode/monitor" "storj.io/storj/storagenode/orders" @@ -153,6 +155,8 @@ func (endpoint *Endpoint) Delete(ctx context.Context, delete *pb.PieceDeleteRequ return nil, rpcstatus.Wrap(rpcstatus.Unauthenticated, err) } + remoteAddrLogField := zap.String("Remote Address", getRemoteAddr(ctx)) + if err := endpoint.store.Delete(ctx, delete.Limit.SatelliteId, delete.Limit.PieceId); err != nil { // explicitly ignoring error because the errors @@ -160,9 +164,9 @@ func (endpoint *Endpoint) Delete(ctx context.Context, delete *pb.PieceDeleteRequ // report rpc status of internal server error or not found error, // e.g. not found might happen when we get a deletion request after garbage // collection has deleted it - endpoint.log.Error("delete failed", zap.Stringer("Satellite ID", delete.Limit.SatelliteId), zap.Stringer("Piece ID", delete.Limit.PieceId), zap.Error(err)) + endpoint.log.Error("delete failed", zap.Stringer("Satellite ID", delete.Limit.SatelliteId), zap.Stringer("Piece ID", delete.Limit.PieceId), remoteAddrLogField, zap.Error(err)) } else { - endpoint.log.Info("deleted", zap.Stringer("Satellite ID", delete.Limit.SatelliteId), zap.Stringer("Piece ID", delete.Limit.PieceId)) + endpoint.log.Info("deleted", zap.Stringer("Satellite ID", delete.Limit.SatelliteId), zap.Stringer("Piece ID", delete.Limit.PieceId), remoteAddrLogField) } return &pb.PieceDeleteResponse{}, nil @@ -321,6 +325,8 @@ func (endpoint *Endpoint) Upload(stream pb.DRPCPiecestore_UploadStream) (err err return rpcstatus.Errorf(rpcstatus.Aborted, "not enough available disk space, have: %v, need: %v", availableSpace, limit.Limit) } + remoteAddrLogField := zap.String("Remote Address", getRemoteAddr(ctx)) + var pieceWriter *pieces.Writer // committed is set to true when the piece is committed. // It is used to distinguish successful pieces where the uplink cancels the connections, @@ -345,21 +351,21 @@ func (endpoint *Endpoint) Upload(stream pb.DRPCPiecestore_UploadStream) (err err mon.IntVal("upload_failure_size_bytes").Observe(uploadSize) mon.IntVal("upload_failure_duration_ns").Observe(uploadDuration) mon.FloatVal("upload_failure_rate_bytes_per_sec").Observe(uploadRate) - endpoint.log.Error("upload failed", zap.Stringer("Piece ID", limit.PieceId), zap.Stringer("Satellite ID", limit.SatelliteId), zap.Stringer("Action", limit.Action), zap.Error(err), zap.Int64("Size", uploadSize)) + endpoint.log.Error("upload failed", zap.Stringer("Piece ID", limit.PieceId), zap.Stringer("Satellite ID", limit.SatelliteId), zap.Stringer("Action", limit.Action), zap.Error(err), zap.Int64("Size", uploadSize), remoteAddrLogField) } else if (errs2.IsCanceled(err) || drpc.ClosedError.Has(err)) && !committed { mon.Counter("upload_cancel_count").Inc(1) mon.Meter("upload_cancel_byte_meter").Mark64(uploadSize) mon.IntVal("upload_cancel_size_bytes").Observe(uploadSize) mon.IntVal("upload_cancel_duration_ns").Observe(uploadDuration) mon.FloatVal("upload_cancel_rate_bytes_per_sec").Observe(uploadRate) - endpoint.log.Info("upload canceled", zap.Stringer("Piece ID", limit.PieceId), zap.Stringer("Satellite ID", limit.SatelliteId), zap.Stringer("Action", limit.Action), zap.Int64("Size", uploadSize)) + endpoint.log.Info("upload canceled", zap.Stringer("Piece ID", limit.PieceId), zap.Stringer("Satellite ID", limit.SatelliteId), zap.Stringer("Action", limit.Action), zap.Int64("Size", uploadSize), remoteAddrLogField) } else { mon.Counter("upload_success_count").Inc(1) mon.Meter("upload_success_byte_meter").Mark64(uploadSize) mon.IntVal("upload_success_size_bytes").Observe(uploadSize) mon.IntVal("upload_success_duration_ns").Observe(uploadDuration) mon.FloatVal("upload_success_rate_bytes_per_sec").Observe(uploadRate) - endpoint.log.Info("uploaded", zap.Stringer("Piece ID", limit.PieceId), zap.Stringer("Satellite ID", limit.SatelliteId), zap.Stringer("Action", limit.Action), zap.Int64("Size", uploadSize)) + endpoint.log.Info("uploaded", zap.Stringer("Piece ID", limit.PieceId), zap.Stringer("Satellite ID", limit.SatelliteId), zap.Stringer("Action", limit.Action), zap.Int64("Size", uploadSize), remoteAddrLogField) } }() @@ -367,7 +373,8 @@ func (endpoint *Endpoint) Upload(stream pb.DRPCPiecestore_UploadStream) (err err zap.Stringer("Piece ID", limit.PieceId), zap.Stringer("Satellite ID", limit.SatelliteId), zap.Stringer("Action", limit.Action), - zap.Int64("Available Space", availableSpace)) + zap.Int64("Available Space", availableSpace), + remoteAddrLogField) mon.Counter("upload_started_count").Inc(1) pieceWriter, err = endpoint.store.Writer(ctx, limit.SatelliteId, limit.PieceId, hashAlgorithm) @@ -569,13 +576,21 @@ func (endpoint *Endpoint) Download(stream pb.DRPCPiecestore_DownloadStream) (err actionSeriesTag := monkit.NewSeriesTag("action", limit.Action.String()) - endpoint.log.Info("download started", zap.Stringer("Piece ID", limit.PieceId), zap.Stringer("Satellite ID", limit.SatelliteId), zap.Stringer("Action", limit.Action)) + remoteAddr := getRemoteAddr(ctx) + endpoint.log.Info("download started", + zap.Stringer("Piece ID", limit.PieceId), + zap.Stringer("Satellite ID", limit.SatelliteId), + zap.Stringer("Action", limit.Action), + zap.Int64("Offset", chunk.Offset), + zap.Int64("Size", chunk.ChunkSize), + zap.String("Remote Address", remoteAddr)) + mon.Counter("download_started_count", actionSeriesTag).Inc(1) if err := endpoint.verifyOrderLimit(ctx, limit); err != nil { mon.Counter("download_failure_count", actionSeriesTag).Inc(1) mon.Meter("download_verify_orderlimit_failed", actionSeriesTag).Mark(1) - endpoint.log.Error("download failed", zap.Stringer("Piece ID", limit.PieceId), zap.Stringer("Satellite ID", limit.SatelliteId), zap.Stringer("Action", limit.Action), zap.Error(err)) + endpoint.log.Error("download failed", zap.Stringer("Piece ID", limit.PieceId), zap.Stringer("Satellite ID", limit.SatelliteId), zap.Stringer("Action", limit.Action), zap.String("Remote Address", remoteAddr), zap.Error(err)) return err } @@ -598,21 +613,21 @@ func (endpoint *Endpoint) Download(stream pb.DRPCPiecestore_DownloadStream) (err mon.IntVal("download_cancel_size_bytes", actionSeriesTag).Observe(downloadSize) mon.IntVal("download_cancel_duration_ns", actionSeriesTag).Observe(downloadDuration) mon.FloatVal("download_cancel_rate_bytes_per_sec", actionSeriesTag).Observe(downloadRate) - endpoint.log.Info("download canceled", zap.Stringer("Piece ID", limit.PieceId), zap.Stringer("Satellite ID", limit.SatelliteId), zap.Stringer("Action", limit.Action)) + endpoint.log.Info("download canceled", zap.Stringer("Piece ID", limit.PieceId), zap.Stringer("Satellite ID", limit.SatelliteId), zap.Stringer("Action", limit.Action), zap.Int64("Offset", chunk.Offset), zap.Int64("Size", downloadSize), zap.String("Remote Address", remoteAddr)) } else if err != nil { mon.Counter("download_failure_count", actionSeriesTag).Inc(1) mon.Meter("download_failure_byte_meter", actionSeriesTag).Mark64(downloadSize) mon.IntVal("download_failure_size_bytes", actionSeriesTag).Observe(downloadSize) mon.IntVal("download_failure_duration_ns", actionSeriesTag).Observe(downloadDuration) mon.FloatVal("download_failure_rate_bytes_per_sec", actionSeriesTag).Observe(downloadRate) - endpoint.log.Error("download failed", zap.Stringer("Piece ID", limit.PieceId), zap.Stringer("Satellite ID", limit.SatelliteId), zap.Stringer("Action", limit.Action), zap.Error(err)) + endpoint.log.Error("download failed", zap.Stringer("Piece ID", limit.PieceId), zap.Stringer("Satellite ID", limit.SatelliteId), zap.Stringer("Action", limit.Action), zap.Int64("Offset", chunk.Offset), zap.Int64("Size", downloadSize), zap.String("Remote Address", remoteAddr), zap.Error(err)) } else { mon.Counter("download_success_count", actionSeriesTag).Inc(1) mon.Meter("download_success_byte_meter", actionSeriesTag).Mark64(downloadSize) mon.IntVal("download_success_size_bytes", actionSeriesTag).Observe(downloadSize) mon.IntVal("download_success_duration_ns", actionSeriesTag).Observe(downloadDuration) mon.FloatVal("download_success_rate_bytes_per_sec", actionSeriesTag).Observe(downloadRate) - endpoint.log.Info("downloaded", zap.Stringer("Piece ID", limit.PieceId), zap.Stringer("Satellite ID", limit.SatelliteId), zap.Stringer("Action", limit.Action)) + endpoint.log.Info("downloaded", zap.Stringer("Piece ID", limit.PieceId), zap.Stringer("Satellite ID", limit.SatelliteId), zap.Stringer("Action", limit.Action), zap.Int64("Offset", chunk.Offset), zap.Int64("Size", downloadSize), zap.String("Remote Address", remoteAddr)) } }() @@ -934,3 +949,13 @@ func (estimate *speedEstimation) EnsureLimit(transferred memory.Size, congested return nil } + +// getRemoteAddr returns the remote address from the request context. +func getRemoteAddr(ctx context.Context) string { + if transport, ok := drpcctx.Transport(ctx); ok { + if conn, ok := transport.(net.Conn); ok { + return conn.RemoteAddr().String() + } + } + return "" +}