satellite/geoip: update node check-in to associate a country code
Resolves https://github.com/storj/storj/issues/4247 Change-Id: Idfd71bf1795d48ca3c686066bbdb95b9c6594f00
This commit is contained in:
parent
dda6720dd6
commit
bf51c286d9
5
go.mod
5
go.mod
@ -24,6 +24,7 @@ require (
|
||||
github.com/mattn/go-sqlite3 v1.14.8
|
||||
github.com/nsf/jsondiff v0.0.0-20200515183724-f29ed568f4ce
|
||||
github.com/nsf/termbox-go v0.0.0-20200418040025-38ba6e5628f1
|
||||
github.com/oschwald/maxminddb-golang v1.8.0
|
||||
github.com/pquerna/otp v1.3.0
|
||||
github.com/shopspring/decimal v1.2.0
|
||||
github.com/spacemonkeygo/monkit/v3 v3.0.17
|
||||
@ -39,7 +40,7 @@ require (
|
||||
github.com/zeebo/ini v0.0.0-20210331155437-86af75b4f524
|
||||
go.etcd.io/bbolt v1.3.5
|
||||
go.uber.org/zap v1.16.0
|
||||
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519
|
||||
golang.org/x/crypto v0.0.0-20211108221036-ceb1ce70b4fa
|
||||
golang.org/x/net v0.0.0-20210428140749-89ef3d95e781
|
||||
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
|
||||
golang.org/x/sys v0.0.0-20211020064051-0ec99a608a1b
|
||||
@ -47,7 +48,7 @@ require (
|
||||
golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e
|
||||
gopkg.in/segmentio/analytics-go.v3 v3.1.0
|
||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c
|
||||
storj.io/common v0.0.0-20211108092228-14e900b161d9
|
||||
storj.io/common v0.0.0-20211109113717-5e82e48abc31
|
||||
storj.io/drpc v0.0.26
|
||||
storj.io/monkit-jaeger v0.0.0-20210426161729-debb1cbcbbd7
|
||||
storj.io/private v0.0.0-20211029202355-a7eae71c382a
|
||||
|
11
go.sum
11
go.sum
@ -389,6 +389,8 @@ github.com/onsi/gomega v1.10.5/go.mod h1:gza4q3jKQJijlu05nKWRCW/GavJumGt8aNRxWg7
|
||||
github.com/onsi/gomega v1.13.0 h1:7lLHu94wT9Ij0o6EWWclhu0aOh32VxhkwEJvzuWPeak=
|
||||
github.com/onsi/gomega v1.13.0/go.mod h1:lRk9szgn8TxENtWd0Tp4c3wjlRfMTMH27I+3Je41yGY=
|
||||
github.com/openzipkin/zipkin-go v0.1.1/go.mod h1:NtoC/o8u3JlF1lSlyPNswIbeQH9bJTmOf0Erfk+hxe8=
|
||||
github.com/oschwald/maxminddb-golang v1.8.0 h1:Uh/DSnGoxsyp/KYbY1AuP0tYEwfs0sCph9p/UMXK/Hk=
|
||||
github.com/oschwald/maxminddb-golang v1.8.0/go.mod h1:RXZtst0N6+FY/3qCNmZMBApR19cdQj43/NM9VkrNAis=
|
||||
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
|
||||
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
|
||||
github.com/pelletier/go-toml v1.9.0 h1:NOd0BRdOKpPf0SxkL3HxSQOG7rNh+4kl6PHcBPFs7Q0=
|
||||
@ -501,6 +503,7 @@ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXf
|
||||
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
|
||||
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
|
||||
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
|
||||
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
|
||||
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||
github.com/stripe/stripe-go/v72 v72.51.0 h1:scXELorHW1SnAfARThO1QayscOsfEIoIAUy0yxoTqxY=
|
||||
@ -601,8 +604,9 @@ golang.org/x/crypto v0.0.0-20210415154028-4f45737414dc/go.mod h1:T9bdIzuCu7OtxOm
|
||||
golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
|
||||
golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
|
||||
golang.org/x/crypto v0.0.0-20210817164053-32db794688a5/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
|
||||
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 h1:7I4JAnoQBe7ZtJcBaYHi5UtiO8tQHbUSXxL+pnGRANg=
|
||||
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
|
||||
golang.org/x/crypto v0.0.0-20211108221036-ceb1ce70b4fa h1:idItI2DDfCokpg0N51B2VtiLdJ4vAuXC9fnCb2gACo4=
|
||||
golang.org/x/crypto v0.0.0-20211108221036-ceb1ce70b4fa/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
|
||||
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
|
||||
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
|
||||
golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=
|
||||
@ -707,6 +711,7 @@ golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7w
|
||||
golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20191210023423-ac6580df4449/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20191224085550-c709ea063b76/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20200107144601-ef85f5a75ddf/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20200113162924-86b910548bc1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
@ -880,8 +885,8 @@ sourcegraph.com/sqs/pbtypes v0.0.0-20180604144634-d3ebe8f20ae4/go.mod h1:ketZ/q3
|
||||
storj.io/common v0.0.0-20200424175742-65ac59022f4f/go.mod h1:pZyXiIE7bGETIRXtfs0nICqMwp7PM8HqnDuyUeldNA0=
|
||||
storj.io/common v0.0.0-20210805073808-8e0feb09e92a/go.mod h1:mhZYWpTojKsACxWE66RfXNz19zbyr/uEDVWHJH8dHog=
|
||||
storj.io/common v0.0.0-20211019072056-34a5992b4856/go.mod h1:objobGrIWQwhmTSpSm6Y7ykd40wZjB7CezNfic5YLKg=
|
||||
storj.io/common v0.0.0-20211108092228-14e900b161d9 h1:PxSH22djpdRU1wHS45QE6Yy9oSsBnl1frp2tURKOV1k=
|
||||
storj.io/common v0.0.0-20211108092228-14e900b161d9/go.mod h1:a2Kw7Uipu929OFANfWKLHRoD0JfhgssikEvimd6hbSQ=
|
||||
storj.io/common v0.0.0-20211109113717-5e82e48abc31 h1:E4igB475TvFTJII8hvKBIIuvZzJjb0YdHLqkeQLUt1I=
|
||||
storj.io/common v0.0.0-20211109113717-5e82e48abc31/go.mod h1:a2Kw7Uipu929OFANfWKLHRoD0JfhgssikEvimd6hbSQ=
|
||||
storj.io/drpc v0.0.11/go.mod h1:TiFc2obNjL9/3isMW1Rpxjy8V9uE0B2HMeMFGiiI7Iw=
|
||||
storj.io/drpc v0.0.24/go.mod h1:ofQUDPQbbIymRDKE0tms48k8bLP5Y+dsI9CbXGv3gko=
|
||||
storj.io/drpc v0.0.26 h1:T6jJzjby7QUa/2XHR1qMxTCENpDHEw4/o+kfDfZQqQI=
|
||||
|
55
satellite/geoip/example_testplanet_test.go
Normal file
55
satellite/geoip/example_testplanet_test.go
Normal file
@ -0,0 +1,55 @@
|
||||
// Copyright (C) 2021 Storj Labs, Inc.
|
||||
// See LICENSE for copying information
|
||||
|
||||
package geoip_test
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"storj.io/common/storj/location"
|
||||
"storj.io/common/testcontext"
|
||||
"storj.io/storj/private/testplanet"
|
||||
"storj.io/storj/satellite"
|
||||
"storj.io/storj/storagenode"
|
||||
)
|
||||
|
||||
func TestGeoIPMock(t *testing.T) {
|
||||
testplanet.Run(t,
|
||||
testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 3, UplinkCount: 0,
|
||||
Reconfigure: testplanet.Reconfigure{
|
||||
Satellite: func(logger *zap.Logger, index int, config *satellite.Config) {
|
||||
config.Overlay.GeoIP.MockCountries = []string{"US", "GB"}
|
||||
},
|
||||
StorageNode: func(index int, config *storagenode.Config) {
|
||||
config.Server.Address = fmt.Sprintf("127.0.201.%d:0", index+1)
|
||||
},
|
||||
},
|
||||
},
|
||||
func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
// ensure storage nodes checked in with satellite
|
||||
for _, node := range planet.StorageNodes {
|
||||
node.Contact.Chore.TriggerWait(ctx)
|
||||
}
|
||||
|
||||
// expected country codes per node index
|
||||
countryCodes := map[int]location.CountryCode{
|
||||
0: location.UnitedKingdom,
|
||||
1: location.UnitedStates,
|
||||
2: location.UnitedKingdom,
|
||||
}
|
||||
|
||||
// check the country code for each storage nodes
|
||||
for i, node := range planet.StorageNodes {
|
||||
dossier, err := planet.Satellites[0].API.Overlay.DB.Get(ctx, node.ID())
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, countryCodes[i], dossier.CountryCode)
|
||||
}
|
||||
},
|
||||
)
|
||||
}
|
32
satellite/geoip/ip2country.go
Normal file
32
satellite/geoip/ip2country.go
Normal file
@ -0,0 +1,32 @@
|
||||
// Copyright (C) 2021 Storj Labs, Inc.
|
||||
// See LICENSE for copying information
|
||||
|
||||
package geoip
|
||||
|
||||
import (
|
||||
"net"
|
||||
|
||||
"github.com/zeebo/errs"
|
||||
|
||||
"storj.io/common/storj/location"
|
||||
)
|
||||
|
||||
// IPToCountry defines an abstraction for resolving the ISO country code given the string representation of an IP address.
|
||||
type IPToCountry interface {
|
||||
Close() error
|
||||
LookupISOCountryCode(address string) (location.CountryCode, error)
|
||||
}
|
||||
|
||||
func addressToIP(address string) (net.IP, error) {
|
||||
host, _, err := net.SplitHostPort(address)
|
||||
if err != nil {
|
||||
return nil, errs.Wrap(err)
|
||||
}
|
||||
|
||||
ip := net.ParseIP(host)
|
||||
if len(ip) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
return ip, nil
|
||||
}
|
41
satellite/geoip/ip2country_mock.go
Normal file
41
satellite/geoip/ip2country_mock.go
Normal file
@ -0,0 +1,41 @@
|
||||
// Copyright (C) 2021 Storj Labs, Inc.
|
||||
// See LICENSE for copying information
|
||||
|
||||
package geoip
|
||||
|
||||
import "storj.io/common/storj/location"
|
||||
|
||||
// MockIPToCountry provides a mock solution for looking up country codes in testplanet tests. This is done using the
|
||||
// last byte of the ip address and mod'ing it into a country code.
|
||||
type MockIPToCountry []location.CountryCode
|
||||
|
||||
// NewMockIPToCountry creates a mock IPToCountry based on predefined country list.
|
||||
func NewMockIPToCountry(countries []string) MockIPToCountry {
|
||||
result := MockIPToCountry{}
|
||||
for _, country := range countries {
|
||||
result = append(result, location.ToCountryCode(country))
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// Close does nothing for the MockIPToCountry.
|
||||
func (m MockIPToCountry) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// LookupISOCountryCode accepts an IP address.
|
||||
func (m MockIPToCountry) LookupISOCountryCode(address string) (location.CountryCode, error) {
|
||||
if len(m) == 0 {
|
||||
return location.CountryCode(0), nil
|
||||
}
|
||||
|
||||
ip, err := addressToIP(address)
|
||||
if err != nil || ip == nil {
|
||||
return location.CountryCode(0), err
|
||||
}
|
||||
|
||||
lastBlock := int(ip[len(ip)-1])
|
||||
return m[lastBlock%len(m)], nil
|
||||
}
|
||||
|
||||
var _ IPToCountry = MockIPToCountry{}
|
59
satellite/geoip/ip2country_mock_test.go
Normal file
59
satellite/geoip/ip2country_mock_test.go
Normal file
@ -0,0 +1,59 @@
|
||||
// Copyright (C) 2021 Storj Labs, Inc.
|
||||
// See LICENSE for copying information
|
||||
|
||||
package geoip_test
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"storj.io/common/storj/location"
|
||||
"storj.io/storj/satellite/geoip"
|
||||
)
|
||||
|
||||
func TestEmptyIP2CountryMock(t *testing.T) {
|
||||
ipLookup := geoip.MockIPToCountry{}
|
||||
{
|
||||
co, err := ipLookup.LookupISOCountryCode("127.0.0.1")
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, location.CountryCode(0), co)
|
||||
}
|
||||
|
||||
{
|
||||
co, err := ipLookup.LookupISOCountryCode("[2001:0db8:85a3:0000:0000:8a2e:0370:7334]")
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, location.CountryCode(0), co)
|
||||
}
|
||||
}
|
||||
|
||||
func TestIP2CountryMock(t *testing.T) {
|
||||
ipLookup := geoip.MockIPToCountry{location.UnitedStates, location.Germany, location.France}
|
||||
|
||||
cases := []struct {
|
||||
name string
|
||||
address string
|
||||
country location.CountryCode
|
||||
errExpected bool
|
||||
}{
|
||||
{"first IP in the pool", "127.0.0.1:1234", location.Germany, false},
|
||||
{"second IP in the pool", "127.0.0.2:1234", location.France, false},
|
||||
{"third IP in the pool", "127.0.0.3:1234", location.UnitedStates, false},
|
||||
{"ipv6", "[2001:0db8:85a3:0000:0000:8a2e:0370:7334]:1234", location.Germany, false},
|
||||
{"not an ip address", "not at all", location.CountryCode(0), true},
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
co, err := ipLookup.LookupISOCountryCode(tc.address)
|
||||
if tc.errExpected {
|
||||
assert.Error(t, err)
|
||||
} else {
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, tc.country, co)
|
||||
}
|
||||
})
|
||||
|
||||
}
|
||||
}
|
56
satellite/geoip/maxmind.go
Normal file
56
satellite/geoip/maxmind.go
Normal file
@ -0,0 +1,56 @@
|
||||
// Copyright (C) 2021 Storj Labs, Inc.
|
||||
// See LICENSE for copying information
|
||||
|
||||
package geoip
|
||||
|
||||
import (
|
||||
"github.com/oschwald/maxminddb-golang"
|
||||
|
||||
"storj.io/common/storj/location"
|
||||
)
|
||||
|
||||
// OpenMaxmindDB will use the provided filepath to open the target maxmind database.
|
||||
func OpenMaxmindDB(filepath string) (*MaxmindDB, error) {
|
||||
geoIP, err := maxminddb.Open(filepath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &MaxmindDB{
|
||||
db: geoIP,
|
||||
}, nil
|
||||
}
|
||||
|
||||
type ipInfo struct {
|
||||
Country struct {
|
||||
IsoCode string `maxminddb:"iso_code"`
|
||||
} `maxminddb:"country"`
|
||||
}
|
||||
|
||||
// MaxmindDB provides access to GeoIP data via the maxmind geoip databases.
|
||||
type MaxmindDB struct {
|
||||
db *maxminddb.Reader
|
||||
}
|
||||
|
||||
var _ IPToCountry = &MaxmindDB{}
|
||||
|
||||
// Close will disconnect the underlying connection to the database.
|
||||
func (m *MaxmindDB) Close() error {
|
||||
return m.db.Close()
|
||||
}
|
||||
|
||||
// LookupISOCountryCode accepts an IP address.
|
||||
func (m *MaxmindDB) LookupISOCountryCode(address string) (location.CountryCode, error) {
|
||||
ip, err := addressToIP(address)
|
||||
if err != nil || ip == nil {
|
||||
return location.CountryCode(0), err
|
||||
}
|
||||
|
||||
info := &ipInfo{}
|
||||
err = m.db.Lookup(ip, info)
|
||||
if err != nil {
|
||||
return location.CountryCode(0), err
|
||||
}
|
||||
|
||||
return location.ToCountryCode(info.Country.IsoCode), nil
|
||||
}
|
@ -9,11 +9,14 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"storj.io/common/pb"
|
||||
"storj.io/common/storj/location"
|
||||
"storj.io/common/testcontext"
|
||||
"storj.io/common/testrand"
|
||||
"storj.io/storj/private/testplanet"
|
||||
"storj.io/storj/satellite"
|
||||
"storj.io/storj/satellite/overlay"
|
||||
)
|
||||
|
||||
@ -23,6 +26,11 @@ import (
|
||||
func TestCheckIn(t *testing.T) {
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 0, UplinkCount: 0,
|
||||
Reconfigure: testplanet.Reconfigure{
|
||||
Satellite: func(logger *zap.Logger, index int, config *satellite.Config) {
|
||||
config.Overlay.GeoIP.MockCountries = []string{"US"}
|
||||
},
|
||||
},
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
sat := planet.Satellites[0]
|
||||
wait := sat.Config.Overlay.NodeCheckInWaitPeriod
|
||||
@ -63,6 +71,8 @@ func TestCheckIn(t *testing.T) {
|
||||
|
||||
require.Equal(t, expectedLastFailure.Truncate(time.Second).UTC(),
|
||||
oldInfo.Reputation.LastContactFailure.Truncate(time.Second).UTC(), testName)
|
||||
|
||||
require.Equal(t, location.CountryCode(location.UnitedStates), oldInfo.CountryCode)
|
||||
}
|
||||
|
||||
infoCheck("First check-in", now, now, lastFail)
|
||||
|
@ -22,6 +22,7 @@ var (
|
||||
type Config struct {
|
||||
Node NodeSelectionConfig
|
||||
NodeSelectionCache UploadSelectionCacheConfig
|
||||
GeoIP GeoIPConfig
|
||||
UpdateStatsBatchSize int `help:"number of update requests to process per transaction" default:"100"`
|
||||
NodeCheckInWaitPeriod time.Duration `help:"the amount of time to wait before accepting a redundant check-in from a node (unmodified info since last check-in)" default:"2h" testDefault:"30s"`
|
||||
}
|
||||
@ -44,6 +45,12 @@ type NodeSelectionConfig struct {
|
||||
AsOfSystemTime AsOfSystemTimeConfig
|
||||
}
|
||||
|
||||
// GeoIPConfig is a configuration struct that helps configure the GeoIP lookup features on the satellite.
|
||||
type GeoIPConfig struct {
|
||||
DB string `help:"the location of the maxmind database containing geoip country information"`
|
||||
MockCountries []string `help:"a mock list of countries the satellite will attribute to nodes (useful for testing)"`
|
||||
}
|
||||
|
||||
func (aost *AsOfSystemTimeConfig) isValid() error {
|
||||
if aost.Enabled {
|
||||
if aost.DefaultInterval >= 0 {
|
||||
|
@ -14,6 +14,8 @@ import (
|
||||
|
||||
"storj.io/common/pb"
|
||||
"storj.io/common/storj"
|
||||
"storj.io/common/storj/location"
|
||||
"storj.io/storj/satellite/geoip"
|
||||
"storj.io/storj/satellite/metabase"
|
||||
)
|
||||
|
||||
@ -111,14 +113,15 @@ type DB interface {
|
||||
|
||||
// NodeCheckInInfo contains all the info that will be updated when a node checkins.
|
||||
type NodeCheckInInfo struct {
|
||||
NodeID storj.NodeID
|
||||
Address *pb.NodeAddress
|
||||
LastNet string
|
||||
LastIPPort string
|
||||
IsUp bool
|
||||
Operator *pb.NodeOperator
|
||||
Capacity *pb.NodeCapacity
|
||||
Version *pb.NodeVersion
|
||||
NodeID storj.NodeID
|
||||
Address *pb.NodeAddress
|
||||
LastNet string
|
||||
LastIPPort string
|
||||
IsUp bool
|
||||
Operator *pb.NodeOperator
|
||||
Capacity *pb.NodeCapacity
|
||||
Version *pb.NodeVersion
|
||||
CountryCode location.CountryCode
|
||||
}
|
||||
|
||||
// InfoResponse contains node dossier info requested from the storage node.
|
||||
@ -234,6 +237,7 @@ type NodeDossier struct {
|
||||
CreatedAt time.Time
|
||||
LastNet string
|
||||
LastIPPort string
|
||||
CountryCode location.CountryCode
|
||||
}
|
||||
|
||||
// NodeStats contains statistics about a node.
|
||||
@ -258,10 +262,11 @@ type NodeLastContact struct {
|
||||
|
||||
// SelectedNode is used as a result for creating orders limits.
|
||||
type SelectedNode struct {
|
||||
ID storj.NodeID
|
||||
Address *pb.NodeAddress
|
||||
LastNet string
|
||||
LastIPPort string
|
||||
ID storj.NodeID
|
||||
Address *pb.NodeAddress
|
||||
LastNet string
|
||||
LastIPPort string
|
||||
CountryCode location.CountryCode
|
||||
}
|
||||
|
||||
// Clone returns a deep clone of the selected node.
|
||||
@ -285,21 +290,33 @@ type Service struct {
|
||||
db DB
|
||||
config Config
|
||||
|
||||
GeoIP geoip.IPToCountry
|
||||
UploadSelectionCache *UploadSelectionCache
|
||||
DownloadSelectionCache *DownloadSelectionCache
|
||||
}
|
||||
|
||||
// NewService returns a new Service.
|
||||
func NewService(log *zap.Logger, db DB, config Config) (*Service, error) {
|
||||
if err := config.Node.AsOfSystemTime.isValid(); err != nil {
|
||||
err := config.Node.AsOfSystemTime.isValid()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var geoIP geoip.IPToCountry = geoip.NewMockIPToCountry(config.GeoIP.MockCountries)
|
||||
if config.GeoIP.DB != "" {
|
||||
geoIP, err = geoip.OpenMaxmindDB(config.GeoIP.DB)
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
}
|
||||
|
||||
return &Service{
|
||||
log: log,
|
||||
db: db,
|
||||
config: config,
|
||||
|
||||
GeoIP: geoIP,
|
||||
|
||||
UploadSelectionCache: NewUploadSelectionCache(log, db,
|
||||
config.NodeSelectionCache.Staleness, config.Node,
|
||||
),
|
||||
@ -313,7 +330,9 @@ func NewService(log *zap.Logger, db DB, config Config) (*Service, error) {
|
||||
}
|
||||
|
||||
// Close closes resources.
|
||||
func (service *Service) Close() error { return nil }
|
||||
func (service *Service) Close() error {
|
||||
return service.GeoIP.Close()
|
||||
}
|
||||
|
||||
// Get looks up the provided nodeID from the overlay.
|
||||
func (service *Service) Get(ctx context.Context, nodeID storj.NodeID) (_ *NodeDossier, err error) {
|
||||
@ -478,12 +497,23 @@ not defined which one will end up in the database.
|
||||
*/
|
||||
func (service *Service) UpdateCheckIn(ctx context.Context, node NodeCheckInInfo, timestamp time.Time) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
failureMeter := mon.Meter("geofencing_lookup_failed")
|
||||
|
||||
oldInfo, err := service.Get(ctx, node.NodeID)
|
||||
if err != nil && !ErrNodeNotFound.Has(err) {
|
||||
return Error.New("failed to get node info from DB")
|
||||
}
|
||||
|
||||
if oldInfo == nil {
|
||||
node.CountryCode, err = service.GeoIP.LookupISOCountryCode(node.LastIPPort)
|
||||
if err != nil {
|
||||
failureMeter.Mark(1)
|
||||
service.log.Debug("failed to resolve country code for node",
|
||||
zap.String("node address", node.Address.Address),
|
||||
zap.Stringer("Node ID", node.NodeID),
|
||||
zap.Error(err))
|
||||
}
|
||||
|
||||
return service.db.UpdateCheckIn(ctx, node, timestamp, service.config.Node)
|
||||
}
|
||||
|
||||
@ -508,8 +538,22 @@ func (service *Service) UpdateCheckIn(ctx context.Context, node NodeCheckInInfo,
|
||||
spaceChanged := (node.Capacity == nil && oldInfo.Capacity.FreeDisk != 0) ||
|
||||
(node.Capacity != nil && node.Capacity.FreeDisk != oldInfo.Capacity.FreeDisk)
|
||||
|
||||
if oldInfo.CountryCode == location.CountryCode(0) || oldInfo.LastIPPort != node.LastIPPort {
|
||||
node.CountryCode, err = service.GeoIP.LookupISOCountryCode(node.LastIPPort)
|
||||
if err != nil {
|
||||
failureMeter.Mark(1)
|
||||
service.log.Debug("failed to resolve country code for node",
|
||||
zap.String("node address", node.Address.Address),
|
||||
zap.Stringer("Node ID", node.NodeID),
|
||||
zap.Error(err))
|
||||
}
|
||||
} else {
|
||||
node.CountryCode = oldInfo.CountryCode
|
||||
}
|
||||
|
||||
if dbStale || addrChanged || walletChanged || verChanged || spaceChanged ||
|
||||
oldInfo.LastNet != node.LastNet || oldInfo.LastIPPort != node.LastIPPort {
|
||||
oldInfo.LastNet != node.LastNet || oldInfo.LastIPPort != node.LastIPPort ||
|
||||
oldInfo.CountryCode != node.CountryCode {
|
||||
return service.db.UpdateCheckIn(ctx, node, timestamp, service.config.Node)
|
||||
}
|
||||
|
||||
|
@ -18,6 +18,7 @@ import (
|
||||
|
||||
"storj.io/common/pb"
|
||||
"storj.io/common/storj"
|
||||
"storj.io/common/storj/location"
|
||||
"storj.io/private/dbutil/cockroachutil"
|
||||
"storj.io/private/dbutil/pgutil"
|
||||
"storj.io/private/tagsql"
|
||||
@ -56,7 +57,7 @@ func (cache *overlaycache) selectAllStorageNodesUpload(ctx context.Context, sele
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
query := `
|
||||
SELECT id, address, last_net, last_ip_port, vetted_at
|
||||
SELECT id, address, last_net, last_ip_port, vetted_at, country_code
|
||||
FROM nodes
|
||||
` + cache.db.impl.AsOfSystemInterval(selectionCfg.AsOfSystemTime.DefaultInterval) + `
|
||||
WHERE disqualified IS NULL
|
||||
@ -100,7 +101,9 @@ func (cache *overlaycache) selectAllStorageNodesUpload(ctx context.Context, sele
|
||||
node.Address = &pb.NodeAddress{}
|
||||
var lastIPPort sql.NullString
|
||||
var vettedAt *time.Time
|
||||
err = rows.Scan(&node.ID, &node.Address.Address, &node.LastNet, &lastIPPort, &vettedAt)
|
||||
var countryCode string
|
||||
err = rows.Scan(&node.ID, &node.Address.Address, &node.LastNet, &lastIPPort, &vettedAt, &countryCode)
|
||||
node.CountryCode = location.ToCountryCode(countryCode)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
@ -932,6 +935,9 @@ func convertDBNode(ctx context.Context, info *dbx.Node) (_ *overlay.NodeDossier,
|
||||
if info.LastIpPort != nil {
|
||||
node.LastIPPort = *info.LastIpPort
|
||||
}
|
||||
if info.CountryCode != nil {
|
||||
node.CountryCode = location.ToCountryCode(*info.CountryCode)
|
||||
}
|
||||
|
||||
return node, nil
|
||||
}
|
||||
@ -1096,7 +1102,8 @@ func (cache *overlaycache) UpdateCheckIn(ctx context.Context, node overlay.NodeC
|
||||
ELSE nodes.last_contact_failure
|
||||
END,
|
||||
last_ip_port=$16,
|
||||
wallet_features=$17
|
||||
wallet_features=$17,
|
||||
country_code=$18
|
||||
WHERE id = $1
|
||||
`, // args $1 - $4
|
||||
node.NodeID.Bytes(), node.Address.GetAddress(), node.LastNet, node.Address.GetTransport(),
|
||||
@ -1112,6 +1119,8 @@ func (cache *overlaycache) UpdateCheckIn(ctx context.Context, node overlay.NodeC
|
||||
node.LastIPPort,
|
||||
// args $17,
|
||||
walletFeatures,
|
||||
// args $18,
|
||||
node.CountryCode.String(),
|
||||
)
|
||||
|
||||
if err == nil {
|
||||
@ -1130,7 +1139,8 @@ func (cache *overlaycache) UpdateCheckIn(ctx context.Context, node overlay.NodeC
|
||||
last_contact_failure,
|
||||
major, minor, patch, hash, timestamp, release,
|
||||
last_ip_port,
|
||||
wallet_features
|
||||
wallet_features,
|
||||
country_code
|
||||
)
|
||||
VALUES (
|
||||
$1, $2, $3, $4, $5,
|
||||
@ -1143,7 +1153,8 @@ func (cache *overlaycache) UpdateCheckIn(ctx context.Context, node overlay.NodeC
|
||||
END,
|
||||
$10, $11, $12, $13, $14, $15,
|
||||
$17,
|
||||
$18
|
||||
$18,
|
||||
$19
|
||||
)
|
||||
ON CONFLICT (id)
|
||||
DO UPDATE
|
||||
@ -1164,7 +1175,8 @@ func (cache *overlaycache) UpdateCheckIn(ctx context.Context, node overlay.NodeC
|
||||
ELSE nodes.last_contact_failure
|
||||
END,
|
||||
last_ip_port=$17,
|
||||
wallet_features=$18;
|
||||
wallet_features=$18,
|
||||
country_code=$19;
|
||||
`,
|
||||
// args $1 - $5
|
||||
node.NodeID.Bytes(), node.Address.GetAddress(), node.LastNet, node.Address.GetTransport(), int(pb.NodeType_STORAGE),
|
||||
@ -1180,6 +1192,8 @@ func (cache *overlaycache) UpdateCheckIn(ctx context.Context, node overlay.NodeC
|
||||
node.LastIPPort,
|
||||
// args $18,
|
||||
walletFeatures,
|
||||
// args $19,
|
||||
node.CountryCode.String(),
|
||||
)
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
|
6
scripts/testdata/satellite-config.yaml.lock
vendored
6
scripts/testdata/satellite-config.yaml.lock
vendored
@ -496,6 +496,12 @@ identity.key-path: /root/.local/share/storj/identity/satellite/identity.key
|
||||
# how many concurrent orders to process at once. zero is unlimited
|
||||
# orders.orders-semaphore-size: 2
|
||||
|
||||
# the location of the maxmind database containing geoip country information
|
||||
# overlay.geo-ip.db: ""
|
||||
|
||||
# a mock list of countries the satellite will attribute to nodes (useful for testing)
|
||||
# overlay.geo-ip.mock-countries: '[]'
|
||||
|
||||
# the amount of time to wait before accepting a redundant check-in from a node (unmodified info since last check-in)
|
||||
# overlay.node-check-in-wait-period: 2h0m0s
|
||||
|
||||
|
@ -10,7 +10,7 @@ require (
|
||||
github.com/spf13/pflag v1.0.5
|
||||
github.com/stretchr/testify v1.7.0
|
||||
go.uber.org/zap v1.17.0
|
||||
storj.io/common v0.0.0-20211108092228-14e900b161d9
|
||||
storj.io/common v0.0.0-20211109113717-5e82e48abc31
|
||||
storj.io/gateway-mt v1.14.4-0.20211015103214-01eddbc864fb
|
||||
storj.io/private v0.0.0-20211029202355-a7eae71c382a
|
||||
storj.io/storj v0.12.1-0.20211102170500-1de8a695e84a
|
||||
@ -142,6 +142,7 @@ require (
|
||||
github.com/nxadm/tail v1.4.8 // indirect
|
||||
github.com/olivere/elastic/v7 v7.0.22 // indirect
|
||||
github.com/onsi/ginkgo v1.16.4 // indirect
|
||||
github.com/oschwald/maxminddb-golang v1.8.0 // indirect
|
||||
github.com/pelletier/go-toml v1.9.0 // indirect
|
||||
github.com/philhofer/fwd v1.1.1 // indirect
|
||||
github.com/pierrec/lz4 v2.5.2+incompatible // indirect
|
||||
@ -199,7 +200,7 @@ require (
|
||||
go.opentelemetry.io/otel/trace v0.18.0 // indirect
|
||||
go.uber.org/atomic v1.7.0 // indirect
|
||||
go.uber.org/multierr v1.6.0 // indirect
|
||||
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 // indirect
|
||||
golang.org/x/crypto v0.0.0-20211108221036-ceb1ce70b4fa // indirect
|
||||
golang.org/x/mod v0.4.2 // indirect
|
||||
golang.org/x/net v0.0.0-20210428140749-89ef3d95e781 // indirect
|
||||
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
|
||||
|
@ -701,6 +701,8 @@ github.com/openzipkin/zipkin-go v0.1.6/go.mod h1:QgAqvLzwWbR/WpD4A3cGpPtJrZXNIiJ
|
||||
github.com/openzipkin/zipkin-go v0.2.1/go.mod h1:NaW6tEwdmWMaCDZzg8sh+IBNOxHMPnhQw8ySjnjRyN4=
|
||||
github.com/openzipkin/zipkin-go v0.2.2/go.mod h1:NaW6tEwdmWMaCDZzg8sh+IBNOxHMPnhQw8ySjnjRyN4=
|
||||
github.com/oschwald/maxminddb-golang v1.7.0/go.mod h1:RXZtst0N6+FY/3qCNmZMBApR19cdQj43/NM9VkrNAis=
|
||||
github.com/oschwald/maxminddb-golang v1.8.0 h1:Uh/DSnGoxsyp/KYbY1AuP0tYEwfs0sCph9p/UMXK/Hk=
|
||||
github.com/oschwald/maxminddb-golang v1.8.0/go.mod h1:RXZtst0N6+FY/3qCNmZMBApR19cdQj43/NM9VkrNAis=
|
||||
github.com/pact-foundation/pact-go v1.0.4/go.mod h1:uExwJY4kCzNPcHRj+hCR/HBbOOIwwtUjcrb0b5/5kLM=
|
||||
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
|
||||
github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
|
||||
@ -1039,8 +1041,9 @@ golang.org/x/crypto v0.0.0-20210415154028-4f45737414dc/go.mod h1:T9bdIzuCu7OtxOm
|
||||
golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
|
||||
golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
|
||||
golang.org/x/crypto v0.0.0-20210817164053-32db794688a5/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
|
||||
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 h1:7I4JAnoQBe7ZtJcBaYHi5UtiO8tQHbUSXxL+pnGRANg=
|
||||
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
|
||||
golang.org/x/crypto v0.0.0-20211108221036-ceb1ce70b4fa h1:idItI2DDfCokpg0N51B2VtiLdJ4vAuXC9fnCb2gACo4=
|
||||
golang.org/x/crypto v0.0.0-20211108221036-ceb1ce70b4fa/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
|
||||
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
|
||||
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
|
||||
golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=
|
||||
@ -1396,8 +1399,8 @@ storj.io/common v0.0.0-20210805073808-8e0feb09e92a/go.mod h1:mhZYWpTojKsACxWE66R
|
||||
storj.io/common v0.0.0-20210916151047-6aaeb34bb916/go.mod h1:objobGrIWQwhmTSpSm6Y7ykd40wZjB7CezNfic5YLKg=
|
||||
storj.io/common v0.0.0-20211006105453-d3fff091f9d2/go.mod h1:objobGrIWQwhmTSpSm6Y7ykd40wZjB7CezNfic5YLKg=
|
||||
storj.io/common v0.0.0-20211019072056-34a5992b4856/go.mod h1:objobGrIWQwhmTSpSm6Y7ykd40wZjB7CezNfic5YLKg=
|
||||
storj.io/common v0.0.0-20211108092228-14e900b161d9 h1:PxSH22djpdRU1wHS45QE6Yy9oSsBnl1frp2tURKOV1k=
|
||||
storj.io/common v0.0.0-20211108092228-14e900b161d9/go.mod h1:a2Kw7Uipu929OFANfWKLHRoD0JfhgssikEvimd6hbSQ=
|
||||
storj.io/common v0.0.0-20211109113717-5e82e48abc31 h1:E4igB475TvFTJII8hvKBIIuvZzJjb0YdHLqkeQLUt1I=
|
||||
storj.io/common v0.0.0-20211109113717-5e82e48abc31/go.mod h1:a2Kw7Uipu929OFANfWKLHRoD0JfhgssikEvimd6hbSQ=
|
||||
storj.io/dotworld v0.0.0-20210324183515-0d11aeccd840/go.mod h1:KU9YvEgRrMMiWLvH8pzn1UkoCoxggKIPvQxmNdx7aXQ=
|
||||
storj.io/drpc v0.0.11/go.mod h1:TiFc2obNjL9/3isMW1Rpxjy8V9uE0B2HMeMFGiiI7Iw=
|
||||
storj.io/drpc v0.0.24/go.mod h1:ofQUDPQbbIymRDKE0tms48k8bLP5Y+dsI9CbXGv3gko=
|
||||
|
Loading…
Reference in New Issue
Block a user