storj/storagenode/collector/service.go

118 lines
3.2 KiB
Go
Raw Normal View History

// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
2019-05-08 12:11:59 +01:00
// Package collector implements expired piece deletion from storage node.
package collector
import (
2019-05-08 12:11:59 +01:00
"context"
"os"
"time"
"github.com/spacemonkeygo/monkit/v3"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/common/sync2"
"storj.io/storj/storagenode/pieces"
"storj.io/storj/storagenode/piecestore/usedserials"
)
2019-05-08 12:11:59 +01:00
var mon = monkit.Package()
// Config defines parameters for storage node Collector.
type Config struct {
2019-05-08 12:11:59 +01:00
Interval time.Duration `help:"how frequently expired pieces are collected" default:"1h0m0s"`
}
// Service implements collecting expired pieces on the storage node.
2019-09-10 14:24:16 +01:00
//
// architecture: Chore
type Service struct {
log *zap.Logger
pieces *pieces.Store
usedSerials *usedserials.Table
2019-05-08 12:11:59 +01:00
Loop *sync2.Cycle
}
// NewService creates a new collector service.
func NewService(log *zap.Logger, pieces *pieces.Store, usedSerials *usedserials.Table, config Config) *Service {
return &Service{
log: log,
pieces: pieces,
usedSerials: usedSerials,
Loop: sync2.NewCycle(config.Interval),
}
}
2019-05-08 12:11:59 +01:00
// Run runs collector service.
2019-05-08 12:11:59 +01:00
func (service *Service) Run(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
return service.Loop.Run(ctx, func(ctx context.Context) error {
// V3-3143 Pieces should be collected at least 24 hours after expiration
// to avoid premature deletion due to timezone issues, which may lead to
// storage node disqualification.
err := service.Collect(ctx, time.Now().Add(-24*time.Hour))
2019-05-08 12:11:59 +01:00
if err != nil {
service.log.Error("error during collecting pieces: ", zap.Error(err))
}
return nil
})
}
// Close stops the collector service.
func (service *Service) Close() (err error) {
service.Loop.Close()
return nil
}
// Collect collects pieces that have expired by now.
func (service *Service) Collect(ctx context.Context, now time.Time) (err error) {
defer mon.Task()(&ctx)(&err)
service.usedSerials.DeleteExpired(now)
2019-05-08 12:11:59 +01:00
const maxBatches = 100
const batchSize = 1000
var count int64
defer func() {
if count > 0 {
service.log.Info("collect", zap.Int64("count", count))
2019-05-08 12:11:59 +01:00
}
}()
for k := 0; k < maxBatches; k++ {
infos, err := service.pieces.GetExpired(ctx, now, batchSize)
2019-05-08 12:11:59 +01:00
if err != nil {
return err
}
if len(infos) == 0 {
return nil
}
for _, expired := range infos {
err := service.pieces.Delete(ctx, expired.SatelliteID, expired.PieceID)
if err != nil {
if errs.Is(err, os.ErrNotExist) {
service.log.Info("file does not exist", zap.Stringer("Satellite ID", expired.SatelliteID), zap.Stringer("Piece ID", expired.PieceID))
continue
}
errfailed := service.pieces.DeleteFailed(ctx, expired, now)
2019-05-08 12:11:59 +01:00
if errfailed != nil {
service.log.Error("unable to update piece info", zap.Stringer("Satellite ID", expired.SatelliteID), zap.Stringer("Piece ID", expired.PieceID), zap.Error(errfailed))
2019-05-08 12:11:59 +01:00
}
service.log.Error("unable to delete piece", zap.Stringer("Satellite ID", expired.SatelliteID), zap.Stringer("Piece ID", expired.PieceID), zap.Error(err))
2019-05-08 12:11:59 +01:00
continue
}
service.log.Info("delete expired", zap.Stringer("Satellite ID", expired.SatelliteID), zap.Stringer("Piece ID", expired.PieceID))
2019-05-08 12:11:59 +01:00
count++
}
}
return nil
}