remove kademlia: create upsert query to update uptime (#2999)

* create upsert query for check-in method

* add tests

* fix lint err

* add benchmark test for db query

* fix lint and tests

* add a unit test, fix lint

* add address to tests

* replace print w/ b.Fatal

* refactor query per CR comments

* fix disqualified, only set if null

* fix query

* add version to updatecheckin query

* fix version

* fix tests

* change version for tests

* add version to tests

* add IP, add transport, mv unit test

* use node.address as arg

* add last ip

* fix lint
This commit is contained in:
Jess G 2019-09-19 11:37:31 -07:00 committed by GitHub
parent 45df0c5340
commit 93788e5218
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 977 additions and 66 deletions

View File

@ -25,6 +25,7 @@ import (
"storj.io/storj/pkg/revocation"
"storj.io/storj/pkg/server"
"storj.io/storj/pkg/storj"
"storj.io/storj/satellite/overlay"
"storj.io/storj/uplink"
"storj.io/storj/uplink/metainfo"
)
@ -130,10 +131,27 @@ func TestDownloadWithSomeNodesOffline(t *testing.T) {
require.NoError(t, err)
// mark node as offline in overlay
_, err = satellite.Overlay.Service.UpdateUptime(ctx, node.ID(), false)
info := overlay.NodeCheckInInfo{
NodeID: node.ID(),
IsUp: false,
Address: &pb.NodeAddress{
Address: "1.2.3.4",
},
Version: &pb.NodeVersion{
Version: "v0.0.0",
CommitHash: "",
Timestamp: time.Time{},
Release: false,
},
}
err = satellite.Overlay.Service.UpdateCheckIn(ctx, info)
require.NoError(t, err)
}
}
// confirm that we marked the correct number of storage nodes as offline
nodes, err := satellite.Overlay.DB.SelectStorageNodes(ctx, len(planet.StorageNodes), &overlay.NodeCriteria{})
require.NoError(t, err)
require.Len(t, nodes, len(planet.StorageNodes)-len(nodesToKill))
// we should be able to download data without any of the original nodes
newData, err := ul.Download(ctx, satellite, "testbucket", "test/path")

View File

@ -25,7 +25,7 @@ const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package
type CheckInRequest struct {
Address string `protobuf:"bytes,1,opt,name=address,proto3" json:"address,omitempty"`
Version string `protobuf:"bytes,2,opt,name=version,proto3" json:"version,omitempty"`
Version *NodeVersion `protobuf:"bytes,2,opt,name=version,proto3" json:"version,omitempty"`
Capacity *NodeCapacity `protobuf:"bytes,3,opt,name=capacity,proto3" json:"capacity,omitempty"`
Operator *NodeOperator `protobuf:"bytes,4,opt,name=operator,proto3" json:"operator,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
@ -64,11 +64,11 @@ func (m *CheckInRequest) GetAddress() string {
return ""
}
func (m *CheckInRequest) GetVersion() string {
func (m *CheckInRequest) GetVersion() *NodeVersion {
if m != nil {
return m.Version
}
return ""
return nil
}
func (m *CheckInRequest) GetCapacity() *NodeCapacity {
@ -201,26 +201,27 @@ func init() {
func init() { proto.RegisterFile("contact.proto", fileDescriptor_a5036fff2565fb15) }
var fileDescriptor_a5036fff2565fb15 = []byte{
// 296 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x91, 0x3f, 0x4f, 0xc3, 0x30,
0x10, 0xc5, 0x95, 0x52, 0x91, 0x72, 0x08, 0x0a, 0x06, 0x84, 0x55, 0x18, 0xaa, 0x4c, 0x15, 0x42,
0x19, 0xca, 0xca, 0x44, 0xe8, 0xc0, 0x00, 0x44, 0x61, 0x63, 0x89, 0x5c, 0xe7, 0x14, 0xa2, 0x0a,
0xdb, 0xd8, 0x2e, 0x12, 0xdf, 0x87, 0x0f, 0x8a, 0x1c, 0x3b, 0x41, 0xa5, 0x8c, 0xf7, 0x7e, 0xcf,
0xcf, 0xf7, 0x07, 0x0e, 0xb8, 0x14, 0x96, 0x71, 0x9b, 0x2a, 0x2d, 0xad, 0x24, 0x71, 0x28, 0x27,
0x20, 0x64, 0x85, 0x5e, 0x4c, 0xbe, 0x23, 0x38, 0xcc, 0xde, 0x90, 0xaf, 0x1e, 0x44, 0x81, 0x1f,
0x6b, 0x34, 0x96, 0x50, 0x88, 0x59, 0x55, 0x69, 0x34, 0x86, 0x46, 0xd3, 0x68, 0xb6, 0x57, 0x74,
0xa5, 0x23, 0x9f, 0xa8, 0x4d, 0x23, 0x05, 0x1d, 0x78, 0x12, 0x4a, 0x92, 0xc2, 0x88, 0x33, 0xc5,
0x78, 0x63, 0xbf, 0xe8, 0xce, 0x34, 0x9a, 0xed, 0xcf, 0x49, 0xda, 0xfe, 0xf2, 0x24, 0x2b, 0xcc,
0x02, 0x29, 0x7a, 0x8f, 0xf3, 0x4b, 0x85, 0x9a, 0x59, 0xa9, 0xe9, 0xf0, 0xaf, 0xff, 0x39, 0x90,
0xa2, 0xf7, 0x24, 0x2b, 0x18, 0xf7, 0x5d, 0x1a, 0x25, 0x85, 0x41, 0x72, 0x05, 0xc7, 0xaa, 0x11,
0x75, 0xe9, 0x9e, 0x95, 0x66, 0xcd, 0x79, 0xd7, 0xf0, 0xa8, 0x18, 0x3b, 0xe0, 0x92, 0x5e, 0xbc,
0x4c, 0xae, 0x81, 0xb4, 0x5e, 0xd4, 0x5a, 0xea, 0xf2, 0x1d, 0x8d, 0x61, 0x35, 0x86, 0x19, 0x8e,
0x1c, 0x59, 0x38, 0xf0, 0xe8, 0xf5, 0xe4, 0x14, 0x48, 0xe6, 0x57, 0x95, 0x37, 0xa2, 0x0e, 0x6b,
0x49, 0xce, 0xe0, 0x64, 0x43, 0xf5, 0x6d, 0xcc, 0x73, 0x88, 0x83, 0x4c, 0x16, 0x30, 0xca, 0xc3,
0xc7, 0xe4, 0x22, 0xed, 0x96, 0xbf, 0x1d, 0x35, 0xb9, 0xfc, 0x1f, 0x86, 0xc4, 0x7b, 0x18, 0xb6,
0x11, 0xb7, 0x10, 0x87, 0x99, 0xc9, 0xf9, 0xef, 0x83, 0x8d, 0x5b, 0x4d, 0xe8, 0x36, 0xf0, 0x29,
0x77, 0xc3, 0xd7, 0x81, 0x5a, 0x2e, 0x77, 0xdb, 0x2b, 0xdf, 0xfc, 0x04, 0x00, 0x00, 0xff, 0xff,
0xbf, 0x07, 0xc0, 0x67, 0x0b, 0x02, 0x00, 0x00,
// 308 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x91, 0x4d, 0x4f, 0xf2, 0x40,
0x14, 0x85, 0x53, 0x5e, 0xf2, 0x16, 0xae, 0x51, 0xe4, 0xaa, 0xb1, 0x41, 0x17, 0xa4, 0x2b, 0xa2,
0xa6, 0x0b, 0xdc, 0xba, 0x12, 0x59, 0xb8, 0x50, 0xc9, 0x98, 0xb8, 0x70, 0x43, 0xca, 0xf4, 0x06,
0x1b, 0xe2, 0x4c, 0x9d, 0x19, 0x4c, 0xfc, 0x65, 0xfe, 0x3d, 0x33, 0x1f, 0x2d, 0x41, 0x5c, 0xce,
0x39, 0xcf, 0x3d, 0x73, 0x3f, 0x60, 0x9f, 0x4b, 0x61, 0x72, 0x6e, 0xb2, 0x4a, 0x49, 0x23, 0x31,
0x0e, 0xcf, 0x01, 0x08, 0x59, 0x90, 0x17, 0xd3, 0xef, 0x08, 0x0e, 0x26, 0x6f, 0xc4, 0x57, 0xf7,
0x82, 0xd1, 0xc7, 0x9a, 0xb4, 0xc1, 0x04, 0xe2, 0xbc, 0x28, 0x14, 0x69, 0x9d, 0x44, 0xc3, 0x68,
0xd4, 0x65, 0xf5, 0x13, 0x2f, 0x21, 0xfe, 0x24, 0xa5, 0x4b, 0x29, 0x92, 0xd6, 0x30, 0x1a, 0xed,
0x8d, 0xfb, 0x99, 0x8b, 0x7a, 0x94, 0x05, 0xbd, 0x78, 0x83, 0xd5, 0x04, 0x66, 0xd0, 0xe1, 0x79,
0x95, 0xf3, 0xd2, 0x7c, 0x25, 0xff, 0x1c, 0x8d, 0x1b, 0x7a, 0x12, 0x1c, 0xd6, 0x30, 0x96, 0x97,
0x15, 0xa9, 0xdc, 0x48, 0x95, 0xb4, 0x7f, 0xf3, 0x4f, 0xc1, 0x61, 0x0d, 0x93, 0xae, 0xa0, 0xd7,
0x34, 0xae, 0x2b, 0x29, 0x34, 0xe1, 0x05, 0xf4, 0xab, 0x52, 0x2c, 0xe7, 0xb6, 0x6c, 0xae, 0xd7,
0x9c, 0xd7, 0x33, 0x74, 0x58, 0xcf, 0x1a, 0x36, 0xe9, 0xd9, 0xcb, 0x78, 0x05, 0xe8, 0x58, 0x52,
0x4a, 0xaa, 0xf9, 0x3b, 0x69, 0x9d, 0x2f, 0xc9, 0x8d, 0xd5, 0x65, 0x87, 0xd6, 0x99, 0x5a, 0xe3,
0xc1, 0xeb, 0xe9, 0x31, 0xe0, 0xc4, 0x6f, 0x6f, 0x56, 0x8a, 0x65, 0xd8, 0x54, 0x7a, 0x02, 0x47,
0x5b, 0xaa, 0x6f, 0x63, 0x3c, 0x83, 0x38, 0xc8, 0x38, 0x85, 0xce, 0x2c, 0x7c, 0x8c, 0x67, 0x59,
0x7d, 0x8f, 0xdd, 0xa8, 0xc1, 0xf9, 0xdf, 0x66, 0x48, 0xbc, 0x83, 0xb6, 0x8b, 0xb8, 0x81, 0x38,
0xcc, 0x8c, 0xa7, 0x9b, 0x82, 0xad, 0xf3, 0x0d, 0x92, 0x5d, 0xc3, 0xa7, 0xdc, 0xb6, 0x5f, 0x5b,
0xd5, 0x62, 0xf1, 0xdf, 0x1d, 0xfe, 0xfa, 0x27, 0x00, 0x00, 0xff, 0xff, 0xa0, 0xa1, 0x19, 0xad,
0x1e, 0x02, 0x00, 0x00,
}
type DRPCContactClient interface {

View File

@ -18,7 +18,7 @@ service Node {
message CheckInRequest {
string address = 1;
string version = 2;
node.NodeVersion version = 2;
node.NodeCapacity capacity = 3;
node.NodeOperator operator = 4;
}

View File

@ -167,7 +167,7 @@
{
"id": 2,
"name": "version",
"type": "string"
"type": "node.NodeVersion"
},
{
"id": 3,

View File

@ -17,6 +17,7 @@ import (
"storj.io/storj/internal/testrand"
"storj.io/storj/pkg/encryption"
"storj.io/storj/pkg/paths"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/storj"
"storj.io/storj/satellite"
"storj.io/storj/satellite/audit"
@ -204,7 +205,25 @@ func TestDisqualifiedNodeRemainsDisqualified(t *testing.T) {
disqualifiedNode := planet.StorageNodes[0]
disqualifyNode(t, ctx, satellitePeer, disqualifiedNode.ID())
_, err := satellitePeer.DB.OverlayCache().UpdateUptime(ctx, disqualifiedNode.ID(), true, 0, 1, 0)
info := overlay.NodeCheckInInfo{
NodeID: disqualifiedNode.ID(),
IsUp: true,
Address: &pb.NodeAddress{
Address: "1.2.3.4",
},
Version: &pb.NodeVersion{
Version: "v0.0.0",
CommitHash: "",
Timestamp: time.Time{},
Release: false,
},
}
config := overlay.NodeSelectionConfig{
UptimeReputationLambda: 0,
UptimeReputationWeight: 1,
UptimeReputationDQ: 0,
}
err := satellitePeer.DB.OverlayCache().UpdateCheckIn(ctx, info, config)
require.NoError(t, err)
assert.True(t, isDisqualified(t, ctx, satellitePeer, disqualifiedNode.ID()))
@ -233,17 +252,25 @@ func isDisqualified(t *testing.T, ctx *testcontext.Context, satellite *testplane
return node.Disqualified != nil
}
func disqualifyNode(t *testing.T, ctx *testcontext.Context, satellite *testplanet.SatelliteSystem, nodeID storj.NodeID) {
_, err := satellite.DB.OverlayCache().BatchUpdateStats(ctx, []*overlay.UpdateRequest{{
info := overlay.NodeCheckInInfo{
NodeID: nodeID,
IsUp: true,
AuditSuccess: false,
AuditLambda: 0,
AuditWeight: 1,
AuditDQ: 0.5,
UptimeLambda: 1,
UptimeWeight: 1,
UptimeDQ: 0.5,
}}, 100)
IsUp: false,
Address: &pb.NodeAddress{
Address: "1.2.3.4",
},
Version: &pb.NodeVersion{
Version: "v0.0.0",
CommitHash: "",
Timestamp: time.Time{},
Release: false,
},
}
config := overlay.NodeSelectionConfig{
UptimeReputationLambda: 1,
UptimeReputationWeight: 1,
UptimeReputationDQ: 1,
}
err := satellite.DB.OverlayCache().UpdateCheckIn(ctx, info, config)
require.NoError(t, err)
assert.True(t, isDisqualified(t, ctx, satellite, nodeID))
}

View File

@ -0,0 +1,111 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package contact_test
import (
"context"
"os"
"testing"
"time"
"storj.io/storj/internal/testcontext"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/storj"
"storj.io/storj/satellite"
"storj.io/storj/satellite/overlay"
"storj.io/storj/satellite/satellitedb/satellitedbtest"
)
func BenchmarkUpdateCheckIn(b *testing.B) {
postgresSetup := os.Getenv("STORJ_POSTGRES_TEST")
if postgresSetup == "" {
b.Fatal("postgres must be configured with env var: STORJ_SIM_POSTGRES")
return
}
satellitedbtest.Bench(b, func(b *testing.B, db satellite.DB) {
ctx := testcontext.New(b)
defer ctx.Cleanup()
benchmarkOld(ctx, b, db)
benchmarkNew(ctx, b, db)
})
}
var node = overlay.NodeCheckInInfo{
NodeID: storj.NodeID{1},
Address: &pb.NodeAddress{
Address: "1.2.4.4",
},
IsUp: true,
Capacity: &pb.NodeCapacity{
FreeBandwidth: int64(1234),
FreeDisk: int64(5678),
},
Operator: &pb.NodeOperator{
Email: "test@email.com",
Wallet: "0x123",
},
Version: &pb.NodeVersion{
Version: "v0.0.0",
CommitHash: "",
Timestamp: time.Time{},
Release: false,
},
}
var config = overlay.NodeSelectionConfig{
UptimeReputationLambda: 0.99,
UptimeReputationWeight: 1.0,
UptimeReputationDQ: 0,
}
func benchmarkOld(ctx context.Context, b *testing.B, db satellite.DB) {
b.Run("old", func(b *testing.B) {
for i := 0; i < b.N; i++ {
value := pb.Node{
Id: node.NodeID,
Address: &pb.NodeAddress{
Transport: pb.NodeTransport_TCP_TLS_GRPC,
Address: node.Address.GetAddress(),
},
}
err := db.OverlayCache().UpdateAddress(ctx, &value, config)
if err != nil {
b.Fatal(err)
return
}
_, err = db.OverlayCache().UpdateUptime(ctx, node.NodeID, node.IsUp, config.UptimeReputationLambda, config.UptimeReputationWeight, config.UptimeReputationDQ)
if err != nil {
b.Fatal(err)
return
}
pbInfo := pb.InfoResponse{
Operator: node.Operator,
Capacity: node.Capacity,
Type: pb.NodeType_STORAGE,
}
_, err = db.OverlayCache().UpdateNodeInfo(ctx, node.NodeID, &pbInfo)
if err != nil {
b.Fatal(err)
return
}
}
})
return
}
func benchmarkNew(ctx context.Context, b *testing.B, db satellite.DB) {
b.Run("new", func(b *testing.B) {
for i := 0; i < b.N; i++ {
node.NodeID = storj.NodeID{2}
err := db.OverlayCache().UpdateCheckIn(ctx, node, config)
if err != nil {
b.Fatal(err)
return
}
}
})
}

View File

@ -39,7 +39,7 @@ func TestSatelliteContactEndpoint(t *testing.T) {
peerCtx := peer.NewContext(ctx, &grpcPeer)
resp, err := planet.Satellites[0].Contact.Endpoint.CheckIn(peerCtx, &pb.CheckInRequest{
Address: nodeDossier.Address.GetAddress(),
Version: nodeDossier.Version.GetVersion(),
Version: &nodeDossier.Version,
Capacity: &nodeDossier.Capacity,
Operator: &nodeDossier.Operator,
})

View File

@ -18,6 +18,7 @@ import (
"storj.io/storj/pkg/identity"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/storj"
"storj.io/storj/satellite/overlay"
)
// Endpoint implements the contact service Endpoints.
@ -53,32 +54,28 @@ func (endpoint *Endpoint) CheckIn(ctx context.Context, req *pb.CheckInRequest) (
return nil, status.Error(codes.Internal, Error.Wrap(err).Error())
}
lastIP, err := overlay.GetNetwork(ctx, req.Address)
if err != nil {
return nil, status.Error(codes.Internal, Error.Wrap(err).Error())
}
pingNodeSuccess, pingErrorMessage, err := endpoint.pingBack(ctx, req, nodeID)
if err != nil {
return nil, status.Error(codes.Internal, Error.Wrap(err).Error())
}
err = endpoint.service.overlay.Put(ctx, nodeID, pb.Node{
Id: nodeID,
nodeInfo := overlay.NodeCheckInInfo{
NodeID: peerID.ID,
Address: &pb.NodeAddress{
Transport: pb.NodeTransport_TCP_TLS_GRPC,
Address: req.Address,
Transport: pb.NodeTransport_TCP_TLS_GRPC,
},
})
if err != nil {
return nil, status.Error(codes.Internal, Error.Wrap(err).Error())
LastIP: lastIP,
IsUp: pingNodeSuccess,
Capacity: req.Capacity,
Operator: req.Operator,
Version: req.Version,
}
// TODO(jg): We are making 2 requests to the database, one to update uptime and
// the other to update the capacity and operator info. We should combine these into
// one to reduce db connections. Consider adding batching and using a stored procedure.
_, err = endpoint.service.overlay.UpdateUptime(ctx, nodeID, pingNodeSuccess)
if err != nil {
return nil, status.Error(codes.Internal, Error.Wrap(err).Error())
}
nodeInfo := pb.InfoResponse{Operator: req.GetOperator(), Capacity: req.GetCapacity(), Version: &pb.NodeVersion{Version: req.Version}}
_, err = endpoint.service.overlay.UpdateNodeInfo(ctx, nodeID, &nodeInfo)
err = endpoint.service.overlay.UpdateCheckIn(ctx, nodeInfo)
if err != nil {
return nil, status.Error(codes.Internal, Error.Wrap(err).Error())
}

View File

@ -68,6 +68,8 @@ type DB interface {
UpdateNodeInfo(ctx context.Context, node storj.NodeID, nodeInfo *pb.InfoResponse) (stats *NodeDossier, err error)
// UpdateUptime updates a single storagenode's uptime stats.
UpdateUptime(ctx context.Context, nodeID storj.NodeID, isUp bool, lambda, weight, uptimeDQ float64) (stats *NodeStats, err error)
// UpdateCheckIn updates a single storagenode's check-in stats.
UpdateCheckIn(ctx context.Context, node NodeCheckInInfo, config NodeSelectionConfig) (err error)
// AllPieceCounts returns a map of node IDs to piece counts from the db.
AllPieceCounts(ctx context.Context) (pieceCounts map[storj.NodeID]int, err error)
@ -75,6 +77,17 @@ type DB interface {
UpdatePieceCounts(ctx context.Context, pieceCounts map[storj.NodeID]int) (err error)
}
// NodeCheckInInfo contains all the info that will be updated when a node checkins
type NodeCheckInInfo struct {
NodeID storj.NodeID
Address *pb.NodeAddress
LastIP string
IsUp bool
Operator *pb.NodeOperator
Capacity *pb.NodeCapacity
Version *pb.NodeVersion
}
// FindStorageNodesRequest defines easy request parameters.
type FindStorageNodesRequest struct {
MinimumRequiredNodes int
@ -383,6 +396,12 @@ func (service *Service) UpdateUptime(ctx context.Context, nodeID storj.NodeID, i
return service.db.UpdateUptime(ctx, nodeID, isUp, lambda, weight, uptimeDQ)
}
// UpdateCheckIn updates a single storagenode's check-in info.
func (service *Service) UpdateCheckIn(ctx context.Context, node NodeCheckInInfo) (err error) {
defer mon.Task()(&ctx)(&err)
return service.db.UpdateCheckIn(ctx, node, service.config.Node)
}
// ConnFailure implements the Transport Observer `ConnFailure` function
func (service *Service) ConnFailure(ctx context.Context, node *pb.Node, failureError error) {
var err error

View File

@ -406,3 +406,155 @@ func TestNodeInfo(t *testing.T) {
assert.Equal(t, planet.StorageNodes[0].Local().Version.Version, node.Version.Version)
})
}
func TestUpdateCheckIn(t *testing.T) {
satellitedbtest.Run(t, func(t *testing.T, db satellite.DB) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
// setup
nodeID := storj.NodeID{1, 2, 3}
expectedEmail := "test@email.com"
expectedAddress := "1.2.4.4"
info := overlay.NodeCheckInInfo{
NodeID: nodeID,
Address: &pb.NodeAddress{
Address: expectedAddress,
},
IsUp: true,
Capacity: &pb.NodeCapacity{
FreeBandwidth: int64(1234),
FreeDisk: int64(5678),
},
Operator: &pb.NodeOperator{
Email: expectedEmail,
Wallet: "0x123",
},
Version: &pb.NodeVersion{
Version: "v0.0.0",
CommitHash: "",
Timestamp: time.Time{},
Release: false,
},
}
expectedNode := &overlay.NodeDossier{
Node: pb.Node{
Id: nodeID,
LastIp: info.LastIP,
Address: &pb.NodeAddress{
Address: info.Address.GetAddress(),
Transport: pb.NodeTransport_TCP_TLS_GRPC,
},
},
Type: pb.NodeType_STORAGE,
Operator: pb.NodeOperator{
Email: info.Operator.GetEmail(),
Wallet: info.Operator.GetWallet(),
},
Capacity: pb.NodeCapacity{
FreeBandwidth: info.Capacity.GetFreeBandwidth(),
FreeDisk: info.Capacity.GetFreeDisk(),
},
Reputation: overlay.NodeStats{
UptimeCount: 1,
UptimeSuccessCount: 1,
UptimeReputationAlpha: 1,
},
Version: pb.NodeVersion{
Version: "v0.0.0",
CommitHash: "",
Timestamp: time.Time{},
Release: false,
},
Contained: false,
Disqualified: nil,
PieceCount: 0,
}
config := overlay.NodeSelectionConfig{
UptimeReputationLambda: 0.99,
UptimeReputationWeight: 1.0,
UptimeReputationDQ: 0,
}
// confirm the node doesn't exist in nodes table yet
_, err := db.OverlayCache().Get(ctx, nodeID)
require.Error(t, err)
require.Contains(t, err.Error(), "node not found")
// check-in for that node id, which should add the node
// to the nodes tables in the database
startOfTest := time.Now().UTC()
err = db.OverlayCache().UpdateCheckIn(ctx, info, config)
require.NoError(t, err)
// confirm that the node is now in the nodes table with the
// correct fields set
actualNode, err := db.OverlayCache().Get(ctx, nodeID)
require.NoError(t, err)
require.True(t, actualNode.Reputation.LastContactSuccess.After(startOfTest))
require.True(t, actualNode.Reputation.LastContactFailure.Equal(time.Time{}.UTC()))
// we need to overwrite the times so that the deep equal considers them the same
expectedNode.Reputation.LastContactSuccess = actualNode.Reputation.LastContactSuccess
expectedNode.Reputation.LastContactFailure = actualNode.Reputation.LastContactFailure
expectedNode.Version.Timestamp = actualNode.Version.Timestamp
require.Equal(t, actualNode, expectedNode)
// confirm that we can update the address field
startOfUpdateTest := time.Now().UTC()
expectedAddress = "9.8.7.6"
updatedInfo := overlay.NodeCheckInInfo{
NodeID: nodeID,
Address: &pb.NodeAddress{
Address: expectedAddress,
},
IsUp: true,
Capacity: &pb.NodeCapacity{
FreeBandwidth: int64(12355),
},
Version: &pb.NodeVersion{
Version: "v0.0.0",
CommitHash: "",
Timestamp: time.Time{},
Release: false,
},
}
// confirm that the updated node is in the nodes table with the
// correct updated fields set
err = db.OverlayCache().UpdateCheckIn(ctx, updatedInfo, config)
require.NoError(t, err)
updatedNode, err := db.OverlayCache().Get(ctx, nodeID)
require.NoError(t, err)
require.True(t, updatedNode.Reputation.LastContactSuccess.After(startOfUpdateTest))
require.True(t, updatedNode.Reputation.LastContactFailure.Equal(time.Time{}.UTC()))
require.Equal(t, updatedNode.Address.GetAddress(), expectedAddress)
require.Equal(t, updatedNode.Reputation.UptimeSuccessCount, actualNode.Reputation.UptimeSuccessCount+1)
require.Equal(t, updatedNode.Capacity.GetFreeBandwidth(), int64(12355))
// confirm we can udpate IsUp field
startOfUpdateTest2 := time.Now().UTC()
updatedInfo2 := overlay.NodeCheckInInfo{
NodeID: nodeID,
Address: &pb.NodeAddress{
Address: "9.8.7.6",
},
IsUp: false,
Capacity: &pb.NodeCapacity{
FreeBandwidth: int64(12355),
},
Version: &pb.NodeVersion{
Version: "v0.0.0",
CommitHash: "",
Timestamp: time.Time{},
Release: false,
},
}
err = db.OverlayCache().UpdateCheckIn(ctx, updatedInfo2, config)
require.NoError(t, err)
updated2Node, err := db.OverlayCache().Get(ctx, nodeID)
require.NoError(t, err)
require.True(t, updated2Node.Reputation.LastContactSuccess.Equal(updatedNode.Reputation.LastContactSuccess))
require.Equal(t, updated2Node.Reputation.UptimeSuccessCount, updatedNode.Reputation.UptimeSuccessCount)
require.True(t, updated2Node.Reputation.LastContactFailure.After(startOfUpdateTest2))
})
}

View File

@ -182,12 +182,12 @@ func testDatabase(ctx context.Context, t *testing.T, cache overlay.DB) {
}
{ // TestUpdateUptimeExists
{ // test UpdateCheckIn updates the reputation correctly when the node is offline/online
nodeID := storj.NodeID{1}
// get the existing node info that is stored in nodes table
node, err := cache.Get(ctx, nodeID)
require.NoError(t, err)
alpha := node.Reputation.UptimeReputationAlpha
beta := node.Reputation.UptimeReputationBeta
@ -195,23 +195,50 @@ func testDatabase(ctx context.Context, t *testing.T, cache overlay.DB) {
weight := 0.876
dq := float64(0) // don't disqualify for any reason
stats, err := cache.UpdateUptime(ctx, nodeID, false, lambda, weight, dq)
info := overlay.NodeCheckInInfo{
NodeID: nodeID,
Address: &pb.NodeAddress{
Address: "1.2.3.4",
},
IsUp: false,
Version: &pb.NodeVersion{
Version: "v0.0.0",
CommitHash: "",
Timestamp: time.Time{},
Release: false,
},
}
config := overlay.NodeSelectionConfig{
UptimeReputationLambda: lambda,
UptimeReputationWeight: weight,
UptimeReputationDQ: dq,
}
// update check-in when node is offline
err = cache.UpdateCheckIn(ctx, info, config)
require.NoError(t, err)
node, err = cache.Get(ctx, nodeID)
require.NoError(t, err)
expectedAlpha := lambda * alpha
expectedBeta := lambda*beta + weight
require.EqualValues(t, stats.UptimeReputationAlpha, expectedAlpha)
require.EqualValues(t, stats.UptimeReputationBeta, expectedBeta)
// confirm the reputation is updated correctly when node is offline
require.EqualValues(t, node.Reputation.UptimeReputationAlpha, expectedAlpha)
require.EqualValues(t, node.Reputation.UptimeReputationBeta, expectedBeta)
alpha = expectedAlpha
beta = expectedBeta
stats, err = cache.UpdateUptime(ctx, nodeID, true, lambda, weight, dq)
info.IsUp = true
// update check-in when node is online
err = cache.UpdateCheckIn(ctx, info, config)
require.NoError(t, err)
node, err = cache.Get(ctx, nodeID)
require.NoError(t, err)
expectedAlpha = lambda*alpha + weight
expectedBeta = lambda * beta
require.EqualValues(t, stats.UptimeReputationAlpha, expectedAlpha)
require.EqualValues(t, stats.UptimeReputationBeta, expectedBeta)
// confirm the reputation is updated correctly when node is online
require.EqualValues(t, node.Reputation.UptimeReputationAlpha, expectedAlpha)
require.EqualValues(t, node.Reputation.UptimeReputationBeta, expectedBeta)
}
}

View File

@ -934,6 +934,13 @@ func (m *lockedOverlayCache) UpdateAddress(ctx context.Context, value *pb.Node,
return m.db.UpdateAddress(ctx, value, defaults)
}
// UpdateCheckIn updates a single storagenode's check-in stats.
func (m *lockedOverlayCache) UpdateCheckIn(ctx context.Context, node overlay.NodeCheckInInfo, config overlay.NodeSelectionConfig) (err error) {
m.Lock()
defer m.Unlock()
return m.db.UpdateCheckIn(ctx, node, config)
}
// UpdateNodeInfo updates node dossier with info requested from the node itself like node type, email, wallet, capacity, and version.
func (m *lockedOverlayCache) UpdateNodeInfo(ctx context.Context, node storj.NodeID, nodeInfo *pb.InfoResponse) (stats *overlay.NodeDossier, err error) {
m.Lock()

View File

@ -1235,6 +1235,26 @@ func (db *DB) PostgresMigration() *migrate.Migration {
);`,
},
},
{
DB: db.db,
Description: "Add defaults to nodes table",
Version: 57,
Action: migrate.SQL{
`ALTER TABLE nodes ALTER COLUMN contained SET DEFAULT false;`,
`ALTER TABLE nodes ALTER COLUMN piece_count SET DEFAULT 0;`,
`ALTER TABLE nodes ALTER COLUMN major SET DEFAULT 0;`,
`ALTER TABLE nodes ALTER COLUMN minor SET DEFAULT 0;`,
`ALTER TABLE nodes ALTER COLUMN audit_success_count SET DEFAULT 0;`,
`ALTER TABLE nodes ALTER COLUMN total_audit_count SET DEFAULT 0;`,
`ALTER TABLE nodes ALTER COLUMN patch SET DEFAULT 0;`,
`ALTER TABLE nodes ALTER COLUMN hash SET DEFAULT '';`,
`ALTER TABLE nodes ALTER COLUMN release SET DEFAULT false;`,
`ALTER TABLE nodes ALTER COLUMN latency_90 SET DEFAULT 0;`,
`ALTER TABLE nodes ALTER COLUMN timestamp SET DEFAULT '0001-01-01 00:00:00+00';`,
`ALTER TABLE nodes ALTER COLUMN created_at SET DEFAULT current_timestamp;`,
`ALTER TABLE nodes ALTER COLUMN updated_at SET DEFAULT current_timestamp;`,
},
},
},
}
}

View File

@ -1338,3 +1338,128 @@ func populateUpdateFields(dbNode *dbx.Node, updateReq *overlay.UpdateRequest) db
return updateFields
}
// UpdateCheckIn updates a single storagenode with info from when the the node last checked in.
func (cache *overlaycache) UpdateCheckIn(ctx context.Context, node overlay.NodeCheckInInfo, config overlay.NodeSelectionConfig) (err error) {
defer mon.Task()(&ctx)(&err)
if node.Address.GetAddress() == "" {
return Error.New("error UpdateCheckIn: missing the storage node address")
}
switch t := cache.db.Driver().(type) {
case *sqlite3.SQLiteDriver:
value := pb.Node{
Id: node.NodeID,
Address: node.Address,
LastIp: node.LastIP,
}
err := cache.UpdateAddress(ctx, &value, config)
if err != nil {
return Error.Wrap(err)
}
_, err = cache.UpdateUptime(ctx, node.NodeID, node.IsUp, config.UptimeReputationLambda, config.UptimeReputationWeight, config.UptimeReputationDQ)
if err != nil {
return Error.Wrap(err)
}
pbInfo := pb.InfoResponse{
Operator: node.Operator,
Capacity: node.Capacity,
Type: pb.NodeType_STORAGE,
}
_, err = cache.UpdateNodeInfo(ctx, node.NodeID, &pbInfo)
if err != nil {
return Error.Wrap(err)
}
case *pq.Driver:
// v is a single feedback value that allows us to update both alpha and beta
var v float64 = -1
if node.IsUp {
v = 1
}
uptimeReputationAlpha := config.UptimeReputationLambda*config.UptimeReputationAlpha0 + config.UptimeReputationWeight*(1+v)/2
uptimeReputationBeta := config.UptimeReputationLambda*config.UptimeReputationBeta0 + config.UptimeReputationWeight*(1-v)/2
semVer, err := version.NewSemVer(node.Version.GetVersion())
if err != nil {
return Error.New("unable to convert version to semVer")
}
start := time.Now()
query := `
INSERT INTO nodes
(
id, address, last_net, protocol, type,
email, wallet, free_bandwidth, free_disk,
uptime_success_count, total_uptime_count,
last_contact_success,
last_contact_failure,
audit_reputation_alpha, audit_reputation_beta, uptime_reputation_alpha, uptime_reputation_beta,
major, minor, patch, hash, timestamp, release
)
VALUES (
$1, $2, $3, $4, $5,
$6, $7, $8, $9,
$10::bool::int, 1,
CASE WHEN $10 IS TRUE THEN current_timestamp
ELSE '0001-01-01 00:00:00+00'
END,
CASE WHEN $10 IS FALSE THEN current_timestamp
ELSE '0001-01-01 00:00:00+00'
END,
$11, $12, $13, $14,
$18, $19, $20, $21, $22, $23
)
ON CONFLICT (id)
DO UPDATE
SET
address=$2,
last_net=$3,
protocol=$4,
email=$6,
wallet=$7,
free_bandwidth=$8,
free_disk=$9,
total_uptime_count=nodes.total_uptime_count+1,
uptime_reputation_alpha=$16::numeric*nodes.uptime_reputation_alpha + $17::numeric*$10::bool::int,
uptime_reputation_beta=$16::numeric*nodes.uptime_reputation_beta + $17::numeric*(NOT $10)::bool::int,
uptime_success_count = nodes.uptime_success_count + $10::bool::int,
last_contact_success = CASE WHEN $10 IS TRUE
THEN current_timestamp
ELSE nodes.last_contact_success
END,
last_contact_failure = CASE WHEN $10 IS FALSE
THEN current_timestamp
ELSE nodes.last_contact_failure
END,
-- this disqualified case statement resolves to:
-- when (new.uptime_reputation_alpha /(new.uptime_reputation_alpha + new.uptime_reputation_beta)) <= config.UptimeReputationDQ
disqualified = CASE WHEN (($16::numeric*nodes.uptime_reputation_alpha + $17::numeric*$10::bool::int) / (($16::numeric*nodes.uptime_reputation_alpha + $17::numeric*$10::bool::int) + ($16::numeric*nodes.uptime_reputation_beta + $17::numeric*(NOT $10)::bool::int))) <= $15 AND nodes.disqualified IS NULL
THEN current_timestamp
ELSE nodes.disqualified
END;
`
_, err = cache.db.ExecContext(ctx, query,
// args $1 - $5
node.NodeID.Bytes(), node.Address.GetAddress(), node.LastIP, node.Address.GetTransport(), int(pb.NodeType_STORAGE),
// args $6 - $9
node.Operator.GetEmail(), node.Operator.GetWallet(), node.Capacity.GetFreeBandwidth(), node.Capacity.GetFreeDisk(),
// args $10
node.IsUp,
// args $11 - $14
config.AuditReputationAlpha0, config.AuditReputationBeta0, uptimeReputationAlpha, uptimeReputationBeta,
// args $15 - $17
config.UptimeReputationDQ, config.UptimeReputationLambda, config.UptimeReputationWeight,
// args $18 - $23
semVer.Major, semVer.Minor, semVer.Patch, node.Version.GetCommitHash(), node.Version.Timestamp, node.Version.GetRelease(),
)
if err != nil {
return Error.Wrap(err)
}
mon.FloatVal("UpdateCheckIn query execution time (seconds)").Observe(time.Since(start).Seconds())
default:
return Error.New("Unsupported database %t", t)
}
return nil
}

View File

@ -0,0 +1,407 @@
-- AUTOGENERATED BY gopkg.in/spacemonkeygo/dbx.v1
-- DO NOT EDIT
CREATE TABLE accounting_rollups
(
id bigserial NOT NULL,
node_id bytea NOT NULL,
start_time timestamp with time zone NOT NULL,
put_total bigint NOT NULL,
get_total bigint NOT NULL,
get_audit_total bigint NOT NULL,
get_repair_total bigint NOT NULL,
put_repair_total bigint NOT NULL,
at_rest_total double precision NOT NULL,
PRIMARY KEY (id)
);
CREATE TABLE accounting_timestamps
(
name text NOT NULL,
value timestamp with time zone NOT NULL,
PRIMARY KEY (name)
);
CREATE TABLE bucket_bandwidth_rollups
(
bucket_name bytea NOT NULL,
project_id bytea NOT NULL,
interval_start timestamp NOT NULL,
interval_seconds integer NOT NULL,
action integer NOT NULL,
inline bigint NOT NULL,
allocated bigint NOT NULL,
settled bigint NOT NULL,
PRIMARY KEY (bucket_name, project_id, interval_start, action)
);
CREATE TABLE bucket_storage_tallies
(
bucket_name bytea NOT NULL,
project_id bytea NOT NULL,
interval_start timestamp NOT NULL,
inline bigint NOT NULL,
remote bigint NOT NULL,
remote_segments_count integer NOT NULL,
inline_segments_count integer NOT NULL,
object_count integer NOT NULL,
metadata_size bigint NOT NULL,
PRIMARY KEY (bucket_name, project_id, interval_start)
);
CREATE TABLE bucket_usages
(
id bytea NOT NULL,
bucket_id bytea NOT NULL,
rollup_end_time timestamp with time zone NOT NULL,
remote_stored_data bigint NOT NULL,
inline_stored_data bigint NOT NULL,
remote_segments integer NOT NULL,
inline_segments integer NOT NULL,
objects integer NOT NULL,
metadata_size bigint NOT NULL,
repair_egress bigint NOT NULL,
get_egress bigint NOT NULL,
audit_egress bigint NOT NULL,
PRIMARY KEY (id)
);
CREATE TABLE injuredsegments
(
path bytea NOT NULL,
data bytea NOT NULL,
attempted timestamp,
PRIMARY KEY (path)
);
CREATE TABLE irreparabledbs
(
segmentpath bytea NOT NULL,
segmentdetail bytea NOT NULL,
pieces_lost_count bigint NOT NULL,
seg_damaged_unix_sec bigint NOT NULL,
repair_attempt_count bigint NOT NULL,
PRIMARY KEY (segmentpath)
);
CREATE TABLE nodes
(
id bytea NOT NULL,
address text NOT NULL,
last_net text NOT NULL,
protocol integer NOT NULL,
type integer NOT NULL,
email text NOT NULL,
wallet text NOT NULL,
free_bandwidth bigint NOT NULL,
free_disk bigint NOT NULL,
piece_count bigint NOT NULL,
major bigint NOT NULL,
minor bigint NOT NULL,
patch bigint NOT NULL,
hash text NOT NULL,
timestamp timestamp with time zone NOT NULL,
release boolean NOT NULL,
latency_90 bigint NOT NULL,
audit_success_count bigint NOT NULL,
total_audit_count bigint NOT NULL,
uptime_success_count bigint NOT NULL,
total_uptime_count bigint NOT NULL,
created_at timestamp with time zone NOT NULL,
updated_at timestamp with time zone NOT NULL,
last_contact_success timestamp with time zone NOT NULL,
last_contact_failure timestamp with time zone NOT NULL,
contained boolean NOT NULL,
disqualified timestamp with time zone,
audit_reputation_alpha double precision NOT NULL,
audit_reputation_beta double precision NOT NULL,
uptime_reputation_alpha double precision NOT NULL,
uptime_reputation_beta double precision NOT NULL,
exit_initiated_at timestamp with time zone,
exit_loop_completed_at timestamp with time zone,
exit_finished_at timestamp with time zone,
PRIMARY KEY (id)
);
CREATE TABLE offers
(
id serial NOT NULL,
name text NOT NULL,
description text NOT NULL,
award_credit_in_cents integer NOT NULL,
invitee_credit_in_cents integer NOT NULL,
award_credit_duration_days integer,
invitee_credit_duration_days integer,
redeemable_cap integer,
expires_at timestamp with time zone NOT NULL,
created_at timestamp with time zone NOT NULL,
status integer NOT NULL,
type integer NOT NULL,
PRIMARY KEY (id)
);
CREATE TABLE peer_identities
(
node_id bytea NOT NULL,
leaf_serial_number bytea NOT NULL,
chain bytea NOT NULL,
updated_at timestamp with time zone NOT NULL,
PRIMARY KEY (node_id)
);
CREATE TABLE pending_audits
(
node_id bytea NOT NULL,
piece_id bytea NOT NULL,
stripe_index bigint NOT NULL,
share_size bigint NOT NULL,
expected_share_hash bytea NOT NULL,
reverify_count bigint NOT NULL,
path bytea NOT NULL,
PRIMARY KEY (node_id)
);
CREATE TABLE projects
(
id bytea NOT NULL,
name text NOT NULL,
description text NOT NULL,
usage_limit bigint NOT NULL,
partner_id bytea,
owner_id bytea NOT NULL,
created_at timestamp with time zone NOT NULL,
PRIMARY KEY (id)
);
CREATE TABLE registration_tokens
(
secret bytea NOT NULL,
owner_id bytea,
project_limit integer NOT NULL,
created_at timestamp with time zone NOT NULL,
PRIMARY KEY (secret),
UNIQUE (owner_id)
);
CREATE TABLE reset_password_tokens
(
secret bytea NOT NULL,
owner_id bytea NOT NULL,
created_at timestamp with time zone NOT NULL,
PRIMARY KEY (secret),
UNIQUE (owner_id)
);
CREATE TABLE serial_numbers
(
id serial NOT NULL,
serial_number bytea NOT NULL,
bucket_id bytea NOT NULL,
expires_at timestamp NOT NULL,
PRIMARY KEY (id)
);
CREATE TABLE storagenode_bandwidth_rollups
(
storagenode_id bytea NOT NULL,
interval_start timestamp NOT NULL,
interval_seconds integer NOT NULL,
action integer NOT NULL,
allocated bigint NOT NULL,
settled bigint NOT NULL,
PRIMARY KEY (storagenode_id, interval_start, action)
);
CREATE TABLE storagenode_storage_tallies
(
id bigserial NOT NULL,
node_id bytea NOT NULL,
interval_end_time timestamp with time zone NOT NULL,
data_total double precision NOT NULL,
PRIMARY KEY (id)
);
CREATE TABLE users (
id bytea NOT NULL,
email text NOT NULL,
normalized_email text NOT NULL,
full_name text NOT NULL,
short_name text,
password_hash bytea NOT NULL,
status integer NOT NULL,
partner_id bytea,
created_at timestamp with time zone NOT NULL,
PRIMARY KEY ( id )
);
CREATE TABLE value_attributions
(
project_id bytea NOT NULL,
bucket_name bytea NOT NULL,
partner_id bytea NOT NULL,
last_updated timestamp NOT NULL,
PRIMARY KEY (project_id, bucket_name)
);
CREATE TABLE api_keys
(
id bytea NOT NULL,
project_id bytea NOT NULL REFERENCES projects (id) ON DELETE CASCADE,
head bytea NOT NULL,
name text NOT NULL,
secret bytea NOT NULL,
partner_id bytea,
created_at timestamp with time zone NOT NULL,
PRIMARY KEY (id),
UNIQUE (head),
UNIQUE (name, project_id)
);
CREATE TABLE bucket_metainfos
(
id bytea NOT NULL,
project_id bytea NOT NULL REFERENCES projects (id),
name bytea NOT NULL,
partner_id bytea,
path_cipher integer NOT NULL,
created_at timestamp with time zone NOT NULL,
default_segment_size integer NOT NULL,
default_encryption_cipher_suite integer NOT NULL,
default_encryption_block_size integer NOT NULL,
default_redundancy_algorithm integer NOT NULL,
default_redundancy_share_size integer NOT NULL,
default_redundancy_required_shares integer NOT NULL,
default_redundancy_repair_shares integer NOT NULL,
default_redundancy_optimal_shares integer NOT NULL,
default_redundancy_total_shares integer NOT NULL,
PRIMARY KEY (id),
UNIQUE (name, project_id)
);
CREATE TABLE project_invoice_stamps
(
project_id bytea NOT NULL REFERENCES projects (id) ON DELETE CASCADE,
invoice_id bytea NOT NULL,
start_date timestamp with time zone NOT NULL,
end_date timestamp with time zone NOT NULL,
created_at timestamp with time zone NOT NULL,
PRIMARY KEY (project_id, start_date, end_date),
UNIQUE (invoice_id)
);
CREATE TABLE project_members
(
member_id bytea NOT NULL REFERENCES users (id) ON DELETE CASCADE,
project_id bytea NOT NULL REFERENCES projects (id) ON DELETE CASCADE,
created_at timestamp with time zone NOT NULL,
PRIMARY KEY (member_id, project_id)
);
CREATE TABLE used_serials
(
serial_number_id integer NOT NULL REFERENCES serial_numbers (id) ON DELETE CASCADE,
storage_node_id bytea NOT NULL,
PRIMARY KEY (serial_number_id, storage_node_id)
);
CREATE TABLE user_credits
(
id serial NOT NULL,
user_id bytea NOT NULL REFERENCES users (id) ON DELETE CASCADE,
offer_id integer NOT NULL REFERENCES offers (id),
referred_by bytea REFERENCES users (id) ON DELETE SET NULL,
type text NOT NULL,
credits_earned_in_cents integer NOT NULL,
credits_used_in_cents integer NOT NULL,
expires_at timestamp with time zone NOT NULL,
created_at timestamp with time zone NOT NULL,
PRIMARY KEY (id)
);
CREATE TABLE user_payments
(
user_id bytea NOT NULL REFERENCES users (id) ON DELETE CASCADE,
customer_id bytea NOT NULL,
created_at timestamp with time zone NOT NULL,
PRIMARY KEY (user_id),
UNIQUE (customer_id)
);
CREATE TABLE project_payments
(
id bytea NOT NULL,
project_id bytea NOT NULL REFERENCES projects (id) ON DELETE CASCADE,
payer_id bytea NOT NULL REFERENCES user_payments (user_id) ON DELETE CASCADE,
payment_method_id bytea NOT NULL,
is_default boolean NOT NULL,
created_at timestamp with time zone NOT NULL,
PRIMARY KEY (id)
);
CREATE TABLE graceful_exit_progress (
node_id bytea NOT NULL,
bytes_transferred bigint NOT NULL,
updated_at timestamp with time zone NOT NULL,
PRIMARY KEY ( node_id )
);
CREATE TABLE graceful_exit_transfer_queue (
node_id bytea NOT NULL,
path bytea NOT NULL,
piece_num integer NOT NULL,
durability_ratio double precision NOT NULL,
queued_at timestamp with time zone NOT NULL,
requested_at timestamp with time zone,
last_failed_at timestamp with time zone,
last_failed_code integer,
failed_count integer,
finished_at timestamp with time zone,
PRIMARY KEY ( node_id, path )
);
CREATE INDEX bucket_name_project_id_interval_start_interval_seconds ON bucket_bandwidth_rollups ( bucket_name, project_id, interval_start, interval_seconds );
CREATE UNIQUE INDEX bucket_id_rollup ON bucket_usages ( bucket_id, rollup_end_time );
CREATE INDEX node_last_ip ON nodes ( last_net );
CREATE UNIQUE INDEX serial_number ON serial_numbers ( serial_number );
CREATE INDEX serial_numbers_expires_at_index ON serial_numbers ( expires_at );
CREATE INDEX storagenode_id_interval_start_interval_seconds ON storagenode_bandwidth_rollups ( storagenode_id, interval_start, interval_seconds );
CREATE UNIQUE INDEX credits_earned_user_id_offer_id ON user_credits (id, offer_id) WHERE credits_earned_in_cents=0;
INSERT INTO "accounting_rollups"("id", "node_id", "start_time", "put_total", "get_total", "get_audit_total", "get_repair_total", "put_repair_total", "at_rest_total") VALUES (1, E'\\367M\\177\\251]t/\\022\\256\\214\\265\\025\\224\\204:\\217\\212\\0102<\\321\\374\\020&\\271Qc\\325\\261\\354\\246\\233'::bytea, '2019-02-09 00:00:00+00', 1000, 2000, 3000, 4000, 0, 5000);
INSERT INTO "accounting_timestamps" VALUES ('LastAtRestTally', '0001-01-01 00:00:00+00');
INSERT INTO "accounting_timestamps" VALUES ('LastRollup', '0001-01-01 00:00:00+00');
INSERT INTO "accounting_timestamps" VALUES ('LastBandwidthTally', '0001-01-01 00:00:00+00');
INSERT INTO "nodes"("id", "address", "last_net", "protocol", "type", "email", "wallet", "free_bandwidth", "free_disk", "piece_count", "major", "minor", "patch", "hash", "timestamp", "release","latency_90", "audit_success_count", "total_audit_count", "uptime_success_count", "total_uptime_count", "created_at", "updated_at", "last_contact_success", "last_contact_failure", "contained", "disqualified", "audit_reputation_alpha", "audit_reputation_beta", "uptime_reputation_alpha", "uptime_reputation_beta") VALUES (E'\\153\\313\\233\\074\\327\\177\\136\\070\\346\\001', '127.0.0.1:55516', '', 0, 4, '', '', -1, -1, 0, 0, 1, 0, '', 'epoch', false, 0, 0, 5, 0, 5, '2019-02-14 08:07:31.028103+00', '2019-02-14 08:07:31.108963+00', 'epoch', 'epoch', false, NULL, 50, 5, 100, 5);
INSERT INTO "nodes"("id", "address", "last_net", "protocol", "type", "email", "wallet", "free_bandwidth", "free_disk", "piece_count", "major", "minor", "patch", "hash", "timestamp", "release","latency_90", "audit_success_count", "total_audit_count", "uptime_success_count", "total_uptime_count", "created_at", "updated_at", "last_contact_success", "last_contact_failure", "contained", "disqualified", "audit_reputation_alpha", "audit_reputation_beta", "uptime_reputation_alpha", "uptime_reputation_beta") VALUES (E'\\006\\223\\250R\\221\\005\\365\\377v>0\\266\\365\\216\\255?\\347\\244\\371?2\\264\\262\\230\\007<\\001\\262\\263\\237\\247n', '127.0.0.1:55518', '', 0, 4, '', '', -1, -1, 0, 0, 1, 0, '', 'epoch', false, 0, 0, 0, 3, 3, '2019-02-14 08:07:31.028103+00', '2019-02-14 08:07:31.108963+00', 'epoch', 'epoch', false, NULL, 50, 0, 100, 0);
INSERT INTO "nodes"("id", "address", "last_net", "protocol", "type", "email", "wallet", "free_bandwidth", "free_disk", "piece_count", "major", "minor", "patch", "hash", "timestamp", "release","latency_90", "audit_success_count", "total_audit_count", "uptime_success_count", "total_uptime_count", "created_at", "updated_at", "last_contact_success", "last_contact_failure", "contained", "disqualified", "audit_reputation_alpha", "audit_reputation_beta", "uptime_reputation_alpha", "uptime_reputation_beta") VALUES (E'\\363\\342\\363\\371>+F\\256\\263\\300\\273|\\342N\\347\\014', '127.0.0.1:55517', '', 0, 4, '', '', -1, -1, 0, 0, 1, 0, '', 'epoch', false, 0, 0, 0, 0, 0, '2019-02-14 08:07:31.028103+00', '2019-02-14 08:07:31.108963+00', 'epoch', 'epoch', false, NULL, 50, 0, 100, 0);
INSERT INTO "nodes"("id", "address", "last_net", "protocol", "type", "email", "wallet", "free_bandwidth", "free_disk", "piece_count", "major", "minor", "patch", "hash", "timestamp", "release","latency_90", "audit_success_count", "total_audit_count", "uptime_success_count", "total_uptime_count", "created_at", "updated_at", "last_contact_success", "last_contact_failure", "contained", "disqualified", "audit_reputation_alpha", "audit_reputation_beta", "uptime_reputation_alpha", "uptime_reputation_beta") VALUES (E'\\363\\342\\363\\371>+F\\256\\263\\300\\273|\\342N\\347\\015', '127.0.0.1:55519', '', 0, 4, '', '', -1, -1, 0, 0, 1, 0, '', 'epoch', false, 0, 1, 2, 1, 2, '2019-02-14 08:07:31.028103+00', '2019-02-14 08:07:31.108963+00', 'epoch', 'epoch', false, NULL, 50, 1, 100, 1);
INSERT INTO "nodes"("id", "address", "last_net", "protocol", "type", "email", "wallet", "free_bandwidth", "free_disk", "piece_count", "major", "minor", "patch", "hash", "timestamp", "release","latency_90", "audit_success_count", "total_audit_count", "uptime_success_count", "total_uptime_count", "created_at", "updated_at", "last_contact_success", "last_contact_failure", "contained", "disqualified", "audit_reputation_alpha", "audit_reputation_beta", "uptime_reputation_alpha", "uptime_reputation_beta") VALUES (E'\\363\\342\\363\\371>+F\\256\\263\\300\\273|\\342N\\347\\016', '127.0.0.1:55520', '', 0, 4, '', '', -1, -1, 0, 0, 1, 0, '', 'epoch', false, 0, 300, 400, 300, 400, '2019-02-14 08:07:31.028103+00', '2019-02-14 08:07:31.108963+00', 'epoch', 'epoch', false, NULL, 300, 100, 300, 100);
INSERT INTO "users"("id", "full_name", "short_name", "email", "normalized_email", "password_hash", "status", "partner_id", "created_at") VALUES (E'\\363\\311\\033w\\222\\303Ci\\265\\343U\\303\\312\\204",'::bytea, 'Noahson', 'William', '1email1@mail.test', '1EMAIL1@MAIL.TEST', E'some_readable_hash'::bytea, 1, NULL, '2019-02-14 08:28:24.614594+00');
INSERT INTO "projects"("id", "name", "description", "usage_limit", "partner_id", "owner_id", "created_at") VALUES (E'\\022\\217/\\014\\376!K\\023\\276\\031\\311}m\\236\\205\\300'::bytea, 'ProjectName', 'projects description', 0, NULL, E'\\363\\311\\033w\\222\\303Ci\\265\\343U\\303\\312\\204",'::bytea, '2019-02-14 08:28:24.254934+00');
INSERT INTO "projects"("id", "name", "description", "usage_limit", "partner_id", "owner_id", "created_at") VALUES (E'\\363\\342\\363\\371>+F\\256\\263\\300\\273|\\342N\\347\\014'::bytea, 'projName1', 'Test project 1', 0, NULL, E'\\363\\311\\033w\\222\\303Ci\\265\\343U\\303\\312\\204",'::bytea, '2019-02-14 08:28:24.636949+00');
INSERT INTO "project_members"("member_id", "project_id", "created_at") VALUES (E'\\363\\311\\033w\\222\\303Ci\\265\\343U\\303\\312\\204",'::bytea, E'\\363\\342\\363\\371>+F\\256\\263\\300\\273|\\342N\\347\\014'::bytea, '2019-02-14 08:28:24.677953+00');
INSERT INTO "project_members"("member_id", "project_id", "created_at") VALUES (E'\\363\\311\\033w\\222\\303Ci\\265\\343U\\303\\312\\204",'::bytea, E'\\022\\217/\\014\\376!K\\023\\276\\031\\311}m\\236\\205\\300'::bytea, '2019-02-13 08:28:24.677953+00');
INSERT INTO "irreparabledbs" ("segmentpath", "segmentdetail", "pieces_lost_count", "seg_damaged_unix_sec", "repair_attempt_count") VALUES ('\x49616d5365676d656e746b6579696e666f30', '\x49616d5365676d656e7464657461696c696e666f30', 10, 1550159554, 10);
INSERT INTO "injuredsegments" ("path", "data") VALUES ('0', '\x0a0130120100');
INSERT INTO "injuredsegments" ("path", "data") VALUES ('here''s/a/great/path', '\x0a136865726527732f612f67726561742f70617468120a0102030405060708090a');
INSERT INTO "injuredsegments" ("path", "data") VALUES ('yet/another/cool/path', '\x0a157965742f616e6f746865722f636f6f6c2f70617468120a0102030405060708090a');
INSERT INTO "injuredsegments" ("path", "data") VALUES ('so/many/iconic/paths/to/choose/from', '\x0a23736f2f6d616e792f69636f6e69632f70617468732f746f2f63686f6f73652f66726f6d120a0102030405060708090a');
INSERT INTO "bucket_usages" ("id", "bucket_id", "rollup_end_time", "remote_stored_data", "inline_stored_data", "remote_segments", "inline_segments", "objects", "metadata_size", "repair_egress", "get_egress", "audit_egress") VALUES (E'\\153\\313\\233\\074\\327\\177\\136\\070\\346\\001",'::bytea, E'\\366\\146\\032\\321\\316\\161\\070\\133\\302\\271",'::bytea, '2019-03-06 08:28:24.677953+00', 10, 11, 12, 13, 14, 15, 16, 17, 18);
INSERT INTO "registration_tokens" ("secret", "owner_id", "project_limit", "created_at") VALUES (E'\\070\\127\\144\\013\\332\\344\\102\\376\\306\\056\\303\\130\\106\\132\\321\\276\\321\\274\\170\\264\\054\\333\\221\\116\\154\\221\\335\\070\\220\\146\\344\\216'::bytea, null, 1, '2019-02-14 08:28:24.677953+00');
INSERT INTO "serial_numbers" ("id", "serial_number", "bucket_id", "expires_at") VALUES (1, E'0123456701234567'::bytea, E'\\363\\342\\363\\371>+F\\256\\263\\300\\273|\\342N\\347\\014/testbucket'::bytea, '2019-03-06 08:28:24.677953+00');
INSERT INTO "used_serials" ("serial_number_id", "storage_node_id") VALUES (1, E'\\006\\223\\250R\\221\\005\\365\\377v>0\\266\\365\\216\\255?\\347\\244\\371?2\\264\\262\\230\\007<\\001\\262\\263\\237\\247n');
INSERT INTO "storagenode_bandwidth_rollups" ("storagenode_id", "interval_start", "interval_seconds", "action", "allocated", "settled") VALUES (E'\\006\\223\\250R\\221\\005\\365\\377v>0\\266\\365\\216\\255?\\347\\244\\371?2\\264\\262\\230\\007<\\001\\262\\263\\237\\247n', '2019-03-06 08:00:00.000000+00', 3600, 1, 1024, 2024);
INSERT INTO "storagenode_storage_tallies" VALUES (1, E'\\3510\\323\\225"~\\036<\\342\\330m\\0253Jhr\\246\\233K\\246#\\2303\\351\\256\\275j\\212UM\\362\\207', '2019-02-14 08:16:57.812849+00', 1000);
INSERT INTO "bucket_bandwidth_rollups" ("bucket_name", "project_id", "interval_start", "interval_seconds", "action", "inline", "allocated", "settled") VALUES (E'testbucket'::bytea, E'\\363\\342\\363\\371>+F\\256\\263\\300\\273|\\342N\\347\\014'::bytea,'2019-03-06 08:00:00.000000+00', 3600, 1, 1024, 2024, 3024);
INSERT INTO "bucket_storage_tallies" ("bucket_name", "project_id", "interval_start", "inline", "remote", "remote_segments_count", "inline_segments_count", "object_count", "metadata_size") VALUES (E'testbucket'::bytea, E'\\363\\342\\363\\371>+F\\256\\263\\300\\273|\\342N\\347\\014'::bytea,'2019-03-06 08:00:00.000000+00', 4024, 5024, 0, 0, 0, 0);
INSERT INTO "bucket_bandwidth_rollups" ("bucket_name", "project_id", "interval_start", "interval_seconds", "action", "inline", "allocated", "settled") VALUES (E'testbucket'::bytea, E'\\170\\160\\157\\370\\274\\366\\113\\364\\272\\235\\301\\243\\321\\102\\321\\136'::bytea,'2019-03-06 08:00:00.000000+00', 3600, 1, 1024, 2024, 3024);
INSERT INTO "bucket_storage_tallies" ("bucket_name", "project_id", "interval_start", "inline", "remote", "remote_segments_count", "inline_segments_count", "object_count", "metadata_size") VALUES (E'testbucket'::bytea, E'\\170\\160\\157\\370\\274\\366\\113\\364\\272\\235\\301\\243\\321\\102\\321\\136'::bytea,'2019-03-06 08:00:00.000000+00', 4024, 5024, 0, 0, 0, 0);
INSERT INTO "reset_password_tokens" ("secret", "owner_id", "created_at") VALUES (E'\\070\\127\\144\\013\\332\\344\\102\\376\\306\\056\\303\\130\\106\\132\\321\\276\\321\\274\\170\\264\\054\\333\\221\\116\\154\\221\\335\\070\\220\\146\\344\\216'::bytea, E'\\363\\311\\033w\\222\\303Ci\\265\\343U\\303\\312\\204",'::bytea, '2019-05-08 08:28:24.677953+00');
INSERT INTO "offers" ("name", "description", "award_credit_in_cents", "invitee_credit_in_cents", "award_credit_duration_days", "invitee_credit_duration_days", "redeemable_cap", "expires_at", "created_at", "status", "type") VALUES ('testOffer', 'Test offer 1', 0, 0, 14, 14, 50, '2019-03-14 08:28:24.636949+00', '2019-02-14 08:28:24.636949+00', 0, 0);
INSERT INTO "offers" ("name","description","award_credit_in_cents","award_credit_duration_days", "invitee_credit_in_cents","invitee_credit_duration_days", "expires_at","created_at","status","type") VALUES ('Default free credit offer','Is active when no active free credit offer',0, NULL,300, 14, '2119-03-14 08:28:24.636949+00','2019-07-14 08:28:24.636949+00',1,1);
INSERT INTO "api_keys" ("id", "project_id", "head", "name", "secret", "partner_id", "created_at") VALUES (E'\\334/\\302;\\225\\355O\\323\\276f\\247\\354/6\\241\\033'::bytea, E'\\022\\217/\\014\\376!K\\023\\276\\031\\311}m\\236\\205\\300'::bytea, E'\\111\\142\\147\\304\\132\\375\\070\\163\\270\\160\\251\\370\\126\\063\\351\\037\\257\\071\\143\\375\\351\\320\\253\\232\\220\\260\\075\\173\\306\\307\\115\\136'::bytea, 'key 2', E'\\254\\011\\315\\333\\273\\365\\001\\071\\024\\154\\253\\332\\301\\216\\361\\074\\221\\367\\251\\231\\274\\333\\300\\367\\001\\272\\327\\111\\315\\123\\042\\016'::bytea, NULL, '2019-02-14 08:28:24.267934+00');
INSERT INTO "user_payments" ("user_id", "customer_id", "created_at") VALUES (E'\\363\\311\\033w\\222\\303Ci\\265\\343U\\303\\312\\204",'::bytea, E'\\022\\217/\\014\\376!K\\023\\276'::bytea, '2019-06-01 08:28:24.267934+00');
INSERT INTO "project_invoice_stamps" ("project_id", "invoice_id", "start_date", "end_date", "created_at") VALUES (E'\\022\\217/\\014\\376!K\\023\\276\\031\\311}m\\236\\205\\300'::bytea, E'\\363\\311\\033w\\222\\303,'::bytea, '2019-06-01 08:28:24.267934+00', '2019-06-29 08:28:24.267934+00', '2019-06-01 08:28:24.267934+00');
INSERT INTO "value_attributions" ("project_id", "bucket_name", "partner_id", "last_updated") VALUES (E'\\363\\311\\033w\\222\\303Ci\\265\\343U\\303\\312\\204",'::bytea, E''::bytea, E'\\363\\342\\363\\371>+F\\256\\263\\300\\273|\\342N\\347\\014'::bytea,'2019-02-14 08:07:31.028103+00');
INSERT INTO "user_credits" ("id", "user_id", "offer_id", "referred_by", "credits_earned_in_cents", "credits_used_in_cents", "type", "expires_at", "created_at") VALUES (1, E'\\363\\311\\033w\\222\\303Ci\\265\\343U\\303\\312\\204",'::bytea, 1, E'\\363\\311\\033w\\222\\303Ci\\265\\343U\\303\\312\\204",'::bytea, 200, 0, 'invalid', '2019-10-01 08:28:24.267934+00', '2019-06-01 08:28:24.267934+00');
INSERT INTO "bucket_metainfos" ("id", "project_id", "name", "partner_id", "created_at", "path_cipher", "default_segment_size", "default_encryption_cipher_suite", "default_encryption_block_size", "default_redundancy_algorithm", "default_redundancy_share_size", "default_redundancy_required_shares", "default_redundancy_repair_shares", "default_redundancy_optimal_shares", "default_redundancy_total_shares") VALUES (E'\\334/\\302;\\225\\355O\\323\\276f\\247\\354/6\\241\\033'::bytea, E'\\022\\217/\\014\\376!K\\023\\276\\031\\311}m\\236\\205\\300'::bytea, E'testbucketuniquename'::bytea, NULL, '2019-06-14 08:28:24.677953+00', 1, 65536, 1, 8192, 1, 4096, 4, 6, 8, 10);
INSERT INTO "project_payments" ("id", "project_id", "payer_id", "payment_method_id", "is_default","created_at") VALUES (E'\\334/\\302;\\225\\355O\\323\\276f\\247\\354/6\\241\\033'::bytea, E'\\363\\342\\363\\371>+F\\256\\263\\300\\273|\\342N\\347\\014'::bytea, E'\\363\\311\\033w\\222\\303Ci\\265\\343U\\303\\312\\204",'::bytea, E'\\022\\217/\\014\\376!K\\023\\276'::bytea, true, '2019-06-01 08:28:24.267934+00');
INSERT INTO "pending_audits" ("node_id", "piece_id", "stripe_index", "share_size", "expected_share_hash", "reverify_count", "path") VALUES (E'\\153\\313\\233\\074\\327\\177\\136\\070\\346\\001'::bytea, E'\\363\\311\\033w\\222\\303Ci\\265\\343U\\303\\312\\204",'::bytea, 5, 1024, E'\\070\\127\\144\\013\\332\\344\\102\\376\\306\\056\\303\\130\\106\\132\\321\\276\\321\\274\\170\\264\\054\\333\\221\\116\\154\\221\\335\\070\\220\\146\\344\\216'::bytea, 1, 'not null');
INSERT INTO "peer_identities" VALUES (E'\\334/\\302;\\225\\355O\\323\\276f\\247\\354/6\\241\\033'::bytea, E'\\363\\342\\363\\371>+F\\256\\263\\300\\273|\\342N\\347\\014'::bytea, E'\\363\\311\\033w\\222\\303Ci\\265\\343U\\303\\312\\204",'::bytea, '2019-02-14 08:07:31.335028+00');
INSERT INTO "graceful_exit_progress" ("node_id", "bytes_transferred", "updated_at") VALUES (E'\\363\\342\\363\\371>+F\\256\\263\\300\\273|\\342N\\347\\016', 1000000000000000, '2019-09-12 10:07:31.028103+00');
INSERT INTO "graceful_exit_transfer_queue" ("node_id", "path", "piece_num", "durability_ratio", "queued_at", "requested_at", "last_failed_at", "last_failed_code", "failed_count", "finished_at") VALUES (E'\\363\\342\\363\\371>+F\\256\\263\\300\\273|\\342N\\347\\016', E'f8419768-5baa-4901-b3ba-62808013ec45/s0/test3/\\240\\243\\223n \\334~b}\\2624)\\250m\\201\\202\\235\\276\\361\\3304\\323\\352\\311\\361\\353;\\326\\311', 8, 1.0, '2019-09-12 10:07:31.028103+00', '2019-09-12 10:07:32.028103+00', null, null, 0, '2019-09-12 10:07:33.028103+00');
INSERT INTO "graceful_exit_transfer_queue" ("node_id", "path", "piece_num", "durability_ratio", "queued_at", "requested_at", "last_failed_at", "last_failed_code", "failed_count", "finished_at") VALUES (E'\\363\\342\\363\\371>+F\\256\\263\\300\\273|\\342N\\347\\016', E'f8419768-5baa-4901-b3ba-62808013ec45/s0/test3/\\240\\243\\223n \\334~b}\\2624)\\250m\\201\\202\\235\\276\\361\\3304\\323\\352\\311\\361\\353;\\326\\312', 8, 1.0, '2019-09-12 10:07:31.028103+00', '2019-09-12 10:07:32.028103+00', null, null, 0, '2019-09-12 10:07:33.028103+00');
-- NEW DATA --

View File

@ -104,7 +104,7 @@ func (chore *Chore) pingSatellites(ctx context.Context) (err error) {
}()
_, err = pb.NewNodeClient(conn).CheckIn(ctx, &pb.CheckInRequest{
Address: self.Address.GetAddress(),
Version: self.Version.GetVersion(),
Version: &self.Version,
Capacity: &self.Capacity,
Operator: &self.Operator,
})