storagenode\gracefulexit: broke worker deleteOnePieceOrAll into deleteOnePiece and deleteAllPieces and deletePiece

Change-Id: Ic3bd21e89fa71e962c2bb1c4943f4696bc4f83e5
This commit is contained in:
Bill Thorp 2020-01-09 15:46:49 -05:00 committed by Bill Thorp
parent 0c660f5490
commit 6f2f97b313

View File

@ -120,7 +120,7 @@ func (worker *Worker) Run(ctx context.Context, done func()) (err error) {
deletePieceMsg := msg.DeletePiece
worker.limiter.Go(ctx, func() {
pieceID := deletePieceMsg.OriginalPieceId
err := worker.deleteOnePieceOrAll(ctx, &pieceID)
err := worker.deleteOnePiece(ctx, pieceID)
if err != nil {
worker.log.Error("failed to delete piece.",
zap.Stringer("Satellite ID", worker.satelliteID),
@ -154,7 +154,7 @@ func (worker *Worker) Run(ctx context.Context, done func()) (err error) {
return errs.Wrap(err)
}
// delete all remaining pieces
err = worker.deleteOnePieceOrAll(ctx, nil)
err = worker.deleteAllPieces(ctx)
return errs.Wrap(err)
default:
@ -275,62 +275,71 @@ func (worker *Worker) transferPiece(ctx context.Context, transferPiece *pb.Trans
return c.Send(success)
}
// deleteOnePieceOrAll deletes pieces stored for a satellite. When no piece ID are specified, all pieces stored by a satellite will be deleted.
func (worker *Worker) deleteOnePieceOrAll(ctx context.Context, pieceID *storj.PieceID) error {
// get piece size
pieceMap := make(map[pb.PieceID]int64)
ctxWithCancel, cancel := context.WithCancel(ctx)
err := worker.store.WalkSatellitePieces(ctxWithCancel, worker.satelliteID, func(piece pieces.StoredPieceAccess) error {
_, size, err := piece.Size(ctxWithCancel)
if err != nil {
worker.log.Debug("failed to retrieve piece info", zap.Stringer("Satellite ID", worker.satelliteID), zap.Error(err))
// deleteOnePiece deletes one piece stored for a satellite
func (worker *Worker) deleteOnePiece(ctx context.Context, pieceID storj.PieceID) error {
piece, err := worker.store.Reader(ctx, worker.satelliteID, pieceID)
if err != nil {
if !errs2.IsCanceled(err) {
worker.log.Debug("failed to retrieve piece info", zap.Stringer("Satellite ID", worker.satelliteID),
zap.Stringer("Piece ID", pieceID), zap.Error(err))
}
if pieceID == nil {
pieceMap[piece.PieceID()] = size
return nil
}
if piece.PieceID() == *pieceID {
pieceMap[*pieceID] = size
cancel()
}
return nil
})
return err
}
err = worker.deletePiece(ctx, pieceID)
if err != nil {
worker.log.Debug("failed to retrieve piece info", zap.Stringer("Satellite ID", worker.satelliteID), zap.Error(err))
return err
}
// update graceful exit progress
size := piece.Size()
return worker.satelliteDB.UpdateGracefulExit(ctx, worker.satelliteID, size)
}
if err != nil && !errs.Is(err, context.Canceled) {
// deletePiece deletes one piece stored for a satellite, without updating satellite Graceful Exit status
func (worker *Worker) deletePiece(ctx context.Context, pieceID storj.PieceID) error {
err := worker.store.Delete(ctx, worker.satelliteID, pieceID)
if err != nil {
worker.log.Debug("failed to delete a piece",
zap.Stringer("Satellite ID", worker.satelliteID),
zap.Stringer("Piece ID", pieceID),
zap.Error(err))
delErr := worker.store.DeleteFailed(ctx, pieces.ExpiredInfo{
SatelliteID: worker.satelliteID,
PieceID: pieceID,
InPieceInfo: true,
}, time.Now().UTC())
if delErr != nil {
worker.log.Debug("failed to mark a deletion failure for a piece",
zap.Stringer("Satellite ID", worker.satelliteID),
zap.Stringer("Piece ID", pieceID), zap.Error(err))
}
return errs.Combine(err, delErr)
}
worker.log.Debug("delete piece",
zap.Stringer("Satellite ID", worker.satelliteID),
zap.Stringer("Piece ID", pieceID))
return err
}
// deleteAllPieces deletes pieces stored for a satellite
func (worker *Worker) deleteAllPieces(ctx context.Context) error {
var totalDeleted int64
err := worker.store.WalkSatellitePieces(ctx, worker.satelliteID, func(piece pieces.StoredPieceAccess) error {
err := worker.deletePiece(ctx, piece.PieceID())
if err == nil {
_, size, err := piece.Size(ctx)
if err != nil {
worker.log.Debug("failed to retrieve piece info", zap.Stringer("Satellite ID", worker.satelliteID),
zap.Stringer("Piece ID", piece.PieceID()), zap.Error(err))
}
totalDeleted += size
}
return err
})
if err != nil && !errs2.IsCanceled(err) {
worker.log.Debug("failed to retrieve piece info", zap.Stringer("Satellite ID", worker.satelliteID), zap.Error(err))
}
var totalDeleted int64
for id, size := range pieceMap {
if size == 0 {
continue
}
err := worker.store.Delete(ctx, worker.satelliteID, id)
if err != nil {
worker.log.Debug("failed to delete a piece",
zap.Stringer("Satellite ID", worker.satelliteID),
zap.Stringer("Piece ID", id),
zap.Error(err))
err = worker.store.DeleteFailed(ctx, pieces.ExpiredInfo{
SatelliteID: worker.satelliteID,
PieceID: id,
InPieceInfo: true,
}, time.Now().UTC())
if err != nil {
worker.log.Debug("failed to mark a deletion failure for a piece",
zap.Stringer("Satellite ID", worker.satelliteID),
zap.Stringer("Piece ID", id),
zap.Error(err))
}
continue
}
worker.log.Debug("delete piece",
zap.Stringer("Satellite ID", worker.satelliteID),
zap.Stringer("Piece ID", id))
totalDeleted += size
}
// update transfer progress
// update graceful exit progress
return worker.satelliteDB.UpdateGracefulExit(ctx, worker.satelliteID, totalDeleted)
}