storagenode/trust: refactor GetAddress to GetNodeURL

Most places now need the NodeURL rather than the ID and Address
separately. This simplifies code in multiple places.

Change-Id: I52621d8ca52296a8b5bf7afbc1001cf8bfb44239
This commit is contained in:
Egon Elbre 2020-05-19 19:11:30 +03:00 committed by Stefan Benten
parent ed627144ed
commit 94b2b315f7
13 changed files with 78 additions and 96 deletions

View File

@ -295,7 +295,7 @@ func TestRecvTimeout(t *testing.T) {
store := pieces.NewStore(zaptest.NewLogger(t), storageNodeDB.Pieces(), nil, nil, storageNodeDB.PieceSpaceUsedDB(), pieces.DefaultConfig) store := pieces.NewStore(zaptest.NewLogger(t), storageNodeDB.Pieces(), nil, nil, storageNodeDB.PieceSpaceUsedDB(), pieces.DefaultConfig)
// run the SN chore again to start processing transfers. // run the SN chore again to start processing transfers.
worker := gracefulexit.NewWorker(zaptest.NewLogger(t), store, exitingNode.DB.Satellites(), exitingNode.Dialer, satellite.ID(), satellite.Addr(), worker := gracefulexit.NewWorker(zaptest.NewLogger(t), store, exitingNode.DB.Satellites(), exitingNode.Dialer, satellite.NodeURL(),
gracefulexit.Config{ gracefulexit.Config{
ChoreInterval: 0, ChoreInterval: 0,
NumWorkers: 2, NumWorkers: 2,

View File

@ -150,7 +150,7 @@ func (s *Service) GetDashboardData(ctx context.Context) (_ *Dashboard, err error
} }
for _, rep := range stats { for _, rep := range stats {
url, err := s.trust.GetAddress(ctx, rep.SatelliteID) url, err := s.trust.GetNodeURL(ctx, rep.SatelliteID)
if err != nil { if err != nil {
return nil, SNOServiceErr.Wrap(err) return nil, SNOServiceErr.Wrap(err)
} }
@ -160,7 +160,7 @@ func (s *Service) GetDashboardData(ctx context.Context) (_ *Dashboard, err error
ID: rep.SatelliteID, ID: rep.SatelliteID,
Disqualified: rep.Disqualified, Disqualified: rep.Disqualified,
Suspended: rep.Suspended, Suspended: rep.Suspended,
URL: url, URL: url.Address,
}, },
) )
} }

View File

@ -108,21 +108,18 @@ func (service *Service) pingSatellite(ctx context.Context, satellite storj.NodeI
func (service *Service) pingSatelliteOnce(ctx context.Context, id storj.NodeID) (err error) { func (service *Service) pingSatelliteOnce(ctx context.Context, id storj.NodeID) (err error) {
defer mon.Task()(&ctx, id)(&err) defer mon.Task()(&ctx, id)(&err)
self := service.Local() nodeurl, err := service.trust.GetNodeURL(ctx, id)
address, err := service.trust.GetAddress(ctx, id)
if err != nil { if err != nil {
return errPingSatellite.Wrap(err) return errPingSatellite.Wrap(err)
} }
conn, err := service.dialer.DialNodeURL(ctx, storj.NodeURL{ conn, err := service.dialer.DialNodeURL(ctx, nodeurl)
ID: id,
Address: address,
})
if err != nil { if err != nil {
return errPingSatellite.Wrap(err) return errPingSatellite.Wrap(err)
} }
defer func() { err = errs.Combine(err, conn.Close()) }() defer func() { err = errs.Combine(err, conn.Close()) }()
self := service.Local()
_, err = pb.NewDRPCNodeClient(conn).CheckIn(ctx, &pb.CheckInRequest{ _, err = pb.NewDRPCNodeClient(conn).CheckIn(ctx, &pb.CheckInRequest{
Address: self.Address.GetAddress(), Address: self.Address.GetAddress(),
Version: &self.Version, Version: &self.Version,

View File

@ -70,24 +70,24 @@ func (chore *Chore) Run(ctx context.Context) (err error) {
if satellite.FinishedAt != nil { if satellite.FinishedAt != nil {
continue continue
} }
satelliteID := satellite.SatelliteID
addr, err := chore.trust.GetAddress(ctx, satelliteID) nodeurl, err := chore.trust.GetNodeURL(ctx, satellite.SatelliteID)
if err != nil { if err != nil {
chore.log.Error("failed to get satellite address.", zap.Error(err)) chore.log.Error("failed to get satellite address.", zap.Error(err))
continue continue
} }
worker := NewWorker(chore.log, chore.store, chore.satelliteDB, chore.dialer, satelliteID, addr, chore.config) worker := NewWorker(chore.log, chore.store, chore.satelliteDB, chore.dialer, nodeurl, chore.config)
if _, ok := chore.exitingMap.LoadOrStore(satelliteID, worker); ok { if _, ok := chore.exitingMap.LoadOrStore(nodeurl.ID, worker); ok {
// already running a worker for this satellite // already running a worker for this satellite
chore.log.Debug("skipping for satellite, worker already exists.", zap.Stringer("Satellite ID", satelliteID)) chore.log.Debug("skipping for satellite, worker already exists.", zap.Stringer("Satellite ID", nodeurl.ID))
continue continue
} }
chore.limiter.Go(ctx, func() { chore.limiter.Go(ctx, func() {
err := worker.Run(ctx, func() { err := worker.Run(ctx, func() {
chore.log.Debug("finished for satellite.", zap.Stringer("Satellite ID", satelliteID)) chore.log.Debug("finished for satellite.", zap.Stringer("Satellite ID", nodeurl.ID))
chore.exitingMap.Delete(satelliteID) chore.exitingMap.Delete(nodeurl.ID)
}) })
if err != nil { if err != nil {

View File

@ -62,7 +62,7 @@ func (e *Endpoint) GetNonExitingSatellites(ctx context.Context, req *pb.GetNonEx
} }
// get domain name // get domain name
domain, err := e.trust.GetAddress(ctx, trusted) nodeurl, err := e.trust.GetNodeURL(ctx, trusted)
if err != nil { if err != nil {
e.log.Debug("graceful exit: get satellite domian name", zap.Stringer("Satellite ID", trusted), zap.Error(err)) e.log.Debug("graceful exit: get satellite domian name", zap.Stringer("Satellite ID", trusted), zap.Error(err))
continue continue
@ -74,7 +74,7 @@ func (e *Endpoint) GetNonExitingSatellites(ctx context.Context, req *pb.GetNonEx
continue continue
} }
availableSatellites = append(availableSatellites, &pb.NonExitingSatellite{ availableSatellites = append(availableSatellites, &pb.NonExitingSatellite{
DomainName: domain, DomainName: nodeurl.Address,
NodeId: trusted, NodeId: trusted,
SpaceUsed: float64(piecesContentSize), SpaceUsed: float64(piecesContentSize),
}) })
@ -89,7 +89,7 @@ func (e *Endpoint) GetNonExitingSatellites(ctx context.Context, req *pb.GetNonEx
func (e *Endpoint) InitiateGracefulExit(ctx context.Context, req *pb.InitiateGracefulExitRequest) (*pb.ExitProgress, error) { func (e *Endpoint) InitiateGracefulExit(ctx context.Context, req *pb.InitiateGracefulExitRequest) (*pb.ExitProgress, error) {
e.log.Debug("initialize graceful exit: start", zap.Stringer("Satellite ID", req.NodeId)) e.log.Debug("initialize graceful exit: start", zap.Stringer("Satellite ID", req.NodeId))
domain, err := e.trust.GetAddress(ctx, req.NodeId) nodeurl, err := e.trust.GetNodeURL(ctx, req.NodeId)
if err != nil { if err != nil {
e.log.Debug("initialize graceful exit: retrieve satellite address", zap.Error(err)) e.log.Debug("initialize graceful exit: retrieve satellite address", zap.Error(err))
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error()) return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
@ -109,7 +109,7 @@ func (e *Endpoint) InitiateGracefulExit(ctx context.Context, req *pb.InitiateGra
} }
return &pb.ExitProgress{ return &pb.ExitProgress{
DomainName: domain, DomainName: nodeurl.Address,
NodeId: req.NodeId, NodeId: req.NodeId,
PercentComplete: float32(0), PercentComplete: float32(0),
}, nil }, nil
@ -126,7 +126,7 @@ func (e *Endpoint) GetExitProgress(ctx context.Context, req *pb.GetExitProgressR
Progress: make([]*pb.ExitProgress, 0, len(exitProgress)), Progress: make([]*pb.ExitProgress, 0, len(exitProgress)),
} }
for _, progress := range exitProgress { for _, progress := range exitProgress {
domain, err := e.trust.GetAddress(ctx, progress.SatelliteID) nodeurl, err := e.trust.GetNodeURL(ctx, progress.SatelliteID)
if err != nil { if err != nil {
e.log.Debug("graceful exit: get satellite domain name", zap.Stringer("Satellite ID", progress.SatelliteID), zap.Error(err)) e.log.Debug("graceful exit: get satellite domain name", zap.Stringer("Satellite ID", progress.SatelliteID), zap.Error(err))
continue continue
@ -145,7 +145,7 @@ func (e *Endpoint) GetExitProgress(ctx context.Context, req *pb.GetExitProgressR
resp.Progress = append(resp.Progress, resp.Progress = append(resp.Progress,
&pb.ExitProgress{ &pb.ExitProgress{
DomainName: domain, DomainName: nodeurl.Address,
NodeId: progress.SatelliteID, NodeId: progress.SatelliteID,
PercentComplete: percentCompleted, PercentComplete: percentCompleted,
Successful: exitSucceeded, Successful: exitSucceeded,

View File

@ -34,23 +34,21 @@ type Worker struct {
satelliteDB satellites.DB satelliteDB satellites.DB
dialer rpc.Dialer dialer rpc.Dialer
limiter *sync2.Limiter limiter *sync2.Limiter
satelliteID storj.NodeID satelliteURL storj.NodeURL
satelliteAddr string
ecclient ecclient.Client ecclient ecclient.Client
minBytesPerSecond memory.Size minBytesPerSecond memory.Size
minDownloadTimeout time.Duration minDownloadTimeout time.Duration
} }
// NewWorker instantiates Worker. // NewWorker instantiates Worker.
func NewWorker(log *zap.Logger, store *pieces.Store, satelliteDB satellites.DB, dialer rpc.Dialer, satelliteID storj.NodeID, satelliteAddr string, config Config) *Worker { func NewWorker(log *zap.Logger, store *pieces.Store, satelliteDB satellites.DB, dialer rpc.Dialer, satelliteURL storj.NodeURL, config Config) *Worker {
return &Worker{ return &Worker{
log: log, log: log,
store: store, store: store,
satelliteDB: satelliteDB, satelliteDB: satelliteDB,
dialer: dialer, dialer: dialer,
limiter: sync2.NewLimiter(config.NumConcurrentTransfers), limiter: sync2.NewLimiter(config.NumConcurrentTransfers),
satelliteID: satelliteID, satelliteURL: satelliteURL,
satelliteAddr: satelliteAddr,
ecclient: ecclient.NewClient(log, dialer, 0), ecclient: ecclient.NewClient(log, dialer, 0),
minBytesPerSecond: config.MinBytesPerSecond, minBytesPerSecond: config.MinBytesPerSecond,
minDownloadTimeout: config.MinDownloadTimeout, minDownloadTimeout: config.MinDownloadTimeout,
@ -65,10 +63,7 @@ func (worker *Worker) Run(ctx context.Context, done func()) (err error) {
worker.log.Debug("running worker") worker.log.Debug("running worker")
conn, err := worker.dialer.DialNodeURL(ctx, storj.NodeURL{ conn, err := worker.dialer.DialNodeURL(ctx, worker.satelliteURL)
ID: worker.satelliteID,
Address: worker.satelliteAddr,
})
if err != nil { if err != nil {
return errs.Wrap(err) return errs.Wrap(err)
} }
@ -91,7 +86,7 @@ func (worker *Worker) Run(ctx context.Context, done func()) (err error) {
} }
if errs2.IsRPC(err, rpcstatus.FailedPrecondition) { if errs2.IsRPC(err, rpcstatus.FailedPrecondition) {
// delete the entry from satellite table and inform graceful exit has failed to start // delete the entry from satellite table and inform graceful exit has failed to start
deleteErr := worker.satelliteDB.CancelGracefulExit(ctx, worker.satelliteID) deleteErr := worker.satelliteDB.CancelGracefulExit(ctx, worker.satelliteURL.ID)
if deleteErr != nil { if deleteErr != nil {
// TODO: what to do now? // TODO: what to do now?
return errs.Combine(deleteErr, err) return errs.Combine(deleteErr, err)
@ -113,7 +108,7 @@ func (worker *Worker) Run(ctx context.Context, done func()) (err error) {
err = worker.transferPiece(ctx, transferPieceMsg, c) err = worker.transferPiece(ctx, transferPieceMsg, c)
if err != nil { if err != nil {
worker.log.Error("failed to transfer piece.", worker.log.Error("failed to transfer piece.",
zap.Stringer("Satellite ID", worker.satelliteID), zap.Stringer("Satellite ID", worker.satelliteURL.ID),
zap.Error(errs.Wrap(err))) zap.Error(errs.Wrap(err)))
} }
}) })
@ -125,7 +120,7 @@ func (worker *Worker) Run(ctx context.Context, done func()) (err error) {
err := worker.deleteOnePiece(ctx, pieceID) err := worker.deleteOnePiece(ctx, pieceID)
if err != nil { if err != nil {
worker.log.Error("failed to delete piece.", worker.log.Error("failed to delete piece.",
zap.Stringer("Satellite ID", worker.satelliteID), zap.Stringer("Satellite ID", worker.satelliteURL.ID),
zap.Stringer("Piece ID", pieceID), zap.Stringer("Piece ID", pieceID),
zap.Error(errs.Wrap(err))) zap.Error(errs.Wrap(err)))
} }
@ -133,25 +128,25 @@ func (worker *Worker) Run(ctx context.Context, done func()) (err error) {
case *pb.SatelliteMessage_ExitFailed: case *pb.SatelliteMessage_ExitFailed:
worker.log.Error("graceful exit failed.", worker.log.Error("graceful exit failed.",
zap.Stringer("Satellite ID", worker.satelliteID), zap.Stringer("Satellite ID", worker.satelliteURL.ID),
zap.Stringer("reason", msg.ExitFailed.Reason)) zap.Stringer("reason", msg.ExitFailed.Reason))
exitFailedBytes, err := pb.Marshal(msg.ExitFailed) exitFailedBytes, err := pb.Marshal(msg.ExitFailed)
if err != nil { if err != nil {
worker.log.Error("failed to marshal exit failed message.") worker.log.Error("failed to marshal exit failed message.")
} }
err = worker.satelliteDB.CompleteGracefulExit(ctx, worker.satelliteID, time.Now(), satellites.ExitFailed, exitFailedBytes) err = worker.satelliteDB.CompleteGracefulExit(ctx, worker.satelliteURL.ID, time.Now(), satellites.ExitFailed, exitFailedBytes)
return errs.Wrap(err) return errs.Wrap(err)
case *pb.SatelliteMessage_ExitCompleted: case *pb.SatelliteMessage_ExitCompleted:
worker.log.Info("graceful exit completed.", zap.Stringer("Satellite ID", worker.satelliteID)) worker.log.Info("graceful exit completed.", zap.Stringer("Satellite ID", worker.satelliteURL.ID))
exitCompletedBytes, err := pb.Marshal(msg.ExitCompleted) exitCompletedBytes, err := pb.Marshal(msg.ExitCompleted)
if err != nil { if err != nil {
worker.log.Error("failed to marshal exit completed message.") worker.log.Error("failed to marshal exit completed message.")
} }
err = worker.satelliteDB.CompleteGracefulExit(ctx, worker.satelliteID, time.Now(), satellites.ExitSucceeded, exitCompletedBytes) err = worker.satelliteDB.CompleteGracefulExit(ctx, worker.satelliteURL.ID, time.Now(), satellites.ExitSucceeded, exitCompletedBytes)
if err != nil { if err != nil {
return errs.Wrap(err) return errs.Wrap(err)
} }
@ -161,7 +156,7 @@ func (worker *Worker) Run(ctx context.Context, done func()) (err error) {
default: default:
// TODO handle err // TODO handle err
worker.log.Error("unknown graceful exit message.", zap.Stringer("Satellite ID", worker.satelliteID)) worker.log.Error("unknown graceful exit message.", zap.Stringer("Satellite ID", worker.satelliteURL.ID))
} }
} }
} }
@ -174,14 +169,14 @@ type gracefulExitStream interface {
func (worker *Worker) transferPiece(ctx context.Context, transferPiece *pb.TransferPiece, c gracefulExitStream) error { func (worker *Worker) transferPiece(ctx context.Context, transferPiece *pb.TransferPiece, c gracefulExitStream) error {
pieceID := transferPiece.OriginalPieceId pieceID := transferPiece.OriginalPieceId
reader, err := worker.store.Reader(ctx, worker.satelliteID, pieceID) reader, err := worker.store.Reader(ctx, worker.satelliteURL.ID, pieceID)
if err != nil { if err != nil {
transferErr := pb.TransferFailed_UNKNOWN transferErr := pb.TransferFailed_UNKNOWN
if errs.Is(err, os.ErrNotExist) { if errs.Is(err, os.ErrNotExist) {
transferErr = pb.TransferFailed_NOT_FOUND transferErr = pb.TransferFailed_NOT_FOUND
} }
worker.log.Error("failed to get piece reader.", worker.log.Error("failed to get piece reader.",
zap.Stringer("Satellite ID", worker.satelliteID), zap.Stringer("Satellite ID", worker.satelliteURL.ID),
zap.Stringer("Piece ID", pieceID), zap.Stringer("Piece ID", pieceID),
zap.Error(errs.Wrap(err))) zap.Error(errs.Wrap(err)))
worker.handleFailure(ctx, transferErr, pieceID, c.Send) worker.handleFailure(ctx, transferErr, pieceID, c.Send)
@ -191,10 +186,10 @@ func (worker *Worker) transferPiece(ctx context.Context, transferPiece *pb.Trans
addrLimit := transferPiece.GetAddressedOrderLimit() addrLimit := transferPiece.GetAddressedOrderLimit()
pk := transferPiece.PrivateKey pk := transferPiece.PrivateKey
originalHash, originalOrderLimit, err := worker.store.GetHashAndLimit(ctx, worker.satelliteID, pieceID, reader) originalHash, originalOrderLimit, err := worker.store.GetHashAndLimit(ctx, worker.satelliteURL.ID, pieceID, reader)
if err != nil { if err != nil {
worker.log.Error("failed to get piece hash and order limit.", worker.log.Error("failed to get piece hash and order limit.",
zap.Stringer("Satellite ID", worker.satelliteID), zap.Stringer("Satellite ID", worker.satelliteURL.ID),
zap.Stringer("Piece ID", pieceID), zap.Stringer("Piece ID", pieceID),
zap.Error(errs.Wrap(err))) zap.Error(errs.Wrap(err)))
worker.handleFailure(ctx, pb.TransferFailed_UNKNOWN, pieceID, c.Send) worker.handleFailure(ctx, pb.TransferFailed_UNKNOWN, pieceID, c.Send)
@ -216,13 +211,13 @@ func (worker *Worker) transferPiece(ctx context.Context, transferPiece *pb.Trans
if err != nil { if err != nil {
if piecestore.ErrVerifyUntrusted.Has(err) { if piecestore.ErrVerifyUntrusted.Has(err) {
worker.log.Error("failed hash verification.", worker.log.Error("failed hash verification.",
zap.Stringer("Satellite ID", worker.satelliteID), zap.Stringer("Satellite ID", worker.satelliteURL.ID),
zap.Stringer("Piece ID", pieceID), zap.Stringer("Piece ID", pieceID),
zap.Error(errs.Wrap(err))) zap.Error(errs.Wrap(err)))
worker.handleFailure(ctx, pb.TransferFailed_HASH_VERIFICATION, pieceID, c.Send) worker.handleFailure(ctx, pb.TransferFailed_HASH_VERIFICATION, pieceID, c.Send)
} else { } else {
worker.log.Error("failed to put piece.", worker.log.Error("failed to put piece.",
zap.Stringer("Satellite ID", worker.satelliteID), zap.Stringer("Satellite ID", worker.satelliteURL.ID),
zap.Stringer("Piece ID", pieceID), zap.Stringer("Piece ID", pieceID),
zap.Error(errs.Wrap(err))) zap.Error(errs.Wrap(err)))
// TODO look at error type to decide on the transfer error // TODO look at error type to decide on the transfer error
@ -234,7 +229,7 @@ func (worker *Worker) transferPiece(ctx context.Context, transferPiece *pb.Trans
if !bytes.Equal(originalHash.Hash, pieceHash.Hash) { if !bytes.Equal(originalHash.Hash, pieceHash.Hash) {
worker.log.Error("piece hash from new storagenode does not match", worker.log.Error("piece hash from new storagenode does not match",
zap.Stringer("Storagenode ID", addrLimit.Limit.StorageNodeId), zap.Stringer("Storagenode ID", addrLimit.Limit.StorageNodeId),
zap.Stringer("Satellite ID", worker.satelliteID), zap.Stringer("Satellite ID", worker.satelliteURL.ID),
zap.Stringer("Piece ID", pieceID)) zap.Stringer("Piece ID", pieceID))
worker.handleFailure(ctx, pb.TransferFailed_HASH_VERIFICATION, pieceID, c.Send) worker.handleFailure(ctx, pb.TransferFailed_HASH_VERIFICATION, pieceID, c.Send)
return Error.New("piece hash from new storagenode does not match") return Error.New("piece hash from new storagenode does not match")
@ -242,7 +237,7 @@ func (worker *Worker) transferPiece(ctx context.Context, transferPiece *pb.Trans
if pieceHash.PieceId != addrLimit.Limit.PieceId { if pieceHash.PieceId != addrLimit.Limit.PieceId {
worker.log.Error("piece id from new storagenode does not match order limit", worker.log.Error("piece id from new storagenode does not match order limit",
zap.Stringer("Storagenode ID", addrLimit.Limit.StorageNodeId), zap.Stringer("Storagenode ID", addrLimit.Limit.StorageNodeId),
zap.Stringer("Satellite ID", worker.satelliteID), zap.Stringer("Satellite ID", worker.satelliteURL.ID),
zap.Stringer("Piece ID", pieceID)) zap.Stringer("Piece ID", pieceID))
worker.handleFailure(ctx, pb.TransferFailed_HASH_VERIFICATION, pieceID, c.Send) worker.handleFailure(ctx, pb.TransferFailed_HASH_VERIFICATION, pieceID, c.Send)
return Error.New("piece id from new storagenode does not match order limit") return Error.New("piece id from new storagenode does not match order limit")
@ -253,7 +248,7 @@ func (worker *Worker) transferPiece(ctx context.Context, transferPiece *pb.Trans
if err != nil { if err != nil {
worker.log.Error("invalid piece hash signature from new storagenode", worker.log.Error("invalid piece hash signature from new storagenode",
zap.Stringer("Storagenode ID", addrLimit.Limit.StorageNodeId), zap.Stringer("Storagenode ID", addrLimit.Limit.StorageNodeId),
zap.Stringer("Satellite ID", worker.satelliteID), zap.Stringer("Satellite ID", worker.satelliteURL.ID),
zap.Stringer("Piece ID", pieceID), zap.Stringer("Piece ID", pieceID),
zap.Error(errs.Wrap(err))) zap.Error(errs.Wrap(err)))
worker.handleFailure(ctx, pb.TransferFailed_HASH_VERIFICATION, pieceID, c.Send) worker.handleFailure(ctx, pb.TransferFailed_HASH_VERIFICATION, pieceID, c.Send)
@ -272,53 +267,53 @@ func (worker *Worker) transferPiece(ctx context.Context, transferPiece *pb.Trans
} }
worker.log.Info("piece transferred to new storagenode", worker.log.Info("piece transferred to new storagenode",
zap.Stringer("Storagenode ID", addrLimit.Limit.StorageNodeId), zap.Stringer("Storagenode ID", addrLimit.Limit.StorageNodeId),
zap.Stringer("Satellite ID", worker.satelliteID), zap.Stringer("Satellite ID", worker.satelliteURL.ID),
zap.Stringer("Piece ID", pieceID)) zap.Stringer("Piece ID", pieceID))
return c.Send(success) return c.Send(success)
} }
// deleteOnePiece deletes one piece stored for a satellite // deleteOnePiece deletes one piece stored for a satellite
func (worker *Worker) deleteOnePiece(ctx context.Context, pieceID storj.PieceID) error { func (worker *Worker) deleteOnePiece(ctx context.Context, pieceID storj.PieceID) error {
piece, err := worker.store.Reader(ctx, worker.satelliteID, pieceID) piece, err := worker.store.Reader(ctx, worker.satelliteURL.ID, pieceID)
if err != nil { if err != nil {
if !errs2.IsCanceled(err) { if !errs2.IsCanceled(err) {
worker.log.Debug("failed to retrieve piece info", zap.Stringer("Satellite ID", worker.satelliteID), worker.log.Debug("failed to retrieve piece info", zap.Stringer("Satellite ID", worker.satelliteURL.ID),
zap.Stringer("Piece ID", pieceID), zap.Error(err)) zap.Stringer("Piece ID", pieceID), zap.Error(err))
} }
return err return err
} }
err = worker.deletePiece(ctx, pieceID) err = worker.deletePiece(ctx, pieceID)
if err != nil { if err != nil {
worker.log.Debug("failed to retrieve piece info", zap.Stringer("Satellite ID", worker.satelliteID), zap.Error(err)) worker.log.Debug("failed to retrieve piece info", zap.Stringer("Satellite ID", worker.satelliteURL.ID), zap.Error(err))
return err return err
} }
// update graceful exit progress // update graceful exit progress
size := piece.Size() size := piece.Size()
return worker.satelliteDB.UpdateGracefulExit(ctx, worker.satelliteID, size) return worker.satelliteDB.UpdateGracefulExit(ctx, worker.satelliteURL.ID, size)
} }
// deletePiece deletes one piece stored for a satellite, without updating satellite Graceful Exit status // deletePiece deletes one piece stored for a satellite, without updating satellite Graceful Exit status
func (worker *Worker) deletePiece(ctx context.Context, pieceID storj.PieceID) error { func (worker *Worker) deletePiece(ctx context.Context, pieceID storj.PieceID) error {
err := worker.store.Delete(ctx, worker.satelliteID, pieceID) err := worker.store.Delete(ctx, worker.satelliteURL.ID, pieceID)
if err != nil { if err != nil {
worker.log.Debug("failed to delete a piece", worker.log.Debug("failed to delete a piece",
zap.Stringer("Satellite ID", worker.satelliteID), zap.Stringer("Satellite ID", worker.satelliteURL.ID),
zap.Stringer("Piece ID", pieceID), zap.Stringer("Piece ID", pieceID),
zap.Error(err)) zap.Error(err))
delErr := worker.store.DeleteFailed(ctx, pieces.ExpiredInfo{ delErr := worker.store.DeleteFailed(ctx, pieces.ExpiredInfo{
SatelliteID: worker.satelliteID, SatelliteID: worker.satelliteURL.ID,
PieceID: pieceID, PieceID: pieceID,
InPieceInfo: true, InPieceInfo: true,
}, time.Now().UTC()) }, time.Now().UTC())
if delErr != nil { if delErr != nil {
worker.log.Debug("failed to mark a deletion failure for a piece", worker.log.Debug("failed to mark a deletion failure for a piece",
zap.Stringer("Satellite ID", worker.satelliteID), zap.Stringer("Satellite ID", worker.satelliteURL.ID),
zap.Stringer("Piece ID", pieceID), zap.Error(err)) zap.Stringer("Piece ID", pieceID), zap.Error(err))
} }
return errs.Combine(err, delErr) return errs.Combine(err, delErr)
} }
worker.log.Debug("delete piece", worker.log.Debug("delete piece",
zap.Stringer("Satellite ID", worker.satelliteID), zap.Stringer("Satellite ID", worker.satelliteURL.ID),
zap.Stringer("Piece ID", pieceID)) zap.Stringer("Piece ID", pieceID))
return err return err
} }
@ -326,12 +321,12 @@ func (worker *Worker) deletePiece(ctx context.Context, pieceID storj.PieceID) er
// deleteAllPieces deletes pieces stored for a satellite // deleteAllPieces deletes pieces stored for a satellite
func (worker *Worker) deleteAllPieces(ctx context.Context) error { func (worker *Worker) deleteAllPieces(ctx context.Context) error {
var totalDeleted int64 var totalDeleted int64
err := worker.store.WalkSatellitePieces(ctx, worker.satelliteID, func(piece pieces.StoredPieceAccess) error { err := worker.store.WalkSatellitePieces(ctx, worker.satelliteURL.ID, func(piece pieces.StoredPieceAccess) error {
err := worker.deletePiece(ctx, piece.PieceID()) err := worker.deletePiece(ctx, piece.PieceID())
if err == nil { if err == nil {
_, size, err := piece.Size(ctx) _, size, err := piece.Size(ctx)
if err != nil { if err != nil {
worker.log.Debug("failed to retrieve piece info", zap.Stringer("Satellite ID", worker.satelliteID), worker.log.Debug("failed to retrieve piece info", zap.Stringer("Satellite ID", worker.satelliteURL.ID),
zap.Stringer("Piece ID", piece.PieceID()), zap.Error(err)) zap.Stringer("Piece ID", piece.PieceID()), zap.Error(err))
} }
totalDeleted += size totalDeleted += size
@ -339,10 +334,10 @@ func (worker *Worker) deleteAllPieces(ctx context.Context) error {
return err return err
}) })
if err != nil && !errs2.IsCanceled(err) { if err != nil && !errs2.IsCanceled(err) {
worker.log.Debug("failed to retrieve piece info", zap.Stringer("Satellite ID", worker.satelliteID), zap.Error(err)) worker.log.Debug("failed to retrieve piece info", zap.Stringer("Satellite ID", worker.satelliteURL.ID), zap.Error(err))
} }
// update graceful exit progress // update graceful exit progress
return worker.satelliteDB.UpdateGracefulExit(ctx, worker.satelliteID, totalDeleted) return worker.satelliteDB.UpdateGracefulExit(ctx, worker.satelliteURL.ID, totalDeleted)
} }
func (worker *Worker) handleFailure(ctx context.Context, transferError pb.TransferFailed_Error, pieceID pb.PieceID, send func(*pb.StorageNodeMessage) error) { func (worker *Worker) handleFailure(ctx context.Context, transferError pb.TransferFailed_Error, pieceID pb.PieceID, send func(*pb.StorageNodeMessage) error) {
@ -357,7 +352,7 @@ func (worker *Worker) handleFailure(ctx context.Context, transferError pb.Transf
sendErr := send(failure) sendErr := send(failure)
if sendErr != nil { if sendErr != nil {
worker.log.Error("unable to send failure.", zap.Stringer("Satellite ID", worker.satelliteID)) worker.log.Error("unable to send failure.", zap.Stringer("Satellite ID", worker.satelliteURL.ID))
} }
} }

View File

@ -69,7 +69,7 @@ func TestWorkerSuccess(t *testing.T) {
require.Len(t, queueItems, 1) require.Len(t, queueItems, 1)
// run the SN chore again to start processing transfers. // run the SN chore again to start processing transfers.
worker := gracefulexit.NewWorker(zaptest.NewLogger(t), exitingNode.Storage2.Store, exitingNode.DB.Satellites(), exitingNode.Dialer, satellite.ID(), satellite.Addr(), worker := gracefulexit.NewWorker(zaptest.NewLogger(t), exitingNode.Storage2.Store, exitingNode.DB.Satellites(), exitingNode.Dialer, satellite.NodeURL(),
gracefulexit.Config{ gracefulexit.Config{
ChoreInterval: 0, ChoreInterval: 0,
NumWorkers: 2, NumWorkers: 2,
@ -147,7 +147,7 @@ func TestWorkerTimeout(t *testing.T) {
store := pieces.NewStore(zaptest.NewLogger(t), storageNodeDB.Pieces(), nil, nil, storageNodeDB.PieceSpaceUsedDB(), pieces.DefaultConfig) store := pieces.NewStore(zaptest.NewLogger(t), storageNodeDB.Pieces(), nil, nil, storageNodeDB.PieceSpaceUsedDB(), pieces.DefaultConfig)
// run the SN chore again to start processing transfers. // run the SN chore again to start processing transfers.
worker := gracefulexit.NewWorker(zaptest.NewLogger(t), store, exitingNode.DB.Satellites(), exitingNode.Dialer, satellite.ID(), satellite.Addr(), worker := gracefulexit.NewWorker(zaptest.NewLogger(t), store, exitingNode.DB.Satellites(), exitingNode.Dialer, satellite.NodeURL(),
gracefulexit.Config{ gracefulexit.Config{
ChoreInterval: 0, ChoreInterval: 0,
NumWorkers: 2, NumWorkers: 2,
@ -209,7 +209,7 @@ func TestWorkerFailure_IneligibleNodeAge(t *testing.T) {
err = exitingNode.DB.Satellites().InitiateGracefulExit(ctx, satellite.ID(), time.Now(), piecesContentSize) err = exitingNode.DB.Satellites().InitiateGracefulExit(ctx, satellite.ID(), time.Now(), piecesContentSize)
require.NoError(t, err) require.NoError(t, err)
worker := gracefulexit.NewWorker(zaptest.NewLogger(t), exitingNode.Storage2.Store, exitingNode.DB.Satellites(), exitingNode.Dialer, satellite.ID(), satellite.Addr(), worker := gracefulexit.NewWorker(zaptest.NewLogger(t), exitingNode.Storage2.Store, exitingNode.DB.Satellites(), exitingNode.Dialer, satellite.NodeURL(),
gracefulexit.Config{ gracefulexit.Config{
ChoreInterval: 0, ChoreInterval: 0,
NumWorkers: 2, NumWorkers: 2,

View File

@ -304,15 +304,12 @@ func (service *Service) AllHeldbackHistory(ctx context.Context, id storj.NodeID)
func (service *Service) dial(ctx context.Context, satelliteID storj.NodeID) (_ *Client, err error) { func (service *Service) dial(ctx context.Context, satelliteID storj.NodeID) (_ *Client, err error) {
defer mon.Task()(&ctx)(&err) defer mon.Task()(&ctx)(&err)
address, err := service.trust.GetAddress(ctx, satelliteID) nodeurl, err := service.trust.GetNodeURL(ctx, satelliteID)
if err != nil { if err != nil {
return nil, errs.New("unable to find satellite %s: %w", satelliteID, err) return nil, errs.New("unable to find satellite %s: %w", satelliteID, err)
} }
conn, err := service.dialer.DialNodeURL(ctx, storj.NodeURL{ conn, err := service.dialer.DialNodeURL(ctx, nodeurl)
ID: satelliteID,
Address: address,
})
if err != nil { if err != nil {
return nil, errs.New("unable to connect to the satellite %s: %w", satelliteID, err) return nil, errs.New("unable to connect to the satellite %s: %w", satelliteID, err)
} }

View File

@ -145,15 +145,12 @@ func (s *Service) GetPricingModel(ctx context.Context, satelliteID storj.NodeID)
func (s *Service) dial(ctx context.Context, satelliteID storj.NodeID) (_ *Client, err error) { func (s *Service) dial(ctx context.Context, satelliteID storj.NodeID) (_ *Client, err error) {
defer mon.Task()(&ctx)(&err) defer mon.Task()(&ctx)(&err)
address, err := s.trust.GetAddress(ctx, satelliteID) nodeurl, err := s.trust.GetNodeURL(ctx, satelliteID)
if err != nil { if err != nil {
return nil, errs.New("unable to find satellite %s: %w", satelliteID, err) return nil, errs.New("unable to find satellite %s: %w", satelliteID, err)
} }
conn, err := s.dialer.DialNodeURL(ctx, storj.NodeURL{ conn, err := s.dialer.DialNodeURL(ctx, nodeurl)
ID: satelliteID,
Address: address,
})
if err != nil { if err != nil {
return nil, errs.New("unable to connect to the satellite %s: %w", satelliteID, err) return nil, errs.New("unable to connect to the satellite %s: %w", satelliteID, err)
} }

View File

@ -271,15 +271,12 @@ func (service *Service) settle(ctx context.Context, log *zap.Logger, satelliteID
log.Info("sending", zap.Int("count", len(orders))) log.Info("sending", zap.Int("count", len(orders)))
defer log.Info("finished") defer log.Info("finished")
address, err := service.trust.GetAddress(ctx, satelliteID) nodeurl, err := service.trust.GetNodeURL(ctx, satelliteID)
if err != nil { if err != nil {
return OrderError.New("unable to get satellite address: %w", err) return OrderError.New("unable to get satellite address: %w", err)
} }
conn, err := service.dialer.DialNodeURL(ctx, storj.NodeURL{ conn, err := service.dialer.DialNodeURL(ctx, nodeurl)
ID: satelliteID,
Address: address,
})
if err != nil { if err != nil {
return OrderError.New("unable to connect to the satellite: %w", err) return OrderError.New("unable to connect to the satellite: %w", err)
} }

View File

@ -103,14 +103,11 @@ func (localTime *LocalTime) Check(ctx context.Context) (err error) {
func (localTime *LocalTime) getSatelliteTime(ctx context.Context, satelliteID storj.NodeID) (_ *pb.GetTimeResponse, err error) { func (localTime *LocalTime) getSatelliteTime(ctx context.Context, satelliteID storj.NodeID) (_ *pb.GetTimeResponse, err error) {
defer mon.Task()(&ctx)(&err) defer mon.Task()(&ctx)(&err)
address, err := localTime.trust.GetAddress(ctx, satelliteID) nodeurl, err := localTime.trust.GetNodeURL(ctx, satelliteID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
conn, err := localTime.dialer.DialNodeURL(ctx, storj.NodeURL{ conn, err := localTime.dialer.DialNodeURL(ctx, nodeurl)
ID: satelliteID,
Address: address,
})
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -160,15 +160,15 @@ func (pool *Pool) GetSatellites(ctx context.Context) (satellites []storj.NodeID)
return satellites return satellites
} }
// GetAddress returns the address of a satellite in the trusted list // GetNodeURL returns the node url of a satellite in the trusted list.
func (pool *Pool) GetAddress(ctx context.Context, id storj.NodeID) (_ string, err error) { func (pool *Pool) GetNodeURL(ctx context.Context, id storj.NodeID) (_ storj.NodeURL, err error) {
defer mon.Task()(&ctx)(&err) defer mon.Task()(&ctx)(&err)
info, err := pool.getInfo(id) info, err := pool.getInfo(id)
if err != nil { if err != nil {
return "", err return storj.NodeURL{}, err
} }
return info.url.Address, nil return info.url, nil
} }
// Refresh refreshes the set of trusted satellites in the pool. Concurrent // Refresh refreshes the set of trusted satellites in the pool. Concurrent

View File

@ -151,9 +151,9 @@ func TestPoolGetAddress(t *testing.T) {
id := testrand.NodeID() id := testrand.NodeID()
// Assert the ID is not trusted // Assert the ID is not trusted
address, err := pool.GetAddress(context.Background(), id) nodeurl, err := pool.GetNodeURL(context.Background(), id)
require.EqualError(t, err, fmt.Sprintf("trust: satellite %q is untrusted", id)) require.EqualError(t, err, fmt.Sprintf("trust: satellite %q is untrusted", id))
require.Empty(t, address) require.Empty(t, nodeurl)
// Refresh the pool with the new trust entry // Refresh the pool with the new trust entry
source.entries = []trust.Entry{ source.entries = []trust.Entry{
@ -168,9 +168,10 @@ func TestPoolGetAddress(t *testing.T) {
require.NoError(t, pool.Refresh(context.Background())) require.NoError(t, pool.Refresh(context.Background()))
// Assert the ID is now trusted and the correct address is returned // Assert the ID is now trusted and the correct address is returned
address, err = pool.GetAddress(context.Background(), id) nodeurl, err = pool.GetNodeURL(context.Background(), id)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, "foo.test:7777", address) require.Equal(t, id, nodeurl.ID)
require.Equal(t, "foo.test:7777", nodeurl.Address)
// Refresh the pool with an updated trust entry with a new address // Refresh the pool with an updated trust entry with a new address
source.entries = []trust.Entry{ source.entries = []trust.Entry{
@ -185,9 +186,10 @@ func TestPoolGetAddress(t *testing.T) {
require.NoError(t, pool.Refresh(context.Background())) require.NoError(t, pool.Refresh(context.Background()))
// Assert the ID is now trusted and the correct address is returned // Assert the ID is now trusted and the correct address is returned
address, err = pool.GetAddress(context.Background(), id) nodeurl, err = pool.GetNodeURL(context.Background(), id)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, "bar.test:7777", address) require.Equal(t, id, nodeurl.ID)
require.Equal(t, "bar.test:7777", nodeurl.Address)
} }
func newPoolTest(t *testing.T) (*testcontext.Context, *trust.Pool, *fakeSource, *fakeIdentityResolver) { func newPoolTest(t *testing.T) (*testcontext.Context, *trust.Pool, *fakeSource, *fakeIdentityResolver) {