Revert "storagenode/payouts: historical payouts use satellitesDB instead of trustPool"

This reverts commit 4a98dd40e2.
This commit is contained in:
igor gaidaienko 2021-07-29 17:30:57 +03:00 committed by Igor
parent 42a0b92404
commit 92deef4f34
8 changed files with 151 additions and 61 deletions

View File

@ -66,7 +66,7 @@ func TestPayoutsEndpointSummary(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, trustPool.Refresh(ctx)) require.NoError(t, trustPool.Refresh(ctx))
payoutsService, err := payouts.NewService(log, db.Payout(), db.Reputation(), db.Satellites()) payoutsService, err := payouts.NewService(log, db.Payout(), db.Reputation(), db.Satellites(), nil)
require.NoError(t, err) require.NoError(t, err)
estimatedPayoutsService := estimatedpayouts.NewService(db.Bandwidth(), db.Reputation(), db.StorageUsage(), db.Pricing(), db.Satellites(), trustPool) estimatedPayoutsService := estimatedpayouts.NewService(db.Bandwidth(), db.Reputation(), db.StorageUsage(), db.Pricing(), db.Satellites(), trustPool)
endpoint := multinode.NewPayoutEndpoint(log, service, db.Payout(), estimatedPayoutsService, payoutsService) endpoint := multinode.NewPayoutEndpoint(log, service, db.Payout(), estimatedPayoutsService, payoutsService)
@ -159,7 +159,7 @@ func TestPayoutsEndpointEstimations(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, trustPool.Refresh(ctx)) require.NoError(t, trustPool.Refresh(ctx))
payoutsService, err := payouts.NewService(log, db.Payout(), db.Reputation(), db.Satellites()) payoutsService, err := payouts.NewService(log, db.Payout(), db.Reputation(), db.Satellites(), nil)
require.NoError(t, err) require.NoError(t, err)
estimatedPayoutsService := estimatedpayouts.NewService(db.Bandwidth(), db.Reputation(), db.StorageUsage(), db.Pricing(), db.Satellites(), trustPool) estimatedPayoutsService := estimatedpayouts.NewService(db.Bandwidth(), db.Reputation(), db.StorageUsage(), db.Pricing(), db.Satellites(), trustPool)
endpoint := multinode.NewPayoutEndpoint(log, service, db.Payout(), estimatedPayoutsService, payoutsService) endpoint := multinode.NewPayoutEndpoint(log, service, db.Payout(), estimatedPayoutsService, payoutsService)
@ -230,7 +230,7 @@ func TestPayoutsUndistributedEndpoint(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, trustPool.Refresh(ctx)) require.NoError(t, trustPool.Refresh(ctx))
payoutsService, err := payouts.NewService(log, db.Payout(), db.Reputation(), db.Satellites()) payoutsService, err := payouts.NewService(log, db.Payout(), db.Reputation(), db.Satellites(), nil)
require.NoError(t, err) require.NoError(t, err)
estimatedPayoutsService := estimatedpayouts.NewService(db.Bandwidth(), db.Reputation(), db.StorageUsage(), db.Pricing(), db.Satellites(), trustPool) estimatedPayoutsService := estimatedpayouts.NewService(db.Bandwidth(), db.Reputation(), db.StorageUsage(), db.Pricing(), db.Satellites(), trustPool)
endpoint := multinode.NewPayoutEndpoint(log, service, db.Payout(), estimatedPayoutsService, payoutsService) endpoint := multinode.NewPayoutEndpoint(log, service, db.Payout(), estimatedPayoutsService, payoutsService)

View File

@ -10,7 +10,6 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
"storj.io/common/testcontext" "storj.io/common/testcontext"
"storj.io/common/testrand" "storj.io/common/testrand"
@ -212,8 +211,7 @@ func TestSatellitePayStubPeriodCached(t *testing.T) {
heldAmountDB := db.Payout() heldAmountDB := db.Payout()
reputationDB := db.Reputation() reputationDB := db.Reputation()
satellitesDB := db.Satellites() satellitesDB := db.Satellites()
log := zaptest.NewLogger(t) service, err := payouts.NewService(nil, heldAmountDB, reputationDB, satellitesDB, nil)
service, err := payouts.NewService(log, heldAmountDB, reputationDB, satellitesDB)
require.NoError(t, err) require.NoError(t, err)
payStub := payouts.PayStub{ payStub := payouts.PayStub{
@ -264,8 +262,7 @@ func TestAllPayStubPeriodCached(t *testing.T) {
heldAmountDB := db.Payout() heldAmountDB := db.Payout()
reputationDB := db.Reputation() reputationDB := db.Reputation()
satellitesDB := db.Satellites() satellitesDB := db.Satellites()
log := zaptest.NewLogger(t) service, err := payouts.NewService(nil, heldAmountDB, reputationDB, satellitesDB, nil)
service, err := payouts.NewService(log, heldAmountDB, reputationDB, satellitesDB)
require.NoError(t, err) require.NoError(t, err)
payStub := payouts.PayStub{ payStub := payouts.PayStub{

View File

@ -20,6 +20,7 @@ import (
"storj.io/storj/private/date" "storj.io/storj/private/date"
"storj.io/storj/storagenode/reputation" "storj.io/storj/storagenode/reputation"
"storj.io/storj/storagenode/satellites" "storj.io/storj/storagenode/satellites"
"storj.io/storj/storagenode/trust"
) )
var ( var (
@ -38,18 +39,28 @@ var (
type Service struct { type Service struct {
log *zap.Logger log *zap.Logger
stefanSatellite storj.NodeID
db DB db DB
reputationDB reputation.DB reputationDB reputation.DB
satellitesDB satellites.DB satellitesDB satellites.DB
trust *trust.Pool
} }
// NewService creates new instance of service. // NewService creates new instance of service.
func NewService(log *zap.Logger, db DB, reputationDB reputation.DB, satelliteDB satellites.DB) (_ *Service, err error) { func NewService(log *zap.Logger, db DB, reputationDB reputation.DB, satelliteDB satellites.DB, trust *trust.Pool) (_ *Service, err error) {
id, err := storj.NodeIDFromString("118UWpMCHzs6CvSgWd9BfFVjw5K9pZbJjkfZJexMtSkmKxvvAW")
if err != nil {
return &Service{}, err
}
return &Service{ return &Service{
log: log, log: log,
db: db, stefanSatellite: id,
reputationDB: reputationDB, db: db,
satellitesDB: satelliteDB, reputationDB: reputationDB,
satellitesDB: satelliteDB,
trust: trust,
}, nil }, nil
} }
@ -162,14 +173,13 @@ func (service *Service) AllPeriods(ctx context.Context) (_ []string, err error)
// AllHeldbackHistory retrieves heldback history for all satellites from storagenode database. // AllHeldbackHistory retrieves heldback history for all satellites from storagenode database.
func (service *Service) AllHeldbackHistory(ctx context.Context) (result []SatelliteHeldHistory, err error) { func (service *Service) AllHeldbackHistory(ctx context.Context) (result []SatelliteHeldHistory, err error) {
defer mon.Task()(&ctx)(&err) defer mon.Task()(&ctx)(&err)
satellitesURLs, err := service.satellitesDB.GetSatellitesURLs(ctx) satellitesIDs := service.trust.GetSatellites(ctx)
if err != nil {
return nil, ErrPayoutService.Wrap(err) satellitesIDs = append(satellitesIDs, service.stefanSatellite)
} for i := 0; i < len(satellitesIDs); i++ {
for _, satelliteURL := range satellitesURLs {
var history SatelliteHeldHistory var history SatelliteHeldHistory
helds, err := service.db.SatellitesHeldbackHistory(ctx, satelliteURL.ID) helds, err := service.db.SatellitesHeldbackHistory(ctx, satellitesIDs[i])
if err != nil { if err != nil {
return nil, ErrPayoutService.Wrap(err) return nil, ErrPayoutService.Wrap(err)
} }
@ -178,7 +188,7 @@ func (service *Service) AllHeldbackHistory(ctx context.Context) (result []Satell
continue continue
} }
disposed, err := service.db.SatellitesDisposedHistory(ctx, satelliteURL.ID) disposed, err := service.db.SatellitesDisposedHistory(ctx, satellitesIDs[i])
if err != nil { if err != nil {
return nil, ErrPayoutService.Wrap(err) return nil, ErrPayoutService.Wrap(err)
} }
@ -199,14 +209,18 @@ func (service *Service) AllHeldbackHistory(ctx context.Context) (result []Satell
} }
history.TotalDisposed = disposed history.TotalDisposed = disposed
history.SatelliteID = satelliteURL.ID history.SatelliteID = satellitesIDs[i]
history.SatelliteName = satelliteURL.Address history.SatelliteName = "stefan-benten"
if satelliteURL.Address == "" { if satellitesIDs[i] != service.stefanSatellite {
history.SatelliteName = satelliteURL.ID.String() url, err := service.trust.GetNodeURL(ctx, satellitesIDs[i])
if err != nil {
return nil, ErrPayoutService.Wrap(err)
}
history.SatelliteName = url.Address
} }
stats, err := service.reputationDB.Get(ctx, satelliteURL.ID) stats, err := service.reputationDB.Get(ctx, satellitesIDs[i])
if err != nil { if err != nil {
return nil, ErrPayoutService.Wrap(err) return nil, ErrPayoutService.Wrap(err)
} }
@ -222,14 +236,12 @@ func (service *Service) AllHeldbackHistory(ctx context.Context) (result []Satell
func (service *Service) AllSatellitesPayoutPeriod(ctx context.Context, period string) (result []SatellitePayoutForPeriod, err error) { func (service *Service) AllSatellitesPayoutPeriod(ctx context.Context, period string) (result []SatellitePayoutForPeriod, err error) {
defer mon.Task()(&ctx)(&err) defer mon.Task()(&ctx)(&err)
satelliteURLs, err := service.satellitesDB.GetSatellitesURLs(ctx) satelliteIDs := service.trust.GetSatellites(ctx)
if err != nil {
return nil, ErrPayoutService.Wrap(err)
}
for _, satelliteURL := range satelliteURLs {
var payoutForPeriod SatellitePayoutForPeriod
paystub, err := service.db.GetPayStub(ctx, satelliteURL.ID, period)
satelliteIDs = append(satelliteIDs, service.stefanSatellite)
for i := 0; i < len(satelliteIDs); i++ {
var payoutForPeriod SatellitePayoutForPeriod
paystub, err := service.db.GetPayStub(ctx, satelliteIDs[i], period)
if err != nil { if err != nil {
if ErrNoPayStubForPeriod.Has(err) { if ErrNoPayStubForPeriod.Has(err) {
continue continue
@ -237,19 +249,19 @@ func (service *Service) AllSatellitesPayoutPeriod(ctx context.Context, period st
return nil, ErrPayoutService.Wrap(err) return nil, ErrPayoutService.Wrap(err)
} }
receipt, err := service.db.GetReceipt(ctx, satelliteURL.ID, period) receipt, err := service.db.GetReceipt(ctx, satelliteIDs[i], period)
if err != nil { if err != nil {
if !ErrNoPayStubForPeriod.Has(err) { if !ErrNoPayStubForPeriod.Has(err) {
return nil, ErrPayoutService.Wrap(err) return nil, ErrPayoutService.Wrap(err)
} }
} }
stats, err := service.reputationDB.Get(ctx, satelliteURL.ID) stats, err := service.reputationDB.Get(ctx, satelliteIDs[i])
if err != nil { if err != nil {
return nil, ErrPayoutService.Wrap(err) return nil, ErrPayoutService.Wrap(err)
} }
satellite, err := service.satellitesDB.GetSatellite(ctx, satelliteURL.ID) satellite, err := service.satellitesDB.GetSatellite(ctx, satelliteIDs[i])
if err != nil { if err != nil {
if errors.Is(err, sql.ErrNoRows) { if errors.Is(err, sql.ErrNoRows) {
payoutForPeriod.IsExitComplete = false payoutForPeriod.IsExitComplete = false
@ -258,6 +270,15 @@ func (service *Service) AllSatellitesPayoutPeriod(ctx context.Context, period st
return nil, ErrPayoutService.Wrap(err) return nil, ErrPayoutService.Wrap(err)
} }
if satelliteIDs[i] != service.stefanSatellite {
url, err := service.trust.GetNodeURL(ctx, satelliteIDs[i])
if err != nil {
return nil, ErrPayoutService.Wrap(err)
}
payoutForPeriod.SatelliteURL = url.Address
}
if satellite.Status == satellites.ExitSucceeded { if satellite.Status == satellites.ExitSucceeded {
payoutForPeriod.IsExitComplete = true payoutForPeriod.IsExitComplete = true
} }
@ -283,17 +304,12 @@ func (service *Service) AllSatellitesPayoutPeriod(ctx context.Context, period st
payoutForPeriod.Age = int64(date.MonthsCountSince(stats.JoinedAt)) payoutForPeriod.Age = int64(date.MonthsCountSince(stats.JoinedAt))
payoutForPeriod.Disposed = paystub.Disposed payoutForPeriod.Disposed = paystub.Disposed
payoutForPeriod.Earned = earned payoutForPeriod.Earned = earned
payoutForPeriod.SatelliteID = satelliteURL.ID.String() payoutForPeriod.SatelliteID = satelliteIDs[i].String()
payoutForPeriod.SatelliteURL = satelliteURL.Address
payoutForPeriod.SurgePercent = paystub.SurgePercent payoutForPeriod.SurgePercent = paystub.SurgePercent
payoutForPeriod.Paid = paystub.Paid payoutForPeriod.Paid = paystub.Paid
payoutForPeriod.HeldPercent = heldPercent payoutForPeriod.HeldPercent = heldPercent
payoutForPeriod.Distributed = paystub.Distributed payoutForPeriod.Distributed = paystub.Distributed
if satelliteURL.Address == "" {
payoutForPeriod.SatelliteURL = satelliteURL.ID.String()
}
result = append(result, payoutForPeriod) result = append(result, payoutForPeriod)
} }
@ -309,22 +325,20 @@ func (service *Service) HeldAmountHistory(ctx context.Context) (_ []HeldAmountHi
return nil, ErrPayoutService.Wrap(err) return nil, ErrPayoutService.Wrap(err)
} }
satelliteURLs, err := service.satellitesDB.GetSatellitesURLs(ctx) trustedSatellites := service.trust.GetSatellites(ctx)
if err != nil {
return nil, ErrPayoutService.Wrap(err) for _, trustedSatellite := range trustedSatellites {
}
for _, satelliteURL := range satelliteURLs {
var found bool var found bool
for _, satelliteHeldHistory := range heldHistory { for _, satelliteHeldHistory := range heldHistory {
if satelliteURL.ID.Compare(satelliteHeldHistory.SatelliteID) == 0 { if trustedSatellite.Compare(satelliteHeldHistory.SatelliteID) == 0 {
found = true found = true
break break
} }
} }
if !found { if !found {
heldHistory = append(heldHistory, HeldAmountHistory{ heldHistory = append(heldHistory, HeldAmountHistory{
SatelliteID: satelliteURL.ID, SatelliteID: trustedSatellite,
}) })
} }
} }

View File

@ -4,16 +4,22 @@
package payouts_test package payouts_test
import ( import (
"context"
"errors"
"sync"
"testing" "testing"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest" "go.uber.org/zap/zaptest"
"storj.io/common/identity"
"storj.io/common/storj"
"storj.io/common/testcontext" "storj.io/common/testcontext"
"storj.io/common/testrand" "storj.io/common/testrand"
"storj.io/storj/storagenode" "storj.io/storj/storagenode"
"storj.io/storj/storagenode/payouts" "storj.io/storj/storagenode/payouts"
"storj.io/storj/storagenode/storagenodedb/storagenodedbtest" "storj.io/storj/storagenode/storagenodedb/storagenodedbtest"
"storj.io/storj/storagenode/trust"
) )
func TestServiceHeldAmountHistory(t *testing.T) { func TestServiceHeldAmountHistory(t *testing.T) {
@ -21,17 +27,42 @@ func TestServiceHeldAmountHistory(t *testing.T) {
log := zaptest.NewLogger(t) log := zaptest.NewLogger(t)
payoutsDB := db.Payout() payoutsDB := db.Payout()
satellitesDB := db.Satellites() satellitesDB := db.Satellites()
source := &fakeSource{}
pool, err := trust.NewPool(log, newFakeIdentityResolver(), trust.Config{
Sources: []trust.Source{source},
CachePath: ctx.File("trust-cache.json"),
}, satellitesDB)
require.NoError(t, err)
satelliteID1 := testrand.NodeID() satelliteID1 := testrand.NodeID()
satelliteID2 := testrand.NodeID() satelliteID2 := testrand.NodeID()
satelliteID3 := testrand.NodeID() satelliteID3 := testrand.NodeID()
err := satellitesDB.SetAddress(ctx, satelliteID1, "foo.test:7777") // populate pool
require.NoError(t, err) source.entries = []trust.Entry{
err = satellitesDB.SetAddress(ctx, satelliteID2, "bar.test:7777") {
require.NoError(t, err) SatelliteURL: trust.SatelliteURL{
err = satellitesDB.SetAddress(ctx, satelliteID3, "baz.test:7777") ID: satelliteID1,
require.NoError(t, err) Host: "foo.test",
Port: 7777,
},
},
{
SatelliteURL: trust.SatelliteURL{
ID: satelliteID2,
Host: "bar.test",
Port: 7777,
},
},
{
SatelliteURL: trust.SatelliteURL{
ID: satelliteID3,
Host: "baz.test",
Port: 7777,
},
},
}
require.NoError(t, pool.Refresh(context.Background()))
// add paystubs // add paystubs
paystubs := []payouts.PayStub{ paystubs := []payouts.PayStub{
@ -93,7 +124,7 @@ func TestServiceHeldAmountHistory(t *testing.T) {
}, },
} }
service, err := payouts.NewService(log, payoutsDB, db.Reputation(), db.Satellites()) service, err := payouts.NewService(log, payoutsDB, db.Reputation(), db.Satellites(), pool)
require.NoError(t, err) require.NoError(t, err)
history, err := service.HeldAmountHistory(ctx) history, err := service.HeldAmountHistory(ctx)
@ -101,3 +132,50 @@ func TestServiceHeldAmountHistory(t *testing.T) {
require.ElementsMatch(t, expected, history) require.ElementsMatch(t, expected, history)
}) })
} }
type fakeSource struct {
name string
static bool
entries []trust.Entry
err error
}
func (s *fakeSource) String() string {
return s.name
}
func (s *fakeSource) Static() bool {
return s.static
}
func (s *fakeSource) FetchEntries(context.Context) ([]trust.Entry, error) {
return s.entries, s.err
}
type fakeIdentityResolver struct {
mu sync.Mutex
identities map[storj.NodeURL]*identity.PeerIdentity
}
func newFakeIdentityResolver() *fakeIdentityResolver {
return &fakeIdentityResolver{
identities: make(map[storj.NodeURL]*identity.PeerIdentity),
}
}
func (resolver *fakeIdentityResolver) SetIdentity(url storj.NodeURL, identity *identity.PeerIdentity) {
resolver.mu.Lock()
defer resolver.mu.Unlock()
resolver.identities[url] = identity
}
func (resolver *fakeIdentityResolver) ResolveIdentity(ctx context.Context, url storj.NodeURL) (*identity.PeerIdentity, error) {
resolver.mu.Lock()
defer resolver.mu.Unlock()
identity := resolver.identities[url]
if identity == nil {
return nil, errors.New("no identity")
}
return identity, nil
}

View File

@ -569,6 +569,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten
peer.DB.Payout(), peer.DB.Payout(),
peer.DB.Reputation(), peer.DB.Reputation(),
peer.DB.Satellites(), peer.DB.Satellites(),
peer.Storage2.Trust,
) )
if err != nil { if err != nil {
return nil, errs.Combine(err, peer.Close()) return nil, errs.Combine(err, peer.Close())

View File

@ -22,14 +22,14 @@ func TestSatellitesDB(t *testing.T) {
err := satellitesDB.SetAddress(ctx, id, "test_addr1") err := satellitesDB.SetAddress(ctx, id, "test_addr1")
require.NoError(t, err) require.NoError(t, err)
satellites, err := satellitesDB.GetSatellitesURLs(ctx) satellites, err := satellitesDB.GetSatellitesUrls(ctx)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, satellites[0].Address, "test_addr1") require.Equal(t, satellites[0].Address, "test_addr1")
err = satellitesDB.SetAddress(ctx, id, "test_addr2") err = satellitesDB.SetAddress(ctx, id, "test_addr2")
require.NoError(t, err) require.NoError(t, err)
satellites, err = satellitesDB.GetSatellitesURLs(ctx) satellites, err = satellitesDB.GetSatellitesUrls(ctx)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, satellites[0].Address, "test_addr2") require.Equal(t, satellites[0].Address, "test_addr2")
}) })

View File

@ -52,8 +52,8 @@ type DB interface {
SetAddress(ctx context.Context, satelliteID storj.NodeID, address string) error SetAddress(ctx context.Context, satelliteID storj.NodeID, address string) error
// GetSatellite retrieves that satellite by ID // GetSatellite retrieves that satellite by ID
GetSatellite(ctx context.Context, satelliteID storj.NodeID) (satellite Satellite, err error) GetSatellite(ctx context.Context, satelliteID storj.NodeID) (satellite Satellite, err error)
// GetSatellitesURLs retrieves all satellite's id and urls. // GetSatellitesUrls retrieves all satellite's id and urls.
GetSatellitesURLs(ctx context.Context) (satelliteURLs []storj.NodeURL, err error) GetSatellitesUrls(ctx context.Context) (satelliteURLs []storj.NodeURL, err error)
// InitiateGracefulExit updates the database to reflect the beginning of a graceful exit // InitiateGracefulExit updates the database to reflect the beginning of a graceful exit
InitiateGracefulExit(ctx context.Context, satelliteID storj.NodeID, intitiatedAt time.Time, startingDiskUsage int64) error InitiateGracefulExit(ctx context.Context, satelliteID storj.NodeID, intitiatedAt time.Time, startingDiskUsage int64) error
// CancelGracefulExit removes that satellite by ID // CancelGracefulExit removes that satellite by ID

View File

@ -59,8 +59,8 @@ func (db *satellitesDB) GetSatellite(ctx context.Context, satelliteID storj.Node
return satellite, rows.Err() return satellite, rows.Err()
} }
// GetSatellitesURLs retrieves all satellite's id and urls. // GetSatellitesUrls retrieves all satellite's id and urls.
func (db *satellitesDB) GetSatellitesURLs(ctx context.Context) (satelliteURLs []storj.NodeURL, err error) { func (db *satellitesDB) GetSatellitesUrls(ctx context.Context) (satelliteURLs []storj.NodeURL, err error) {
defer mon.Task()(&ctx)(&err) defer mon.Task()(&ctx)(&err)
query := `SELECT query := `SELECT