storj/satellite/gracefulexit/pending.go
Fadila Khadar c00ecae75c satellite/gracefulexit: stop using gracefulexit_transfer_queue
Remove the logic associated to the old transfer queue.
A new transfer queue (gracefulexit_segment_transfer_queue) has been created for migration to segmentLoop.
Transfers from the old queue were not moved to the new queue.
Instead, it was still used for nodes which have initiated graceful exit before migration.
There is no such node left, so we can remove all this logic.
In a next step, we will drop the table.

Change-Id: I3aa9bc29b76065d34b57a73f6e9c9d0297587f54
2021-09-14 11:52:34 +00:00

176 lines
4.8 KiB
Go

// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package gracefulexit
import (
"context"
"sync"
"storj.io/common/pb"
"storj.io/common/storj"
"storj.io/common/uuid"
"storj.io/storj/satellite/metabase"
)
// PendingFinishedPromise for waiting for information about finished state.
type PendingFinishedPromise struct {
addedWorkChan chan struct{}
finishCalledChan chan struct{}
finishErr error
}
func newFinishedPromise() *PendingFinishedPromise {
return &PendingFinishedPromise{
addedWorkChan: make(chan struct{}),
finishCalledChan: make(chan struct{}),
}
}
// Wait should be called (once) after acquiring the finished promise and will return whether the pending map is finished.
func (promise *PendingFinishedPromise) Wait(ctx context.Context) (bool, error) {
select {
case <-ctx.Done():
return true, ctx.Err()
case <-promise.addedWorkChan:
return false, nil
case <-promise.finishCalledChan:
return true, promise.finishErr
}
}
func (promise *PendingFinishedPromise) addedWork() {
close(promise.addedWorkChan)
}
func (promise *PendingFinishedPromise) finishCalled(err error) {
promise.finishErr = err
close(promise.finishCalledChan)
}
// PendingTransfer is the representation of work on the pending map.
// It contains information about a transfer request that has been sent to a storagenode by the satellite.
type PendingTransfer struct {
StreamID uuid.UUID
Position metabase.SegmentPosition
PieceSize int64
SatelliteMessage *pb.SatelliteMessage
OriginalRootPieceID storj.PieceID
PieceNum uint16
}
// PendingMap for managing concurrent access to the pending transfer map.
type PendingMap struct {
mu sync.RWMutex
data map[storj.PieceID]*PendingTransfer
doneSending bool
doneSendingErr error
finishedPromise *PendingFinishedPromise
}
// NewPendingMap creates a new PendingMap.
func NewPendingMap() *PendingMap {
newData := make(map[storj.PieceID]*PendingTransfer)
return &PendingMap{
data: newData,
}
}
// Put adds work to the map. If there is already work associated with this piece ID it returns an error.
// If there is a PendingFinishedPromise waiting, that promise is updated to return false.
func (pm *PendingMap) Put(pieceID storj.PieceID, pendingTransfer *PendingTransfer) error {
pm.mu.Lock()
defer pm.mu.Unlock()
if pm.doneSending {
return Error.New("cannot add work: pending map already finished")
}
if _, ok := pm.data[pieceID]; ok {
return Error.New("piece ID already exists in pending map")
}
pm.data[pieceID] = pendingTransfer
if pm.finishedPromise != nil {
pm.finishedPromise.addedWork()
pm.finishedPromise = nil
}
return nil
}
// Get returns the pending transfer item from the map, if it exists.
func (pm *PendingMap) Get(pieceID storj.PieceID) (*PendingTransfer, bool) {
pm.mu.RLock()
defer pm.mu.RUnlock()
pendingTransfer, ok := pm.data[pieceID]
return pendingTransfer, ok
}
// Length returns the number of elements in the map.
func (pm *PendingMap) Length() int {
pm.mu.RLock()
defer pm.mu.RUnlock()
return len(pm.data)
}
// Delete removes the pending transfer item from the map and returns an error if the data does not exist.
func (pm *PendingMap) Delete(pieceID storj.PieceID) error {
pm.mu.Lock()
defer pm.mu.Unlock()
if _, ok := pm.data[pieceID]; !ok {
return Error.New("piece ID does not exist in pending map")
}
delete(pm.data, pieceID)
return nil
}
// IsFinishedPromise returns a promise for the caller to wait on to determine the finished status of the pending map.
// If we have enough information to determine the finished status, we update the promise to have an answer immediately.
// Otherwise, we attach the promise to the pending map to be updated and cleared by either Put or DoneSending (whichever happens first).
func (pm *PendingMap) IsFinishedPromise() *PendingFinishedPromise {
pm.mu.Lock()
defer pm.mu.Unlock()
if pm.finishedPromise != nil {
return pm.finishedPromise
}
newPromise := newFinishedPromise()
if len(pm.data) > 0 {
newPromise.addedWork()
return newPromise
}
if pm.doneSending {
newPromise.finishCalled(pm.doneSendingErr)
return newPromise
}
pm.finishedPromise = newPromise
return newPromise
}
// DoneSending is called (with an optional error) when no more work will be added to the map.
// If DoneSending has already been called, an error is returned.
// If a PendingFinishedPromise is waiting on a response, it is updated to return true.
func (pm *PendingMap) DoneSending(err error) error {
pm.mu.Lock()
defer pm.mu.Unlock()
if pm.doneSending {
return Error.New("DoneSending() already called on pending map")
}
if pm.finishedPromise != nil {
pm.finishedPromise.finishCalled(err)
pm.finishedPromise = nil
}
pm.doneSending = true
pm.doneSendingErr = err
return nil
}