From 92deef4f348d38e82c85c61b0cb87f78fbf1b76c Mon Sep 17 00:00:00 2001 From: igor gaidaienko Date: Thu, 29 Jul 2021 17:30:57 +0300 Subject: [PATCH] Revert "storagenode/payouts: historical payouts use satellitesDB instead of trustPool" This reverts commit 4a98dd40e2d227e76e5e13c586f6bf614d736ba2. --- storagenode/multinode/payout_test.go | 6 +- storagenode/payouts/db_test.go | 7 +- storagenode/payouts/service.go | 94 ++++++++++++++----------- storagenode/payouts/service_test.go | 92 ++++++++++++++++++++++-- storagenode/peer.go | 1 + storagenode/satellites/db_test.go | 4 +- storagenode/satellites/satellites.go | 4 +- storagenode/storagenodedb/satellites.go | 4 +- 8 files changed, 151 insertions(+), 61 deletions(-) diff --git a/storagenode/multinode/payout_test.go b/storagenode/multinode/payout_test.go index 78564007e..0da3dbb93 100644 --- a/storagenode/multinode/payout_test.go +++ b/storagenode/multinode/payout_test.go @@ -66,7 +66,7 @@ func TestPayoutsEndpointSummary(t *testing.T) { require.NoError(t, err) require.NoError(t, trustPool.Refresh(ctx)) - payoutsService, err := payouts.NewService(log, db.Payout(), db.Reputation(), db.Satellites()) + payoutsService, err := payouts.NewService(log, db.Payout(), db.Reputation(), db.Satellites(), nil) require.NoError(t, err) estimatedPayoutsService := estimatedpayouts.NewService(db.Bandwidth(), db.Reputation(), db.StorageUsage(), db.Pricing(), db.Satellites(), trustPool) endpoint := multinode.NewPayoutEndpoint(log, service, db.Payout(), estimatedPayoutsService, payoutsService) @@ -159,7 +159,7 @@ func TestPayoutsEndpointEstimations(t *testing.T) { require.NoError(t, err) require.NoError(t, trustPool.Refresh(ctx)) - payoutsService, err := payouts.NewService(log, db.Payout(), db.Reputation(), db.Satellites()) + payoutsService, err := payouts.NewService(log, db.Payout(), db.Reputation(), db.Satellites(), nil) require.NoError(t, err) estimatedPayoutsService := estimatedpayouts.NewService(db.Bandwidth(), db.Reputation(), db.StorageUsage(), db.Pricing(), db.Satellites(), trustPool) endpoint := multinode.NewPayoutEndpoint(log, service, db.Payout(), estimatedPayoutsService, payoutsService) @@ -230,7 +230,7 @@ func TestPayoutsUndistributedEndpoint(t *testing.T) { require.NoError(t, err) require.NoError(t, trustPool.Refresh(ctx)) - payoutsService, err := payouts.NewService(log, db.Payout(), db.Reputation(), db.Satellites()) + payoutsService, err := payouts.NewService(log, db.Payout(), db.Reputation(), db.Satellites(), nil) require.NoError(t, err) estimatedPayoutsService := estimatedpayouts.NewService(db.Bandwidth(), db.Reputation(), db.StorageUsage(), db.Pricing(), db.Satellites(), trustPool) endpoint := multinode.NewPayoutEndpoint(log, service, db.Payout(), estimatedPayoutsService, payoutsService) diff --git a/storagenode/payouts/db_test.go b/storagenode/payouts/db_test.go index 2cfb5dd0e..bd82960e1 100644 --- a/storagenode/payouts/db_test.go +++ b/storagenode/payouts/db_test.go @@ -10,7 +10,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.uber.org/zap/zaptest" "storj.io/common/testcontext" "storj.io/common/testrand" @@ -212,8 +211,7 @@ func TestSatellitePayStubPeriodCached(t *testing.T) { heldAmountDB := db.Payout() reputationDB := db.Reputation() satellitesDB := db.Satellites() - log := zaptest.NewLogger(t) - service, err := payouts.NewService(log, heldAmountDB, reputationDB, satellitesDB) + service, err := payouts.NewService(nil, heldAmountDB, reputationDB, satellitesDB, nil) require.NoError(t, err) payStub := payouts.PayStub{ @@ -264,8 +262,7 @@ func TestAllPayStubPeriodCached(t *testing.T) { heldAmountDB := db.Payout() reputationDB := db.Reputation() satellitesDB := db.Satellites() - log := zaptest.NewLogger(t) - service, err := payouts.NewService(log, heldAmountDB, reputationDB, satellitesDB) + service, err := payouts.NewService(nil, heldAmountDB, reputationDB, satellitesDB, nil) require.NoError(t, err) payStub := payouts.PayStub{ diff --git a/storagenode/payouts/service.go b/storagenode/payouts/service.go index c524e0f44..4ea221503 100644 --- a/storagenode/payouts/service.go +++ b/storagenode/payouts/service.go @@ -20,6 +20,7 @@ import ( "storj.io/storj/private/date" "storj.io/storj/storagenode/reputation" "storj.io/storj/storagenode/satellites" + "storj.io/storj/storagenode/trust" ) var ( @@ -38,18 +39,28 @@ var ( type Service struct { log *zap.Logger + stefanSatellite storj.NodeID + db DB reputationDB reputation.DB satellitesDB satellites.DB + trust *trust.Pool } // NewService creates new instance of service. -func NewService(log *zap.Logger, db DB, reputationDB reputation.DB, satelliteDB satellites.DB) (_ *Service, err error) { +func NewService(log *zap.Logger, db DB, reputationDB reputation.DB, satelliteDB satellites.DB, trust *trust.Pool) (_ *Service, err error) { + id, err := storj.NodeIDFromString("118UWpMCHzs6CvSgWd9BfFVjw5K9pZbJjkfZJexMtSkmKxvvAW") + if err != nil { + return &Service{}, err + } + return &Service{ - log: log, - db: db, - reputationDB: reputationDB, - satellitesDB: satelliteDB, + log: log, + stefanSatellite: id, + db: db, + reputationDB: reputationDB, + satellitesDB: satelliteDB, + trust: trust, }, nil } @@ -162,14 +173,13 @@ func (service *Service) AllPeriods(ctx context.Context) (_ []string, err error) // AllHeldbackHistory retrieves heldback history for all satellites from storagenode database. func (service *Service) AllHeldbackHistory(ctx context.Context) (result []SatelliteHeldHistory, err error) { defer mon.Task()(&ctx)(&err) - satellitesURLs, err := service.satellitesDB.GetSatellitesURLs(ctx) - if err != nil { - return nil, ErrPayoutService.Wrap(err) - } - for _, satelliteURL := range satellitesURLs { + satellitesIDs := service.trust.GetSatellites(ctx) + + satellitesIDs = append(satellitesIDs, service.stefanSatellite) + for i := 0; i < len(satellitesIDs); i++ { var history SatelliteHeldHistory - helds, err := service.db.SatellitesHeldbackHistory(ctx, satelliteURL.ID) + helds, err := service.db.SatellitesHeldbackHistory(ctx, satellitesIDs[i]) if err != nil { return nil, ErrPayoutService.Wrap(err) } @@ -178,7 +188,7 @@ func (service *Service) AllHeldbackHistory(ctx context.Context) (result []Satell continue } - disposed, err := service.db.SatellitesDisposedHistory(ctx, satelliteURL.ID) + disposed, err := service.db.SatellitesDisposedHistory(ctx, satellitesIDs[i]) if err != nil { return nil, ErrPayoutService.Wrap(err) } @@ -199,14 +209,18 @@ func (service *Service) AllHeldbackHistory(ctx context.Context) (result []Satell } history.TotalDisposed = disposed - history.SatelliteID = satelliteURL.ID - history.SatelliteName = satelliteURL.Address + history.SatelliteID = satellitesIDs[i] + history.SatelliteName = "stefan-benten" - if satelliteURL.Address == "" { - history.SatelliteName = satelliteURL.ID.String() + if satellitesIDs[i] != service.stefanSatellite { + url, err := service.trust.GetNodeURL(ctx, satellitesIDs[i]) + if err != nil { + return nil, ErrPayoutService.Wrap(err) + } + history.SatelliteName = url.Address } - stats, err := service.reputationDB.Get(ctx, satelliteURL.ID) + stats, err := service.reputationDB.Get(ctx, satellitesIDs[i]) if err != nil { return nil, ErrPayoutService.Wrap(err) } @@ -222,14 +236,12 @@ func (service *Service) AllHeldbackHistory(ctx context.Context) (result []Satell func (service *Service) AllSatellitesPayoutPeriod(ctx context.Context, period string) (result []SatellitePayoutForPeriod, err error) { defer mon.Task()(&ctx)(&err) - satelliteURLs, err := service.satellitesDB.GetSatellitesURLs(ctx) - if err != nil { - return nil, ErrPayoutService.Wrap(err) - } - for _, satelliteURL := range satelliteURLs { - var payoutForPeriod SatellitePayoutForPeriod - paystub, err := service.db.GetPayStub(ctx, satelliteURL.ID, period) + satelliteIDs := service.trust.GetSatellites(ctx) + satelliteIDs = append(satelliteIDs, service.stefanSatellite) + for i := 0; i < len(satelliteIDs); i++ { + var payoutForPeriod SatellitePayoutForPeriod + paystub, err := service.db.GetPayStub(ctx, satelliteIDs[i], period) if err != nil { if ErrNoPayStubForPeriod.Has(err) { continue @@ -237,19 +249,19 @@ func (service *Service) AllSatellitesPayoutPeriod(ctx context.Context, period st return nil, ErrPayoutService.Wrap(err) } - receipt, err := service.db.GetReceipt(ctx, satelliteURL.ID, period) + receipt, err := service.db.GetReceipt(ctx, satelliteIDs[i], period) if err != nil { if !ErrNoPayStubForPeriod.Has(err) { return nil, ErrPayoutService.Wrap(err) } } - stats, err := service.reputationDB.Get(ctx, satelliteURL.ID) + stats, err := service.reputationDB.Get(ctx, satelliteIDs[i]) if err != nil { return nil, ErrPayoutService.Wrap(err) } - satellite, err := service.satellitesDB.GetSatellite(ctx, satelliteURL.ID) + satellite, err := service.satellitesDB.GetSatellite(ctx, satelliteIDs[i]) if err != nil { if errors.Is(err, sql.ErrNoRows) { payoutForPeriod.IsExitComplete = false @@ -258,6 +270,15 @@ func (service *Service) AllSatellitesPayoutPeriod(ctx context.Context, period st return nil, ErrPayoutService.Wrap(err) } + if satelliteIDs[i] != service.stefanSatellite { + url, err := service.trust.GetNodeURL(ctx, satelliteIDs[i]) + if err != nil { + return nil, ErrPayoutService.Wrap(err) + } + + payoutForPeriod.SatelliteURL = url.Address + } + if satellite.Status == satellites.ExitSucceeded { payoutForPeriod.IsExitComplete = true } @@ -283,17 +304,12 @@ func (service *Service) AllSatellitesPayoutPeriod(ctx context.Context, period st payoutForPeriod.Age = int64(date.MonthsCountSince(stats.JoinedAt)) payoutForPeriod.Disposed = paystub.Disposed payoutForPeriod.Earned = earned - payoutForPeriod.SatelliteID = satelliteURL.ID.String() - payoutForPeriod.SatelliteURL = satelliteURL.Address + payoutForPeriod.SatelliteID = satelliteIDs[i].String() payoutForPeriod.SurgePercent = paystub.SurgePercent payoutForPeriod.Paid = paystub.Paid payoutForPeriod.HeldPercent = heldPercent payoutForPeriod.Distributed = paystub.Distributed - if satelliteURL.Address == "" { - payoutForPeriod.SatelliteURL = satelliteURL.ID.String() - } - result = append(result, payoutForPeriod) } @@ -309,22 +325,20 @@ func (service *Service) HeldAmountHistory(ctx context.Context) (_ []HeldAmountHi return nil, ErrPayoutService.Wrap(err) } - satelliteURLs, err := service.satellitesDB.GetSatellitesURLs(ctx) - if err != nil { - return nil, ErrPayoutService.Wrap(err) - } - for _, satelliteURL := range satelliteURLs { + trustedSatellites := service.trust.GetSatellites(ctx) + + for _, trustedSatellite := range trustedSatellites { var found bool for _, satelliteHeldHistory := range heldHistory { - if satelliteURL.ID.Compare(satelliteHeldHistory.SatelliteID) == 0 { + if trustedSatellite.Compare(satelliteHeldHistory.SatelliteID) == 0 { found = true break } } if !found { heldHistory = append(heldHistory, HeldAmountHistory{ - SatelliteID: satelliteURL.ID, + SatelliteID: trustedSatellite, }) } } diff --git a/storagenode/payouts/service_test.go b/storagenode/payouts/service_test.go index 11b030066..f7f88427b 100644 --- a/storagenode/payouts/service_test.go +++ b/storagenode/payouts/service_test.go @@ -4,16 +4,22 @@ package payouts_test import ( + "context" + "errors" + "sync" "testing" "github.com/stretchr/testify/require" "go.uber.org/zap/zaptest" + "storj.io/common/identity" + "storj.io/common/storj" "storj.io/common/testcontext" "storj.io/common/testrand" "storj.io/storj/storagenode" "storj.io/storj/storagenode/payouts" "storj.io/storj/storagenode/storagenodedb/storagenodedbtest" + "storj.io/storj/storagenode/trust" ) func TestServiceHeldAmountHistory(t *testing.T) { @@ -21,17 +27,42 @@ func TestServiceHeldAmountHistory(t *testing.T) { log := zaptest.NewLogger(t) payoutsDB := db.Payout() satellitesDB := db.Satellites() + source := &fakeSource{} + pool, err := trust.NewPool(log, newFakeIdentityResolver(), trust.Config{ + Sources: []trust.Source{source}, + CachePath: ctx.File("trust-cache.json"), + }, satellitesDB) + require.NoError(t, err) satelliteID1 := testrand.NodeID() satelliteID2 := testrand.NodeID() satelliteID3 := testrand.NodeID() - err := satellitesDB.SetAddress(ctx, satelliteID1, "foo.test:7777") - require.NoError(t, err) - err = satellitesDB.SetAddress(ctx, satelliteID2, "bar.test:7777") - require.NoError(t, err) - err = satellitesDB.SetAddress(ctx, satelliteID3, "baz.test:7777") - require.NoError(t, err) + // populate pool + source.entries = []trust.Entry{ + { + SatelliteURL: trust.SatelliteURL{ + ID: satelliteID1, + Host: "foo.test", + Port: 7777, + }, + }, + { + SatelliteURL: trust.SatelliteURL{ + ID: satelliteID2, + Host: "bar.test", + Port: 7777, + }, + }, + { + SatelliteURL: trust.SatelliteURL{ + ID: satelliteID3, + Host: "baz.test", + Port: 7777, + }, + }, + } + require.NoError(t, pool.Refresh(context.Background())) // add paystubs paystubs := []payouts.PayStub{ @@ -93,7 +124,7 @@ func TestServiceHeldAmountHistory(t *testing.T) { }, } - service, err := payouts.NewService(log, payoutsDB, db.Reputation(), db.Satellites()) + service, err := payouts.NewService(log, payoutsDB, db.Reputation(), db.Satellites(), pool) require.NoError(t, err) history, err := service.HeldAmountHistory(ctx) @@ -101,3 +132,50 @@ func TestServiceHeldAmountHistory(t *testing.T) { require.ElementsMatch(t, expected, history) }) } + +type fakeSource struct { + name string + static bool + entries []trust.Entry + err error +} + +func (s *fakeSource) String() string { + return s.name +} + +func (s *fakeSource) Static() bool { + return s.static +} + +func (s *fakeSource) FetchEntries(context.Context) ([]trust.Entry, error) { + return s.entries, s.err +} + +type fakeIdentityResolver struct { + mu sync.Mutex + identities map[storj.NodeURL]*identity.PeerIdentity +} + +func newFakeIdentityResolver() *fakeIdentityResolver { + return &fakeIdentityResolver{ + identities: make(map[storj.NodeURL]*identity.PeerIdentity), + } +} + +func (resolver *fakeIdentityResolver) SetIdentity(url storj.NodeURL, identity *identity.PeerIdentity) { + resolver.mu.Lock() + defer resolver.mu.Unlock() + resolver.identities[url] = identity +} + +func (resolver *fakeIdentityResolver) ResolveIdentity(ctx context.Context, url storj.NodeURL) (*identity.PeerIdentity, error) { + resolver.mu.Lock() + defer resolver.mu.Unlock() + + identity := resolver.identities[url] + if identity == nil { + return nil, errors.New("no identity") + } + return identity, nil +} diff --git a/storagenode/peer.go b/storagenode/peer.go index 5842877f8..b2a2e6b41 100644 --- a/storagenode/peer.go +++ b/storagenode/peer.go @@ -569,6 +569,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten peer.DB.Payout(), peer.DB.Reputation(), peer.DB.Satellites(), + peer.Storage2.Trust, ) if err != nil { return nil, errs.Combine(err, peer.Close()) diff --git a/storagenode/satellites/db_test.go b/storagenode/satellites/db_test.go index 6e2c71a8b..636813ef6 100644 --- a/storagenode/satellites/db_test.go +++ b/storagenode/satellites/db_test.go @@ -22,14 +22,14 @@ func TestSatellitesDB(t *testing.T) { err := satellitesDB.SetAddress(ctx, id, "test_addr1") require.NoError(t, err) - satellites, err := satellitesDB.GetSatellitesURLs(ctx) + satellites, err := satellitesDB.GetSatellitesUrls(ctx) require.NoError(t, err) require.Equal(t, satellites[0].Address, "test_addr1") err = satellitesDB.SetAddress(ctx, id, "test_addr2") require.NoError(t, err) - satellites, err = satellitesDB.GetSatellitesURLs(ctx) + satellites, err = satellitesDB.GetSatellitesUrls(ctx) require.NoError(t, err) require.Equal(t, satellites[0].Address, "test_addr2") }) diff --git a/storagenode/satellites/satellites.go b/storagenode/satellites/satellites.go index 06435c3ab..9e08c422e 100644 --- a/storagenode/satellites/satellites.go +++ b/storagenode/satellites/satellites.go @@ -52,8 +52,8 @@ type DB interface { SetAddress(ctx context.Context, satelliteID storj.NodeID, address string) error // GetSatellite retrieves that satellite by ID GetSatellite(ctx context.Context, satelliteID storj.NodeID) (satellite Satellite, err error) - // GetSatellitesURLs retrieves all satellite's id and urls. - GetSatellitesURLs(ctx context.Context) (satelliteURLs []storj.NodeURL, err error) + // GetSatellitesUrls retrieves all satellite's id and urls. + GetSatellitesUrls(ctx context.Context) (satelliteURLs []storj.NodeURL, err error) // InitiateGracefulExit updates the database to reflect the beginning of a graceful exit InitiateGracefulExit(ctx context.Context, satelliteID storj.NodeID, intitiatedAt time.Time, startingDiskUsage int64) error // CancelGracefulExit removes that satellite by ID diff --git a/storagenode/storagenodedb/satellites.go b/storagenode/storagenodedb/satellites.go index 716e50cb2..302b810e1 100644 --- a/storagenode/storagenodedb/satellites.go +++ b/storagenode/storagenodedb/satellites.go @@ -59,8 +59,8 @@ func (db *satellitesDB) GetSatellite(ctx context.Context, satelliteID storj.Node return satellite, rows.Err() } -// GetSatellitesURLs retrieves all satellite's id and urls. -func (db *satellitesDB) GetSatellitesURLs(ctx context.Context) (satelliteURLs []storj.NodeURL, err error) { +// GetSatellitesUrls retrieves all satellite's id and urls. +func (db *satellitesDB) GetSatellitesUrls(ctx context.Context) (satelliteURLs []storj.NodeURL, err error) { defer mon.Task()(&ctx)(&err) query := `SELECT