From f8906ce000ef3cd26443cafb42ee17eadddb1b39 Mon Sep 17 00:00:00 2001 From: Egon Elbre Date: Tue, 15 Jan 2019 11:08:45 -0500 Subject: [PATCH] Change overlay cache database interface (#1047) --- internal/testplanet/node.go | 2 +- pkg/audit/cursor_test.go | 31 +- pkg/overlay/cache.go | 94 +-- pkg/overlay/cache_test.go | 42 +- pkg/overlay/client_test.go | 95 ++- pkg/overlay/config.go | 3 +- pkg/overlay/config_test.go | 19 - pkg/overlay/server.go | 64 +- pkg/overlay/server_test.go | 33 +- satellite/db.go | 4 +- satellite/satellitedb/database.go | 6 +- satellite/satellitedb/dbx/satellitedb.dbx | 39 +- satellite/satellitedb/dbx/satellitedb.dbx.go | 796 +++++++++++++----- .../dbx/satellitedb.dbx.postgres.sql | 21 +- .../dbx/satellitedb.dbx.sqlite3.sql | 21 +- satellite/satellitedb/locked.go | 59 +- satellite/satellitedb/overlaycache.go | 282 +++++-- .../satellitedb/satellitedbtest/utils.go | 2 +- 18 files changed, 1045 insertions(+), 568 deletions(-) delete mode 100644 pkg/overlay/config_test.go diff --git a/internal/testplanet/node.go b/internal/testplanet/node.go index 6589bda90..67bbbeae9 100644 --- a/internal/testplanet/node.go +++ b/internal/testplanet/node.go @@ -182,7 +182,7 @@ func (node *Node) initOverlay(planet *Planet) error { node.Kademlia = kad node.StatDB = node.Database.StatDB() - node.Overlay = overlay.NewCache(teststore.New(), node.StatDB) + node.Overlay = overlay.NewCache(node.Database.OverlayCache(), node.StatDB) node.Discovery = discovery.NewDiscovery(node.Log.Named("discovery"), node.Overlay, node.Kademlia, node.StatDB) return nil diff --git a/pkg/audit/cursor_test.go b/pkg/audit/cursor_test.go index 25228bebf..b9abc2841 100644 --- a/pkg/audit/cursor_test.go +++ b/pkg/audit/cursor_test.go @@ -4,21 +4,22 @@ package audit import ( - "context" "crypto/rand" "errors" "math" "math/big" "reflect" "testing" + "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.uber.org/zap" - "storj.io/storj/internal/testidentity" + "storj.io/storj/internal/testcontext" + "storj.io/storj/internal/testplanet" "storj.io/storj/internal/teststorj" "storj.io/storj/pkg/auth" - "storj.io/storj/pkg/overlay" "storj.io/storj/pkg/pb" "storj.io/storj/pkg/pointerdb" "storj.io/storj/pkg/storage/meta" @@ -27,7 +28,6 @@ import ( ) var ( - ctx = context.Background() ErrNoList = errors.New("list error: failed to get list") ErrNoNum = errors.New("num error: failed to get num") ) @@ -38,10 +38,17 @@ func TestAuditSegment(t *testing.T) { count int } - ca, err := testidentity.NewTestCA(ctx) - assert.NoError(t, err) - identity, err := ca.NewIdentity() - assert.NoError(t, err) + tctx := testcontext.New(t) + defer tctx.Cleanup() + + planet, err := testplanet.New(t, 1, 4, 1) + require.NoError(t, err) + defer tctx.Check(planet.Shutdown) + + planet.Start(tctx) + + // we wait a second for all the nodes to complete bootstrapping off the satellite + time.Sleep(2 * time.Second) // note: to simulate better, // change limit in library to 5 in @@ -92,15 +99,15 @@ func TestAuditSegment(t *testing.T) { }, } - ctx = auth.WithAPIKey(ctx, nil) + ctx := auth.WithAPIKey(tctx, nil) // PointerDB instantiation db := teststore.New() c := pointerdb.Config{MaxInlineSegmentSize: 8000} - cache := overlay.NewCache(teststore.New(), nil) - - pointers := pointerdb.NewServer(db, cache, zap.NewNop(), c, identity) + //TODO: use planet PointerDB directly + cache := planet.Satellites[0].Overlay + pointers := pointerdb.NewServer(db, cache, zap.NewNop(), c, planet.Satellites[0].Identity) // create a pdb client and instance of audit cursor := NewCursor(pointers) diff --git a/pkg/overlay/cache.go b/pkg/overlay/cache.go index c7f5cb71c..77f72af81 100644 --- a/pkg/overlay/cache.go +++ b/pkg/overlay/cache.go @@ -5,8 +5,8 @@ package overlay import ( "context" + "errors" - "github.com/gogo/protobuf/proto" "github.com/zeebo/errs" "go.uber.org/zap" @@ -21,9 +21,6 @@ const ( OverlayBucket = "overlay" ) -// ErrDelete is returned when there is a problem deleting a node from the cache -var ErrDelete = errs.New("error deleting node") - // ErrEmptyNode is returned when the nodeID is empty var ErrEmptyNode = errs.New("empty node ID") @@ -36,20 +33,35 @@ var ErrBucketNotFound = errs.New("Bucket not found") // OverlayError creates class of errors for stack traces var OverlayError = errs.Class("Overlay Error") +// DB implements the database for overlay.Cache +type DB interface { + // Get looks up the node by nodeID + Get(ctx context.Context, nodeID storj.NodeID) (*pb.Node, error) + // GetAll looks up nodes based on the ids from the overlay cache + GetAll(ctx context.Context, nodeIDs storj.NodeIDList) ([]*pb.Node, error) + // List lists nodes starting from cursor + List(ctx context.Context, cursor storj.NodeID, limit int) ([]*pb.Node, error) + // Update updates node information + Update(ctx context.Context, value *pb.Node) error + // Delete deletes node based on id + Delete(ctx context.Context, id storj.NodeID) error +} + // Cache is used to store overlay data in Redis type Cache struct { - db storage.KeyValueStore + db DB statDB statdb.DB } // NewCache returns a new Cache -func NewCache(db storage.KeyValueStore, sdb statdb.DB) *Cache { +func NewCache(db DB, sdb statdb.DB) *Cache { return &Cache{db: db, statDB: sdb} } // Inspect lists limited number of items in the cache func (cache *Cache) Inspect(ctx context.Context) (storage.Keys, error) { - return cache.db.List(nil, 0) + // TODO: implement inspection tools + return nil, errors.New("not implemented") } // Get looks up the provided nodeID from the overlay cache @@ -58,51 +70,16 @@ func (cache *Cache) Get(ctx context.Context, nodeID storj.NodeID) (*pb.Node, err return nil, ErrEmptyNode } - b, err := cache.db.Get(nodeID.Bytes()) - if err != nil { - if storage.ErrKeyNotFound.Has(err) { - return nil, ErrNodeNotFound - } - return nil, err - } - if b == nil { - return nil, ErrNodeNotFound - } - - na := &pb.Node{} - if err := proto.Unmarshal(b, na); err != nil { - return nil, err - } - return na, nil + return cache.db.Get(ctx, nodeID) } -// GetAll looks up the provided nodeIDs from the overlay cache -func (cache *Cache) GetAll(ctx context.Context, nodeIDs storj.NodeIDList) ([]*pb.Node, error) { - if len(nodeIDs) == 0 { - return nil, OverlayError.New("no nodeIDs provided") +// GetAll looks up the provided ids from the overlay cache +func (cache *Cache) GetAll(ctx context.Context, ids storj.NodeIDList) ([]*pb.Node, error) { + if len(ids) == 0 { + return nil, OverlayError.New("no ids provided") } - var ks storage.Keys - for _, v := range nodeIDs { - ks = append(ks, v.Bytes()) - } - vs, err := cache.db.GetAll(ks) - if err != nil { - return nil, err - } - var ns []*pb.Node - for _, v := range vs { - if v == nil { - ns = append(ns, nil) - continue - } - na := &pb.Node{} - err := proto.Unmarshal(v, na) - if err != nil { - return nil, OverlayError.New("could not unmarshal non-nil node: %v", err) - } - ns = append(ns, na) - } - return ns, nil + + return cache.db.GetAll(ctx, ids) } // Put adds a nodeID to the redis cache with a binary representation of proto defined Node @@ -112,12 +89,16 @@ func (cache *Cache) Put(ctx context.Context, nodeID storj.NodeID, value pb.Node) if nodeID.IsZero() { return nil } + if nodeID != value.Id { + return errors.New("invalid request") + } // get existing node rep, or create a new statdb node with 0 rep stats, err := cache.statDB.CreateEntryIfNotExists(ctx, nodeID) if err != nil { return err } + value.Reputation = &pb.NodeStats{ AuditSuccessRatio: stats.AuditSuccessRatio, AuditSuccessCount: stats.AuditSuccessCount, @@ -127,12 +108,7 @@ func (cache *Cache) Put(ctx context.Context, nodeID storj.NodeID, value pb.Node) UptimeCount: stats.UptimeCount, } - data, err := proto.Marshal(&value) - if err != nil { - return err - } - - return cache.db.Put(nodeID.Bytes(), data) + return cache.db.Update(ctx, &value) } // Delete will remove the node from the cache. Used when a node hard disconnects or fails @@ -141,13 +117,7 @@ func (cache *Cache) Delete(ctx context.Context, id storj.NodeID) error { if id.IsZero() { return ErrEmptyNode } - - err := cache.db.Delete(id.Bytes()) - if err != nil { - return ErrDelete - } - - return nil + return cache.db.Delete(ctx, id) } // ConnFailure implements the Transport Observer `ConnFailure` function diff --git a/pkg/overlay/cache_test.go b/pkg/overlay/cache_test.go index 2a2f7aa7f..609c446cb 100644 --- a/pkg/overlay/cache_test.go +++ b/pkg/overlay/cache_test.go @@ -11,18 +11,24 @@ import ( "github.com/stretchr/testify/assert" "storj.io/storj/internal/testcontext" - "storj.io/storj/internal/testplanet" "storj.io/storj/pkg/overlay" "storj.io/storj/pkg/pb" "storj.io/storj/pkg/statdb" "storj.io/storj/pkg/storj" "storj.io/storj/satellite" "storj.io/storj/satellite/satellitedb/satellitedbtest" - "storj.io/storj/storage" - "storj.io/storj/storage/teststore" ) -func testCache(ctx context.Context, t *testing.T, store storage.KeyValueStore, sdb statdb.DB) { +func TestCache_Database(t *testing.T) { + satellitedbtest.Run(t, func(t *testing.T, db satellite.DB) { + ctx := testcontext.New(t) + defer ctx.Cleanup() + + testCache(ctx, t, db.OverlayCache(), db.StatDB()) + }) +} + +func testCache(ctx context.Context, t *testing.T, store overlay.DB, sdb statdb.DB) { valid1ID := storj.NodeID{} valid2ID := storj.NodeID{} missingID := storj.NodeID{} @@ -65,11 +71,7 @@ func testCache(ctx context.Context, t *testing.T, store storage.KeyValueStore, s assert.True(t, err == overlay.ErrNodeNotFound) assert.Nil(t, invalid2) - if storeClient, ok := store.(*teststore.Client); ok { - storeClient.ForceError++ - _, err := cache.Get(ctx, valid1ID) - assert.Error(t, err) - } + // TODO: add erroring database test } { // GetAll @@ -92,11 +94,7 @@ func testCache(ctx context.Context, t *testing.T, store storage.KeyValueStore, s _, err = cache.GetAll(ctx, storj.NodeIDList{}) assert.True(t, overlay.OverlayError.Has(err)) - if storeClient, ok := store.(*teststore.Client); ok { - storeClient.ForceError++ - _, err := cache.GetAll(ctx, storj.NodeIDList{valid1ID, valid2ID}) - assert.Error(t, err) - } + // TODO: add erroring database test } { // Delete @@ -120,19 +118,3 @@ func testCache(ctx context.Context, t *testing.T, store storage.KeyValueStore, s assert.True(t, err == overlay.ErrEmptyNode) } } - -func TestCache_Masterdb(t *testing.T) { - ctx := testcontext.New(t) - defer ctx.Cleanup() - - planet, err := testplanet.New(t, 1, 4, 0) - if err != nil { - t.Fatal(err) - } - defer ctx.Check(planet.Shutdown) - planet.Start(ctx) - - satellitedbtest.Run(t, func(t *testing.T, db satellite.DB) { - testCache(ctx, t, db.OverlayCache(), db.StatDB()) - }) -} diff --git a/pkg/overlay/client_test.go b/pkg/overlay/client_test.go index ad67c7a46..abce75938 100644 --- a/pkg/overlay/client_test.go +++ b/pkg/overlay/client_test.go @@ -8,6 +8,7 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "storj.io/storj/internal/testcontext" "storj.io/storj/internal/testidentity" @@ -43,6 +44,22 @@ func TestNewClient(t *testing.T) { } func TestChoose(t *testing.T) { + ctx := testcontext.New(t) + defer ctx.Cleanup() + + planet, err := testplanet.New(t, 1, 4, 1) + require.NoError(t, err) + + planet.Start(ctx) + // we wait a second for all the nodes to complete bootstrapping off the satellite + time.Sleep(2 * time.Second) + defer ctx.Check(planet.Shutdown) + + oc, err := planet.Uplinks[0].DialOverlay(planet.Satellites[0]) + if err != nil { + t.Fatal(err) + } + n1 := &pb.Node{Id: storj.NodeID{1}, Type: pb.NodeType_STORAGE} n2 := &pb.Node{Id: storj.NodeID{2}, Type: pb.NodeType_STORAGE} n3 := &pb.Node{Id: storj.NodeID{3}, Type: pb.NodeType_STORAGE} @@ -57,13 +74,6 @@ func TestChoose(t *testing.T) { id3 := storj.NodeID{3} id4 := storj.NodeID{4} - ctx := testcontext.New(t) - defer ctx.Cleanup() - - planet, cleanup := getPlanet(ctx, t) - defer cleanup() - oc := getOverlayClient(t, planet) - cases := []struct { limit int space int64 @@ -122,9 +132,18 @@ func TestLookup(t *testing.T) { ctx := testcontext.New(t) defer ctx.Cleanup() - planet, cleanup := getPlanet(ctx, t) - defer cleanup() - oc := getOverlayClient(t, planet) + planet, err := testplanet.New(t, 1, 4, 1) + require.NoError(t, err) + + planet.Start(ctx) + // we wait a second for all the nodes to complete bootstrapping off the satellite + time.Sleep(2 * time.Second) + defer ctx.Check(planet.Shutdown) + + oc, err := planet.Uplinks[0].DialOverlay(planet.Satellites[0]) + if err != nil { + t.Fatal(err) + } nid1 := planet.StorageNodes[0].ID() @@ -158,9 +177,18 @@ func TestBulkLookup(t *testing.T) { ctx := testcontext.New(t) defer ctx.Cleanup() - planet, cleanup := getPlanet(ctx, t) - defer cleanup() - oc := getOverlayClient(t, planet) + planet, err := testplanet.New(t, 1, 4, 1) + require.NoError(t, err) + + planet.Start(ctx) + // we wait a second for all the nodes to complete bootstrapping off the satellite + time.Sleep(2 * time.Second) + defer ctx.Check(planet.Shutdown) + + oc, err := planet.Uplinks[0].DialOverlay(planet.Satellites[0]) + if err != nil { + t.Fatal(err) + } nid1 := planet.StorageNodes[0].ID() nid2 := planet.StorageNodes[1].ID() @@ -189,9 +217,18 @@ func TestBulkLookupV2(t *testing.T) { ctx := testcontext.New(t) defer ctx.Cleanup() - planet, cleanup := getPlanet(ctx, t) - defer cleanup() - oc := getOverlayClient(t, planet) + planet, err := testplanet.New(t, 1, 4, 1) + require.NoError(t, err) + + planet.Start(ctx) + // we wait a second for all the nodes to complete bootstrapping off the satellite + time.Sleep(2 * time.Second) + defer ctx.Check(planet.Shutdown) + + oc, err := planet.Uplinks[0].DialOverlay(planet.Satellites[0]) + if err != nil { + t.Fatal(err) + } cache := planet.Satellites[0].Overlay @@ -248,29 +285,3 @@ func TestBulkLookupV2(t *testing.T) { } } } - -func getPlanet(ctx *testcontext.Context, t *testing.T) (planet *testplanet.Planet, f func()) { - planet, err := testplanet.New(t, 1, 4, 1) - if err != nil { - t.Fatal(err) - } - - planet.Start(ctx) - // we wait a second for all the nodes to complete bootstrapping off the satellite - time.Sleep(2 * time.Second) - - f = func() { - ctx.Check(planet.Shutdown) - } - - return planet, f -} - -func getOverlayClient(t *testing.T, planet *testplanet.Planet) (oc overlay.Client) { - oc, err := planet.Uplinks[0].DialOverlay(planet.Satellites[0]) - if err != nil { - t.Fatal(err) - } - - return oc -} diff --git a/pkg/overlay/config.go b/pkg/overlay/config.go index 7fb4db2e0..326dd0459 100644 --- a/pkg/overlay/config.go +++ b/pkg/overlay/config.go @@ -17,7 +17,6 @@ import ( "storj.io/storj/pkg/statdb" "storj.io/storj/pkg/storj" "storj.io/storj/pkg/utils" - "storj.io/storj/storage" ) var ( @@ -64,7 +63,7 @@ func (c Config) Run(ctx context.Context, server *provider.Provider) ( sdb, ok := ctx.Value("masterdb").(interface { StatDB() statdb.DB - OverlayCache() storage.KeyValueStore + OverlayCache() DB }) if !ok { return Error.Wrap(errs.New("unable to get master db instance")) diff --git a/pkg/overlay/config_test.go b/pkg/overlay/config_test.go deleted file mode 100644 index 1cb3beedc..000000000 --- a/pkg/overlay/config_test.go +++ /dev/null @@ -1,19 +0,0 @@ -// Copyright (C) 2018 Storj Labs, Inc. -// See LICENSE for copying information. -package overlay - -import ( - "context" - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestRun(t *testing.T) { - ctx := context.Background() - - // run with nil, pass pointer to Kademlia in context - err := Config{}.Run(ctx, nil) - assert.Error(t, err) - assert.Equal(t, "overlay error: unable to get master db instance", err.Error()) -} diff --git a/pkg/overlay/server.go b/pkg/overlay/server.go index 8c737df9e..8ddb96da4 100644 --- a/pkg/overlay/server.go +++ b/pkg/overlay/server.go @@ -8,7 +8,6 @@ import ( "context" "fmt" - "github.com/gogo/protobuf/proto" "github.com/zeebo/errs" "go.uber.org/zap" "google.golang.org/grpc/codes" @@ -17,7 +16,6 @@ import ( "storj.io/storj/pkg/pb" "storj.io/storj/pkg/storj" - "storj.io/storj/storage" ) // ServerError creates class of errors for stack traces @@ -120,52 +118,31 @@ func (server *Server) FindStorageNodes(ctx context.Context, req *pb.FindStorageN }, nil } -func (server *Server) getNodes(ctx context.Context, keys storage.Keys) ([]*pb.Node, error) { - values, err := server.cache.db.GetAll(keys) - if err != nil { - return nil, Error.Wrap(err) - } - - nodes := []*pb.Node{} - for _, v := range values { - n := &pb.Node{} - if err := proto.Unmarshal(v, n); err != nil { - return nil, Error.Wrap(err) - } - - nodes = append(nodes, n) - } - - return nodes, nil - -} - -func (server *Server) populate(ctx context.Context, startID storj.NodeID, maxNodes int64, - minRestrictions *pb.NodeRestrictions, minReputation *pb.NodeStats, +// TODO: nicer method arguments +func (server *Server) populate(ctx context.Context, + startID storj.NodeID, maxNodes int64, + minRestrictions *pb.NodeRestrictions, + minReputation *pb.NodeStats, excluded storj.NodeIDList) ([]*pb.Node, storj.NodeID, error) { + // TODO: move the query into db limit := int(maxNodes * 2) - keys, err := server.cache.db.List(startID.Bytes(), limit) + nodes, err := server.cache.db.List(ctx, startID, limit) if err != nil { server.log.Error("Error listing nodes", zap.Error(err)) return nil, storj.NodeID{}, Error.Wrap(err) } - if len(keys) <= 0 { - server.log.Info("No Keys returned from List operation") - return []*pb.Node{}, startID, nil - } - - // TODO: should this be `var result []*pb.Node` ? + var nextStart storj.NodeID result := []*pb.Node{} - nodes, err := server.getNodes(ctx, keys) - if err != nil { - server.log.Error("Error getting nodes", zap.Error(err)) - return nil, storj.NodeID{}, Error.Wrap(err) - } - for _, v := range nodes { + if v == nil { + continue + } + + nextStart = v.Id if v.Type != pb.NodeType_STORAGE { + server.log.Debug("not storage node = " + v.Id.String() + " was " + v.Type.String()) continue } @@ -179,19 +156,12 @@ func (server *Server) populate(ctx context.Context, startID storj.NodeID, maxNod reputation.GetAuditSuccessRatio() < minReputation.GetAuditSuccessRatio() || reputation.GetAuditCount() < minReputation.GetAuditCount() || contains(excluded, v.Id) { + server.log.Debug("excluded = " + v.Id.String()) continue } - result = append(result, v) - } - var nextStart storj.NodeID - if len(keys) < limit { - nextStart = storj.NodeID{} - } else { - nextStart, err = storj.NodeIDFromBytes(keys[len(keys)-1]) - } - if err != nil { - return nil, storj.NodeID{}, Error.Wrap(err) + server.log.Debug("append " + v.Id.String() + " - " + v.Type.String()) + result = append(result, v) } return result, nextStart, nil diff --git a/pkg/overlay/server_test.go b/pkg/overlay/server_test.go index f93ec6949..5f44a0137 100644 --- a/pkg/overlay/server_test.go +++ b/pkg/overlay/server_test.go @@ -8,6 +8,7 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "storj.io/storj/internal/testcontext" "storj.io/storj/internal/testplanet" @@ -34,17 +35,21 @@ func TestServer(t *testing.T) { // TODO: handle cleanup { // FindStorageNodes - result, err := server.FindStorageNodes(ctx, &pb.FindStorageNodesRequest{Opts: &pb.OverlayOptions{Amount: 2}}) - if assert.NoError(t, err) && assert.NotNil(t, result) { - assert.Len(t, result.Nodes, 2) - } + result, err := server.FindStorageNodes(ctx, &pb.FindStorageNodesRequest{ + Opts: &pb.OverlayOptions{Amount: 2}, + }) + require.NoError(t, err) + require.NotNil(t, result) + assert.Len(t, result.Nodes, 2) } { // Lookup - result, err := server.Lookup(ctx, &pb.LookupRequest{NodeId: planet.StorageNodes[0].ID()}) - if assert.NoError(t, err) && assert.NotNil(t, result) { - assert.Equal(t, result.Node.Address.Address, planet.StorageNodes[0].Addr()) - } + result, err := server.Lookup(ctx, &pb.LookupRequest{ + NodeId: planet.StorageNodes[0].ID(), + }) + require.NoError(t, err) + require.NotNil(t, result) + assert.Equal(t, result.Node.Address.Address, planet.StorageNodes[0].Addr()) } { // BulkLookup @@ -56,11 +61,13 @@ func TestServer(t *testing.T) { }, }) - if assert.NoError(t, err) && assert.NotNil(t, result) && assert.Len(t, result.LookupResponse, 3) { - for i, resp := range result.LookupResponse { - if assert.NotNil(t, resp.Node) { - assert.Equal(t, resp.Node.Address.Address, planet.StorageNodes[i].Addr()) - } + require.NoError(t, err) + require.NotNil(t, result) + require.Len(t, result.LookupResponse, 3) + + for i, resp := range result.LookupResponse { + if assert.NotNil(t, resp.Node) { + assert.Equal(t, resp.Node.Address.Address, planet.StorageNodes[i].Addr()) } } } diff --git a/satellite/db.go b/satellite/db.go index df2c4cb96..4be93b22b 100644 --- a/satellite/db.go +++ b/satellite/db.go @@ -8,8 +8,8 @@ import ( "storj.io/storj/pkg/bwagreement" "storj.io/storj/pkg/datarepair/irreparable" "storj.io/storj/pkg/datarepair/queue" + "storj.io/storj/pkg/overlay" "storj.io/storj/pkg/statdb" - "storj.io/storj/storage" ) // DB is the master database for the satellite @@ -24,7 +24,7 @@ type DB interface { // StatDB returns database for storing node statistics StatDB() statdb.DB // OverlayCache returns database for caching overlay information - OverlayCache() storage.KeyValueStore + OverlayCache() overlay.DB // Accounting returns database for storing information about data use Accounting() accounting.DB // RepairQueue returns queue for segments that need repairing diff --git a/satellite/satellitedb/database.go b/satellite/satellitedb/database.go index 767c47adf..1c4a5ff1e 100644 --- a/satellite/satellitedb/database.go +++ b/satellite/satellitedb/database.go @@ -11,11 +11,11 @@ import ( "storj.io/storj/pkg/bwagreement" "storj.io/storj/pkg/datarepair/irreparable" "storj.io/storj/pkg/datarepair/queue" + "storj.io/storj/pkg/overlay" "storj.io/storj/pkg/statdb" "storj.io/storj/pkg/utils" "storj.io/storj/satellite" dbx "storj.io/storj/satellite/satellitedb/dbx" - "storj.io/storj/storage" ) var ( @@ -52,7 +52,7 @@ func New(databaseURL string) (satellite.DB, error) { // NewInMemory creates instance of Sqlite in memory satellite database func NewInMemory() (satellite.DB, error) { - return New("sqlite3://file::memory:?mode=memory&cache=shared") + return New("sqlite3://file::memory:?mode=memory") } // BandwidthAgreement is a getter for bandwidth agreement repository @@ -71,7 +71,7 @@ func (db *DB) StatDB() statdb.DB { } // OverlayCache is a getter for overlay cache repository -func (db *DB) OverlayCache() storage.KeyValueStore { +func (db *DB) OverlayCache() overlay.DB { return &overlaycache{db: db.db} } diff --git a/satellite/satellitedb/dbx/satellitedb.dbx b/satellite/satellitedb/dbx/satellitedb.dbx index c767fd343..bd295d56b 100644 --- a/satellite/satellitedb/dbx/satellitedb.dbx +++ b/satellite/satellitedb/dbx/satellitedb.dbx @@ -156,31 +156,46 @@ read one ( //--- overlaycache ---// model overlay_cache_node ( - key key - unique key + key node_id + unique node_id - field key blob - field value blob ( updatable ) + field node_id blob + field node_type int + + field address text (updatable) // TODO: use compressed format + field protocol int (updatable) + + field operator_email text (updatable) + field operator_wallet text (updatable) //TODO: use compressed format + + field free_bandwidth int64 (updatable) + field free_disk int64 (updatable) + + field latency_90 int64 (updatable) + + field audit_success_ratio float64 (updatable) + field audit_uptime_ratio float64 (updatable) + field audit_count int64 (updatable) + field audit_success_count int64 (updatable) + + field uptime_count int64 (updatable) + field uptime_success_count int64 (updatable) ) create overlay_cache_node ( ) read one ( select overlay_cache_node - where overlay_cache_node.key = ? + where overlay_cache_node.node_id = ? ) read limitoffset ( select overlay_cache_node + where overlay_cache_node.node_id >= ? ) -read limitoffset ( - select overlay_cache_node - where overlay_cache_node.key >= ? -) - -update overlay_cache_node ( where overlay_cache_node.key = ? ) -delete overlay_cache_node ( where overlay_cache_node.key = ? ) +update overlay_cache_node ( where overlay_cache_node.node_id = ? ) +delete overlay_cache_node ( where overlay_cache_node.node_id = ? ) //--- repairqueue ---// diff --git a/satellite/satellitedb/dbx/satellitedb.dbx.go b/satellite/satellitedb/dbx/satellitedb.dbx.go index f1d509d98..13ad82fa2 100644 --- a/satellite/satellitedb/dbx/satellitedb.dbx.go +++ b/satellite/satellitedb/dbx/satellitedb.dbx.go @@ -333,10 +333,23 @@ CREATE TABLE nodes ( PRIMARY KEY ( id ) ); CREATE TABLE overlay_cache_nodes ( - key bytea NOT NULL, - value bytea NOT NULL, - PRIMARY KEY ( key ), - UNIQUE ( key ) + node_id bytea NOT NULL, + node_type integer NOT NULL, + address text NOT NULL, + protocol integer NOT NULL, + operator_email text NOT NULL, + operator_wallet text NOT NULL, + free_bandwidth bigint NOT NULL, + free_disk bigint NOT NULL, + latency_90 bigint NOT NULL, + audit_success_ratio double precision NOT NULL, + audit_uptime_ratio double precision NOT NULL, + audit_count bigint NOT NULL, + audit_success_count bigint NOT NULL, + uptime_count bigint NOT NULL, + uptime_success_count bigint NOT NULL, + PRIMARY KEY ( node_id ), + UNIQUE ( node_id ) );` } @@ -461,10 +474,23 @@ CREATE TABLE nodes ( PRIMARY KEY ( id ) ); CREATE TABLE overlay_cache_nodes ( - key BLOB NOT NULL, - value BLOB NOT NULL, - PRIMARY KEY ( key ), - UNIQUE ( key ) + node_id BLOB NOT NULL, + node_type INTEGER NOT NULL, + address TEXT NOT NULL, + protocol INTEGER NOT NULL, + operator_email TEXT NOT NULL, + operator_wallet TEXT NOT NULL, + free_bandwidth INTEGER NOT NULL, + free_disk INTEGER NOT NULL, + latency_90 INTEGER NOT NULL, + audit_success_ratio REAL NOT NULL, + audit_uptime_ratio REAL NOT NULL, + audit_count INTEGER NOT NULL, + audit_success_count INTEGER NOT NULL, + uptime_count INTEGER NOT NULL, + uptime_success_count INTEGER NOT NULL, + PRIMARY KEY ( node_id ), + UNIQUE ( node_id ) );` } @@ -1336,53 +1362,325 @@ func (f Node_UpdatedAt_Field) value() interface{} { func (Node_UpdatedAt_Field) _Column() string { return "updated_at" } type OverlayCacheNode struct { - Key []byte - Value []byte + NodeId []byte + NodeType int + Address string + Protocol int + OperatorEmail string + OperatorWallet string + FreeBandwidth int64 + FreeDisk int64 + Latency90 int64 + AuditSuccessRatio float64 + AuditUptimeRatio float64 + AuditCount int64 + AuditSuccessCount int64 + UptimeCount int64 + UptimeSuccessCount int64 } func (OverlayCacheNode) _Table() string { return "overlay_cache_nodes" } type OverlayCacheNode_Update_Fields struct { - Value OverlayCacheNode_Value_Field + Address OverlayCacheNode_Address_Field + Protocol OverlayCacheNode_Protocol_Field + OperatorEmail OverlayCacheNode_OperatorEmail_Field + OperatorWallet OverlayCacheNode_OperatorWallet_Field + FreeBandwidth OverlayCacheNode_FreeBandwidth_Field + FreeDisk OverlayCacheNode_FreeDisk_Field + Latency90 OverlayCacheNode_Latency90_Field + AuditSuccessRatio OverlayCacheNode_AuditSuccessRatio_Field + AuditUptimeRatio OverlayCacheNode_AuditUptimeRatio_Field + AuditCount OverlayCacheNode_AuditCount_Field + AuditSuccessCount OverlayCacheNode_AuditSuccessCount_Field + UptimeCount OverlayCacheNode_UptimeCount_Field + UptimeSuccessCount OverlayCacheNode_UptimeSuccessCount_Field } -type OverlayCacheNode_Key_Field struct { +type OverlayCacheNode_NodeId_Field struct { _set bool _null bool _value []byte } -func OverlayCacheNode_Key(v []byte) OverlayCacheNode_Key_Field { - return OverlayCacheNode_Key_Field{_set: true, _value: v} +func OverlayCacheNode_NodeId(v []byte) OverlayCacheNode_NodeId_Field { + return OverlayCacheNode_NodeId_Field{_set: true, _value: v} } -func (f OverlayCacheNode_Key_Field) value() interface{} { +func (f OverlayCacheNode_NodeId_Field) value() interface{} { if !f._set || f._null { return nil } return f._value } -func (OverlayCacheNode_Key_Field) _Column() string { return "key" } +func (OverlayCacheNode_NodeId_Field) _Column() string { return "node_id" } -type OverlayCacheNode_Value_Field struct { +type OverlayCacheNode_NodeType_Field struct { _set bool _null bool - _value []byte + _value int } -func OverlayCacheNode_Value(v []byte) OverlayCacheNode_Value_Field { - return OverlayCacheNode_Value_Field{_set: true, _value: v} +func OverlayCacheNode_NodeType(v int) OverlayCacheNode_NodeType_Field { + return OverlayCacheNode_NodeType_Field{_set: true, _value: v} } -func (f OverlayCacheNode_Value_Field) value() interface{} { +func (f OverlayCacheNode_NodeType_Field) value() interface{} { if !f._set || f._null { return nil } return f._value } -func (OverlayCacheNode_Value_Field) _Column() string { return "value" } +func (OverlayCacheNode_NodeType_Field) _Column() string { return "node_type" } + +type OverlayCacheNode_Address_Field struct { + _set bool + _null bool + _value string +} + +func OverlayCacheNode_Address(v string) OverlayCacheNode_Address_Field { + return OverlayCacheNode_Address_Field{_set: true, _value: v} +} + +func (f OverlayCacheNode_Address_Field) value() interface{} { + if !f._set || f._null { + return nil + } + return f._value +} + +func (OverlayCacheNode_Address_Field) _Column() string { return "address" } + +type OverlayCacheNode_Protocol_Field struct { + _set bool + _null bool + _value int +} + +func OverlayCacheNode_Protocol(v int) OverlayCacheNode_Protocol_Field { + return OverlayCacheNode_Protocol_Field{_set: true, _value: v} +} + +func (f OverlayCacheNode_Protocol_Field) value() interface{} { + if !f._set || f._null { + return nil + } + return f._value +} + +func (OverlayCacheNode_Protocol_Field) _Column() string { return "protocol" } + +type OverlayCacheNode_OperatorEmail_Field struct { + _set bool + _null bool + _value string +} + +func OverlayCacheNode_OperatorEmail(v string) OverlayCacheNode_OperatorEmail_Field { + return OverlayCacheNode_OperatorEmail_Field{_set: true, _value: v} +} + +func (f OverlayCacheNode_OperatorEmail_Field) value() interface{} { + if !f._set || f._null { + return nil + } + return f._value +} + +func (OverlayCacheNode_OperatorEmail_Field) _Column() string { return "operator_email" } + +type OverlayCacheNode_OperatorWallet_Field struct { + _set bool + _null bool + _value string +} + +func OverlayCacheNode_OperatorWallet(v string) OverlayCacheNode_OperatorWallet_Field { + return OverlayCacheNode_OperatorWallet_Field{_set: true, _value: v} +} + +func (f OverlayCacheNode_OperatorWallet_Field) value() interface{} { + if !f._set || f._null { + return nil + } + return f._value +} + +func (OverlayCacheNode_OperatorWallet_Field) _Column() string { return "operator_wallet" } + +type OverlayCacheNode_FreeBandwidth_Field struct { + _set bool + _null bool + _value int64 +} + +func OverlayCacheNode_FreeBandwidth(v int64) OverlayCacheNode_FreeBandwidth_Field { + return OverlayCacheNode_FreeBandwidth_Field{_set: true, _value: v} +} + +func (f OverlayCacheNode_FreeBandwidth_Field) value() interface{} { + if !f._set || f._null { + return nil + } + return f._value +} + +func (OverlayCacheNode_FreeBandwidth_Field) _Column() string { return "free_bandwidth" } + +type OverlayCacheNode_FreeDisk_Field struct { + _set bool + _null bool + _value int64 +} + +func OverlayCacheNode_FreeDisk(v int64) OverlayCacheNode_FreeDisk_Field { + return OverlayCacheNode_FreeDisk_Field{_set: true, _value: v} +} + +func (f OverlayCacheNode_FreeDisk_Field) value() interface{} { + if !f._set || f._null { + return nil + } + return f._value +} + +func (OverlayCacheNode_FreeDisk_Field) _Column() string { return "free_disk" } + +type OverlayCacheNode_Latency90_Field struct { + _set bool + _null bool + _value int64 +} + +func OverlayCacheNode_Latency90(v int64) OverlayCacheNode_Latency90_Field { + return OverlayCacheNode_Latency90_Field{_set: true, _value: v} +} + +func (f OverlayCacheNode_Latency90_Field) value() interface{} { + if !f._set || f._null { + return nil + } + return f._value +} + +func (OverlayCacheNode_Latency90_Field) _Column() string { return "latency_90" } + +type OverlayCacheNode_AuditSuccessRatio_Field struct { + _set bool + _null bool + _value float64 +} + +func OverlayCacheNode_AuditSuccessRatio(v float64) OverlayCacheNode_AuditSuccessRatio_Field { + return OverlayCacheNode_AuditSuccessRatio_Field{_set: true, _value: v} +} + +func (f OverlayCacheNode_AuditSuccessRatio_Field) value() interface{} { + if !f._set || f._null { + return nil + } + return f._value +} + +func (OverlayCacheNode_AuditSuccessRatio_Field) _Column() string { return "audit_success_ratio" } + +type OverlayCacheNode_AuditUptimeRatio_Field struct { + _set bool + _null bool + _value float64 +} + +func OverlayCacheNode_AuditUptimeRatio(v float64) OverlayCacheNode_AuditUptimeRatio_Field { + return OverlayCacheNode_AuditUptimeRatio_Field{_set: true, _value: v} +} + +func (f OverlayCacheNode_AuditUptimeRatio_Field) value() interface{} { + if !f._set || f._null { + return nil + } + return f._value +} + +func (OverlayCacheNode_AuditUptimeRatio_Field) _Column() string { return "audit_uptime_ratio" } + +type OverlayCacheNode_AuditCount_Field struct { + _set bool + _null bool + _value int64 +} + +func OverlayCacheNode_AuditCount(v int64) OverlayCacheNode_AuditCount_Field { + return OverlayCacheNode_AuditCount_Field{_set: true, _value: v} +} + +func (f OverlayCacheNode_AuditCount_Field) value() interface{} { + if !f._set || f._null { + return nil + } + return f._value +} + +func (OverlayCacheNode_AuditCount_Field) _Column() string { return "audit_count" } + +type OverlayCacheNode_AuditSuccessCount_Field struct { + _set bool + _null bool + _value int64 +} + +func OverlayCacheNode_AuditSuccessCount(v int64) OverlayCacheNode_AuditSuccessCount_Field { + return OverlayCacheNode_AuditSuccessCount_Field{_set: true, _value: v} +} + +func (f OverlayCacheNode_AuditSuccessCount_Field) value() interface{} { + if !f._set || f._null { + return nil + } + return f._value +} + +func (OverlayCacheNode_AuditSuccessCount_Field) _Column() string { return "audit_success_count" } + +type OverlayCacheNode_UptimeCount_Field struct { + _set bool + _null bool + _value int64 +} + +func OverlayCacheNode_UptimeCount(v int64) OverlayCacheNode_UptimeCount_Field { + return OverlayCacheNode_UptimeCount_Field{_set: true, _value: v} +} + +func (f OverlayCacheNode_UptimeCount_Field) value() interface{} { + if !f._set || f._null { + return nil + } + return f._value +} + +func (OverlayCacheNode_UptimeCount_Field) _Column() string { return "uptime_count" } + +type OverlayCacheNode_UptimeSuccessCount_Field struct { + _set bool + _null bool + _value int64 +} + +func OverlayCacheNode_UptimeSuccessCount(v int64) OverlayCacheNode_UptimeSuccessCount_Field { + return OverlayCacheNode_UptimeSuccessCount_Field{_set: true, _value: v} +} + +func (f OverlayCacheNode_UptimeSuccessCount_Field) value() interface{} { + if !f._set || f._null { + return nil + } + return f._value +} + +func (OverlayCacheNode_UptimeSuccessCount_Field) _Column() string { return "uptime_success_count" } func toUTC(t time.Time) time.Time { return t.UTC() @@ -1726,19 +2024,45 @@ func (obj *postgresImpl) Create_Node(ctx context.Context, } func (obj *postgresImpl) Create_OverlayCacheNode(ctx context.Context, - overlay_cache_node_key OverlayCacheNode_Key_Field, - overlay_cache_node_value OverlayCacheNode_Value_Field) ( + overlay_cache_node_node_id OverlayCacheNode_NodeId_Field, + overlay_cache_node_node_type OverlayCacheNode_NodeType_Field, + overlay_cache_node_address OverlayCacheNode_Address_Field, + overlay_cache_node_protocol OverlayCacheNode_Protocol_Field, + overlay_cache_node_operator_email OverlayCacheNode_OperatorEmail_Field, + overlay_cache_node_operator_wallet OverlayCacheNode_OperatorWallet_Field, + overlay_cache_node_free_bandwidth OverlayCacheNode_FreeBandwidth_Field, + overlay_cache_node_free_disk OverlayCacheNode_FreeDisk_Field, + overlay_cache_node_latency_90 OverlayCacheNode_Latency90_Field, + overlay_cache_node_audit_success_ratio OverlayCacheNode_AuditSuccessRatio_Field, + overlay_cache_node_audit_uptime_ratio OverlayCacheNode_AuditUptimeRatio_Field, + overlay_cache_node_audit_count OverlayCacheNode_AuditCount_Field, + overlay_cache_node_audit_success_count OverlayCacheNode_AuditSuccessCount_Field, + overlay_cache_node_uptime_count OverlayCacheNode_UptimeCount_Field, + overlay_cache_node_uptime_success_count OverlayCacheNode_UptimeSuccessCount_Field) ( overlay_cache_node *OverlayCacheNode, err error) { - __key_val := overlay_cache_node_key.value() - __value_val := overlay_cache_node_value.value() + __node_id_val := overlay_cache_node_node_id.value() + __node_type_val := overlay_cache_node_node_type.value() + __address_val := overlay_cache_node_address.value() + __protocol_val := overlay_cache_node_protocol.value() + __operator_email_val := overlay_cache_node_operator_email.value() + __operator_wallet_val := overlay_cache_node_operator_wallet.value() + __free_bandwidth_val := overlay_cache_node_free_bandwidth.value() + __free_disk_val := overlay_cache_node_free_disk.value() + __latency_90_val := overlay_cache_node_latency_90.value() + __audit_success_ratio_val := overlay_cache_node_audit_success_ratio.value() + __audit_uptime_ratio_val := overlay_cache_node_audit_uptime_ratio.value() + __audit_count_val := overlay_cache_node_audit_count.value() + __audit_success_count_val := overlay_cache_node_audit_success_count.value() + __uptime_count_val := overlay_cache_node_uptime_count.value() + __uptime_success_count_val := overlay_cache_node_uptime_success_count.value() - var __embed_stmt = __sqlbundle_Literal("INSERT INTO overlay_cache_nodes ( key, value ) VALUES ( ?, ? ) RETURNING overlay_cache_nodes.key, overlay_cache_nodes.value") + var __embed_stmt = __sqlbundle_Literal("INSERT INTO overlay_cache_nodes ( node_id, node_type, address, protocol, operator_email, operator_wallet, free_bandwidth, free_disk, latency_90, audit_success_ratio, audit_uptime_ratio, audit_count, audit_success_count, uptime_count, uptime_success_count ) VALUES ( ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? ) RETURNING overlay_cache_nodes.node_id, overlay_cache_nodes.node_type, overlay_cache_nodes.address, overlay_cache_nodes.protocol, overlay_cache_nodes.operator_email, overlay_cache_nodes.operator_wallet, overlay_cache_nodes.free_bandwidth, overlay_cache_nodes.free_disk, overlay_cache_nodes.latency_90, overlay_cache_nodes.audit_success_ratio, overlay_cache_nodes.audit_uptime_ratio, overlay_cache_nodes.audit_count, overlay_cache_nodes.audit_success_count, overlay_cache_nodes.uptime_count, overlay_cache_nodes.uptime_success_count") var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt) - obj.logStmt(__stmt, __key_val, __value_val) + obj.logStmt(__stmt, __node_id_val, __node_type_val, __address_val, __protocol_val, __operator_email_val, __operator_wallet_val, __free_bandwidth_val, __free_disk_val, __latency_90_val, __audit_success_ratio_val, __audit_uptime_ratio_val, __audit_count_val, __audit_success_count_val, __uptime_count_val, __uptime_success_count_val) overlay_cache_node = &OverlayCacheNode{} - err = obj.driver.QueryRow(__stmt, __key_val, __value_val).Scan(&overlay_cache_node.Key, &overlay_cache_node.Value) + err = obj.driver.QueryRow(__stmt, __node_id_val, __node_type_val, __address_val, __protocol_val, __operator_email_val, __operator_wallet_val, __free_bandwidth_val, __free_disk_val, __latency_90_val, __audit_success_ratio_val, __audit_uptime_ratio_val, __audit_count_val, __audit_success_count_val, __uptime_count_val, __uptime_success_count_val).Scan(&overlay_cache_node.NodeId, &overlay_cache_node.NodeType, &overlay_cache_node.Address, &overlay_cache_node.Protocol, &overlay_cache_node.OperatorEmail, &overlay_cache_node.OperatorWallet, &overlay_cache_node.FreeBandwidth, &overlay_cache_node.FreeDisk, &overlay_cache_node.Latency90, &overlay_cache_node.AuditSuccessRatio, &overlay_cache_node.AuditUptimeRatio, &overlay_cache_node.AuditCount, &overlay_cache_node.AuditSuccessCount, &overlay_cache_node.UptimeCount, &overlay_cache_node.UptimeSuccessCount) if err != nil { return nil, obj.makeErr(err) } @@ -2060,20 +2384,20 @@ func (obj *postgresImpl) Get_Node_By_Id(ctx context.Context, } -func (obj *postgresImpl) Get_OverlayCacheNode_By_Key(ctx context.Context, - overlay_cache_node_key OverlayCacheNode_Key_Field) ( +func (obj *postgresImpl) Get_OverlayCacheNode_By_NodeId(ctx context.Context, + overlay_cache_node_node_id OverlayCacheNode_NodeId_Field) ( overlay_cache_node *OverlayCacheNode, err error) { - var __embed_stmt = __sqlbundle_Literal("SELECT overlay_cache_nodes.key, overlay_cache_nodes.value FROM overlay_cache_nodes WHERE overlay_cache_nodes.key = ?") + var __embed_stmt = __sqlbundle_Literal("SELECT overlay_cache_nodes.node_id, overlay_cache_nodes.node_type, overlay_cache_nodes.address, overlay_cache_nodes.protocol, overlay_cache_nodes.operator_email, overlay_cache_nodes.operator_wallet, overlay_cache_nodes.free_bandwidth, overlay_cache_nodes.free_disk, overlay_cache_nodes.latency_90, overlay_cache_nodes.audit_success_ratio, overlay_cache_nodes.audit_uptime_ratio, overlay_cache_nodes.audit_count, overlay_cache_nodes.audit_success_count, overlay_cache_nodes.uptime_count, overlay_cache_nodes.uptime_success_count FROM overlay_cache_nodes WHERE overlay_cache_nodes.node_id = ?") var __values []interface{} - __values = append(__values, overlay_cache_node_key.value()) + __values = append(__values, overlay_cache_node_node_id.value()) var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt) obj.logStmt(__stmt, __values...) overlay_cache_node = &OverlayCacheNode{} - err = obj.driver.QueryRow(__stmt, __values...).Scan(&overlay_cache_node.Key, &overlay_cache_node.Value) + err = obj.driver.QueryRow(__stmt, __values...).Scan(&overlay_cache_node.NodeId, &overlay_cache_node.NodeType, &overlay_cache_node.Address, &overlay_cache_node.Protocol, &overlay_cache_node.OperatorEmail, &overlay_cache_node.OperatorWallet, &overlay_cache_node.FreeBandwidth, &overlay_cache_node.FreeDisk, &overlay_cache_node.Latency90, &overlay_cache_node.AuditSuccessRatio, &overlay_cache_node.AuditUptimeRatio, &overlay_cache_node.AuditCount, &overlay_cache_node.AuditSuccessCount, &overlay_cache_node.UptimeCount, &overlay_cache_node.UptimeSuccessCount) if err != nil { return nil, obj.makeErr(err) } @@ -2081,14 +2405,15 @@ func (obj *postgresImpl) Get_OverlayCacheNode_By_Key(ctx context.Context, } -func (obj *postgresImpl) Limited_OverlayCacheNode(ctx context.Context, +func (obj *postgresImpl) Limited_OverlayCacheNode_By_NodeId_GreaterOrEqual(ctx context.Context, + overlay_cache_node_node_id_greater_or_equal OverlayCacheNode_NodeId_Field, limit int, offset int64) ( rows []*OverlayCacheNode, err error) { - var __embed_stmt = __sqlbundle_Literal("SELECT overlay_cache_nodes.key, overlay_cache_nodes.value FROM overlay_cache_nodes LIMIT ? OFFSET ?") + var __embed_stmt = __sqlbundle_Literal("SELECT overlay_cache_nodes.node_id, overlay_cache_nodes.node_type, overlay_cache_nodes.address, overlay_cache_nodes.protocol, overlay_cache_nodes.operator_email, overlay_cache_nodes.operator_wallet, overlay_cache_nodes.free_bandwidth, overlay_cache_nodes.free_disk, overlay_cache_nodes.latency_90, overlay_cache_nodes.audit_success_ratio, overlay_cache_nodes.audit_uptime_ratio, overlay_cache_nodes.audit_count, overlay_cache_nodes.audit_success_count, overlay_cache_nodes.uptime_count, overlay_cache_nodes.uptime_success_count FROM overlay_cache_nodes WHERE overlay_cache_nodes.node_id >= ? LIMIT ? OFFSET ?") var __values []interface{} - __values = append(__values) + __values = append(__values, overlay_cache_node_node_id_greater_or_equal.value()) __values = append(__values, limit, offset) @@ -2103,43 +2428,7 @@ func (obj *postgresImpl) Limited_OverlayCacheNode(ctx context.Context, for __rows.Next() { overlay_cache_node := &OverlayCacheNode{} - err = __rows.Scan(&overlay_cache_node.Key, &overlay_cache_node.Value) - if err != nil { - return nil, obj.makeErr(err) - } - rows = append(rows, overlay_cache_node) - } - if err := __rows.Err(); err != nil { - return nil, obj.makeErr(err) - } - return rows, nil - -} - -func (obj *postgresImpl) Limited_OverlayCacheNode_By_Key_GreaterOrEqual(ctx context.Context, - overlay_cache_node_key_greater_or_equal OverlayCacheNode_Key_Field, - limit int, offset int64) ( - rows []*OverlayCacheNode, err error) { - - var __embed_stmt = __sqlbundle_Literal("SELECT overlay_cache_nodes.key, overlay_cache_nodes.value FROM overlay_cache_nodes WHERE overlay_cache_nodes.key >= ? LIMIT ? OFFSET ?") - - var __values []interface{} - __values = append(__values, overlay_cache_node_key_greater_or_equal.value()) - - __values = append(__values, limit, offset) - - var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt) - obj.logStmt(__stmt, __values...) - - __rows, err := obj.driver.Query(__stmt, __values...) - if err != nil { - return nil, obj.makeErr(err) - } - defer __rows.Close() - - for __rows.Next() { - overlay_cache_node := &OverlayCacheNode{} - err = __rows.Scan(&overlay_cache_node.Key, &overlay_cache_node.Value) + err = __rows.Scan(&overlay_cache_node.NodeId, &overlay_cache_node.NodeType, &overlay_cache_node.Address, &overlay_cache_node.Protocol, &overlay_cache_node.OperatorEmail, &overlay_cache_node.OperatorWallet, &overlay_cache_node.FreeBandwidth, &overlay_cache_node.FreeDisk, &overlay_cache_node.Latency90, &overlay_cache_node.AuditSuccessRatio, &overlay_cache_node.AuditUptimeRatio, &overlay_cache_node.AuditCount, &overlay_cache_node.AuditSuccessCount, &overlay_cache_node.UptimeCount, &overlay_cache_node.UptimeSuccessCount) if err != nil { return nil, obj.makeErr(err) } @@ -2454,28 +2743,88 @@ func (obj *postgresImpl) Update_Node_By_Id(ctx context.Context, return node, nil } -func (obj *postgresImpl) Update_OverlayCacheNode_By_Key(ctx context.Context, - overlay_cache_node_key OverlayCacheNode_Key_Field, +func (obj *postgresImpl) Update_OverlayCacheNode_By_NodeId(ctx context.Context, + overlay_cache_node_node_id OverlayCacheNode_NodeId_Field, update OverlayCacheNode_Update_Fields) ( overlay_cache_node *OverlayCacheNode, err error) { var __sets = &__sqlbundle_Hole{} - var __embed_stmt = __sqlbundle_Literals{Join: "", SQLs: []__sqlbundle_SQL{__sqlbundle_Literal("UPDATE overlay_cache_nodes SET "), __sets, __sqlbundle_Literal(" WHERE overlay_cache_nodes.key = ? RETURNING overlay_cache_nodes.key, overlay_cache_nodes.value")}} + var __embed_stmt = __sqlbundle_Literals{Join: "", SQLs: []__sqlbundle_SQL{__sqlbundle_Literal("UPDATE overlay_cache_nodes SET "), __sets, __sqlbundle_Literal(" WHERE overlay_cache_nodes.node_id = ? RETURNING overlay_cache_nodes.node_id, overlay_cache_nodes.node_type, overlay_cache_nodes.address, overlay_cache_nodes.protocol, overlay_cache_nodes.operator_email, overlay_cache_nodes.operator_wallet, overlay_cache_nodes.free_bandwidth, overlay_cache_nodes.free_disk, overlay_cache_nodes.latency_90, overlay_cache_nodes.audit_success_ratio, overlay_cache_nodes.audit_uptime_ratio, overlay_cache_nodes.audit_count, overlay_cache_nodes.audit_success_count, overlay_cache_nodes.uptime_count, overlay_cache_nodes.uptime_success_count")}} __sets_sql := __sqlbundle_Literals{Join: ", "} var __values []interface{} var __args []interface{} - if update.Value._set { - __values = append(__values, update.Value.value()) - __sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("value = ?")) + if update.Address._set { + __values = append(__values, update.Address.value()) + __sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("address = ?")) + } + + if update.Protocol._set { + __values = append(__values, update.Protocol.value()) + __sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("protocol = ?")) + } + + if update.OperatorEmail._set { + __values = append(__values, update.OperatorEmail.value()) + __sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("operator_email = ?")) + } + + if update.OperatorWallet._set { + __values = append(__values, update.OperatorWallet.value()) + __sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("operator_wallet = ?")) + } + + if update.FreeBandwidth._set { + __values = append(__values, update.FreeBandwidth.value()) + __sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("free_bandwidth = ?")) + } + + if update.FreeDisk._set { + __values = append(__values, update.FreeDisk.value()) + __sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("free_disk = ?")) + } + + if update.Latency90._set { + __values = append(__values, update.Latency90.value()) + __sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("latency_90 = ?")) + } + + if update.AuditSuccessRatio._set { + __values = append(__values, update.AuditSuccessRatio.value()) + __sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("audit_success_ratio = ?")) + } + + if update.AuditUptimeRatio._set { + __values = append(__values, update.AuditUptimeRatio.value()) + __sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("audit_uptime_ratio = ?")) + } + + if update.AuditCount._set { + __values = append(__values, update.AuditCount.value()) + __sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("audit_count = ?")) + } + + if update.AuditSuccessCount._set { + __values = append(__values, update.AuditSuccessCount.value()) + __sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("audit_success_count = ?")) + } + + if update.UptimeCount._set { + __values = append(__values, update.UptimeCount.value()) + __sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("uptime_count = ?")) + } + + if update.UptimeSuccessCount._set { + __values = append(__values, update.UptimeSuccessCount.value()) + __sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("uptime_success_count = ?")) } if len(__sets_sql.SQLs) == 0 { return nil, emptyUpdate() } - __args = append(__args, overlay_cache_node_key.value()) + __args = append(__args, overlay_cache_node_node_id.value()) __values = append(__values, __args...) __sets.SQL = __sets_sql @@ -2484,7 +2833,7 @@ func (obj *postgresImpl) Update_OverlayCacheNode_By_Key(ctx context.Context, obj.logStmt(__stmt, __values...) overlay_cache_node = &OverlayCacheNode{} - err = obj.driver.QueryRow(__stmt, __values...).Scan(&overlay_cache_node.Key, &overlay_cache_node.Value) + err = obj.driver.QueryRow(__stmt, __values...).Scan(&overlay_cache_node.NodeId, &overlay_cache_node.NodeType, &overlay_cache_node.Address, &overlay_cache_node.Protocol, &overlay_cache_node.OperatorEmail, &overlay_cache_node.OperatorWallet, &overlay_cache_node.FreeBandwidth, &overlay_cache_node.FreeDisk, &overlay_cache_node.Latency90, &overlay_cache_node.AuditSuccessRatio, &overlay_cache_node.AuditUptimeRatio, &overlay_cache_node.AuditCount, &overlay_cache_node.AuditSuccessCount, &overlay_cache_node.UptimeCount, &overlay_cache_node.UptimeSuccessCount) if err == sql.ErrNoRows { return nil, nil } @@ -2650,14 +2999,14 @@ func (obj *postgresImpl) Delete_Node_By_Id(ctx context.Context, } -func (obj *postgresImpl) Delete_OverlayCacheNode_By_Key(ctx context.Context, - overlay_cache_node_key OverlayCacheNode_Key_Field) ( +func (obj *postgresImpl) Delete_OverlayCacheNode_By_NodeId(ctx context.Context, + overlay_cache_node_node_id OverlayCacheNode_NodeId_Field) ( deleted bool, err error) { - var __embed_stmt = __sqlbundle_Literal("DELETE FROM overlay_cache_nodes WHERE overlay_cache_nodes.key = ?") + var __embed_stmt = __sqlbundle_Literal("DELETE FROM overlay_cache_nodes WHERE overlay_cache_nodes.node_id = ?") var __values []interface{} - __values = append(__values, overlay_cache_node_key.value()) + __values = append(__values, overlay_cache_node_node_id.value()) var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt) obj.logStmt(__stmt, __values...) @@ -2988,18 +3337,44 @@ func (obj *sqlite3Impl) Create_Node(ctx context.Context, } func (obj *sqlite3Impl) Create_OverlayCacheNode(ctx context.Context, - overlay_cache_node_key OverlayCacheNode_Key_Field, - overlay_cache_node_value OverlayCacheNode_Value_Field) ( + overlay_cache_node_node_id OverlayCacheNode_NodeId_Field, + overlay_cache_node_node_type OverlayCacheNode_NodeType_Field, + overlay_cache_node_address OverlayCacheNode_Address_Field, + overlay_cache_node_protocol OverlayCacheNode_Protocol_Field, + overlay_cache_node_operator_email OverlayCacheNode_OperatorEmail_Field, + overlay_cache_node_operator_wallet OverlayCacheNode_OperatorWallet_Field, + overlay_cache_node_free_bandwidth OverlayCacheNode_FreeBandwidth_Field, + overlay_cache_node_free_disk OverlayCacheNode_FreeDisk_Field, + overlay_cache_node_latency_90 OverlayCacheNode_Latency90_Field, + overlay_cache_node_audit_success_ratio OverlayCacheNode_AuditSuccessRatio_Field, + overlay_cache_node_audit_uptime_ratio OverlayCacheNode_AuditUptimeRatio_Field, + overlay_cache_node_audit_count OverlayCacheNode_AuditCount_Field, + overlay_cache_node_audit_success_count OverlayCacheNode_AuditSuccessCount_Field, + overlay_cache_node_uptime_count OverlayCacheNode_UptimeCount_Field, + overlay_cache_node_uptime_success_count OverlayCacheNode_UptimeSuccessCount_Field) ( overlay_cache_node *OverlayCacheNode, err error) { - __key_val := overlay_cache_node_key.value() - __value_val := overlay_cache_node_value.value() + __node_id_val := overlay_cache_node_node_id.value() + __node_type_val := overlay_cache_node_node_type.value() + __address_val := overlay_cache_node_address.value() + __protocol_val := overlay_cache_node_protocol.value() + __operator_email_val := overlay_cache_node_operator_email.value() + __operator_wallet_val := overlay_cache_node_operator_wallet.value() + __free_bandwidth_val := overlay_cache_node_free_bandwidth.value() + __free_disk_val := overlay_cache_node_free_disk.value() + __latency_90_val := overlay_cache_node_latency_90.value() + __audit_success_ratio_val := overlay_cache_node_audit_success_ratio.value() + __audit_uptime_ratio_val := overlay_cache_node_audit_uptime_ratio.value() + __audit_count_val := overlay_cache_node_audit_count.value() + __audit_success_count_val := overlay_cache_node_audit_success_count.value() + __uptime_count_val := overlay_cache_node_uptime_count.value() + __uptime_success_count_val := overlay_cache_node_uptime_success_count.value() - var __embed_stmt = __sqlbundle_Literal("INSERT INTO overlay_cache_nodes ( key, value ) VALUES ( ?, ? )") + var __embed_stmt = __sqlbundle_Literal("INSERT INTO overlay_cache_nodes ( node_id, node_type, address, protocol, operator_email, operator_wallet, free_bandwidth, free_disk, latency_90, audit_success_ratio, audit_uptime_ratio, audit_count, audit_success_count, uptime_count, uptime_success_count ) VALUES ( ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? )") var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt) - obj.logStmt(__stmt, __key_val, __value_val) + obj.logStmt(__stmt, __node_id_val, __node_type_val, __address_val, __protocol_val, __operator_email_val, __operator_wallet_val, __free_bandwidth_val, __free_disk_val, __latency_90_val, __audit_success_ratio_val, __audit_uptime_ratio_val, __audit_count_val, __audit_success_count_val, __uptime_count_val, __uptime_success_count_val) - __res, err := obj.driver.Exec(__stmt, __key_val, __value_val) + __res, err := obj.driver.Exec(__stmt, __node_id_val, __node_type_val, __address_val, __protocol_val, __operator_email_val, __operator_wallet_val, __free_bandwidth_val, __free_disk_val, __latency_90_val, __audit_success_ratio_val, __audit_uptime_ratio_val, __audit_count_val, __audit_success_count_val, __uptime_count_val, __uptime_success_count_val) if err != nil { return nil, obj.makeErr(err) } @@ -3328,20 +3703,20 @@ func (obj *sqlite3Impl) Get_Node_By_Id(ctx context.Context, } -func (obj *sqlite3Impl) Get_OverlayCacheNode_By_Key(ctx context.Context, - overlay_cache_node_key OverlayCacheNode_Key_Field) ( +func (obj *sqlite3Impl) Get_OverlayCacheNode_By_NodeId(ctx context.Context, + overlay_cache_node_node_id OverlayCacheNode_NodeId_Field) ( overlay_cache_node *OverlayCacheNode, err error) { - var __embed_stmt = __sqlbundle_Literal("SELECT overlay_cache_nodes.key, overlay_cache_nodes.value FROM overlay_cache_nodes WHERE overlay_cache_nodes.key = ?") + var __embed_stmt = __sqlbundle_Literal("SELECT overlay_cache_nodes.node_id, overlay_cache_nodes.node_type, overlay_cache_nodes.address, overlay_cache_nodes.protocol, overlay_cache_nodes.operator_email, overlay_cache_nodes.operator_wallet, overlay_cache_nodes.free_bandwidth, overlay_cache_nodes.free_disk, overlay_cache_nodes.latency_90, overlay_cache_nodes.audit_success_ratio, overlay_cache_nodes.audit_uptime_ratio, overlay_cache_nodes.audit_count, overlay_cache_nodes.audit_success_count, overlay_cache_nodes.uptime_count, overlay_cache_nodes.uptime_success_count FROM overlay_cache_nodes WHERE overlay_cache_nodes.node_id = ?") var __values []interface{} - __values = append(__values, overlay_cache_node_key.value()) + __values = append(__values, overlay_cache_node_node_id.value()) var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt) obj.logStmt(__stmt, __values...) overlay_cache_node = &OverlayCacheNode{} - err = obj.driver.QueryRow(__stmt, __values...).Scan(&overlay_cache_node.Key, &overlay_cache_node.Value) + err = obj.driver.QueryRow(__stmt, __values...).Scan(&overlay_cache_node.NodeId, &overlay_cache_node.NodeType, &overlay_cache_node.Address, &overlay_cache_node.Protocol, &overlay_cache_node.OperatorEmail, &overlay_cache_node.OperatorWallet, &overlay_cache_node.FreeBandwidth, &overlay_cache_node.FreeDisk, &overlay_cache_node.Latency90, &overlay_cache_node.AuditSuccessRatio, &overlay_cache_node.AuditUptimeRatio, &overlay_cache_node.AuditCount, &overlay_cache_node.AuditSuccessCount, &overlay_cache_node.UptimeCount, &overlay_cache_node.UptimeSuccessCount) if err != nil { return nil, obj.makeErr(err) } @@ -3349,14 +3724,15 @@ func (obj *sqlite3Impl) Get_OverlayCacheNode_By_Key(ctx context.Context, } -func (obj *sqlite3Impl) Limited_OverlayCacheNode(ctx context.Context, +func (obj *sqlite3Impl) Limited_OverlayCacheNode_By_NodeId_GreaterOrEqual(ctx context.Context, + overlay_cache_node_node_id_greater_or_equal OverlayCacheNode_NodeId_Field, limit int, offset int64) ( rows []*OverlayCacheNode, err error) { - var __embed_stmt = __sqlbundle_Literal("SELECT overlay_cache_nodes.key, overlay_cache_nodes.value FROM overlay_cache_nodes LIMIT ? OFFSET ?") + var __embed_stmt = __sqlbundle_Literal("SELECT overlay_cache_nodes.node_id, overlay_cache_nodes.node_type, overlay_cache_nodes.address, overlay_cache_nodes.protocol, overlay_cache_nodes.operator_email, overlay_cache_nodes.operator_wallet, overlay_cache_nodes.free_bandwidth, overlay_cache_nodes.free_disk, overlay_cache_nodes.latency_90, overlay_cache_nodes.audit_success_ratio, overlay_cache_nodes.audit_uptime_ratio, overlay_cache_nodes.audit_count, overlay_cache_nodes.audit_success_count, overlay_cache_nodes.uptime_count, overlay_cache_nodes.uptime_success_count FROM overlay_cache_nodes WHERE overlay_cache_nodes.node_id >= ? LIMIT ? OFFSET ?") var __values []interface{} - __values = append(__values) + __values = append(__values, overlay_cache_node_node_id_greater_or_equal.value()) __values = append(__values, limit, offset) @@ -3371,43 +3747,7 @@ func (obj *sqlite3Impl) Limited_OverlayCacheNode(ctx context.Context, for __rows.Next() { overlay_cache_node := &OverlayCacheNode{} - err = __rows.Scan(&overlay_cache_node.Key, &overlay_cache_node.Value) - if err != nil { - return nil, obj.makeErr(err) - } - rows = append(rows, overlay_cache_node) - } - if err := __rows.Err(); err != nil { - return nil, obj.makeErr(err) - } - return rows, nil - -} - -func (obj *sqlite3Impl) Limited_OverlayCacheNode_By_Key_GreaterOrEqual(ctx context.Context, - overlay_cache_node_key_greater_or_equal OverlayCacheNode_Key_Field, - limit int, offset int64) ( - rows []*OverlayCacheNode, err error) { - - var __embed_stmt = __sqlbundle_Literal("SELECT overlay_cache_nodes.key, overlay_cache_nodes.value FROM overlay_cache_nodes WHERE overlay_cache_nodes.key >= ? LIMIT ? OFFSET ?") - - var __values []interface{} - __values = append(__values, overlay_cache_node_key_greater_or_equal.value()) - - __values = append(__values, limit, offset) - - var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt) - obj.logStmt(__stmt, __values...) - - __rows, err := obj.driver.Query(__stmt, __values...) - if err != nil { - return nil, obj.makeErr(err) - } - defer __rows.Close() - - for __rows.Next() { - overlay_cache_node := &OverlayCacheNode{} - err = __rows.Scan(&overlay_cache_node.Key, &overlay_cache_node.Value) + err = __rows.Scan(&overlay_cache_node.NodeId, &overlay_cache_node.NodeType, &overlay_cache_node.Address, &overlay_cache_node.Protocol, &overlay_cache_node.OperatorEmail, &overlay_cache_node.OperatorWallet, &overlay_cache_node.FreeBandwidth, &overlay_cache_node.FreeDisk, &overlay_cache_node.Latency90, &overlay_cache_node.AuditSuccessRatio, &overlay_cache_node.AuditUptimeRatio, &overlay_cache_node.AuditCount, &overlay_cache_node.AuditSuccessCount, &overlay_cache_node.UptimeCount, &overlay_cache_node.UptimeSuccessCount) if err != nil { return nil, obj.makeErr(err) } @@ -3772,28 +4112,88 @@ func (obj *sqlite3Impl) Update_Node_By_Id(ctx context.Context, return node, nil } -func (obj *sqlite3Impl) Update_OverlayCacheNode_By_Key(ctx context.Context, - overlay_cache_node_key OverlayCacheNode_Key_Field, +func (obj *sqlite3Impl) Update_OverlayCacheNode_By_NodeId(ctx context.Context, + overlay_cache_node_node_id OverlayCacheNode_NodeId_Field, update OverlayCacheNode_Update_Fields) ( overlay_cache_node *OverlayCacheNode, err error) { var __sets = &__sqlbundle_Hole{} - var __embed_stmt = __sqlbundle_Literals{Join: "", SQLs: []__sqlbundle_SQL{__sqlbundle_Literal("UPDATE overlay_cache_nodes SET "), __sets, __sqlbundle_Literal(" WHERE overlay_cache_nodes.key = ?")}} + var __embed_stmt = __sqlbundle_Literals{Join: "", SQLs: []__sqlbundle_SQL{__sqlbundle_Literal("UPDATE overlay_cache_nodes SET "), __sets, __sqlbundle_Literal(" WHERE overlay_cache_nodes.node_id = ?")}} __sets_sql := __sqlbundle_Literals{Join: ", "} var __values []interface{} var __args []interface{} - if update.Value._set { - __values = append(__values, update.Value.value()) - __sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("value = ?")) + if update.Address._set { + __values = append(__values, update.Address.value()) + __sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("address = ?")) + } + + if update.Protocol._set { + __values = append(__values, update.Protocol.value()) + __sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("protocol = ?")) + } + + if update.OperatorEmail._set { + __values = append(__values, update.OperatorEmail.value()) + __sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("operator_email = ?")) + } + + if update.OperatorWallet._set { + __values = append(__values, update.OperatorWallet.value()) + __sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("operator_wallet = ?")) + } + + if update.FreeBandwidth._set { + __values = append(__values, update.FreeBandwidth.value()) + __sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("free_bandwidth = ?")) + } + + if update.FreeDisk._set { + __values = append(__values, update.FreeDisk.value()) + __sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("free_disk = ?")) + } + + if update.Latency90._set { + __values = append(__values, update.Latency90.value()) + __sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("latency_90 = ?")) + } + + if update.AuditSuccessRatio._set { + __values = append(__values, update.AuditSuccessRatio.value()) + __sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("audit_success_ratio = ?")) + } + + if update.AuditUptimeRatio._set { + __values = append(__values, update.AuditUptimeRatio.value()) + __sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("audit_uptime_ratio = ?")) + } + + if update.AuditCount._set { + __values = append(__values, update.AuditCount.value()) + __sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("audit_count = ?")) + } + + if update.AuditSuccessCount._set { + __values = append(__values, update.AuditSuccessCount.value()) + __sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("audit_success_count = ?")) + } + + if update.UptimeCount._set { + __values = append(__values, update.UptimeCount.value()) + __sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("uptime_count = ?")) + } + + if update.UptimeSuccessCount._set { + __values = append(__values, update.UptimeSuccessCount.value()) + __sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("uptime_success_count = ?")) } if len(__sets_sql.SQLs) == 0 { return nil, emptyUpdate() } - __args = append(__args, overlay_cache_node_key.value()) + __args = append(__args, overlay_cache_node_node_id.value()) __values = append(__values, __args...) __sets.SQL = __sets_sql @@ -3807,12 +4207,12 @@ func (obj *sqlite3Impl) Update_OverlayCacheNode_By_Key(ctx context.Context, return nil, obj.makeErr(err) } - var __embed_stmt_get = __sqlbundle_Literal("SELECT overlay_cache_nodes.key, overlay_cache_nodes.value FROM overlay_cache_nodes WHERE overlay_cache_nodes.key = ?") + var __embed_stmt_get = __sqlbundle_Literal("SELECT overlay_cache_nodes.node_id, overlay_cache_nodes.node_type, overlay_cache_nodes.address, overlay_cache_nodes.protocol, overlay_cache_nodes.operator_email, overlay_cache_nodes.operator_wallet, overlay_cache_nodes.free_bandwidth, overlay_cache_nodes.free_disk, overlay_cache_nodes.latency_90, overlay_cache_nodes.audit_success_ratio, overlay_cache_nodes.audit_uptime_ratio, overlay_cache_nodes.audit_count, overlay_cache_nodes.audit_success_count, overlay_cache_nodes.uptime_count, overlay_cache_nodes.uptime_success_count FROM overlay_cache_nodes WHERE overlay_cache_nodes.node_id = ?") var __stmt_get = __sqlbundle_Render(obj.dialect, __embed_stmt_get) obj.logStmt("(IMPLIED) "+__stmt_get, __args...) - err = obj.driver.QueryRow(__stmt_get, __args...).Scan(&overlay_cache_node.Key, &overlay_cache_node.Value) + err = obj.driver.QueryRow(__stmt_get, __args...).Scan(&overlay_cache_node.NodeId, &overlay_cache_node.NodeType, &overlay_cache_node.Address, &overlay_cache_node.Protocol, &overlay_cache_node.OperatorEmail, &overlay_cache_node.OperatorWallet, &overlay_cache_node.FreeBandwidth, &overlay_cache_node.FreeDisk, &overlay_cache_node.Latency90, &overlay_cache_node.AuditSuccessRatio, &overlay_cache_node.AuditUptimeRatio, &overlay_cache_node.AuditCount, &overlay_cache_node.AuditSuccessCount, &overlay_cache_node.UptimeCount, &overlay_cache_node.UptimeSuccessCount) if err == sql.ErrNoRows { return nil, nil } @@ -3978,14 +4378,14 @@ func (obj *sqlite3Impl) Delete_Node_By_Id(ctx context.Context, } -func (obj *sqlite3Impl) Delete_OverlayCacheNode_By_Key(ctx context.Context, - overlay_cache_node_key OverlayCacheNode_Key_Field) ( +func (obj *sqlite3Impl) Delete_OverlayCacheNode_By_NodeId(ctx context.Context, + overlay_cache_node_node_id OverlayCacheNode_NodeId_Field) ( deleted bool, err error) { - var __embed_stmt = __sqlbundle_Literal("DELETE FROM overlay_cache_nodes WHERE overlay_cache_nodes.key = ?") + var __embed_stmt = __sqlbundle_Literal("DELETE FROM overlay_cache_nodes WHERE overlay_cache_nodes.node_id = ?") var __values []interface{} - __values = append(__values, overlay_cache_node_key.value()) + __values = append(__values, overlay_cache_node_node_id.value()) var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt) obj.logStmt(__stmt, __values...) @@ -4142,13 +4542,13 @@ func (obj *sqlite3Impl) getLastOverlayCacheNode(ctx context.Context, pk int64) ( overlay_cache_node *OverlayCacheNode, err error) { - var __embed_stmt = __sqlbundle_Literal("SELECT overlay_cache_nodes.key, overlay_cache_nodes.value FROM overlay_cache_nodes WHERE _rowid_ = ?") + var __embed_stmt = __sqlbundle_Literal("SELECT overlay_cache_nodes.node_id, overlay_cache_nodes.node_type, overlay_cache_nodes.address, overlay_cache_nodes.protocol, overlay_cache_nodes.operator_email, overlay_cache_nodes.operator_wallet, overlay_cache_nodes.free_bandwidth, overlay_cache_nodes.free_disk, overlay_cache_nodes.latency_90, overlay_cache_nodes.audit_success_ratio, overlay_cache_nodes.audit_uptime_ratio, overlay_cache_nodes.audit_count, overlay_cache_nodes.audit_success_count, overlay_cache_nodes.uptime_count, overlay_cache_nodes.uptime_success_count FROM overlay_cache_nodes WHERE _rowid_ = ?") var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt) obj.logStmt(__stmt, pk) overlay_cache_node = &OverlayCacheNode{} - err = obj.driver.QueryRow(__stmt, pk).Scan(&overlay_cache_node.Key, &overlay_cache_node.Value) + err = obj.driver.QueryRow(__stmt, pk).Scan(&overlay_cache_node.NodeId, &overlay_cache_node.NodeType, &overlay_cache_node.Address, &overlay_cache_node.Protocol, &overlay_cache_node.OperatorEmail, &overlay_cache_node.OperatorWallet, &overlay_cache_node.FreeBandwidth, &overlay_cache_node.FreeDisk, &overlay_cache_node.Latency90, &overlay_cache_node.AuditSuccessRatio, &overlay_cache_node.AuditUptimeRatio, &overlay_cache_node.AuditCount, &overlay_cache_node.AuditSuccessCount, &overlay_cache_node.UptimeCount, &overlay_cache_node.UptimeSuccessCount) if err != nil { return nil, obj.makeErr(err) } @@ -4456,14 +4856,27 @@ func (rx *Rx) Create_Node(ctx context.Context, } func (rx *Rx) Create_OverlayCacheNode(ctx context.Context, - overlay_cache_node_key OverlayCacheNode_Key_Field, - overlay_cache_node_value OverlayCacheNode_Value_Field) ( + overlay_cache_node_node_id OverlayCacheNode_NodeId_Field, + overlay_cache_node_node_type OverlayCacheNode_NodeType_Field, + overlay_cache_node_address OverlayCacheNode_Address_Field, + overlay_cache_node_protocol OverlayCacheNode_Protocol_Field, + overlay_cache_node_operator_email OverlayCacheNode_OperatorEmail_Field, + overlay_cache_node_operator_wallet OverlayCacheNode_OperatorWallet_Field, + overlay_cache_node_free_bandwidth OverlayCacheNode_FreeBandwidth_Field, + overlay_cache_node_free_disk OverlayCacheNode_FreeDisk_Field, + overlay_cache_node_latency_90 OverlayCacheNode_Latency90_Field, + overlay_cache_node_audit_success_ratio OverlayCacheNode_AuditSuccessRatio_Field, + overlay_cache_node_audit_uptime_ratio OverlayCacheNode_AuditUptimeRatio_Field, + overlay_cache_node_audit_count OverlayCacheNode_AuditCount_Field, + overlay_cache_node_audit_success_count OverlayCacheNode_AuditSuccessCount_Field, + overlay_cache_node_uptime_count OverlayCacheNode_UptimeCount_Field, + overlay_cache_node_uptime_success_count OverlayCacheNode_UptimeSuccessCount_Field) ( overlay_cache_node *OverlayCacheNode, err error) { var tx *Tx if tx, err = rx.getTx(ctx); err != nil { return } - return tx.Create_OverlayCacheNode(ctx, overlay_cache_node_key, overlay_cache_node_value) + return tx.Create_OverlayCacheNode(ctx, overlay_cache_node_node_id, overlay_cache_node_node_type, overlay_cache_node_address, overlay_cache_node_protocol, overlay_cache_node_operator_email, overlay_cache_node_operator_wallet, overlay_cache_node_free_bandwidth, overlay_cache_node_free_disk, overlay_cache_node_latency_90, overlay_cache_node_audit_success_ratio, overlay_cache_node_audit_uptime_ratio, overlay_cache_node_audit_count, overlay_cache_node_audit_success_count, overlay_cache_node_uptime_count, overlay_cache_node_uptime_success_count) } @@ -4538,14 +4951,14 @@ func (rx *Rx) Delete_Node_By_Id(ctx context.Context, return tx.Delete_Node_By_Id(ctx, node_id) } -func (rx *Rx) Delete_OverlayCacheNode_By_Key(ctx context.Context, - overlay_cache_node_key OverlayCacheNode_Key_Field) ( +func (rx *Rx) Delete_OverlayCacheNode_By_NodeId(ctx context.Context, + overlay_cache_node_node_id OverlayCacheNode_NodeId_Field) ( deleted bool, err error) { var tx *Tx if tx, err = rx.getTx(ctx); err != nil { return } - return tx.Delete_OverlayCacheNode_By_Key(ctx, overlay_cache_node_key) + return tx.Delete_OverlayCacheNode_By_NodeId(ctx, overlay_cache_node_node_id) } func (rx *Rx) Find_AccountingTimestamps_Value_By_Name(ctx context.Context, @@ -4617,14 +5030,14 @@ func (rx *Rx) Get_Node_By_Id(ctx context.Context, return tx.Get_Node_By_Id(ctx, node_id) } -func (rx *Rx) Get_OverlayCacheNode_By_Key(ctx context.Context, - overlay_cache_node_key OverlayCacheNode_Key_Field) ( +func (rx *Rx) Get_OverlayCacheNode_By_NodeId(ctx context.Context, + overlay_cache_node_node_id OverlayCacheNode_NodeId_Field) ( overlay_cache_node *OverlayCacheNode, err error) { var tx *Tx if tx, err = rx.getTx(ctx); err != nil { return } - return tx.Get_OverlayCacheNode_By_Key(ctx, overlay_cache_node_key) + return tx.Get_OverlayCacheNode_By_NodeId(ctx, overlay_cache_node_node_id) } func (rx *Rx) Limited_Bwagreement(ctx context.Context, @@ -4647,25 +5060,15 @@ func (rx *Rx) Limited_Injuredsegment(ctx context.Context, return tx.Limited_Injuredsegment(ctx, limit, offset) } -func (rx *Rx) Limited_OverlayCacheNode(ctx context.Context, +func (rx *Rx) Limited_OverlayCacheNode_By_NodeId_GreaterOrEqual(ctx context.Context, + overlay_cache_node_node_id_greater_or_equal OverlayCacheNode_NodeId_Field, limit int, offset int64) ( rows []*OverlayCacheNode, err error) { var tx *Tx if tx, err = rx.getTx(ctx); err != nil { return } - return tx.Limited_OverlayCacheNode(ctx, limit, offset) -} - -func (rx *Rx) Limited_OverlayCacheNode_By_Key_GreaterOrEqual(ctx context.Context, - overlay_cache_node_key_greater_or_equal OverlayCacheNode_Key_Field, - limit int, offset int64) ( - rows []*OverlayCacheNode, err error) { - var tx *Tx - if tx, err = rx.getTx(ctx); err != nil { - return - } - return tx.Limited_OverlayCacheNode_By_Key_GreaterOrEqual(ctx, overlay_cache_node_key_greater_or_equal, limit, offset) + return tx.Limited_OverlayCacheNode_By_NodeId_GreaterOrEqual(ctx, overlay_cache_node_node_id_greater_or_equal, limit, offset) } func (rx *Rx) Update_AccountingRaw_By_Id(ctx context.Context, @@ -4723,15 +5126,15 @@ func (rx *Rx) Update_Node_By_Id(ctx context.Context, return tx.Update_Node_By_Id(ctx, node_id, update) } -func (rx *Rx) Update_OverlayCacheNode_By_Key(ctx context.Context, - overlay_cache_node_key OverlayCacheNode_Key_Field, +func (rx *Rx) Update_OverlayCacheNode_By_NodeId(ctx context.Context, + overlay_cache_node_node_id OverlayCacheNode_NodeId_Field, update OverlayCacheNode_Update_Fields) ( overlay_cache_node *OverlayCacheNode, err error) { var tx *Tx if tx, err = rx.getTx(ctx); err != nil { return } - return tx.Update_OverlayCacheNode_By_Key(ctx, overlay_cache_node_key, update) + return tx.Update_OverlayCacheNode_By_NodeId(ctx, overlay_cache_node_node_id, update) } type Methods interface { @@ -4799,8 +5202,21 @@ type Methods interface { node *Node, err error) Create_OverlayCacheNode(ctx context.Context, - overlay_cache_node_key OverlayCacheNode_Key_Field, - overlay_cache_node_value OverlayCacheNode_Value_Field) ( + overlay_cache_node_node_id OverlayCacheNode_NodeId_Field, + overlay_cache_node_node_type OverlayCacheNode_NodeType_Field, + overlay_cache_node_address OverlayCacheNode_Address_Field, + overlay_cache_node_protocol OverlayCacheNode_Protocol_Field, + overlay_cache_node_operator_email OverlayCacheNode_OperatorEmail_Field, + overlay_cache_node_operator_wallet OverlayCacheNode_OperatorWallet_Field, + overlay_cache_node_free_bandwidth OverlayCacheNode_FreeBandwidth_Field, + overlay_cache_node_free_disk OverlayCacheNode_FreeDisk_Field, + overlay_cache_node_latency_90 OverlayCacheNode_Latency90_Field, + overlay_cache_node_audit_success_ratio OverlayCacheNode_AuditSuccessRatio_Field, + overlay_cache_node_audit_uptime_ratio OverlayCacheNode_AuditUptimeRatio_Field, + overlay_cache_node_audit_count OverlayCacheNode_AuditCount_Field, + overlay_cache_node_audit_success_count OverlayCacheNode_AuditSuccessCount_Field, + overlay_cache_node_uptime_count OverlayCacheNode_UptimeCount_Field, + overlay_cache_node_uptime_success_count OverlayCacheNode_UptimeSuccessCount_Field) ( overlay_cache_node *OverlayCacheNode, err error) Delete_AccountingRaw_By_Id(ctx context.Context, @@ -4831,8 +5247,8 @@ type Methods interface { node_id Node_Id_Field) ( deleted bool, err error) - Delete_OverlayCacheNode_By_Key(ctx context.Context, - overlay_cache_node_key OverlayCacheNode_Key_Field) ( + Delete_OverlayCacheNode_By_NodeId(ctx context.Context, + overlay_cache_node_node_id OverlayCacheNode_NodeId_Field) ( deleted bool, err error) Find_AccountingTimestamps_Value_By_Name(ctx context.Context, @@ -4862,8 +5278,8 @@ type Methods interface { node_id Node_Id_Field) ( node *Node, err error) - Get_OverlayCacheNode_By_Key(ctx context.Context, - overlay_cache_node_key OverlayCacheNode_Key_Field) ( + Get_OverlayCacheNode_By_NodeId(ctx context.Context, + overlay_cache_node_node_id OverlayCacheNode_NodeId_Field) ( overlay_cache_node *OverlayCacheNode, err error) Limited_Bwagreement(ctx context.Context, @@ -4874,12 +5290,8 @@ type Methods interface { limit int, offset int64) ( rows []*Injuredsegment, err error) - Limited_OverlayCacheNode(ctx context.Context, - limit int, offset int64) ( - rows []*OverlayCacheNode, err error) - - Limited_OverlayCacheNode_By_Key_GreaterOrEqual(ctx context.Context, - overlay_cache_node_key_greater_or_equal OverlayCacheNode_Key_Field, + Limited_OverlayCacheNode_By_NodeId_GreaterOrEqual(ctx context.Context, + overlay_cache_node_node_id_greater_or_equal OverlayCacheNode_NodeId_Field, limit int, offset int64) ( rows []*OverlayCacheNode, err error) @@ -4908,8 +5320,8 @@ type Methods interface { update Node_Update_Fields) ( node *Node, err error) - Update_OverlayCacheNode_By_Key(ctx context.Context, - overlay_cache_node_key OverlayCacheNode_Key_Field, + Update_OverlayCacheNode_By_NodeId(ctx context.Context, + overlay_cache_node_node_id OverlayCacheNode_NodeId_Field, update OverlayCacheNode_Update_Fields) ( overlay_cache_node *OverlayCacheNode, err error) } diff --git a/satellite/satellitedb/dbx/satellitedb.dbx.postgres.sql b/satellite/satellitedb/dbx/satellitedb.dbx.postgres.sql index b69a83f96..32fe5fed3 100644 --- a/satellite/satellitedb/dbx/satellitedb.dbx.postgres.sql +++ b/satellite/satellitedb/dbx/satellitedb.dbx.postgres.sql @@ -60,8 +60,21 @@ CREATE TABLE nodes ( PRIMARY KEY ( id ) ); CREATE TABLE overlay_cache_nodes ( - key bytea NOT NULL, - value bytea NOT NULL, - PRIMARY KEY ( key ), - UNIQUE ( key ) + node_id bytea NOT NULL, + node_type integer NOT NULL, + address text NOT NULL, + protocol integer NOT NULL, + operator_email text NOT NULL, + operator_wallet text NOT NULL, + free_bandwidth bigint NOT NULL, + free_disk bigint NOT NULL, + latency_90 bigint NOT NULL, + audit_success_ratio double precision NOT NULL, + audit_uptime_ratio double precision NOT NULL, + audit_count bigint NOT NULL, + audit_success_count bigint NOT NULL, + uptime_count bigint NOT NULL, + uptime_success_count bigint NOT NULL, + PRIMARY KEY ( node_id ), + UNIQUE ( node_id ) ); diff --git a/satellite/satellitedb/dbx/satellitedb.dbx.sqlite3.sql b/satellite/satellitedb/dbx/satellitedb.dbx.sqlite3.sql index c05850671..08aa02d49 100644 --- a/satellite/satellitedb/dbx/satellitedb.dbx.sqlite3.sql +++ b/satellite/satellitedb/dbx/satellitedb.dbx.sqlite3.sql @@ -60,8 +60,21 @@ CREATE TABLE nodes ( PRIMARY KEY ( id ) ); CREATE TABLE overlay_cache_nodes ( - key BLOB NOT NULL, - value BLOB NOT NULL, - PRIMARY KEY ( key ), - UNIQUE ( key ) + node_id BLOB NOT NULL, + node_type INTEGER NOT NULL, + address TEXT NOT NULL, + protocol INTEGER NOT NULL, + operator_email TEXT NOT NULL, + operator_wallet TEXT NOT NULL, + free_bandwidth INTEGER NOT NULL, + free_disk INTEGER NOT NULL, + latency_90 INTEGER NOT NULL, + audit_success_ratio REAL NOT NULL, + audit_uptime_ratio REAL NOT NULL, + audit_count INTEGER NOT NULL, + audit_success_count INTEGER NOT NULL, + uptime_count INTEGER NOT NULL, + uptime_success_count INTEGER NOT NULL, + PRIMARY KEY ( node_id ), + UNIQUE ( node_id ) ); diff --git a/satellite/satellitedb/locked.go b/satellite/satellitedb/locked.go index 623a577e2..33701ca66 100644 --- a/satellite/satellitedb/locked.go +++ b/satellite/satellitedb/locked.go @@ -14,11 +14,11 @@ import ( "storj.io/storj/pkg/bwagreement" "storj.io/storj/pkg/datarepair/irreparable" "storj.io/storj/pkg/datarepair/queue" + "storj.io/storj/pkg/overlay" "storj.io/storj/pkg/pb" "storj.io/storj/pkg/statdb" "storj.io/storj/pkg/storj" "storj.io/storj/satellite" - "storj.io/storj/storage" ) // locked implements a locking wrapper around satellite.DB. @@ -68,7 +68,7 @@ func (m *locked) Irreparable() irreparable.DB { } // OverlayCache returns database for caching overlay information -func (m *locked) OverlayCache() storage.KeyValueStore { +func (m *locked) OverlayCache() overlay.DB { m.Lock() defer m.Unlock() return &lockedOverlayCache{m.Locker, m.db.OverlayCache()} @@ -169,66 +169,45 @@ func (m *lockedIrreparable) IncrementRepairAttempts(ctx context.Context, segment return m.db.IncrementRepairAttempts(ctx, segmentInfo) } -// lockedOverlayCache implements locking wrapper for storage.KeyValueStore +// lockedOverlayCache implements locking wrapper for overlay.DB type lockedOverlayCache struct { sync.Locker - db storage.KeyValueStore + db overlay.DB } -// Close closes the store -func (m *lockedOverlayCache) Close() error { +// Delete deletes node based on id +func (m *lockedOverlayCache) Delete(ctx context.Context, id storj.NodeID) error { m.Lock() defer m.Unlock() - return m.db.Close() + return m.db.Delete(ctx, id) } -// Delete deletes key and the value -func (m *lockedOverlayCache) Delete(a0 storage.Key) error { +// Get looks up the node by nodeID +func (m *lockedOverlayCache) Get(ctx context.Context, nodeID storj.NodeID) (*pb.Node, error) { m.Lock() defer m.Unlock() - return m.db.Delete(a0) + return m.db.Get(ctx, nodeID) } -// Get gets a value to store -func (m *lockedOverlayCache) Get(a0 storage.Key) (storage.Value, error) { +// GetAll looks up nodes based on the ids from the overlay cache +func (m *lockedOverlayCache) GetAll(ctx context.Context, nodeIDs storj.NodeIDList) ([]*pb.Node, error) { m.Lock() defer m.Unlock() - return m.db.Get(a0) + return m.db.GetAll(ctx, nodeIDs) } -// GetAll gets all values from the store -func (m *lockedOverlayCache) GetAll(a0 storage.Keys) (storage.Values, error) { +// List lists nodes starting from cursor +func (m *lockedOverlayCache) List(ctx context.Context, cursor storj.NodeID, limit int) ([]*pb.Node, error) { m.Lock() defer m.Unlock() - return m.db.GetAll(a0) + return m.db.List(ctx, cursor, limit) } -// Iterate iterates over items based on opts -func (m *lockedOverlayCache) Iterate(opts storage.IterateOptions, fn func(storage.Iterator) error) error { +// Update updates node information +func (m *lockedOverlayCache) Update(ctx context.Context, value *pb.Node) error { m.Lock() defer m.Unlock() - return m.db.Iterate(opts, fn) -} - -// List lists all keys starting from start and upto limit items -func (m *lockedOverlayCache) List(start storage.Key, limit int) (storage.Keys, error) { - m.Lock() - defer m.Unlock() - return m.db.List(start, limit) -} - -// Put adds a value to store -func (m *lockedOverlayCache) Put(a0 storage.Key, a1 storage.Value) error { - m.Lock() - defer m.Unlock() - return m.db.Put(a0, a1) -} - -// ReverseList lists all keys in revers order -func (m *lockedOverlayCache) ReverseList(a0 storage.Key, a1 int) (storage.Keys, error) { - m.Lock() - defer m.Unlock() - return m.db.ReverseList(a0, a1) + return m.db.Update(ctx, value) } // lockedRepairQueue implements locking wrapper for queue.RepairQueue diff --git a/satellite/satellitedb/overlaycache.go b/satellite/satellitedb/overlaycache.go index db937e4a1..0d3000059 100644 --- a/satellite/satellitedb/overlaycache.go +++ b/satellite/satellitedb/overlaycache.go @@ -6,122 +6,240 @@ package satellitedb import ( "context" "database/sql" - "errors" - "storj.io/storj/pkg/utils" + "github.com/zeebo/errs" + + "storj.io/storj/pkg/overlay" + "storj.io/storj/pkg/pb" + "storj.io/storj/pkg/storj" dbx "storj.io/storj/satellite/satellitedb/dbx" "storj.io/storj/storage" ) +var _ overlay.DB = (*overlaycache)(nil) + type overlaycache struct { db *dbx.DB } -func (o *overlaycache) Put(key storage.Key, value storage.Value) error { - if key.IsZero() { - return storage.ErrEmptyKey.New("") - } - ctx := context.Background() // TODO: fix - - tx, err := o.db.Open(ctx) - if err != nil { - return Error.Wrap(err) +// Get looks up the node by nodeID +func (cache *overlaycache) Get(ctx context.Context, id storj.NodeID) (*pb.Node, error) { + if id.IsZero() { + return nil, overlay.ErrEmptyNode } - _, err = tx.Get_OverlayCacheNode_By_Key(ctx, dbx.OverlayCacheNode_Key(key)) - if err != nil { - _, err = tx.Create_OverlayCacheNode( - ctx, - dbx.OverlayCacheNode_Key(key), - dbx.OverlayCacheNode_Value(value), - ) - if err != nil { - return Error.Wrap(utils.CombineErrors(err, tx.Rollback())) - } - } else { - updateFields := dbx.OverlayCacheNode_Update_Fields{} - updateFields.Value = dbx.OverlayCacheNode_Value(value) - _, err := tx.Update_OverlayCacheNode_By_Key( - ctx, - dbx.OverlayCacheNode_Key(key), - updateFields, - ) - if err != nil { - return Error.Wrap(utils.CombineErrors(err, tx.Rollback())) - } - } - return Error.Wrap(tx.Commit()) -} - -func (o *overlaycache) Get(key storage.Key) (storage.Value, error) { - if key.IsZero() { - return nil, storage.ErrEmptyKey.New("") - } - - ctx := context.Background() // TODO: fix - - node, err := o.db.Get_OverlayCacheNode_By_Key(ctx, dbx.OverlayCacheNode_Key(key)) + node, err := cache.db.Get_OverlayCacheNode_By_NodeId(ctx, + dbx.OverlayCacheNode_NodeId(id.Bytes()), + ) if err == sql.ErrNoRows { - return nil, storage.ErrKeyNotFound.New(key.String()) + return nil, overlay.ErrNodeNotFound } if err != nil { return nil, err } - return node.Value, nil + + return convertOverlayNode(node) } -func (o *overlaycache) GetAll(keys storage.Keys) (storage.Values, error) { - values := make([]storage.Value, len(keys)) - for i, key := range keys { - value, err := o.Get(key) - if err == nil { - values[i] = value +// GetAll looks up nodes based on the ids from the overlay cache +func (cache *overlaycache) GetAll(ctx context.Context, ids storj.NodeIDList) ([]*pb.Node, error) { + infos := make([]*pb.Node, len(ids)) + for i, id := range ids { + // TODO: abort on canceled context + info, err := cache.Get(ctx, id) + if err != nil { + continue } + infos[i] = info } - return values, nil + return infos, nil } -func (o *overlaycache) Delete(key storage.Key) error { - ctx := context.Background() // TODO: fix - _, err := o.db.Delete_OverlayCacheNode_By_Key(ctx, dbx.OverlayCacheNode_Key(key)) - return err -} - -func (o *overlaycache) List(start storage.Key, limit int) (keys storage.Keys, err error) { - ctx := context.Background() // TODO: fix +// List lists nodes starting from cursor +func (cache *overlaycache) List(ctx context.Context, cursor storj.NodeID, limit int) ([]*pb.Node, error) { + // TODO: handle this nicer if limit <= 0 || limit > storage.LookupLimit { limit = storage.LookupLimit } - var rows []*dbx.OverlayCacheNode - if start == nil { - rows, err = o.db.Limited_OverlayCacheNode(ctx, limit, 0) - } else { - rows, err = o.db.Limited_OverlayCacheNode_By_Key_GreaterOrEqual(ctx, dbx.OverlayCacheNode_Key(start), limit, 0) - } + dbxInfos, err := cache.db.Limited_OverlayCacheNode_By_NodeId_GreaterOrEqual(ctx, + dbx.OverlayCacheNode_NodeId(cursor.Bytes()), + limit, 0, + ) if err != nil { - return []storage.Key{}, err + return nil, err } - keys = make([]storage.Key, len(rows)) - for i, row := range rows { - keys[i] = row.Key + infos := make([]*pb.Node, len(dbxInfos)) + for i, dbxInfo := range dbxInfos { + infos[i], err = convertOverlayNode(dbxInfo) + if err != nil { + return nil, err + } + } + return infos, nil +} + +// Update updates node information +func (cache *overlaycache) Update(ctx context.Context, info *pb.Node) (err error) { + if info == nil || info.Id.IsZero() { + return overlay.ErrEmptyNode } - return keys, nil + tx, err := cache.db.Open(ctx) + if err != nil { + return Error.Wrap(err) + } + + // TODO: use upsert + _, err = tx.Get_OverlayCacheNode_By_NodeId(ctx, + dbx.OverlayCacheNode_NodeId(info.Id.Bytes()), + ) + + address := info.Address + if address == nil { + address = &pb.NodeAddress{} + } + + metadata := info.Metadata + if metadata == nil { + metadata = &pb.NodeMetadata{} + } + + restrictions := info.Restrictions + if restrictions == nil { + restrictions = &pb.NodeRestrictions{ + FreeBandwidth: -1, + FreeDisk: -1, + } + } + + reputation := info.Reputation + if reputation == nil { + reputation = &pb.NodeStats{} + } + + if err != nil { + _, err = tx.Create_OverlayCacheNode( + ctx, + dbx.OverlayCacheNode_NodeId(info.Id.Bytes()), + + dbx.OverlayCacheNode_NodeType(int(info.Type)), + dbx.OverlayCacheNode_Address(address.Address), + dbx.OverlayCacheNode_Protocol(int(address.Transport)), + + dbx.OverlayCacheNode_OperatorEmail(metadata.Email), + dbx.OverlayCacheNode_OperatorWallet(metadata.Wallet), + + dbx.OverlayCacheNode_FreeBandwidth(restrictions.FreeBandwidth), + dbx.OverlayCacheNode_FreeDisk(restrictions.FreeDisk), + + dbx.OverlayCacheNode_Latency90(reputation.Latency_90), + dbx.OverlayCacheNode_AuditSuccessRatio(reputation.AuditSuccessRatio), + dbx.OverlayCacheNode_AuditUptimeRatio(reputation.UptimeRatio), + dbx.OverlayCacheNode_AuditCount(reputation.AuditCount), + dbx.OverlayCacheNode_AuditSuccessCount(reputation.AuditSuccessCount), + + dbx.OverlayCacheNode_UptimeCount(reputation.UptimeCount), + dbx.OverlayCacheNode_UptimeSuccessCount(reputation.UptimeSuccessCount), + ) + if err != nil { + return Error.Wrap(errs.Combine(err, tx.Rollback())) + } + } else { + update := dbx.OverlayCacheNode_Update_Fields{ + // TODO: should we be able to update node type? + Address: dbx.OverlayCacheNode_Address(address.Address), + Protocol: dbx.OverlayCacheNode_Protocol(int(address.Transport)), + + Latency90: dbx.OverlayCacheNode_Latency90(info.Reputation.Latency_90), + AuditSuccessRatio: dbx.OverlayCacheNode_AuditSuccessRatio(info.Reputation.AuditSuccessRatio), + AuditUptimeRatio: dbx.OverlayCacheNode_AuditUptimeRatio(info.Reputation.UptimeRatio), + AuditCount: dbx.OverlayCacheNode_AuditCount(info.Reputation.AuditCount), + AuditSuccessCount: dbx.OverlayCacheNode_AuditSuccessCount(info.Reputation.AuditSuccessCount), + UptimeCount: dbx.OverlayCacheNode_UptimeCount(info.Reputation.UptimeCount), + UptimeSuccessCount: dbx.OverlayCacheNode_UptimeSuccessCount(info.Reputation.UptimeSuccessCount), + } + + if info.Metadata != nil { + update.OperatorEmail = dbx.OverlayCacheNode_OperatorEmail(info.Metadata.Email) + update.OperatorWallet = dbx.OverlayCacheNode_OperatorWallet(info.Metadata.Wallet) + } + + if info.Restrictions != nil { + update.FreeBandwidth = dbx.OverlayCacheNode_FreeBandwidth(restrictions.FreeBandwidth) + update.FreeDisk = dbx.OverlayCacheNode_FreeDisk(restrictions.FreeDisk) + } + + _, err := tx.Update_OverlayCacheNode_By_NodeId(ctx, + dbx.OverlayCacheNode_NodeId(info.Id.Bytes()), + update, + ) + if err != nil { + return Error.Wrap(errs.Combine(err, tx.Rollback())) + } + } + + return Error.Wrap(tx.Commit()) } -// ReverseList lists all keys in revers order -func (o *overlaycache) ReverseList(start storage.Key, limit int) (storage.Keys, error) { - return nil, errors.New("not implemented") +// Delete deletes node based on id +func (cache *overlaycache) Delete(ctx context.Context, id storj.NodeID) error { + _, err := cache.db.Delete_OverlayCacheNode_By_NodeId(ctx, + dbx.OverlayCacheNode_NodeId(id.Bytes()), + ) + return err } -// Iterate iterates over items based on opts -func (o *overlaycache) Iterate(opts storage.IterateOptions, fn func(storage.Iterator) error) error { - return errors.New("not implemented") -} +func convertOverlayNode(info *dbx.OverlayCacheNode) (*pb.Node, error) { + if info == nil { + return nil, Error.New("missing info") + } -// Close closes the store -func (o *overlaycache) Close() error { - return errors.New("not implemented") + id, err := storj.NodeIDFromBytes(info.NodeId) + if err != nil { + return nil, err + } + + node := &pb.Node{ + Id: id, + Type: pb.NodeType(info.NodeType), + Address: &pb.NodeAddress{ + Address: info.Address, + Transport: pb.NodeTransport(info.Protocol), + }, + Metadata: &pb.NodeMetadata{ + Email: info.OperatorEmail, + Wallet: info.OperatorWallet, + }, + Restrictions: &pb.NodeRestrictions{ + FreeBandwidth: info.FreeBandwidth, + FreeDisk: info.FreeDisk, + }, + Reputation: &pb.NodeStats{ + NodeId: id, + Latency_90: info.Latency90, + AuditSuccessRatio: info.AuditSuccessRatio, + UptimeRatio: info.AuditUptimeRatio, + AuditCount: info.AuditCount, + AuditSuccessCount: info.AuditSuccessCount, + UptimeCount: info.UptimeCount, + UptimeSuccessCount: info.UptimeSuccessCount, + }, + } + + if node.Address.Address == "" { + node.Address = nil + } + if node.Metadata.Email == "" && node.Metadata.Wallet == "" { + node.Metadata = nil + } + if node.Restrictions.FreeBandwidth < 0 && node.Restrictions.FreeDisk < 0 { + node.Restrictions = nil + } + if node.Reputation.Latency_90 < 0 { + node.Reputation = nil + } + + return node, nil } diff --git a/satellite/satellitedb/satellitedbtest/utils.go b/satellite/satellitedb/satellitedbtest/utils.go index 4bf2df6c7..024b6153b 100644 --- a/satellite/satellitedb/satellitedbtest/utils.go +++ b/satellite/satellitedb/satellitedbtest/utils.go @@ -17,7 +17,7 @@ import ( const ( // postgres connstring that works with docker-compose defaultPostgresConn = "postgres://storj:storj-pass@test-postgres/teststorj?sslmode=disable" - defaultSqliteConn = "sqlite3://file::memory:?mode=memory&cache=shared" + defaultSqliteConn = "sqlite3://file::memory:?mode=memory" ) var (