storagenode/retain: add more verbose monkit monitoring
Change-Id: Ibb9804268751b4b1842eb729bc510dba83e9b28b
This commit is contained in:
parent
e6fe9d209e
commit
c9cfb5ed0c
@ -735,6 +735,10 @@ func (endpoint *Endpoint) Retain(ctx context.Context, retainReq *pb.RetainReques
|
||||
if err != nil {
|
||||
return nil, rpcstatus.Wrap(rpcstatus.InvalidArgument, err)
|
||||
}
|
||||
filterHashCount, _ := filter.Parameters()
|
||||
mon.IntVal("retain_filter_size").Observe(filter.Size())
|
||||
mon.IntVal("retain_filter_hash_count").Observe(int64(filterHashCount))
|
||||
mon.IntVal("retain_creation_date").Observe(retainReq.CreationDate.Unix())
|
||||
|
||||
// the queue function will update the created before time based on the configurable retain buffer
|
||||
queued := endpoint.retain.Queue(retain.Request{
|
||||
|
@ -138,7 +138,9 @@ func (s *Service) Queue(req Request) bool {
|
||||
}
|
||||
|
||||
// Run listens for queued retain requests and processes them as they come in.
|
||||
func (s *Service) Run(ctx context.Context) error {
|
||||
func (s *Service) Run(ctx context.Context) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
// Hold the lock while we spawn the workers because a concurrent Close call
|
||||
// can race and wait for them. We later temporarily drop the lock while we
|
||||
// wait for the workers to exit.
|
||||
@ -233,7 +235,7 @@ func (s *Service) Run(ctx context.Context) error {
|
||||
|
||||
// Unlock while we wait for the workers to exit.
|
||||
s.cond.L.Unlock()
|
||||
err := s.group.Wait()
|
||||
err = s.group.Wait()
|
||||
s.cond.L.Lock()
|
||||
|
||||
// Clear the queue after Wait has exited. We're sure no more entries
|
||||
@ -347,21 +349,33 @@ func (s *Service) retainPieces(ctx context.Context, req Request) (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
defer mon.Task()(&ctx, req.SatelliteID, req.CreatedBefore, req.Filter.Size())(&err)
|
||||
defer mon.Task()(&ctx, req.SatelliteID, req.CreatedBefore)(&err)
|
||||
|
||||
var piecesCount int64
|
||||
var piecesSkipped int64
|
||||
var piecesToDeleteCount int64
|
||||
numDeleted := 0
|
||||
satelliteID := req.SatelliteID
|
||||
filter := req.Filter
|
||||
|
||||
// subtract some time to leave room for clock difference between the satellite and storage node
|
||||
createdBefore := req.CreatedBefore.Add(-s.config.MaxTimeSkew)
|
||||
started := time.Now().UTC()
|
||||
filterHashCount, _ := req.Filter.Parameters()
|
||||
mon.IntVal("garbage_collection_created_before").Observe(createdBefore.Unix())
|
||||
mon.IntVal("garbage_collection_filter_hash_count").Observe(int64(filterHashCount))
|
||||
mon.IntVal("garbage_collection_filter_size").Observe(filter.Size())
|
||||
mon.IntVal("garbage_collection_started").Observe(started.Unix())
|
||||
|
||||
s.log.Debug("Prepared to run a Retain request.",
|
||||
zap.Time("Created Before", createdBefore),
|
||||
zap.Int64("Filter Size", filter.Size()),
|
||||
zap.Stringer("Satellite ID", satelliteID))
|
||||
|
||||
err = s.store.WalkSatellitePieces(ctx, satelliteID, func(access pieces.StoredPieceAccess) error {
|
||||
err = s.store.WalkSatellitePieces(ctx, satelliteID, func(access pieces.StoredPieceAccess) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
piecesCount++
|
||||
|
||||
// We call Gosched() when done because the GC process is expected to be long and we want to keep it at low priority,
|
||||
// so other goroutines can continue serving requests.
|
||||
defer runtime.Gosched()
|
||||
@ -369,10 +383,12 @@ func (s *Service) retainPieces(ctx context.Context, req Request) (err error) {
|
||||
// of using ModTime in place of the more precise CreationTime.
|
||||
mTime, err := access.ModTime(ctx)
|
||||
if err != nil {
|
||||
piecesSkipped++
|
||||
s.log.Warn("failed to determine mtime of blob", zap.Error(err))
|
||||
// but continue iterating.
|
||||
return nil
|
||||
}
|
||||
|
||||
if !mTime.Before(createdBefore) {
|
||||
return nil
|
||||
}
|
||||
@ -383,9 +399,11 @@ func (s *Service) retainPieces(ctx context.Context, req Request) (err error) {
|
||||
zap.Stringer("Piece ID", pieceID),
|
||||
zap.String("Status", s.config.Status.String()))
|
||||
|
||||
piecesToDeleteCount++
|
||||
|
||||
// if retain status is enabled, delete pieceid
|
||||
if s.config.Status == Enabled {
|
||||
if err = s.store.Trash(ctx, satelliteID, pieceID); err != nil {
|
||||
if err = s.trash(ctx, satelliteID, pieceID); err != nil {
|
||||
s.log.Warn("failed to delete piece",
|
||||
zap.Stringer("Satellite ID", satelliteID),
|
||||
zap.Stringer("Piece ID", pieceID),
|
||||
@ -407,8 +425,18 @@ func (s *Service) retainPieces(ctx context.Context, req Request) (err error) {
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
mon.IntVal("garbage_collection_pieces_count").Observe(piecesCount)
|
||||
mon.IntVal("garbage_collection_pieces_skipped").Observe(piecesSkipped)
|
||||
mon.IntVal("garbage_collection_pieces_to_delete_count").Observe(piecesToDeleteCount)
|
||||
mon.IntVal("garbage_collection_pieces_deleted").Observe(int64(numDeleted))
|
||||
mon.DurationVal("garbage_collection_loop_duration").Observe(time.Now().UTC().Sub(started))
|
||||
s.log.Debug("Moved pieces to trash during retain", zap.Int("num deleted", numDeleted), zap.String("Retain Status", s.config.Status.String()))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// trash wraps retains piece deletion to monitor moving retained piece to trash error during garbage collection.
|
||||
func (s *Service) trash(ctx context.Context, satelliteID storj.NodeID, pieceID storj.PieceID) (err error) {
|
||||
defer mon.Task()(&ctx, satelliteID)(&err)
|
||||
return s.store.Trash(ctx, satelliteID, pieceID)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user