storagenode/gracefulexit: remove unused interface
Change-Id: Ie6c3d69f5177872d8f4308ac476bc87655da9e4b
This commit is contained in:
parent
cf92220c20
commit
e9692c5681
@ -23,7 +23,7 @@ type Chore struct {
|
|||||||
dialer rpc.Dialer
|
dialer rpc.Dialer
|
||||||
config Config
|
config Config
|
||||||
|
|
||||||
service Service
|
service *Service
|
||||||
transferService piecetransfer.Service
|
transferService piecetransfer.Service
|
||||||
|
|
||||||
exitingMap sync.Map
|
exitingMap sync.Map
|
||||||
@ -32,7 +32,7 @@ type Chore struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// NewChore instantiates Chore.
|
// NewChore instantiates Chore.
|
||||||
func NewChore(log *zap.Logger, service Service, transferService piecetransfer.Service, dialer rpc.Dialer, config Config) *Chore {
|
func NewChore(log *zap.Logger, service *Service, transferService piecetransfer.Service, dialer rpc.Dialer, config Config) *Chore {
|
||||||
return &Chore{
|
return &Chore{
|
||||||
log: log,
|
log: log,
|
||||||
dialer: dialer,
|
dialer: dialer,
|
||||||
|
@ -19,49 +19,10 @@ import (
|
|||||||
"storj.io/storj/storagenode/trust"
|
"storj.io/storj/storagenode/trust"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Service acts as the gateway to the `satellites` db for graceful exit
|
// Service exposes methods to manage GE progress.
|
||||||
// code (querying and updating that db as necessary).
|
|
||||||
//
|
//
|
||||||
// architecture: Service
|
// architecture: Service
|
||||||
type Service interface {
|
type Service struct {
|
||||||
// ListPendingExits returns a slice with one record for every satellite
|
|
||||||
// from which this node is gracefully exiting. Each record includes the
|
|
||||||
// satellite's ID/address and information about the graceful exit status
|
|
||||||
// and progress.
|
|
||||||
ListPendingExits(ctx context.Context) ([]ExitingSatellite, error)
|
|
||||||
|
|
||||||
// DeletePiece deletes one piece stored for a satellite, and updates
|
|
||||||
// the deleted byte count for the corresponding graceful exit operation.
|
|
||||||
DeletePiece(ctx context.Context, satelliteID storj.NodeID, pieceID storj.PieceID) error
|
|
||||||
|
|
||||||
// DeleteSatellitePieces deletes all pieces stored for a satellite, and updates
|
|
||||||
// the deleted byte count for the corresponding graceful exit operation.
|
|
||||||
DeleteSatellitePieces(ctx context.Context, satelliteID storj.NodeID) error
|
|
||||||
|
|
||||||
// ExitFailed updates the database when a graceful exit has failed.
|
|
||||||
ExitFailed(ctx context.Context, satelliteID storj.NodeID, reason pb.ExitFailed_Reason, exitFailedBytes []byte) error
|
|
||||||
|
|
||||||
// ExitCompleted updates the database when a graceful exit is completed. It also
|
|
||||||
// deletes all pieces and blobs for that satellite.
|
|
||||||
ExitCompleted(ctx context.Context, satelliteID storj.NodeID, completionReceipt []byte) error
|
|
||||||
|
|
||||||
// ExitNotPossible deletes the entry for the corresponding graceful exit operation.
|
|
||||||
// This is intended to be called when a graceful exit operation was initiated but
|
|
||||||
// the satellite rejected it.
|
|
||||||
ExitNotPossible(ctx context.Context, satelliteID storj.NodeID) error
|
|
||||||
|
|
||||||
// DeleteSatelliteData deletes all pieces and blobs stored for a satellite.
|
|
||||||
// Note: this should only ever be called after exit has finished.
|
|
||||||
DeleteSatelliteData(ctx context.Context, satelliteID storj.NodeID) error
|
|
||||||
}
|
|
||||||
|
|
||||||
// ensures that service implements Service.
|
|
||||||
var _ Service = (*service)(nil)
|
|
||||||
|
|
||||||
// service exposes methods to manage GE progress.
|
|
||||||
//
|
|
||||||
// architecture: Service
|
|
||||||
type service struct {
|
|
||||||
log *zap.Logger
|
log *zap.Logger
|
||||||
store *pieces.Store
|
store *pieces.Store
|
||||||
trust *trust.Pool
|
trust *trust.Pool
|
||||||
@ -71,8 +32,8 @@ type service struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// NewService is a constructor for a GE service.
|
// NewService is a constructor for a GE service.
|
||||||
func NewService(log *zap.Logger, store *pieces.Store, trust *trust.Pool, satelliteDB satellites.DB, dialer rpc.Dialer, config Config) Service {
|
func NewService(log *zap.Logger, store *pieces.Store, trust *trust.Pool, satelliteDB satellites.DB, dialer rpc.Dialer, config Config) *Service {
|
||||||
return &service{
|
return &Service{
|
||||||
log: log,
|
log: log,
|
||||||
store: store,
|
store: store,
|
||||||
trust: trust,
|
trust: trust,
|
||||||
@ -87,7 +48,11 @@ type ExitingSatellite struct {
|
|||||||
NodeURL storj.NodeURL
|
NodeURL storj.NodeURL
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *service) ListPendingExits(ctx context.Context) (_ []ExitingSatellite, err error) {
|
// ListPendingExits returns a slice with one record for every satellite
|
||||||
|
// from which this node is gracefully exiting. Each record includes the
|
||||||
|
// satellite's ID/address and information about the graceful exit status
|
||||||
|
// and progress.
|
||||||
|
func (c *Service) ListPendingExits(ctx context.Context) (_ []ExitingSatellite, err error) {
|
||||||
defer mon.Task()(&ctx)(&err)
|
defer mon.Task()(&ctx)(&err)
|
||||||
|
|
||||||
exitProgress, err := c.satelliteDB.ListGracefulExits(ctx)
|
exitProgress, err := c.satelliteDB.ListGracefulExits(ctx)
|
||||||
@ -111,7 +76,7 @@ func (c *service) ListPendingExits(ctx context.Context) (_ []ExitingSatellite, e
|
|||||||
|
|
||||||
// DeletePiece deletes one piece stored for a satellite, and updates
|
// DeletePiece deletes one piece stored for a satellite, and updates
|
||||||
// the deleted byte count for the corresponding graceful exit operation.
|
// the deleted byte count for the corresponding graceful exit operation.
|
||||||
func (c *service) DeletePiece(ctx context.Context, satelliteID storj.NodeID, pieceID storj.PieceID) (err error) {
|
func (c *Service) DeletePiece(ctx context.Context, satelliteID storj.NodeID, pieceID storj.PieceID) (err error) {
|
||||||
defer mon.Task()(&ctx)(&err)
|
defer mon.Task()(&ctx)(&err)
|
||||||
|
|
||||||
piece, err := c.store.Reader(ctx, satelliteID, pieceID)
|
piece, err := c.store.Reader(ctx, satelliteID, pieceID)
|
||||||
@ -127,9 +92,25 @@ func (c *service) DeletePiece(ctx context.Context, satelliteID storj.NodeID, pie
|
|||||||
return c.satelliteDB.UpdateGracefulExit(ctx, satelliteID, size)
|
return c.satelliteDB.UpdateGracefulExit(ctx, satelliteID, size)
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeleteSatellitePieces deletes all pieces stored for a satellite, and updates
|
// DeleteSatelliteData deletes all pieces and blobs stored for a satellite.
|
||||||
|
//
|
||||||
|
// Note: this should only ever be called after exit has finished.
|
||||||
|
func (c *Service) DeleteSatelliteData(ctx context.Context, satelliteID storj.NodeID) (err error) {
|
||||||
|
defer mon.Task()(&ctx)(&err)
|
||||||
|
|
||||||
|
// delete all remaining pieces
|
||||||
|
err = c.deleteSatellitePieces(ctx, satelliteID)
|
||||||
|
if err != nil {
|
||||||
|
return errs.Wrap(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// delete everything left in blobs folder of specific satellites
|
||||||
|
return c.store.DeleteSatelliteBlobs(ctx, satelliteID)
|
||||||
|
}
|
||||||
|
|
||||||
|
// deleteSatellitePieces deletes all pieces stored for a satellite, and updates
|
||||||
// the deleted byte count for the corresponding graceful exit operation.
|
// the deleted byte count for the corresponding graceful exit operation.
|
||||||
func (c *service) DeleteSatellitePieces(ctx context.Context, satelliteID storj.NodeID) (err error) {
|
func (c *Service) deleteSatellitePieces(ctx context.Context, satelliteID storj.NodeID) (err error) {
|
||||||
defer mon.Task()(&ctx)(&err)
|
defer mon.Task()(&ctx)(&err)
|
||||||
|
|
||||||
var totalDeleted int64
|
var totalDeleted int64
|
||||||
@ -158,34 +139,21 @@ func (c *service) DeleteSatellitePieces(ctx context.Context, satelliteID storj.N
|
|||||||
}
|
}
|
||||||
|
|
||||||
// ExitFailed updates the database when a graceful exit has failed.
|
// ExitFailed updates the database when a graceful exit has failed.
|
||||||
func (c *service) ExitFailed(ctx context.Context, satelliteID storj.NodeID, reason pb.ExitFailed_Reason, exitFailedBytes []byte) (err error) {
|
func (c *Service) ExitFailed(ctx context.Context, satelliteID storj.NodeID, reason pb.ExitFailed_Reason, exitFailedBytes []byte) (err error) {
|
||||||
defer mon.Task()(&ctx)(&err)
|
defer mon.Task()(&ctx)(&err)
|
||||||
return errs.Wrap(c.satelliteDB.CompleteGracefulExit(ctx, satelliteID, c.nowFunc(), satellites.ExitFailed, exitFailedBytes))
|
return errs.Wrap(c.satelliteDB.CompleteGracefulExit(ctx, satelliteID, c.nowFunc(), satellites.ExitFailed, exitFailedBytes))
|
||||||
}
|
}
|
||||||
|
|
||||||
// ExitCompleted updates the database when a graceful exit is completed.
|
// ExitCompleted updates the database when a graceful exit is completed.
|
||||||
func (c *service) ExitCompleted(ctx context.Context, satelliteID storj.NodeID, completionReceipt []byte) (err error) {
|
func (c *Service) ExitCompleted(ctx context.Context, satelliteID storj.NodeID, completionReceipt []byte) (err error) {
|
||||||
defer mon.Task()(&ctx)(&err)
|
defer mon.Task()(&ctx)(&err)
|
||||||
return errs.Wrap(c.satelliteDB.CompleteGracefulExit(ctx, satelliteID, c.nowFunc(), satellites.ExitSucceeded, completionReceipt))
|
return errs.Wrap(c.satelliteDB.CompleteGracefulExit(ctx, satelliteID, c.nowFunc(), satellites.ExitSucceeded, completionReceipt))
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeleteSatelliteData deletes all pieces and blobs for that satellite.
|
// ExitNotPossible deletes the entry for the corresponding graceful exit operation.
|
||||||
func (c *service) DeleteSatelliteData(ctx context.Context, satelliteID storj.NodeID) (err error) {
|
// This is intended to be called when a graceful exit operation was initiated but
|
||||||
defer mon.Task()(&ctx)(&err)
|
// the satellite rejected it.
|
||||||
|
func (c *Service) ExitNotPossible(ctx context.Context, satelliteID storj.NodeID) (err error) {
|
||||||
// delete all remaining pieces
|
|
||||||
err = c.DeleteSatellitePieces(ctx, satelliteID)
|
|
||||||
if err != nil {
|
|
||||||
return errs.Wrap(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// delete everything left in blobs folder of specific satellites
|
|
||||||
return c.store.DeleteSatelliteBlobs(ctx, satelliteID)
|
|
||||||
}
|
|
||||||
|
|
||||||
// ExitNotPossible deletes the entry from satellite table and inform graceful exit
|
|
||||||
// has failed to start.
|
|
||||||
func (c *service) ExitNotPossible(ctx context.Context, satelliteID storj.NodeID) (err error) {
|
|
||||||
defer mon.Task()(&ctx)(&err)
|
defer mon.Task()(&ctx)(&err)
|
||||||
|
|
||||||
return c.satelliteDB.CancelGracefulExit(ctx, satelliteID)
|
return c.satelliteDB.CancelGracefulExit(ctx, satelliteID)
|
||||||
|
@ -23,7 +23,7 @@ import (
|
|||||||
type Worker struct {
|
type Worker struct {
|
||||||
log *zap.Logger
|
log *zap.Logger
|
||||||
|
|
||||||
service Service
|
service *Service
|
||||||
transferService piecetransfer.Service
|
transferService piecetransfer.Service
|
||||||
|
|
||||||
dialer rpc.Dialer
|
dialer rpc.Dialer
|
||||||
@ -32,7 +32,7 @@ type Worker struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// NewWorker instantiates Worker.
|
// NewWorker instantiates Worker.
|
||||||
func NewWorker(log *zap.Logger, service Service, transferService piecetransfer.Service, dialer rpc.Dialer, satelliteURL storj.NodeURL, config Config) *Worker {
|
func NewWorker(log *zap.Logger, service *Service, transferService piecetransfer.Service, dialer rpc.Dialer, satelliteURL storj.NodeURL, config Config) *Worker {
|
||||||
return &Worker{
|
return &Worker{
|
||||||
log: log.Named(satelliteURL.String()),
|
log: log.Named(satelliteURL.String()),
|
||||||
service: service,
|
service: service,
|
||||||
@ -48,7 +48,7 @@ func NewWorker(log *zap.Logger, service Service, transferService piecetransfer.S
|
|||||||
func (worker *Worker) Run(ctx context.Context) (err error) {
|
func (worker *Worker) Run(ctx context.Context) (err error) {
|
||||||
defer mon.Task()(&ctx)(&err)
|
defer mon.Task()(&ctx)(&err)
|
||||||
|
|
||||||
worker.log.Debug("worker")
|
worker.log.Debug("started")
|
||||||
defer worker.log.Debug("finished")
|
defer worker.log.Debug("finished")
|
||||||
|
|
||||||
limiter := sync2.NewLimiter(worker.concurrentTransfers)
|
limiter := sync2.NewLimiter(worker.concurrentTransfers)
|
||||||
|
@ -265,7 +265,7 @@ type Peer struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
GracefulExit struct {
|
GracefulExit struct {
|
||||||
Service gracefulexit.Service
|
Service *gracefulexit.Service
|
||||||
Endpoint *gracefulexit.Endpoint
|
Endpoint *gracefulexit.Endpoint
|
||||||
Chore *gracefulexit.Chore
|
Chore *gracefulexit.Chore
|
||||||
BlobsCleaner *gracefulexit.BlobsCleaner
|
BlobsCleaner *gracefulexit.BlobsCleaner
|
||||||
|
Loading…
Reference in New Issue
Block a user