2019-07-24 18:26:43 +01:00
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package gc
import (
"context"
"time"
2019-11-08 20:40:39 +00:00
"github.com/spacemonkeygo/monkit/v3"
2019-07-24 18:26:43 +01:00
"github.com/zeebo/errs"
"go.uber.org/zap"
2019-12-27 11:48:47 +00:00
"storj.io/common/bloomfilter"
"storj.io/common/pb"
"storj.io/common/rpc"
"storj.io/common/storj"
"storj.io/common/sync2"
2019-07-24 18:26:43 +01:00
"storj.io/storj/satellite/metainfo"
2019-07-28 06:55:36 +01:00
"storj.io/storj/satellite/overlay"
2020-02-21 14:07:29 +00:00
"storj.io/uplink/private/piecestore"
2019-07-24 18:26:43 +01:00
)
var (
// Error defines the gc service errors class
Error = errs . Class ( "gc service error" )
mon = monkit . Package ( )
)
2020-07-16 15:18:02 +01:00
// Config contains configurable values for garbage collection.
2019-07-24 18:26:43 +01:00
type Config struct {
2020-02-13 11:01:39 +00:00
Interval time . Duration ` help:"the time between each send of garbage collection filters to storage nodes" releaseDefault:"120h" devDefault:"10m" `
Enabled bool ` help:"set if garbage collection is enabled or not" releaseDefault:"true" devDefault:"true" `
SkipFirst bool ` help:"if true, skip the first run of GC" releaseDefault:"true" devDefault:"false" `
2020-03-26 18:44:18 +00:00
RunInCore bool ` help:"if true, run garbage collection as part of the core" releaseDefault:"false" devDefault:"false" `
2019-07-24 18:26:43 +01:00
// value for InitialPieces currently based on average pieces per node
2019-12-20 20:26:32 +00:00
InitialPieces int ` help:"the initial number of pieces expected for a storage node to have, used for creating a filter" releaseDefault:"400000" devDefault:"10" `
FalsePositiveRate float64 ` help:"the false positive rate used for creating a garbage collection bloom filter" releaseDefault:"0.1" devDefault:"0.1" `
ConcurrentSends int ` help:"the number of nodes to concurrently send garbage collection bloom filters to" releaseDefault:"1" devDefault:"1" `
RetainSendTimeout time . Duration ` help:"the amount of time to allow a node to handle a retain request" default:"1m" `
2019-07-24 18:26:43 +01:00
}
// Service implements the garbage collection service
2019-09-10 14:24:16 +01:00
//
// architecture: Chore
2019-07-24 18:26:43 +01:00
type Service struct {
log * zap . Logger
config Config
2020-01-30 13:06:43 +00:00
Loop * sync2 . Cycle
2019-07-24 18:26:43 +01:00
2019-09-19 05:46:39 +01:00
dialer rpc . Dialer
2019-07-24 18:26:43 +01:00
overlay overlay . DB
metainfoLoop * metainfo . Loop
}
2020-07-16 15:18:02 +01:00
// RetainInfo contains info needed for a storage node to retain important data and delete garbage data.
2019-07-24 18:26:43 +01:00
type RetainInfo struct {
Filter * bloomfilter . Filter
CreationDate time . Time
Count int
}
2020-07-16 15:18:02 +01:00
// NewService creates a new instance of the gc service.
2019-09-19 05:46:39 +01:00
func NewService ( log * zap . Logger , config Config , dialer rpc . Dialer , overlay overlay . DB , loop * metainfo . Loop ) * Service {
2019-07-24 18:26:43 +01:00
return & Service {
2020-02-13 11:01:39 +00:00
log : log ,
config : config ,
Loop : sync2 . NewCycle ( config . Interval ) ,
2019-09-19 05:46:39 +01:00
dialer : dialer ,
2019-07-24 18:26:43 +01:00
overlay : overlay ,
metainfoLoop : loop ,
}
}
2020-07-16 15:18:02 +01:00
// Run starts the gc loop service.
2019-07-24 18:26:43 +01:00
func ( service * Service ) Run ( ctx context . Context ) ( err error ) {
2019-08-27 19:20:27 +01:00
defer mon . Task ( ) ( & ctx ) ( & err )
2019-07-24 18:26:43 +01:00
if ! service . config . Enabled {
return nil
}
2020-02-13 11:01:39 +00:00
if service . config . SkipFirst {
// make sure the metainfo loop runs once
err = service . metainfoLoop . Join ( ctx , metainfo . NullObserver { } )
if err != nil {
return err
}
}
2019-08-27 13:37:42 +01:00
// load last piece counts from overlay db
lastPieceCounts , err := service . overlay . AllPieceCounts ( ctx )
if err != nil {
service . log . Error ( "error getting last piece counts" , zap . Error ( err ) )
}
if lastPieceCounts == nil {
lastPieceCounts = make ( map [ storj . NodeID ] int )
}
2019-07-24 18:26:43 +01:00
return service . Loop . Run ( ctx , func ( ctx context . Context ) ( err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
pieceTracker := NewPieceTracker ( service . log . Named ( "gc observer" ) , service . config , lastPieceCounts )
// collect things to retain
err = service . metainfoLoop . Join ( ctx , pieceTracker )
if err != nil {
service . log . Error ( "error joining metainfoloop" , zap . Error ( err ) )
return nil
}
2019-08-27 13:37:42 +01:00
// save piece counts in memory for next iteration
2019-07-24 18:26:43 +01:00
for id := range lastPieceCounts {
delete ( lastPieceCounts , id )
}
for id , info := range pieceTracker . retainInfos {
lastPieceCounts [ id ] = info . Count
}
2019-08-27 13:37:42 +01:00
// save piece counts to db for next satellite restart
err = service . overlay . UpdatePieceCounts ( ctx , lastPieceCounts )
if err != nil {
service . log . Error ( "error updating piece counts" , zap . Error ( err ) )
}
2019-07-24 18:26:43 +01:00
// monitor information
for _ , info := range pieceTracker . retainInfos {
mon . IntVal ( "node_piece_count" ) . Observe ( int64 ( info . Count ) )
mon . IntVal ( "retain_filter_size_bytes" ) . Observe ( info . Filter . Size ( ) )
}
// send retain requests
limiter := sync2 . NewLimiter ( service . config . ConcurrentSends )
for id , info := range pieceTracker . retainInfos {
id , info := id , info
limiter . Go ( ctx , func ( ) {
err := service . sendRetainRequest ( ctx , id , info )
if err != nil {
2020-03-24 12:46:28 +00:00
service . log . Warn ( "error sending retain info to node" , zap . Stringer ( "Node ID" , id ) , zap . Error ( err ) )
2019-07-24 18:26:43 +01:00
}
} )
}
limiter . Wait ( )
return nil
} )
}
func ( service * Service ) sendRetainRequest ( ctx context . Context , id storj . NodeID , info * RetainInfo ) ( err error ) {
defer mon . Task ( ) ( & ctx , id . String ( ) ) ( & err )
log := service . log . Named ( id . String ( ) )
dossier , err := service . overlay . Get ( ctx , id )
if err != nil {
return Error . Wrap ( err )
}
2019-12-20 20:26:32 +00:00
if service . config . RetainSendTimeout > 0 {
var cancel func ( )
ctx , cancel = context . WithTimeout ( ctx , service . config . RetainSendTimeout )
defer cancel ( )
}
2020-05-19 16:49:13 +01:00
nodeurl := storj . NodeURL {
ID : id ,
Address : dossier . Address . Address ,
}
client , err := piecestore . DialNodeURL ( ctx , service . dialer , nodeurl , log , piecestore . DefaultConfig )
2019-07-24 18:26:43 +01:00
if err != nil {
return Error . Wrap ( err )
}
defer func ( ) {
err = errs . Combine ( err , Error . Wrap ( client . Close ( ) ) )
} ( )
err = client . Retain ( ctx , & pb . RetainRequest {
CreationDate : info . CreationDate ,
Filter : info . Filter . Bytes ( ) ,
} )
return Error . Wrap ( err )
}