logging: unify logging around satellite ID, node ID and piece ID (#3491)

* logging: unify logging around satellite ID, node ID and piece ID

* unify segment index
This commit is contained in:
littleskunk 2019-11-05 22:04:07 +01:00 committed by GitHub
parent 12cead8f7b
commit 7eb6724c92
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 126 additions and 92 deletions

View File

@ -85,7 +85,7 @@ func (endpoint Endpoint) Sign(ctx context.Context, req *pb.SigningRequest) (_ *p
Token: *token,
}
endpoint.log.Info("certificate successfully signed",
zap.Stringer("node ID", peerIdent.ID),
zap.Stringer("Node ID", peerIdent.ID),
zap.Uint16("difficulty", difficulty),
zap.Stringer("truncated token", tokenFormatter),
)

View File

@ -136,7 +136,7 @@ func cmdGracefulExitInit(cmd *cobra.Command, args []string) error {
}
resp, err := client.initGracefulExit(ctx, req)
if err != nil {
zap.S().Debug("initializing graceful exit failed", zap.String("Satellite ID", id.String()), zap.Error(err))
zap.S().Debug("initializing graceful exit failed", zap.Stringer("Satellite ID", id), zap.Error(err))
errgroup.Add(err)
continue
}

View File

@ -54,19 +54,19 @@ func (endpoint *Endpoint) CheckIn(ctx context.Context, req *pb.CheckInRequest) (
err = endpoint.service.peerIDs.Set(ctx, nodeID, peerID)
if err != nil {
endpoint.log.Info("failed to add peer identity entry for ID", zap.String("node address", req.Address), zap.Stringer("node ID", nodeID), zap.Error(err))
endpoint.log.Info("failed to add peer identity entry for ID", zap.String("node address", req.Address), zap.Stringer("Node ID", nodeID), zap.Error(err))
return nil, rpcstatus.Error(rpcstatus.FailedPrecondition, errCheckInIdentity.New("failed to add peer identity entry for ID: %v", err).Error())
}
lastIP, err := overlay.GetNetwork(ctx, req.Address)
if err != nil {
endpoint.log.Info("failed to resolve IP from address", zap.String("node address", req.Address), zap.Stringer("node ID", nodeID), zap.Error(err))
endpoint.log.Info("failed to resolve IP from address", zap.String("node address", req.Address), zap.Stringer("Node ID", nodeID), zap.Error(err))
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, errCheckInNetwork.New("failed to resolve IP from address: %s, err: %v", req.Address, err).Error())
}
pingNodeSuccess, pingErrorMessage, err := endpoint.pingBack(ctx, req, nodeID)
if err != nil {
endpoint.log.Info("failed to ping back address", zap.String("node address", req.Address), zap.Stringer("node ID", nodeID), zap.Error(err))
endpoint.log.Info("failed to ping back address", zap.String("node address", req.Address), zap.Stringer("Node ID", nodeID), zap.Error(err))
if errPingBackDial.Has(err) {
err = errCheckInNetwork.New("failed dialing address when attempting to ping node (ID: %s): %s, err: %v", nodeID, req.Address, err)
return nil, rpcstatus.Error(rpcstatus.NotFound, err.Error())
@ -88,7 +88,7 @@ func (endpoint *Endpoint) CheckIn(ctx context.Context, req *pb.CheckInRequest) (
}
err = endpoint.service.overlay.UpdateCheckIn(ctx, nodeInfo)
if err != nil {
endpoint.log.Info("failed to update check in", zap.String("node address", req.Address), zap.Stringer("node ID", nodeID), zap.Error(err))
endpoint.log.Info("failed to update check in", zap.String("node address", req.Address), zap.Stringer("Node ID", nodeID), zap.Error(err))
return nil, rpcstatus.Error(rpcstatus.Internal, Error.Wrap(err).Error())
}
@ -113,7 +113,7 @@ func (endpoint *Endpoint) pingBack(ctx context.Context, req *pb.CheckInRequest,
mon.Event("failed dial")
pingNodeSuccess = false
pingErrorMessage = fmt.Sprintf("failed to dial storage node (ID: %s) at address %s: %q", peerID, req.Address, err)
endpoint.log.Info("pingBack failed to dial storage node", zap.Stringer("node ID", peerID), zap.String("node address", req.Address), zap.String("pingErrorMessage", pingErrorMessage), zap.Error(err))
endpoint.log.Info("pingBack failed to dial storage node", zap.Stringer("Node ID", peerID), zap.String("node address", req.Address), zap.String("pingErrorMessage", pingErrorMessage), zap.Error(err))
return pingNodeSuccess, pingErrorMessage, nil
}
defer func() { err = errs.Combine(err, client.Close()) }()
@ -123,7 +123,7 @@ func (endpoint *Endpoint) pingBack(ctx context.Context, req *pb.CheckInRequest,
mon.Event("failed ping node")
pingNodeSuccess = false
pingErrorMessage = fmt.Sprintf("failed to ping storage node, your node indicated error code: %d, %q", rpcstatus.Code(err), err)
endpoint.log.Info("pingBack pingNode error", zap.Stringer("node ID", peerID), zap.String("pingErrorMessage", pingErrorMessage), zap.Error(err))
endpoint.log.Info("pingBack pingNode error", zap.Stringer("Node ID", peerID), zap.String("pingErrorMessage", pingErrorMessage), zap.Error(err))
}
return pingNodeSuccess, pingErrorMessage, nil

View File

@ -126,7 +126,7 @@ func (service *Service) Run(ctx context.Context) (err error) {
limiter.Go(ctx, func() {
err := service.sendRetainRequest(ctx, id, info)
if err != nil {
service.log.Error("error sending retain info to node", zap.Stringer("node ID", id), zap.Error(err))
service.log.Error("error sending retain info to node", zap.Stringer("Node ID", id), zap.Error(err))
}
})
}

View File

@ -194,7 +194,7 @@ func (endpoint *Endpoint) doProcess(stream processStream) (err error) {
// TODO should we error if the node is DQ'd?
nodeID := peer.ID
endpoint.log.Debug("graceful exit process", zap.Stringer("node ID", nodeID))
endpoint.log.Debug("graceful exit process", zap.Stringer("Node ID", nodeID))
// ensure that only one connection can be opened for a single node at a time
if !endpoint.connections.tryAdd(nodeID) {
@ -206,7 +206,7 @@ func (endpoint *Endpoint) doProcess(stream processStream) (err error) {
eofHandler := func(err error) error {
if err == io.EOF {
endpoint.log.Debug("received EOF when trying to receive messages from storage node", zap.Stringer("node ID", nodeID))
endpoint.log.Debug("received EOF when trying to receive messages from storage node", zap.Stringer("Node ID", nodeID))
return nil
}
if err != nil {
@ -308,7 +308,7 @@ func (endpoint *Endpoint) doProcess(stream processStream) (err error) {
}
if len(incomplete) == 0 {
endpoint.log.Debug("no more pieces to transfer for node", zap.Stringer("node ID", nodeID))
endpoint.log.Debug("no more pieces to transfer for node", zap.Stringer("Node ID", nodeID))
processMu.Lock()
morePiecesFlag = false
processMu.Unlock()
@ -521,7 +521,7 @@ func (endpoint *Endpoint) processIncomplete(ctx context.Context, stream processS
}
if nodePiece == nil {
endpoint.log.Debug("piece no longer held by node", zap.Stringer("node ID", nodeID), zap.ByteString("path", incomplete.Path), zap.Int32("piece num", incomplete.PieceNum))
endpoint.log.Debug("piece no longer held by node", zap.Stringer("Node ID", nodeID), zap.ByteString("path", incomplete.Path), zap.Int32("piece num", incomplete.PieceNum))
err = endpoint.db.DeleteTransferQueueItem(ctx, nodeID, incomplete.Path, incomplete.PieceNum)
if err != nil {
@ -537,7 +537,7 @@ func (endpoint *Endpoint) processIncomplete(ctx context.Context, stream processS
}
if len(remote.GetRemotePieces()) > redundancy.OptimalThreshold() {
endpoint.log.Debug("pointer has more pieces than required. removing node from pointer.", zap.Stringer("node ID", nodeID), zap.ByteString("path", incomplete.Path), zap.Int32("piece num", incomplete.PieceNum))
endpoint.log.Debug("pointer has more pieces than required. removing node from pointer.", zap.Stringer("Node ID", nodeID), zap.ByteString("path", incomplete.Path), zap.Int32("piece num", incomplete.PieceNum))
_, err = endpoint.metainfo.UpdatePieces(ctx, string(incomplete.Path), pointer, nil, []*pb.RemotePiece{nodePiece})
if err != nil {
@ -742,12 +742,12 @@ func (endpoint *Endpoint) handleSucceeded(ctx context.Context, stream processStr
func (endpoint *Endpoint) handleFailed(ctx context.Context, pending *pendingMap, nodeID storj.NodeID, message *pb.StorageNodeMessage_Failed) (err error) {
defer mon.Task()(&ctx)(&err)
endpoint.log.Warn("transfer failed", zap.Stringer("piece ID", message.Failed.OriginalPieceId), zap.Stringer("transfer error", message.Failed.GetError()))
endpoint.log.Warn("transfer failed", zap.Stringer("Piece ID", message.Failed.OriginalPieceId), zap.Stringer("transfer error", message.Failed.GetError()))
mon.Meter("graceful_exit_transfer_piece_fail").Mark(1)
pieceID := message.Failed.OriginalPieceId
transfer, ok := pending.get(pieceID)
if !ok {
endpoint.log.Debug("could not find transfer message in pending queue. skipping.", zap.Stringer("piece ID", pieceID))
endpoint.log.Debug("could not find transfer message in pending queue. skipping.", zap.Stringer("Piece ID", pieceID))
// TODO we should probably error out here so we don't get stuck in a loop with a SN that is not behaving properl
}

View File

@ -82,7 +82,7 @@ func (collector *PathCollector) RemoteSegment(ctx context.Context, path metainfo
PieceNum: piece.PieceNum,
DurabilityRatio: float64(numPieces / pointer.GetRemote().GetRedundancy().GetTotal()),
}
collector.log.Debug("adding piece to transfer queue.", zap.String("node ID", piece.NodeId.String()),
collector.log.Debug("adding piece to transfer queue.", zap.Stringer("Node ID", piece.NodeId),
zap.String("path", path.Raw), zap.Int32("piece num", piece.GetPieceNum()),
zap.Int32("num pieces", numPieces), zap.Int32("total possible pieces", pointer.GetRemote().GetRedundancy().GetTotal()))

View File

@ -494,8 +494,8 @@ func (endpoint *Endpoint) filterValidPieces(ctx context.Context, pointer *pb.Poi
peerID, ok := peerIDMap[piece.NodeId]
if !ok {
endpoint.log.Warn("Identity chain unknown for node. Piece removed from pointer",
zap.Stringer("nodeID", piece.NodeId),
zap.Int32("pieceID", piece.PieceNum),
zap.Stringer("Node ID", piece.NodeId),
zap.Int32("Piece ID", piece.PieceNum),
)
invalidPieces = append(invalidPieces, invalidPiece{
@ -909,7 +909,7 @@ func (endpoint *Endpoint) setBucketAttribution(ctx context.Context, header *pb.R
// check if attribution is set for given bucket
_, err = endpoint.partnerinfo.Get(ctx, keyInfo.ProjectID, bucketName)
if err == nil {
endpoint.log.Info("Bucket already attributed", zap.ByteString("bucketName", bucketName), zap.String("partnerID", partnerID.String()))
endpoint.log.Info("Bucket already attributed", zap.ByteString("bucketName", bucketName), zap.Stringer("Partner ID", partnerID))
return nil
}
@ -1498,7 +1498,7 @@ func (endpoint *Endpoint) CommitSegment(ctx context.Context, req *pb.SegmentComm
endpoint.log.Debug("the results of uploaded pieces for the segment is below the redundancy optimal threshold",
zap.Int("upload pieces results", numResults),
zap.Int32("redundancy optimal threshold", streamID.Redundancy.GetSuccessThreshold()),
zap.Stringer("segment ID", req.SegmentId),
zap.Stringer("Segment ID", req.SegmentId),
)
return nil, rpcstatus.Errorf(rpcstatus.InvalidArgument,
"the number of results of uploaded pieces (%d) is below the optimal threshold (%d)",
@ -1568,7 +1568,7 @@ func (endpoint *Endpoint) CommitSegment(ctx context.Context, req *pb.SegmentComm
if exceeded {
endpoint.log.Error("The project limit of storage and bandwidth has been exceeded",
zap.Int64("limit", limit.Int64()),
zap.Stringer("project id", keyInfo.ProjectID),
zap.Stringer("Project ID", keyInfo.ProjectID),
)
return nil, rpcstatus.Error(rpcstatus.ResourceExhausted, "Exceeded Usage Limit")
}
@ -1597,7 +1597,7 @@ func (endpoint *Endpoint) CommitSegment(ctx context.Context, req *pb.SegmentComm
if err := endpoint.projectUsage.AddProjectStorageUsage(ctx, keyInfo.ProjectID, inlineUsed, remoteUsed); err != nil {
endpoint.log.Error("Could not track new storage usage by project",
zap.Stringer("projectID", keyInfo.ProjectID),
zap.Stringer("Project ID", keyInfo.ProjectID),
zap.Error(err),
)
// but continue. it's most likely our own fault that we couldn't track it, and the only thing

View File

@ -315,7 +315,7 @@ func (ec *ECRepairer) Repair(ctx context.Context, limits []*pb.AddressedOrderLim
}
ec.log.Debug("Repair to storage node failed",
zap.Binary("Segment", []byte(path)),
zap.Stringer("NodeID", limits[info.i].GetLimit().StorageNodeId),
zap.Stringer("Node ID", limits[info.i].GetLimit().StorageNodeId),
zap.Error(info.err),
)
continue
@ -380,8 +380,8 @@ func (ec *ECRepairer) putPiece(ctx, parent context.Context, limit *pb.AddressedO
if err != nil {
ec.log.Debug("Failed dialing for putting piece to node",
zap.Binary("Segment", []byte(path)),
zap.Stringer("PieceID", pieceID),
zap.Stringer("NodeID", storageNodeID),
zap.Stringer("Piece ID", pieceID),
zap.Stringer("Node ID", storageNodeID),
zap.Error(err),
)
return nil, err
@ -392,8 +392,8 @@ func (ec *ECRepairer) putPiece(ctx, parent context.Context, limit *pb.AddressedO
if err != nil {
ec.log.Debug("Failed requesting upload of pieces to node",
zap.Binary("Segment", []byte(path)),
zap.Stringer("PieceID", pieceID),
zap.Stringer("NodeID", storageNodeID),
zap.Stringer("Piece ID", pieceID),
zap.Stringer("Node ID", storageNodeID),
zap.Error(err),
)
return nil, err
@ -416,11 +416,11 @@ func (ec *ECRepairer) putPiece(ctx, parent context.Context, limit *pb.AddressedO
if parent.Err() == context.Canceled {
ec.log.Info("Upload to node canceled by user",
zap.Binary("Segment", []byte(path)),
zap.Stringer("NodeID", storageNodeID))
zap.Stringer("Node ID", storageNodeID))
} else {
ec.log.Debug("Node cut from upload due to slow connection",
zap.Binary("Segment", []byte(path)),
zap.Stringer("NodeID", storageNodeID))
zap.Stringer("Node ID", storageNodeID))
}
err = context.Canceled
} else if err != nil {
@ -431,8 +431,8 @@ func (ec *ECRepairer) putPiece(ctx, parent context.Context, limit *pb.AddressedO
ec.log.Debug("Failed uploading piece to node",
zap.Binary("Segment", []byte(path)),
zap.Stringer("PieceID", pieceID),
zap.Stringer("NodeID", storageNodeID),
zap.Stringer("Piece ID", pieceID),
zap.Stringer("Node ID", storageNodeID),
zap.String("Node Address", nodeAddress),
zap.Error(err),
)

View File

@ -95,11 +95,12 @@ func (service *Service) Collect(ctx context.Context, now time.Time) (err error)
if err != nil {
errfailed := service.pieces.DeleteFailed(ctx, expired, now)
if errfailed != nil {
service.log.Error("unable to update piece info", zap.Stringer("satellite id", expired.SatelliteID), zap.Stringer("piece id", expired.PieceID), zap.Error(errfailed))
service.log.Error("unable to update piece info", zap.Stringer("Satellite ID", expired.SatelliteID), zap.Stringer("Piece ID", expired.PieceID), zap.Error(errfailed))
}
service.log.Error("unable to delete piece", zap.Stringer("satellite id", expired.SatelliteID), zap.Stringer("piece id", expired.PieceID), zap.Error(err))
service.log.Error("unable to delete piece", zap.Stringer("Satellite ID", expired.SatelliteID), zap.Stringer("Piece ID", expired.PieceID), zap.Error(err))
continue
}
service.log.Info("delete expired", zap.Stringer("Satellite ID", expired.SatelliteID), zap.Stringer("Piece ID", expired.PieceID))
count++
}

View File

@ -73,7 +73,7 @@ func (chore *Chore) Run(ctx context.Context) (err error) {
chore.cycles = append(chore.cycles, cycle)
cycle.Start(ctx, &group, func(ctx context.Context) error {
chore.log.Debug("starting cycle", zap.Stringer("satellite", satellite))
chore.log.Debug("starting cycle", zap.Stringer("Satellite ID", satellite))
interval := initialBackOff
attempts := 0
for {
@ -82,16 +82,16 @@ func (chore *Chore) Run(ctx context.Context) (err error) {
if err == nil {
return nil
}
chore.log.Error("ping satellite failed ", zap.Stringer("satellite", satellite), zap.Int("attempts", attempts), zap.Error(err))
chore.log.Error("ping satellite failed ", zap.Stringer("Satellite ID", satellite), zap.Int("attempts", attempts), zap.Error(err))
// Sleeps until interval times out, then continue. Returns if context is cancelled.
if !sync2.Sleep(ctx, interval) {
chore.log.Info("context cancelled", zap.Stringer("satellite", satellite))
chore.log.Info("context cancelled", zap.Stringer("Satellite ID", satellite))
return nil
}
interval *= 2
if interval >= chore.interval {
chore.log.Info("retries timed out for this cycle", zap.Stringer("satellite", satellite))
chore.log.Info("retries timed out for this cycle", zap.Stringer("Satellite ID", satellite))
return nil
}
}

View File

@ -80,13 +80,13 @@ func (chore *Chore) Run(ctx context.Context) (err error) {
worker := NewWorker(chore.log, chore.store, chore.satelliteDB, chore.dialer, satelliteID, addr, chore.config)
if _, ok := chore.exitingMap.LoadOrStore(satelliteID, worker); ok {
// already running a worker for this satellite
chore.log.Debug("skipping for satellite, worker already exists.", zap.Stringer("satellite ID", satelliteID))
chore.log.Debug("skipping for satellite, worker already exists.", zap.Stringer("Satellite ID", satelliteID))
continue
}
chore.limiter.Go(ctx, func() {
err := worker.Run(ctx, func() {
chore.log.Debug("finished for satellite.", zap.Stringer("satellite ID", satelliteID))
chore.log.Debug("finished for satellite.", zap.Stringer("Satellite ID", satelliteID))
chore.exitingMap.Delete(satelliteID)
})
if err != nil {

View File

@ -64,13 +64,13 @@ func (e *Endpoint) GetNonExitingSatellites(ctx context.Context, req *pb.GetNonEx
// get domain name
domain, err := e.trust.GetAddress(ctx, trusted)
if err != nil {
e.log.Debug("graceful exit: get satellite domian name", zap.Stringer("satelliteID", trusted), zap.Error(err))
e.log.Debug("graceful exit: get satellite domian name", zap.Stringer("Satellite ID", trusted), zap.Error(err))
continue
}
// get space usage by satellites
spaceUsed, err := e.usageCache.SpaceUsedBySatellite(ctx, trusted)
if err != nil {
e.log.Debug("graceful exit: get space used by satellite", zap.Stringer("satelliteID", trusted), zap.Error(err))
e.log.Debug("graceful exit: get space used by satellite", zap.Stringer("Satellite ID", trusted), zap.Error(err))
continue
}
availableSatellites = append(availableSatellites, &pb.NonExitingSatellite{
@ -87,7 +87,7 @@ func (e *Endpoint) GetNonExitingSatellites(ctx context.Context, req *pb.GetNonEx
// InitiateGracefulExit updates one or more satellites in the storagenode's database to be gracefully exiting.
func (e *Endpoint) InitiateGracefulExit(ctx context.Context, req *pb.InitiateGracefulExitRequest) (*pb.ExitProgress, error) {
e.log.Debug("initialize graceful exit: start", zap.Stringer("satellite ID", req.NodeId))
e.log.Debug("initialize graceful exit: start", zap.Stringer("Satellite ID", req.NodeId))
domain, err := e.trust.GetAddress(ctx, req.NodeId)
if err != nil {
@ -128,7 +128,7 @@ func (e *Endpoint) GetExitProgress(ctx context.Context, req *pb.GetExitProgressR
for _, progress := range exitProgress {
domain, err := e.trust.GetAddress(ctx, progress.SatelliteID)
if err != nil {
e.log.Debug("graceful exit: get satellite domain name", zap.Stringer("satelliteID", progress.SatelliteID), zap.Error(err))
e.log.Debug("graceful exit: get satellite domain name", zap.Stringer("Satellite ID", progress.SatelliteID), zap.Error(err))
continue
}

View File

@ -106,12 +106,17 @@ func (worker *Worker) Run(ctx context.Context, done func()) (err error) {
pieceID := deletePieceMsg.OriginalPieceId
err := worker.deleteOnePieceOrAll(ctx, &pieceID)
if err != nil {
worker.log.Error("failed to delete piece.", zap.Stringer("satellite ID", worker.satelliteID), zap.Stringer("piece ID", pieceID), zap.Error(errs.Wrap(err)))
worker.log.Error("failed to delete piece.",
zap.Stringer("Satellite ID", worker.satelliteID),
zap.Stringer("Piece ID", pieceID),
zap.Error(errs.Wrap(err)))
}
})
case *pb.SatelliteMessage_ExitFailed:
worker.log.Error("graceful exit failed.", zap.Stringer("satellite ID", worker.satelliteID), zap.Stringer("reason", msg.ExitFailed.Reason))
worker.log.Error("graceful exit failed.",
zap.Stringer("Satellite ID", worker.satelliteID),
zap.Stringer("reason", msg.ExitFailed.Reason))
err = worker.satelliteDB.CompleteGracefulExit(ctx, worker.satelliteID, time.Now(), satellites.ExitFailed, msg.ExitFailed.GetExitFailureSignature())
if err != nil {
@ -119,7 +124,7 @@ func (worker *Worker) Run(ctx context.Context, done func()) (err error) {
}
break
case *pb.SatelliteMessage_ExitCompleted:
worker.log.Info("graceful exit completed.", zap.Stringer("satellite ID", worker.satelliteID))
worker.log.Info("graceful exit completed.", zap.Stringer("Satellite ID", worker.satelliteID))
err = worker.satelliteDB.CompleteGracefulExit(ctx, worker.satelliteID, time.Now(), satellites.ExitSucceeded, msg.ExitCompleted.GetExitCompleteSignature())
if err != nil {
@ -133,7 +138,7 @@ func (worker *Worker) Run(ctx context.Context, done func()) (err error) {
break
default:
// TODO handle err
worker.log.Error("unknown graceful exit message.", zap.Stringer("satellite ID", worker.satelliteID))
worker.log.Error("unknown graceful exit message.", zap.Stringer("Satellite ID", worker.satelliteID))
}
}
@ -155,7 +160,10 @@ func (worker *Worker) transferPiece(ctx context.Context, transferPiece *pb.Trans
if errs.Is(err, os.ErrNotExist) {
transferErr = pb.TransferFailed_NOT_FOUND
}
worker.log.Error("failed to get piece reader.", zap.Stringer("satellite ID", worker.satelliteID), zap.Stringer("piece ID", pieceID), zap.Error(errs.Wrap(err)))
worker.log.Error("failed to get piece reader.",
zap.Stringer("Satellite ID", worker.satelliteID),
zap.Stringer("Piece ID", pieceID),
zap.Error(errs.Wrap(err)))
worker.handleFailure(ctx, transferErr, pieceID, c.Send)
return err
}
@ -165,7 +173,10 @@ func (worker *Worker) transferPiece(ctx context.Context, transferPiece *pb.Trans
originalHash, originalOrderLimit, err := worker.getHashAndLimit(ctx, reader, addrLimit.GetLimit())
if err != nil {
worker.log.Error("failed to get piece hash and order limit.", zap.Stringer("satellite ID", worker.satelliteID), zap.Stringer("piece ID", pieceID), zap.Error(errs.Wrap(err)))
worker.log.Error("failed to get piece hash and order limit.",
zap.Stringer("Satellite ID", worker.satelliteID),
zap.Stringer("Piece ID", pieceID),
zap.Error(errs.Wrap(err)))
worker.handleFailure(ctx, pb.TransferFailed_UNKNOWN, pieceID, c.Send)
return err
}
@ -184,10 +195,16 @@ func (worker *Worker) transferPiece(ctx context.Context, transferPiece *pb.Trans
pieceHash, peerID, err := worker.ecclient.PutPiece(putCtx, ctx, addrLimit, pk, reader)
if err != nil {
if piecestore.ErrVerifyUntrusted.Has(err) {
worker.log.Error("failed hash verification.", zap.Stringer("satellite ID", worker.satelliteID), zap.Stringer("piece ID", pieceID), zap.Error(errs.Wrap(err)))
worker.log.Error("failed hash verification.",
zap.Stringer("Satellite ID", worker.satelliteID),
zap.Stringer("Piece ID", pieceID),
zap.Error(errs.Wrap(err)))
worker.handleFailure(ctx, pb.TransferFailed_HASH_VERIFICATION, pieceID, c.Send)
} else {
worker.log.Error("failed to put piece.", zap.Stringer("satellite ID", worker.satelliteID), zap.Stringer("piece ID", pieceID), zap.Error(errs.Wrap(err)))
worker.log.Error("failed to put piece.",
zap.Stringer("Satellite ID", worker.satelliteID),
zap.Stringer("Piece ID", pieceID),
zap.Error(errs.Wrap(err)))
// TODO look at error type to decide on the transfer error
worker.handleFailure(ctx, pb.TransferFailed_STORAGE_NODE_UNAVAILABLE, pieceID, c.Send)
}
@ -195,12 +212,18 @@ func (worker *Worker) transferPiece(ctx context.Context, transferPiece *pb.Trans
}
if !bytes.Equal(originalHash.Hash, pieceHash.Hash) {
worker.log.Error("piece hash from new storagenode does not match", zap.Stringer("storagenode ID", addrLimit.Limit.StorageNodeId), zap.Stringer("satellite ID", worker.satelliteID), zap.Stringer("piece ID", pieceID))
worker.log.Error("piece hash from new storagenode does not match",
zap.Stringer("Storagenode ID", addrLimit.Limit.StorageNodeId),
zap.Stringer("Satellite ID", worker.satelliteID),
zap.Stringer("Piece ID", pieceID))
worker.handleFailure(ctx, pb.TransferFailed_HASH_VERIFICATION, pieceID, c.Send)
return Error.New("piece hash from new storagenode does not match")
}
if pieceHash.PieceId != addrLimit.Limit.PieceId {
worker.log.Error("piece id from new storagenode does not match order limit", zap.Stringer("storagenode ID", addrLimit.Limit.StorageNodeId), zap.Stringer("satellite ID", worker.satelliteID), zap.Stringer("piece ID", pieceID))
worker.log.Error("piece id from new storagenode does not match order limit",
zap.Stringer("Storagenode ID", addrLimit.Limit.StorageNodeId),
zap.Stringer("Satellite ID", worker.satelliteID),
zap.Stringer("Piece ID", pieceID))
worker.handleFailure(ctx, pb.TransferFailed_HASH_VERIFICATION, pieceID, c.Send)
return Error.New("piece id from new storagenode does not match order limit")
}
@ -208,7 +231,11 @@ func (worker *Worker) transferPiece(ctx context.Context, transferPiece *pb.Trans
signee := signing.SigneeFromPeerIdentity(peerID)
err = signing.VerifyPieceHashSignature(ctx, signee, pieceHash)
if err != nil {
worker.log.Error("invalid piece hash signature from new storagenode", zap.Stringer("storagenode ID", addrLimit.Limit.StorageNodeId), zap.Stringer("satellite ID", worker.satelliteID), zap.Stringer("piece ID", pieceID), zap.Error(errs.Wrap(err)))
worker.log.Error("invalid piece hash signature from new storagenode",
zap.Stringer("Storagenode ID", addrLimit.Limit.StorageNodeId),
zap.Stringer("Satellite ID", worker.satelliteID),
zap.Stringer("Piece ID", pieceID),
zap.Error(errs.Wrap(err)))
worker.handleFailure(ctx, pb.TransferFailed_HASH_VERIFICATION, pieceID, c.Send)
return err
}
@ -258,14 +285,20 @@ func (worker *Worker) deleteOnePieceOrAll(ctx context.Context, pieceID *storj.Pi
}
err := worker.store.Delete(ctx, worker.satelliteID, id)
if err != nil {
worker.log.Debug("failed to delete a piece", zap.Stringer("Satellite ID", worker.satelliteID), zap.Stringer("Piece ID", id), zap.Error(err))
worker.log.Debug("failed to delete a piece",
zap.Stringer("Satellite ID", worker.satelliteID),
zap.Stringer("Piece ID", id),
zap.Error(err))
err = worker.store.DeleteFailed(ctx, pieces.ExpiredInfo{
SatelliteID: worker.satelliteID,
PieceID: id,
InPieceInfo: true,
}, time.Now().UTC())
if err != nil {
worker.log.Debug("failed to mark a deletion failure for a piece", zap.Stringer("Satellite ID", worker.satelliteID), zap.Stringer("Piece ID", id), zap.Error(err))
worker.log.Debug("failed to mark a deletion failure for a piece",
zap.Stringer("Satellite ID", worker.satelliteID),
zap.Stringer("Piece ID", id),
zap.Error(err))
}
continue
}
@ -288,7 +321,7 @@ func (worker *Worker) handleFailure(ctx context.Context, transferError pb.Transf
sendErr := send(failure)
if sendErr != nil {
worker.log.Error("unable to send failure.", zap.Stringer("satellite ID", worker.satelliteID))
worker.log.Error("unable to send failure.", zap.Stringer("Satellite ID", worker.satelliteID))
}
}

View File

@ -459,7 +459,7 @@ func (store *Store) SpaceUsedBySatellite(ctx context.Context, satelliteID storj.
err := store.WalkSatellitePieces(ctx, satelliteID, func(access StoredPieceAccess) error {
contentSize, statErr := access.ContentSize(ctx)
if statErr != nil {
store.log.Error("failed to stat", zap.Error(statErr), zap.String("pieceID", access.PieceID().String()), zap.String("satellite", satelliteID.String()))
store.log.Error("failed to stat", zap.Error(statErr), zap.Stringer("Piece ID", access.PieceID()), zap.Stringer("Satellite ID", satelliteID))
// keep iterating; we want a best effort total here.
return nil
}

View File

@ -156,11 +156,11 @@ func (endpoint *Endpoint) Delete(ctx context.Context, delete *pb.PieceDeleteRequ
if err := endpoint.store.Delete(ctx, delete.Limit.SatelliteId, delete.Limit.PieceId); err != nil {
// explicitly ignoring error because the errors
// TODO: add more debug info
endpoint.log.Error("delete failed", zap.Stringer("Piece ID", delete.Limit.PieceId), zap.Error(err))
endpoint.log.Error("delete failed", zap.Stringer("Satellite ID", delete.Limit.SatelliteId), zap.Stringer("Piece ID", delete.Limit.PieceId), zap.Error(err))
// TODO: report rpc status of internal server error or missing error,
// e.g. missing might happen when we get a deletion request after garbage collection has deleted it
} else {
endpoint.log.Info("deleted", zap.Stringer("Piece ID", delete.Limit.PieceId))
endpoint.log.Info("deleted", zap.Stringer("Satellite ID", delete.Limit.SatelliteId), zap.Stringer("Piece ID", delete.Limit.PieceId))
}
return &pb.PieceDeleteResponse{}, nil
@ -216,7 +216,7 @@ func (endpoint *Endpoint) doUpload(stream uploadStream, requestLimit int) (err e
return ErrProtocol.New("expected order limit as the first message")
}
limit := message.Limit
endpoint.log.Info("upload started", zap.Stringer("Piece ID", limit.PieceId), zap.Stringer("SatelliteID", limit.SatelliteId), zap.Stringer("Action", limit.Action))
endpoint.log.Info("upload started", zap.Stringer("Piece ID", limit.PieceId), zap.Stringer("Satellite ID", limit.SatelliteId), zap.Stringer("Action", limit.Action))
// TODO: verify that we have have expected amount of storage before continuing
@ -247,13 +247,13 @@ func (endpoint *Endpoint) doUpload(stream uploadStream, requestLimit int) (err e
mon.IntVal("upload_failure_size_bytes").Observe(uploadSize)
mon.IntVal("upload_failure_duration_ns").Observe(uploadDuration)
mon.FloatVal("upload_failure_rate_bytes_per_sec").Observe(uploadRate)
endpoint.log.Info("upload failed", zap.Stringer("Piece ID", limit.PieceId), zap.Stringer("SatelliteID", limit.SatelliteId), zap.Stringer("Action", limit.Action), zap.Error(err))
endpoint.log.Info("upload failed", zap.Stringer("Piece ID", limit.PieceId), zap.Stringer("Satellite ID", limit.SatelliteId), zap.Stringer("Action", limit.Action), zap.Error(err))
} else {
mon.Meter("upload_success_byte_meter").Mark64(uploadSize)
mon.IntVal("upload_success_size_bytes").Observe(uploadSize)
mon.IntVal("upload_success_duration_ns").Observe(uploadDuration)
mon.FloatVal("upload_success_rate_bytes_per_sec").Observe(uploadRate)
endpoint.log.Info("uploaded", zap.Stringer("Piece ID", limit.PieceId), zap.Stringer("SatelliteID", limit.SatelliteId), zap.Stringer("Action", limit.Action))
endpoint.log.Info("uploaded", zap.Stringer("Piece ID", limit.PieceId), zap.Stringer("Satellite ID", limit.SatelliteId), zap.Stringer("Action", limit.Action))
}
}()
@ -431,7 +431,7 @@ func (endpoint *Endpoint) doDownload(stream downloadStream) (err error) {
}
limit, chunk := message.Limit, message.Chunk
endpoint.log.Info("download started", zap.Stringer("Piece ID", limit.PieceId), zap.Stringer("SatelliteID", limit.SatelliteId), zap.Stringer("Action", limit.Action))
endpoint.log.Info("download started", zap.Stringer("Piece ID", limit.PieceId), zap.Stringer("Satellite ID", limit.SatelliteId), zap.Stringer("Action", limit.Action))
if limit.Action != pb.PieceAction_GET && limit.Action != pb.PieceAction_GET_REPAIR && limit.Action != pb.PieceAction_GET_AUDIT {
return ErrProtocol.New("expected get or get repair or audit action got %v", limit.Action) // TODO: report rpc status unauthorized or bad request
@ -463,13 +463,13 @@ func (endpoint *Endpoint) doDownload(stream downloadStream) (err error) {
mon.IntVal("download_failure_size_bytes").Observe(downloadSize)
mon.IntVal("download_failure_duration_ns").Observe(downloadDuration)
mon.FloatVal("download_failure_rate_bytes_per_sec").Observe(downloadRate)
endpoint.log.Info("download failed", zap.Stringer("Piece ID", limit.PieceId), zap.Stringer("SatelliteID", limit.SatelliteId), zap.Stringer("Action", limit.Action), zap.Error(err))
endpoint.log.Info("download failed", zap.Stringer("Piece ID", limit.PieceId), zap.Stringer("Satellite ID", limit.SatelliteId), zap.Stringer("Action", limit.Action), zap.Error(err))
} else {
mon.Meter("download_success_byte_meter").Mark64(downloadSize)
mon.IntVal("download_success_size_bytes").Observe(downloadSize)
mon.IntVal("download_success_duration_ns").Observe(downloadDuration)
mon.FloatVal("download_success_rate_bytes_per_sec").Observe(downloadRate)
endpoint.log.Info("downloaded", zap.Stringer("Piece ID", limit.PieceId), zap.Stringer("SatelliteID", limit.SatelliteId), zap.Stringer("Action", limit.Action))
endpoint.log.Info("downloaded", zap.Stringer("Piece ID", limit.PieceId), zap.Stringer("Satellite ID", limit.SatelliteId), zap.Stringer("Action", limit.Action))
}
}()
@ -690,7 +690,7 @@ func (endpoint *Endpoint) Retain(ctx context.Context, retainReq *pb.RetainReques
Filter: filter,
})
if !queued {
endpoint.log.Debug("Retain job not queued for satellite", zap.String("satellite ID", peer.ID.String()))
endpoint.log.Debug("Retain job not queued for satellite", zap.Stringer("Satellite ID", peer.ID))
}
return &pb.RetainResponse{}, nil

View File

@ -357,9 +357,9 @@ func (s *Service) retainPieces(ctx context.Context, req Request) (err error) {
createdBefore := req.CreatedBefore.Add(-s.config.MaxTimeSkew)
s.log.Debug("Prepared to run a Retain request.",
zap.Time("createdBefore", createdBefore),
zap.Int64("filterSize", filter.Size()),
zap.String("satellite", satelliteID.String()))
zap.Time("Created Before", createdBefore),
zap.Int64("Filter Size", filter.Size()),
zap.Stringer("Satellite ID", satelliteID))
err = s.store.WalkSatellitePieces(ctx, satelliteID, func(access pieces.StoredPieceAccess) error {
// We call Gosched() when done because the GC process is expected to be long and we want to keep it at low priority,
@ -379,16 +379,16 @@ func (s *Service) retainPieces(ctx context.Context, req Request) (err error) {
pieceID := access.PieceID()
if !filter.Contains(pieceID) {
s.log.Debug("About to delete piece id",
zap.String("satellite", satelliteID.String()),
zap.String("pieceID", pieceID.String()),
zap.String("status", s.config.Status.String()))
zap.Stringer("Satellite ID", satelliteID),
zap.Stringer("Piece ID", pieceID),
zap.String("Status", s.config.Status.String()))
// if retain status is enabled, delete pieceid
if s.config.Status == Enabled {
if err = s.store.Delete(ctx, satelliteID, pieceID); err != nil {
s.log.Warn("failed to delete piece",
zap.String("satellite", satelliteID.String()),
zap.String("pieceID", pieceID.String()),
zap.Stringer("Satellite ID", satelliteID),
zap.Stringer("Piece ID", pieceID),
zap.Error(err))
return nil
}
@ -408,7 +408,7 @@ func (s *Service) retainPieces(ctx context.Context, req Request) (err error) {
return Error.Wrap(err)
}
mon.IntVal("garbage_collection_pieces_deleted").Observe(int64(numDeleted))
s.log.Debug("Deleted pieces during retain", zap.Int("num deleted", numDeleted), zap.String("retain status", s.config.Status.String()))
s.log.Debug("Deleted pieces during retain", zap.Int("num deleted", numDeleted), zap.String("Retain Status", s.config.Status.String()))
return nil
}

View File

@ -131,7 +131,7 @@ func (ec *ecClient) Put(ctx context.Context, limits []*pb.AddressedOrderLimit, p
cancellationCount++
}
ec.log.Debug("Upload to storage node failed",
zap.String("NodeID", limits[info.i].GetLimit().StorageNodeId.String()),
zap.Stringer("Node ID", limits[info.i].GetLimit().StorageNodeId),
zap.Error(info.err),
)
continue
@ -200,8 +200,8 @@ func (ec *ecClient) PutPiece(ctx, parent context.Context, limit *pb.AddressedOrd
})
if err != nil {
ec.log.Debug("Failed dialing for putting piece to node",
zap.String("PieceID", pieceID.String()),
zap.String("NodeID", storageNodeID.String()),
zap.Stringer("Piece ID", pieceID),
zap.Stringer("Node ID", storageNodeID),
zap.Error(err),
)
return nil, nil, err
@ -211,7 +211,7 @@ func (ec *ecClient) PutPiece(ctx, parent context.Context, limit *pb.AddressedOrd
peerID, err = ps.GetPeerIdentity()
if err != nil {
ec.log.Debug("Failed getting peer identity from node connection",
zap.String("NodeID", storageNodeID.String()),
zap.Stringer("Node ID", storageNodeID),
zap.Error(err),
)
return nil, nil, err
@ -220,8 +220,8 @@ func (ec *ecClient) PutPiece(ctx, parent context.Context, limit *pb.AddressedOrd
upload, err := ps.Upload(ctx, limit.GetLimit(), privateKey)
if err != nil {
ec.log.Debug("Failed requesting upload of pieces to node",
zap.String("PieceID", pieceID.String()),
zap.String("NodeID", storageNodeID.String()),
zap.Stringer("Piece ID", pieceID),
zap.Stringer("Node ID", storageNodeID),
zap.Error(err),
)
return nil, nil, err
@ -242,9 +242,9 @@ func (ec *ecClient) PutPiece(ctx, parent context.Context, limit *pb.AddressedOrd
if err != nil {
if errs2.IsCanceled(err) {
if parent.Err() == context.Canceled {
ec.log.Info("Upload to node canceled by user", zap.Stringer("NodeID", storageNodeID))
ec.log.Info("Upload to node canceled by user", zap.Stringer("Node ID", storageNodeID))
} else {
ec.log.Debug("Node cut from upload due to slow connection", zap.Stringer("NodeID", storageNodeID))
ec.log.Debug("Node cut from upload due to slow connection", zap.Stringer("Node ID", storageNodeID))
}
} else {
nodeAddress := ""
@ -253,8 +253,8 @@ func (ec *ecClient) PutPiece(ctx, parent context.Context, limit *pb.AddressedOrd
}
ec.log.Debug("Failed uploading piece to node",
zap.Stringer("PieceID", pieceID),
zap.Stringer("NodeID", storageNodeID),
zap.Stringer("Piece ID", pieceID),
zap.Stringer("Node ID", storageNodeID),
zap.String("Node Address", nodeAddress),
zap.Error(err),
)
@ -330,8 +330,8 @@ func (ec *ecClient) Delete(ctx context.Context, limits []*pb.AddressedOrderLimit
})
if err != nil {
ec.log.Debug("Failed dialing for deleting piece from node",
zap.String("PieceID", limit.PieceId.String()),
zap.String("NodeID", limit.StorageNodeId.String()),
zap.Stringer("Piece ID", limit.PieceId),
zap.Stringer("Node ID", limit.StorageNodeId),
zap.Error(err),
)
errch <- err
@ -342,8 +342,8 @@ func (ec *ecClient) Delete(ctx context.Context, limits []*pb.AddressedOrderLimit
err = errs.Combine(err, ps.Close())
if err != nil {
ec.log.Debug("Failed deleting piece from node",
zap.String("PieceID", limit.PieceId.String()),
zap.String("NodeID", limit.StorageNodeId.String()),
zap.Stringer("Piece ID", limit.PieceId),
zap.Stringer("Node ID", limit.StorageNodeId),
zap.Error(err),
)
}

View File

@ -771,7 +771,7 @@ func (s *streamStore) cancelHandler(ctx context.Context, streamID storj.StreamID
for i := int64(0); i < totalSegments; i++ {
err := s.segments.Delete(ctx, streamID, int32(i))
if err != nil {
zap.L().Warn("Failed deleting segment", zap.String("path", path.String()), zap.Int64("segmentIndex", i), zap.Error(err))
zap.L().Warn("Failed deleting segment", zap.Stringer("path", path), zap.Int64("Segment Index", i), zap.Error(err))
continue
}
}