Satellite voucher service (#2043)
* set up voucher service skeleton, basic test * add VetNode db method * basic test for VetNode * encode and sign voucher functions * fill out and sign vouchers * test pass/fail voucher request * match EncodeVoucher to other Encode functions
This commit is contained in:
parent
24c8132975
commit
590b1a5a1d
@ -51,6 +51,7 @@ import (
|
||||
"storj.io/storj/satellite/mailservice"
|
||||
"storj.io/storj/satellite/metainfo"
|
||||
"storj.io/storj/satellite/satellitedb"
|
||||
"storj.io/storj/satellite/vouchers"
|
||||
"storj.io/storj/storagenode"
|
||||
"storj.io/storj/storagenode/collector"
|
||||
"storj.io/storj/storagenode/orders"
|
||||
@ -504,6 +505,9 @@ func (planet *Planet) newSatellites(count int) ([]*satellite.Peer, error) {
|
||||
PasswordCost: console.TestPasswordCost,
|
||||
AuthTokenSecret: "my-suppa-secret-key",
|
||||
},
|
||||
Vouchers: vouchers.Config{
|
||||
Expiration: 30,
|
||||
},
|
||||
Version: planet.NewVersionConfig(),
|
||||
}
|
||||
if planet.config.Reconfigure.Satellite != nil {
|
||||
|
@ -35,3 +35,12 @@ func EncodePieceHash(hash *pb.PieceHash) ([]byte, error) {
|
||||
hash.Signature = signature
|
||||
return out, err
|
||||
}
|
||||
|
||||
// EncodeVoucher encodes voucher into bytes for signing.
|
||||
func EncodeVoucher(voucher *pb.Voucher) ([]byte, error) {
|
||||
signature := voucher.SatelliteSignature
|
||||
voucher.SatelliteSignature = nil
|
||||
out, err := proto.Marshal(voucher)
|
||||
voucher.SatelliteSignature = signature
|
||||
return out, err
|
||||
}
|
||||
|
@ -70,3 +70,20 @@ func SignPieceHash(signer Signer, unsigned *pb.PieceHash) (*pb.PieceHash, error)
|
||||
|
||||
return &signed, nil
|
||||
}
|
||||
|
||||
// SignVoucher signs the voucher using the specified signer
|
||||
// Signer is a satellite
|
||||
func SignVoucher(signer Signer, unsigned *pb.Voucher) (*pb.Voucher, error) {
|
||||
bytes, err := EncodeVoucher(unsigned)
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
|
||||
signed := *unsigned
|
||||
signed.SatelliteSignature, err = signer.HashAndSign(bytes)
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
|
||||
return &signed, nil
|
||||
}
|
||||
|
@ -48,7 +48,8 @@ type DB interface {
|
||||
KnownUnreliableOrOffline(context.Context, *NodeCriteria, storj.NodeIDList) (storj.NodeIDList, error)
|
||||
// Paginate will page through the database nodes
|
||||
Paginate(ctx context.Context, offset int64, limit int) ([]*NodeDossier, bool, error)
|
||||
|
||||
// VetNode returns whether or not the node reaches reputable thresholds
|
||||
VetNode(ctx context.Context, id storj.NodeID, criteria *NodeCriteria) (bool, error)
|
||||
// CreateStats initializes the stats for node.
|
||||
CreateStats(ctx context.Context, nodeID storj.NodeID, initial *NodeStats) (stats *NodeStats, err error)
|
||||
// Update updates node address
|
||||
@ -282,6 +283,22 @@ func (cache *Cache) Create(ctx context.Context, nodeID storj.NodeID, initial *No
|
||||
return cache.db.CreateStats(ctx, nodeID, initial)
|
||||
}
|
||||
|
||||
// VetNode returns whether or not the node reaches reputable thresholds
|
||||
func (cache *Cache) VetNode(ctx context.Context, nodeID storj.NodeID) (reputable bool, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
criteria := &NodeCriteria{
|
||||
AuditCount: cache.preferences.AuditCount,
|
||||
AuditSuccessRatio: cache.preferences.AuditSuccessRatio,
|
||||
UptimeCount: cache.preferences.UptimeCount,
|
||||
UptimeSuccessRatio: cache.preferences.UptimeRatio,
|
||||
}
|
||||
reputable, err = cache.db.VetNode(ctx, nodeID, criteria)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
return reputable, nil
|
||||
}
|
||||
|
||||
// UpdateStats all parts of single storagenode's stats.
|
||||
func (cache *Cache) UpdateStats(ctx context.Context, request *UpdateRequest) (stats *NodeStats, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
@ -11,9 +11,11 @@ import (
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/zap"
|
||||
"go.uber.org/zap/zaptest"
|
||||
|
||||
"storj.io/storj/internal/testcontext"
|
||||
"storj.io/storj/internal/testplanet"
|
||||
"storj.io/storj/pkg/overlay"
|
||||
"storj.io/storj/pkg/pb"
|
||||
"storj.io/storj/pkg/storj"
|
||||
@ -174,3 +176,36 @@ func TestRandomizedSelection(t *testing.T) {
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestVetNode(t *testing.T) {
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 2, UplinkCount: 0,
|
||||
Reconfigure: testplanet.Reconfigure{
|
||||
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
|
||||
config.Overlay.Node.AuditCount = 1
|
||||
config.Overlay.Node.AuditSuccessRatio = 1
|
||||
config.Overlay.Node.UptimeCount = 1
|
||||
config.Overlay.Node.UptimeRatio = 1
|
||||
},
|
||||
},
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
var err error
|
||||
satellite := planet.Satellites[0]
|
||||
service := satellite.Overlay.Service
|
||||
|
||||
_, err = satellite.DB.OverlayCache().UpdateStats(ctx, &overlay.UpdateRequest{
|
||||
NodeID: planet.StorageNodes[0].ID(),
|
||||
IsUp: true,
|
||||
AuditSuccess: true,
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
|
||||
reputable, err := service.VetNode(ctx, planet.StorageNodes[0].ID())
|
||||
require.NoError(t, err)
|
||||
assert.True(t, reputable)
|
||||
|
||||
reputable, err = service.VetNode(ctx, planet.StorageNodes[1].ID())
|
||||
require.NoError(t, err)
|
||||
assert.False(t, reputable)
|
||||
})
|
||||
}
|
||||
|
@ -51,6 +51,7 @@ import (
|
||||
"storj.io/storj/satellite/mailservice/simulate"
|
||||
"storj.io/storj/satellite/metainfo"
|
||||
"storj.io/storj/satellite/orders"
|
||||
"storj.io/storj/satellite/vouchers"
|
||||
"storj.io/storj/storage"
|
||||
"storj.io/storj/storage/boltdb"
|
||||
)
|
||||
@ -114,6 +115,8 @@ type Config struct {
|
||||
Mail mailservice.Config
|
||||
Console consoleweb.Config
|
||||
|
||||
Vouchers vouchers.Config
|
||||
|
||||
Version version.Config
|
||||
}
|
||||
|
||||
@ -191,6 +194,10 @@ type Peer struct {
|
||||
Service *mailservice.Service
|
||||
}
|
||||
|
||||
Vouchers struct {
|
||||
Service *vouchers.Service
|
||||
}
|
||||
|
||||
Console struct {
|
||||
Listener net.Listener
|
||||
Service *console.Service
|
||||
@ -313,6 +320,26 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, config *Config, ve
|
||||
peer.Discovery.Service = discovery.New(peer.Log.Named("discovery"), peer.Overlay.Service, peer.Kademlia.Service, config)
|
||||
}
|
||||
|
||||
{ // setup vouchers
|
||||
log.Debug("Setting up vouchers")
|
||||
config := config.Vouchers
|
||||
if config.Expiration < 0 {
|
||||
return nil, errs.New("voucher expiration (%d) must be > 0", config.Expiration)
|
||||
}
|
||||
expirationHours := config.Expiration * 24
|
||||
duration, err := time.ParseDuration(fmt.Sprintf("%dh", expirationHours))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
peer.Vouchers.Service = vouchers.NewService(
|
||||
peer.Log.Named("vouchers"),
|
||||
signing.SignerFromFullIdentity(peer.Identity),
|
||||
peer.Overlay.Service,
|
||||
duration,
|
||||
)
|
||||
pb.RegisterVouchersServer(peer.Server.GRPC(), peer.Vouchers.Service)
|
||||
}
|
||||
|
||||
{ // setup live accounting
|
||||
log.Debug("Setting up live accounting")
|
||||
config := config.LiveAccounting
|
||||
|
@ -700,6 +700,13 @@ func (m *lockedOverlayCache) Paginate(ctx context.Context, offset int64, limit i
|
||||
return m.db.Paginate(ctx, offset, limit)
|
||||
}
|
||||
|
||||
// VetNode returns whether or not the node reaches reputable thresholds
|
||||
func (m *lockedOverlayCache) VetNode(ctx context.Context, id storj.NodeID, criteria *overlay.NodeCriteria) (bool, error) {
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
return m.db.VetNode(ctx, id, criteria)
|
||||
}
|
||||
|
||||
// SelectNewStorageNodes looks up nodes based on new node criteria
|
||||
func (m *lockedOverlayCache) SelectNewStorageNodes(ctx context.Context, count int, criteria *overlay.NodeCriteria) ([]*pb.Node, error) {
|
||||
m.Lock()
|
||||
|
@ -339,6 +339,29 @@ func (cache *overlaycache) Get(ctx context.Context, id storj.NodeID) (*overlay.N
|
||||
return convertDBNode(node)
|
||||
}
|
||||
|
||||
// VetNode returns whether or not the node reaches reputable thresholds
|
||||
func (cache *overlaycache) VetNode(ctx context.Context, id storj.NodeID, criteria *overlay.NodeCriteria) (bool, error) {
|
||||
row := cache.db.QueryRow(cache.db.Rebind(`SELECT id
|
||||
FROM nodes
|
||||
WHERE id = ?
|
||||
AND type = ?
|
||||
AND total_audit_count >= ?
|
||||
AND audit_success_ratio >= ?
|
||||
AND total_uptime_count >= ?
|
||||
AND uptime_ratio >= ?
|
||||
`), id, pb.NodeType_STORAGE, criteria.AuditCount, criteria.AuditSuccessRatio,
|
||||
criteria.UptimeCount, criteria.UptimeSuccessRatio)
|
||||
var bytes *[]byte
|
||||
err := row.Scan(&bytes)
|
||||
if err != nil {
|
||||
if err == sql.ErrNoRows {
|
||||
return false, nil
|
||||
}
|
||||
return false, err
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// KnownUnreliableOrOffline filters a set of nodes to unreliable or offlines node, independent of new
|
||||
func (cache *overlaycache) KnownUnreliableOrOffline(ctx context.Context, criteria *overlay.NodeCriteria, nodeIds storj.NodeIDList) (badNodes storj.NodeIDList, err error) {
|
||||
if len(nodeIds) == 0 {
|
||||
|
79
satellite/vouchers/service.go
Normal file
79
satellite/vouchers/service.go
Normal file
@ -0,0 +1,79 @@
|
||||
// Copyright (C) 2019 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package vouchers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/golang/protobuf/ptypes"
|
||||
"github.com/zeebo/errs"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"storj.io/storj/pkg/auth/signing"
|
||||
"storj.io/storj/pkg/identity"
|
||||
"storj.io/storj/pkg/overlay"
|
||||
"storj.io/storj/pkg/pb"
|
||||
)
|
||||
|
||||
// Config contains voucher service configuration parameters
|
||||
type Config struct {
|
||||
Expiration int `help:"number of days before a voucher expires" default:"30"`
|
||||
}
|
||||
|
||||
// Service for issuing signed vouchers
|
||||
type Service struct {
|
||||
log *zap.Logger
|
||||
satellite signing.Signer
|
||||
cache *overlay.Cache
|
||||
expiration time.Duration
|
||||
}
|
||||
|
||||
// Error the default vouchers errs class
|
||||
var (
|
||||
Error = errs.Class("vouchers error")
|
||||
)
|
||||
|
||||
// NewService creates a new service for issuing signed vouchers
|
||||
func NewService(log *zap.Logger, satellite signing.Signer, cache *overlay.Cache, expiration time.Duration) *Service {
|
||||
return &Service{
|
||||
log: log,
|
||||
satellite: satellite,
|
||||
cache: cache,
|
||||
expiration: expiration,
|
||||
}
|
||||
}
|
||||
|
||||
// Request receives a voucher request and returns a voucher and an error
|
||||
func (service *Service) Request(ctx context.Context, req *pb.VoucherRequest) (*pb.Voucher, error) {
|
||||
peer, err := identity.PeerIdentityFromContext(ctx)
|
||||
if err != nil {
|
||||
return &pb.Voucher{}, Error.Wrap(err)
|
||||
}
|
||||
|
||||
reputable, err := service.cache.VetNode(ctx, peer.ID)
|
||||
if err != nil {
|
||||
return &pb.Voucher{}, Error.Wrap(err)
|
||||
}
|
||||
|
||||
service.log.Debug("Node reputation", zap.Bool("reputable", reputable))
|
||||
|
||||
if !reputable {
|
||||
return &pb.Voucher{}, Error.New("Request rejected. Node not reputable")
|
||||
}
|
||||
|
||||
expirationTime := time.Now().UTC().Add(service.expiration)
|
||||
expiration, err := ptypes.TimestampProto(expirationTime)
|
||||
if err != nil {
|
||||
return &pb.Voucher{}, Error.Wrap(err)
|
||||
}
|
||||
|
||||
unsigned := &pb.Voucher{
|
||||
SatelliteId: service.satellite.ID(),
|
||||
StorageNodeId: peer.ID,
|
||||
Expiration: expiration,
|
||||
}
|
||||
|
||||
return signing.SignVoucher(service.satellite, unsigned)
|
||||
}
|
74
satellite/vouchers/vouchers_test.go
Normal file
74
satellite/vouchers/vouchers_test.go
Normal file
@ -0,0 +1,74 @@
|
||||
// Copyright (C) 2019 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package vouchers_test
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"storj.io/storj/internal/testcontext"
|
||||
"storj.io/storj/internal/testplanet"
|
||||
"storj.io/storj/pkg/overlay"
|
||||
"storj.io/storj/pkg/pb"
|
||||
"storj.io/storj/satellite"
|
||||
"storj.io/storj/storagenode"
|
||||
)
|
||||
|
||||
func TestVouchers(t *testing.T) {
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 2, UplinkCount: 0,
|
||||
Reconfigure: testplanet.Reconfigure{
|
||||
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
|
||||
config.Overlay.Node.AuditCount = 1
|
||||
},
|
||||
},
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
tests := []struct {
|
||||
node *storagenode.Peer
|
||||
reputable bool
|
||||
expectedErr string
|
||||
}{
|
||||
{
|
||||
node: planet.StorageNodes[0],
|
||||
reputable: true,
|
||||
},
|
||||
{
|
||||
node: planet.StorageNodes[1],
|
||||
reputable: false,
|
||||
expectedErr: "rpc error: code = Unknown desc = vouchers error: Request rejected. Node not reputable",
|
||||
},
|
||||
}
|
||||
|
||||
satellite := planet.Satellites[0].Local().Node
|
||||
|
||||
for _, tt := range tests {
|
||||
if tt.reputable {
|
||||
_, err := planet.Satellites[0].DB.OverlayCache().UpdateStats(ctx, &overlay.UpdateRequest{
|
||||
NodeID: tt.node.ID(),
|
||||
IsUp: true,
|
||||
AuditSuccess: true,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
conn, err := tt.node.Transport.DialNode(ctx, &satellite)
|
||||
require.NoError(t, err)
|
||||
|
||||
client := pb.NewVouchersClient(conn)
|
||||
|
||||
voucher, err := client.Request(ctx, &pb.VoucherRequest{})
|
||||
if tt.reputable {
|
||||
assert.NoError(t, err)
|
||||
assert.NotNil(t, voucher)
|
||||
assert.Equal(t, tt.node.ID(), voucher.StorageNodeId)
|
||||
} else {
|
||||
assert.Equal(t, tt.expectedErr, err.Error())
|
||||
assert.Nil(t, voucher)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
3
scripts/testdata/satellite-config.yaml.lock
vendored
3
scripts/testdata/satellite-config.yaml.lock
vendored
@ -253,3 +253,6 @@ server.private-address: "127.0.0.1:7778"
|
||||
# server address to check its version against
|
||||
# version.server-address: "https://version.alpha.storj.io"
|
||||
|
||||
# number of days before a voucher expires
|
||||
# vouchers.expiration: 30
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user