2019-03-18 10:55:06 +00:00
|
|
|
// Copyright (C) 2019 Storj Labs, Inc.
|
|
|
|
// See LICENSE for copying information.
|
|
|
|
|
2019-05-08 12:11:59 +01:00
|
|
|
// Package collector implements expired piece deletion from storage node.
|
2019-03-18 10:55:06 +00:00
|
|
|
package collector
|
|
|
|
|
|
|
|
import (
|
2019-05-08 12:11:59 +01:00
|
|
|
"context"
|
2019-03-18 10:55:06 +00:00
|
|
|
"time"
|
|
|
|
|
|
|
|
"go.uber.org/zap"
|
2019-05-08 12:11:59 +01:00
|
|
|
monkit "gopkg.in/spacemonkeygo/monkit.v2"
|
2019-03-18 10:55:06 +00:00
|
|
|
|
2019-05-08 12:11:59 +01:00
|
|
|
"storj.io/storj/internal/memory"
|
|
|
|
"storj.io/storj/internal/sync2"
|
2019-03-18 10:55:06 +00:00
|
|
|
"storj.io/storj/storagenode/pieces"
|
|
|
|
)
|
|
|
|
|
2019-05-08 12:11:59 +01:00
|
|
|
var mon = monkit.Package()
|
|
|
|
|
2019-03-18 10:55:06 +00:00
|
|
|
// Config defines parameters for storage node Collector.
|
|
|
|
type Config struct {
|
2019-05-08 12:11:59 +01:00
|
|
|
Interval time.Duration `help:"how frequently expired pieces are collected" default:"1h0m0s"`
|
2019-03-18 10:55:06 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Service implements collecting expired pieces on the storage node.
|
|
|
|
type Service struct {
|
|
|
|
log *zap.Logger
|
|
|
|
pieces *pieces.Store
|
|
|
|
pieceinfos pieces.DB
|
2019-05-08 12:11:59 +01:00
|
|
|
|
|
|
|
Loop sync2.Cycle
|
2019-03-18 10:55:06 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// NewService creates a new collector service.
|
2019-05-08 12:11:59 +01:00
|
|
|
func NewService(log *zap.Logger, pieces *pieces.Store, pieceinfos pieces.DB, config Config) *Service {
|
2019-03-18 10:55:06 +00:00
|
|
|
return &Service{
|
|
|
|
log: log,
|
|
|
|
pieces: pieces,
|
|
|
|
pieceinfos: pieceinfos,
|
2019-05-08 12:11:59 +01:00
|
|
|
Loop: *sync2.NewCycle(config.Interval),
|
2019-03-18 10:55:06 +00:00
|
|
|
}
|
|
|
|
}
|
2019-05-08 12:11:59 +01:00
|
|
|
|
|
|
|
// Run runs monitor service
|
|
|
|
func (service *Service) Run(ctx context.Context) (err error) {
|
|
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
|
|
|
|
return service.Loop.Run(ctx, func(ctx context.Context) error {
|
|
|
|
err := service.Collect(ctx, time.Now())
|
|
|
|
if err != nil {
|
|
|
|
service.log.Error("error during collecting pieces: ", zap.Error(err))
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
// Close stops the collector service.
|
|
|
|
func (service *Service) Close() (err error) {
|
|
|
|
service.Loop.Close()
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Collect collects pieces that have expired by now.
|
|
|
|
func (service *Service) Collect(ctx context.Context, now time.Time) (err error) {
|
|
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
|
|
|
|
const maxBatches = 100
|
|
|
|
const batchSize = 1000
|
|
|
|
|
|
|
|
var count int64
|
|
|
|
var bytes int64
|
|
|
|
defer func() {
|
|
|
|
if count > 0 {
|
|
|
|
service.log.Info("collect", zap.Int64("count", count), zap.Stringer("size", memory.Size(bytes)))
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
|
|
|
for k := 0; k < maxBatches; k++ {
|
|
|
|
infos, err := service.pieceinfos.GetExpired(ctx, now, batchSize)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
if len(infos) == 0 {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
for _, expired := range infos {
|
|
|
|
err := service.pieces.Delete(ctx, expired.SatelliteID, expired.PieceID)
|
|
|
|
if err != nil {
|
|
|
|
errfailed := service.pieceinfos.DeleteFailed(ctx, expired.SatelliteID, expired.PieceID, now)
|
|
|
|
if errfailed != nil {
|
|
|
|
service.log.Error("unable to update piece info", zap.Stringer("satellite id", expired.SatelliteID), zap.Stringer("piece id", expired.PieceID), zap.Error(errfailed))
|
|
|
|
}
|
|
|
|
service.log.Error("unable to delete piece", zap.Stringer("satellite id", expired.SatelliteID), zap.Stringer("piece id", expired.PieceID), zap.Error(err))
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
err = service.pieceinfos.Delete(ctx, expired.SatelliteID, expired.PieceID)
|
|
|
|
if err != nil {
|
|
|
|
service.log.Error("unable to delete piece info", zap.Stringer("satellite id", expired.SatelliteID), zap.Stringer("piece id", expired.PieceID), zap.Error(err))
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
count++
|
|
|
|
bytes += expired.PieceSize
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|