Change overlay cache database interface (#1047)

This commit is contained in:
Egon Elbre 2019-01-15 11:08:45 -05:00 committed by GitHub
parent ab0c18de3a
commit f8906ce000
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 1045 additions and 568 deletions

View File

@ -182,7 +182,7 @@ func (node *Node) initOverlay(planet *Planet) error {
node.Kademlia = kad
node.StatDB = node.Database.StatDB()
node.Overlay = overlay.NewCache(teststore.New(), node.StatDB)
node.Overlay = overlay.NewCache(node.Database.OverlayCache(), node.StatDB)
node.Discovery = discovery.NewDiscovery(node.Log.Named("discovery"), node.Overlay, node.Kademlia, node.StatDB)
return nil

View File

@ -4,21 +4,22 @@
package audit
import (
"context"
"crypto/rand"
"errors"
"math"
"math/big"
"reflect"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"storj.io/storj/internal/testidentity"
"storj.io/storj/internal/testcontext"
"storj.io/storj/internal/testplanet"
"storj.io/storj/internal/teststorj"
"storj.io/storj/pkg/auth"
"storj.io/storj/pkg/overlay"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/pointerdb"
"storj.io/storj/pkg/storage/meta"
@ -27,7 +28,6 @@ import (
)
var (
ctx = context.Background()
ErrNoList = errors.New("list error: failed to get list")
ErrNoNum = errors.New("num error: failed to get num")
)
@ -38,10 +38,17 @@ func TestAuditSegment(t *testing.T) {
count int
}
ca, err := testidentity.NewTestCA(ctx)
assert.NoError(t, err)
identity, err := ca.NewIdentity()
assert.NoError(t, err)
tctx := testcontext.New(t)
defer tctx.Cleanup()
planet, err := testplanet.New(t, 1, 4, 1)
require.NoError(t, err)
defer tctx.Check(planet.Shutdown)
planet.Start(tctx)
// we wait a second for all the nodes to complete bootstrapping off the satellite
time.Sleep(2 * time.Second)
// note: to simulate better,
// change limit in library to 5 in
@ -92,15 +99,15 @@ func TestAuditSegment(t *testing.T) {
},
}
ctx = auth.WithAPIKey(ctx, nil)
ctx := auth.WithAPIKey(tctx, nil)
// PointerDB instantiation
db := teststore.New()
c := pointerdb.Config{MaxInlineSegmentSize: 8000}
cache := overlay.NewCache(teststore.New(), nil)
pointers := pointerdb.NewServer(db, cache, zap.NewNop(), c, identity)
//TODO: use planet PointerDB directly
cache := planet.Satellites[0].Overlay
pointers := pointerdb.NewServer(db, cache, zap.NewNop(), c, planet.Satellites[0].Identity)
// create a pdb client and instance of audit
cursor := NewCursor(pointers)

View File

@ -5,8 +5,8 @@ package overlay
import (
"context"
"errors"
"github.com/gogo/protobuf/proto"
"github.com/zeebo/errs"
"go.uber.org/zap"
@ -21,9 +21,6 @@ const (
OverlayBucket = "overlay"
)
// ErrDelete is returned when there is a problem deleting a node from the cache
var ErrDelete = errs.New("error deleting node")
// ErrEmptyNode is returned when the nodeID is empty
var ErrEmptyNode = errs.New("empty node ID")
@ -36,20 +33,35 @@ var ErrBucketNotFound = errs.New("Bucket not found")
// OverlayError creates class of errors for stack traces
var OverlayError = errs.Class("Overlay Error")
// DB implements the database for overlay.Cache
type DB interface {
// Get looks up the node by nodeID
Get(ctx context.Context, nodeID storj.NodeID) (*pb.Node, error)
// GetAll looks up nodes based on the ids from the overlay cache
GetAll(ctx context.Context, nodeIDs storj.NodeIDList) ([]*pb.Node, error)
// List lists nodes starting from cursor
List(ctx context.Context, cursor storj.NodeID, limit int) ([]*pb.Node, error)
// Update updates node information
Update(ctx context.Context, value *pb.Node) error
// Delete deletes node based on id
Delete(ctx context.Context, id storj.NodeID) error
}
// Cache is used to store overlay data in Redis
type Cache struct {
db storage.KeyValueStore
db DB
statDB statdb.DB
}
// NewCache returns a new Cache
func NewCache(db storage.KeyValueStore, sdb statdb.DB) *Cache {
func NewCache(db DB, sdb statdb.DB) *Cache {
return &Cache{db: db, statDB: sdb}
}
// Inspect lists limited number of items in the cache
func (cache *Cache) Inspect(ctx context.Context) (storage.Keys, error) {
return cache.db.List(nil, 0)
// TODO: implement inspection tools
return nil, errors.New("not implemented")
}
// Get looks up the provided nodeID from the overlay cache
@ -58,51 +70,16 @@ func (cache *Cache) Get(ctx context.Context, nodeID storj.NodeID) (*pb.Node, err
return nil, ErrEmptyNode
}
b, err := cache.db.Get(nodeID.Bytes())
if err != nil {
if storage.ErrKeyNotFound.Has(err) {
return nil, ErrNodeNotFound
}
return nil, err
}
if b == nil {
return nil, ErrNodeNotFound
}
na := &pb.Node{}
if err := proto.Unmarshal(b, na); err != nil {
return nil, err
}
return na, nil
return cache.db.Get(ctx, nodeID)
}
// GetAll looks up the provided nodeIDs from the overlay cache
func (cache *Cache) GetAll(ctx context.Context, nodeIDs storj.NodeIDList) ([]*pb.Node, error) {
if len(nodeIDs) == 0 {
return nil, OverlayError.New("no nodeIDs provided")
// GetAll looks up the provided ids from the overlay cache
func (cache *Cache) GetAll(ctx context.Context, ids storj.NodeIDList) ([]*pb.Node, error) {
if len(ids) == 0 {
return nil, OverlayError.New("no ids provided")
}
var ks storage.Keys
for _, v := range nodeIDs {
ks = append(ks, v.Bytes())
}
vs, err := cache.db.GetAll(ks)
if err != nil {
return nil, err
}
var ns []*pb.Node
for _, v := range vs {
if v == nil {
ns = append(ns, nil)
continue
}
na := &pb.Node{}
err := proto.Unmarshal(v, na)
if err != nil {
return nil, OverlayError.New("could not unmarshal non-nil node: %v", err)
}
ns = append(ns, na)
}
return ns, nil
return cache.db.GetAll(ctx, ids)
}
// Put adds a nodeID to the redis cache with a binary representation of proto defined Node
@ -112,12 +89,16 @@ func (cache *Cache) Put(ctx context.Context, nodeID storj.NodeID, value pb.Node)
if nodeID.IsZero() {
return nil
}
if nodeID != value.Id {
return errors.New("invalid request")
}
// get existing node rep, or create a new statdb node with 0 rep
stats, err := cache.statDB.CreateEntryIfNotExists(ctx, nodeID)
if err != nil {
return err
}
value.Reputation = &pb.NodeStats{
AuditSuccessRatio: stats.AuditSuccessRatio,
AuditSuccessCount: stats.AuditSuccessCount,
@ -127,12 +108,7 @@ func (cache *Cache) Put(ctx context.Context, nodeID storj.NodeID, value pb.Node)
UptimeCount: stats.UptimeCount,
}
data, err := proto.Marshal(&value)
if err != nil {
return err
}
return cache.db.Put(nodeID.Bytes(), data)
return cache.db.Update(ctx, &value)
}
// Delete will remove the node from the cache. Used when a node hard disconnects or fails
@ -141,13 +117,7 @@ func (cache *Cache) Delete(ctx context.Context, id storj.NodeID) error {
if id.IsZero() {
return ErrEmptyNode
}
err := cache.db.Delete(id.Bytes())
if err != nil {
return ErrDelete
}
return nil
return cache.db.Delete(ctx, id)
}
// ConnFailure implements the Transport Observer `ConnFailure` function

View File

@ -11,18 +11,24 @@ import (
"github.com/stretchr/testify/assert"
"storj.io/storj/internal/testcontext"
"storj.io/storj/internal/testplanet"
"storj.io/storj/pkg/overlay"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/statdb"
"storj.io/storj/pkg/storj"
"storj.io/storj/satellite"
"storj.io/storj/satellite/satellitedb/satellitedbtest"
"storj.io/storj/storage"
"storj.io/storj/storage/teststore"
)
func testCache(ctx context.Context, t *testing.T, store storage.KeyValueStore, sdb statdb.DB) {
func TestCache_Database(t *testing.T) {
satellitedbtest.Run(t, func(t *testing.T, db satellite.DB) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
testCache(ctx, t, db.OverlayCache(), db.StatDB())
})
}
func testCache(ctx context.Context, t *testing.T, store overlay.DB, sdb statdb.DB) {
valid1ID := storj.NodeID{}
valid2ID := storj.NodeID{}
missingID := storj.NodeID{}
@ -65,11 +71,7 @@ func testCache(ctx context.Context, t *testing.T, store storage.KeyValueStore, s
assert.True(t, err == overlay.ErrNodeNotFound)
assert.Nil(t, invalid2)
if storeClient, ok := store.(*teststore.Client); ok {
storeClient.ForceError++
_, err := cache.Get(ctx, valid1ID)
assert.Error(t, err)
}
// TODO: add erroring database test
}
{ // GetAll
@ -92,11 +94,7 @@ func testCache(ctx context.Context, t *testing.T, store storage.KeyValueStore, s
_, err = cache.GetAll(ctx, storj.NodeIDList{})
assert.True(t, overlay.OverlayError.Has(err))
if storeClient, ok := store.(*teststore.Client); ok {
storeClient.ForceError++
_, err := cache.GetAll(ctx, storj.NodeIDList{valid1ID, valid2ID})
assert.Error(t, err)
}
// TODO: add erroring database test
}
{ // Delete
@ -120,19 +118,3 @@ func testCache(ctx context.Context, t *testing.T, store storage.KeyValueStore, s
assert.True(t, err == overlay.ErrEmptyNode)
}
}
func TestCache_Masterdb(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
planet, err := testplanet.New(t, 1, 4, 0)
if err != nil {
t.Fatal(err)
}
defer ctx.Check(planet.Shutdown)
planet.Start(ctx)
satellitedbtest.Run(t, func(t *testing.T, db satellite.DB) {
testCache(ctx, t, db.OverlayCache(), db.StatDB())
})
}

View File

@ -8,6 +8,7 @@ import (
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"storj.io/storj/internal/testcontext"
"storj.io/storj/internal/testidentity"
@ -43,6 +44,22 @@ func TestNewClient(t *testing.T) {
}
func TestChoose(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
planet, err := testplanet.New(t, 1, 4, 1)
require.NoError(t, err)
planet.Start(ctx)
// we wait a second for all the nodes to complete bootstrapping off the satellite
time.Sleep(2 * time.Second)
defer ctx.Check(planet.Shutdown)
oc, err := planet.Uplinks[0].DialOverlay(planet.Satellites[0])
if err != nil {
t.Fatal(err)
}
n1 := &pb.Node{Id: storj.NodeID{1}, Type: pb.NodeType_STORAGE}
n2 := &pb.Node{Id: storj.NodeID{2}, Type: pb.NodeType_STORAGE}
n3 := &pb.Node{Id: storj.NodeID{3}, Type: pb.NodeType_STORAGE}
@ -57,13 +74,6 @@ func TestChoose(t *testing.T) {
id3 := storj.NodeID{3}
id4 := storj.NodeID{4}
ctx := testcontext.New(t)
defer ctx.Cleanup()
planet, cleanup := getPlanet(ctx, t)
defer cleanup()
oc := getOverlayClient(t, planet)
cases := []struct {
limit int
space int64
@ -122,9 +132,18 @@ func TestLookup(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
planet, cleanup := getPlanet(ctx, t)
defer cleanup()
oc := getOverlayClient(t, planet)
planet, err := testplanet.New(t, 1, 4, 1)
require.NoError(t, err)
planet.Start(ctx)
// we wait a second for all the nodes to complete bootstrapping off the satellite
time.Sleep(2 * time.Second)
defer ctx.Check(planet.Shutdown)
oc, err := planet.Uplinks[0].DialOverlay(planet.Satellites[0])
if err != nil {
t.Fatal(err)
}
nid1 := planet.StorageNodes[0].ID()
@ -158,9 +177,18 @@ func TestBulkLookup(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
planet, cleanup := getPlanet(ctx, t)
defer cleanup()
oc := getOverlayClient(t, planet)
planet, err := testplanet.New(t, 1, 4, 1)
require.NoError(t, err)
planet.Start(ctx)
// we wait a second for all the nodes to complete bootstrapping off the satellite
time.Sleep(2 * time.Second)
defer ctx.Check(planet.Shutdown)
oc, err := planet.Uplinks[0].DialOverlay(planet.Satellites[0])
if err != nil {
t.Fatal(err)
}
nid1 := planet.StorageNodes[0].ID()
nid2 := planet.StorageNodes[1].ID()
@ -189,9 +217,18 @@ func TestBulkLookupV2(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
planet, cleanup := getPlanet(ctx, t)
defer cleanup()
oc := getOverlayClient(t, planet)
planet, err := testplanet.New(t, 1, 4, 1)
require.NoError(t, err)
planet.Start(ctx)
// we wait a second for all the nodes to complete bootstrapping off the satellite
time.Sleep(2 * time.Second)
defer ctx.Check(planet.Shutdown)
oc, err := planet.Uplinks[0].DialOverlay(planet.Satellites[0])
if err != nil {
t.Fatal(err)
}
cache := planet.Satellites[0].Overlay
@ -248,29 +285,3 @@ func TestBulkLookupV2(t *testing.T) {
}
}
}
func getPlanet(ctx *testcontext.Context, t *testing.T) (planet *testplanet.Planet, f func()) {
planet, err := testplanet.New(t, 1, 4, 1)
if err != nil {
t.Fatal(err)
}
planet.Start(ctx)
// we wait a second for all the nodes to complete bootstrapping off the satellite
time.Sleep(2 * time.Second)
f = func() {
ctx.Check(planet.Shutdown)
}
return planet, f
}
func getOverlayClient(t *testing.T, planet *testplanet.Planet) (oc overlay.Client) {
oc, err := planet.Uplinks[0].DialOverlay(planet.Satellites[0])
if err != nil {
t.Fatal(err)
}
return oc
}

View File

@ -17,7 +17,6 @@ import (
"storj.io/storj/pkg/statdb"
"storj.io/storj/pkg/storj"
"storj.io/storj/pkg/utils"
"storj.io/storj/storage"
)
var (
@ -64,7 +63,7 @@ func (c Config) Run(ctx context.Context, server *provider.Provider) (
sdb, ok := ctx.Value("masterdb").(interface {
StatDB() statdb.DB
OverlayCache() storage.KeyValueStore
OverlayCache() DB
})
if !ok {
return Error.Wrap(errs.New("unable to get master db instance"))

View File

@ -1,19 +0,0 @@
// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information.
package overlay
import (
"context"
"testing"
"github.com/stretchr/testify/assert"
)
func TestRun(t *testing.T) {
ctx := context.Background()
// run with nil, pass pointer to Kademlia in context
err := Config{}.Run(ctx, nil)
assert.Error(t, err)
assert.Equal(t, "overlay error: unable to get master db instance", err.Error())
}

View File

@ -8,7 +8,6 @@ import (
"context"
"fmt"
"github.com/gogo/protobuf/proto"
"github.com/zeebo/errs"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
@ -17,7 +16,6 @@ import (
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/storj"
"storj.io/storj/storage"
)
// ServerError creates class of errors for stack traces
@ -120,52 +118,31 @@ func (server *Server) FindStorageNodes(ctx context.Context, req *pb.FindStorageN
}, nil
}
func (server *Server) getNodes(ctx context.Context, keys storage.Keys) ([]*pb.Node, error) {
values, err := server.cache.db.GetAll(keys)
if err != nil {
return nil, Error.Wrap(err)
}
nodes := []*pb.Node{}
for _, v := range values {
n := &pb.Node{}
if err := proto.Unmarshal(v, n); err != nil {
return nil, Error.Wrap(err)
}
nodes = append(nodes, n)
}
return nodes, nil
}
func (server *Server) populate(ctx context.Context, startID storj.NodeID, maxNodes int64,
minRestrictions *pb.NodeRestrictions, minReputation *pb.NodeStats,
// TODO: nicer method arguments
func (server *Server) populate(ctx context.Context,
startID storj.NodeID, maxNodes int64,
minRestrictions *pb.NodeRestrictions,
minReputation *pb.NodeStats,
excluded storj.NodeIDList) ([]*pb.Node, storj.NodeID, error) {
// TODO: move the query into db
limit := int(maxNodes * 2)
keys, err := server.cache.db.List(startID.Bytes(), limit)
nodes, err := server.cache.db.List(ctx, startID, limit)
if err != nil {
server.log.Error("Error listing nodes", zap.Error(err))
return nil, storj.NodeID{}, Error.Wrap(err)
}
if len(keys) <= 0 {
server.log.Info("No Keys returned from List operation")
return []*pb.Node{}, startID, nil
}
// TODO: should this be `var result []*pb.Node` ?
var nextStart storj.NodeID
result := []*pb.Node{}
nodes, err := server.getNodes(ctx, keys)
if err != nil {
server.log.Error("Error getting nodes", zap.Error(err))
return nil, storj.NodeID{}, Error.Wrap(err)
}
for _, v := range nodes {
if v == nil {
continue
}
nextStart = v.Id
if v.Type != pb.NodeType_STORAGE {
server.log.Debug("not storage node = " + v.Id.String() + " was " + v.Type.String())
continue
}
@ -179,19 +156,12 @@ func (server *Server) populate(ctx context.Context, startID storj.NodeID, maxNod
reputation.GetAuditSuccessRatio() < minReputation.GetAuditSuccessRatio() ||
reputation.GetAuditCount() < minReputation.GetAuditCount() ||
contains(excluded, v.Id) {
server.log.Debug("excluded = " + v.Id.String())
continue
}
result = append(result, v)
}
var nextStart storj.NodeID
if len(keys) < limit {
nextStart = storj.NodeID{}
} else {
nextStart, err = storj.NodeIDFromBytes(keys[len(keys)-1])
}
if err != nil {
return nil, storj.NodeID{}, Error.Wrap(err)
server.log.Debug("append " + v.Id.String() + " - " + v.Type.String())
result = append(result, v)
}
return result, nextStart, nil

View File

@ -8,6 +8,7 @@ import (
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"storj.io/storj/internal/testcontext"
"storj.io/storj/internal/testplanet"
@ -34,17 +35,21 @@ func TestServer(t *testing.T) {
// TODO: handle cleanup
{ // FindStorageNodes
result, err := server.FindStorageNodes(ctx, &pb.FindStorageNodesRequest{Opts: &pb.OverlayOptions{Amount: 2}})
if assert.NoError(t, err) && assert.NotNil(t, result) {
assert.Len(t, result.Nodes, 2)
}
result, err := server.FindStorageNodes(ctx, &pb.FindStorageNodesRequest{
Opts: &pb.OverlayOptions{Amount: 2},
})
require.NoError(t, err)
require.NotNil(t, result)
assert.Len(t, result.Nodes, 2)
}
{ // Lookup
result, err := server.Lookup(ctx, &pb.LookupRequest{NodeId: planet.StorageNodes[0].ID()})
if assert.NoError(t, err) && assert.NotNil(t, result) {
assert.Equal(t, result.Node.Address.Address, planet.StorageNodes[0].Addr())
}
result, err := server.Lookup(ctx, &pb.LookupRequest{
NodeId: planet.StorageNodes[0].ID(),
})
require.NoError(t, err)
require.NotNil(t, result)
assert.Equal(t, result.Node.Address.Address, planet.StorageNodes[0].Addr())
}
{ // BulkLookup
@ -56,11 +61,13 @@ func TestServer(t *testing.T) {
},
})
if assert.NoError(t, err) && assert.NotNil(t, result) && assert.Len(t, result.LookupResponse, 3) {
for i, resp := range result.LookupResponse {
if assert.NotNil(t, resp.Node) {
assert.Equal(t, resp.Node.Address.Address, planet.StorageNodes[i].Addr())
}
require.NoError(t, err)
require.NotNil(t, result)
require.Len(t, result.LookupResponse, 3)
for i, resp := range result.LookupResponse {
if assert.NotNil(t, resp.Node) {
assert.Equal(t, resp.Node.Address.Address, planet.StorageNodes[i].Addr())
}
}
}

View File

@ -8,8 +8,8 @@ import (
"storj.io/storj/pkg/bwagreement"
"storj.io/storj/pkg/datarepair/irreparable"
"storj.io/storj/pkg/datarepair/queue"
"storj.io/storj/pkg/overlay"
"storj.io/storj/pkg/statdb"
"storj.io/storj/storage"
)
// DB is the master database for the satellite
@ -24,7 +24,7 @@ type DB interface {
// StatDB returns database for storing node statistics
StatDB() statdb.DB
// OverlayCache returns database for caching overlay information
OverlayCache() storage.KeyValueStore
OverlayCache() overlay.DB
// Accounting returns database for storing information about data use
Accounting() accounting.DB
// RepairQueue returns queue for segments that need repairing

View File

@ -11,11 +11,11 @@ import (
"storj.io/storj/pkg/bwagreement"
"storj.io/storj/pkg/datarepair/irreparable"
"storj.io/storj/pkg/datarepair/queue"
"storj.io/storj/pkg/overlay"
"storj.io/storj/pkg/statdb"
"storj.io/storj/pkg/utils"
"storj.io/storj/satellite"
dbx "storj.io/storj/satellite/satellitedb/dbx"
"storj.io/storj/storage"
)
var (
@ -52,7 +52,7 @@ func New(databaseURL string) (satellite.DB, error) {
// NewInMemory creates instance of Sqlite in memory satellite database
func NewInMemory() (satellite.DB, error) {
return New("sqlite3://file::memory:?mode=memory&cache=shared")
return New("sqlite3://file::memory:?mode=memory")
}
// BandwidthAgreement is a getter for bandwidth agreement repository
@ -71,7 +71,7 @@ func (db *DB) StatDB() statdb.DB {
}
// OverlayCache is a getter for overlay cache repository
func (db *DB) OverlayCache() storage.KeyValueStore {
func (db *DB) OverlayCache() overlay.DB {
return &overlaycache{db: db.db}
}

View File

@ -156,31 +156,46 @@ read one (
//--- overlaycache ---//
model overlay_cache_node (
key key
unique key
key node_id
unique node_id
field key blob
field value blob ( updatable )
field node_id blob
field node_type int
field address text (updatable) // TODO: use compressed format
field protocol int (updatable)
field operator_email text (updatable)
field operator_wallet text (updatable) //TODO: use compressed format
field free_bandwidth int64 (updatable)
field free_disk int64 (updatable)
field latency_90 int64 (updatable)
field audit_success_ratio float64 (updatable)
field audit_uptime_ratio float64 (updatable)
field audit_count int64 (updatable)
field audit_success_count int64 (updatable)
field uptime_count int64 (updatable)
field uptime_success_count int64 (updatable)
)
create overlay_cache_node ( )
read one (
select overlay_cache_node
where overlay_cache_node.key = ?
where overlay_cache_node.node_id = ?
)
read limitoffset (
select overlay_cache_node
where overlay_cache_node.node_id >= ?
)
read limitoffset (
select overlay_cache_node
where overlay_cache_node.key >= ?
)
update overlay_cache_node ( where overlay_cache_node.key = ? )
delete overlay_cache_node ( where overlay_cache_node.key = ? )
update overlay_cache_node ( where overlay_cache_node.node_id = ? )
delete overlay_cache_node ( where overlay_cache_node.node_id = ? )
//--- repairqueue ---//

File diff suppressed because it is too large Load Diff

View File

@ -60,8 +60,21 @@ CREATE TABLE nodes (
PRIMARY KEY ( id )
);
CREATE TABLE overlay_cache_nodes (
key bytea NOT NULL,
value bytea NOT NULL,
PRIMARY KEY ( key ),
UNIQUE ( key )
node_id bytea NOT NULL,
node_type integer NOT NULL,
address text NOT NULL,
protocol integer NOT NULL,
operator_email text NOT NULL,
operator_wallet text NOT NULL,
free_bandwidth bigint NOT NULL,
free_disk bigint NOT NULL,
latency_90 bigint NOT NULL,
audit_success_ratio double precision NOT NULL,
audit_uptime_ratio double precision NOT NULL,
audit_count bigint NOT NULL,
audit_success_count bigint NOT NULL,
uptime_count bigint NOT NULL,
uptime_success_count bigint NOT NULL,
PRIMARY KEY ( node_id ),
UNIQUE ( node_id )
);

View File

@ -60,8 +60,21 @@ CREATE TABLE nodes (
PRIMARY KEY ( id )
);
CREATE TABLE overlay_cache_nodes (
key BLOB NOT NULL,
value BLOB NOT NULL,
PRIMARY KEY ( key ),
UNIQUE ( key )
node_id BLOB NOT NULL,
node_type INTEGER NOT NULL,
address TEXT NOT NULL,
protocol INTEGER NOT NULL,
operator_email TEXT NOT NULL,
operator_wallet TEXT NOT NULL,
free_bandwidth INTEGER NOT NULL,
free_disk INTEGER NOT NULL,
latency_90 INTEGER NOT NULL,
audit_success_ratio REAL NOT NULL,
audit_uptime_ratio REAL NOT NULL,
audit_count INTEGER NOT NULL,
audit_success_count INTEGER NOT NULL,
uptime_count INTEGER NOT NULL,
uptime_success_count INTEGER NOT NULL,
PRIMARY KEY ( node_id ),
UNIQUE ( node_id )
);

View File

@ -14,11 +14,11 @@ import (
"storj.io/storj/pkg/bwagreement"
"storj.io/storj/pkg/datarepair/irreparable"
"storj.io/storj/pkg/datarepair/queue"
"storj.io/storj/pkg/overlay"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/statdb"
"storj.io/storj/pkg/storj"
"storj.io/storj/satellite"
"storj.io/storj/storage"
)
// locked implements a locking wrapper around satellite.DB.
@ -68,7 +68,7 @@ func (m *locked) Irreparable() irreparable.DB {
}
// OverlayCache returns database for caching overlay information
func (m *locked) OverlayCache() storage.KeyValueStore {
func (m *locked) OverlayCache() overlay.DB {
m.Lock()
defer m.Unlock()
return &lockedOverlayCache{m.Locker, m.db.OverlayCache()}
@ -169,66 +169,45 @@ func (m *lockedIrreparable) IncrementRepairAttempts(ctx context.Context, segment
return m.db.IncrementRepairAttempts(ctx, segmentInfo)
}
// lockedOverlayCache implements locking wrapper for storage.KeyValueStore
// lockedOverlayCache implements locking wrapper for overlay.DB
type lockedOverlayCache struct {
sync.Locker
db storage.KeyValueStore
db overlay.DB
}
// Close closes the store
func (m *lockedOverlayCache) Close() error {
// Delete deletes node based on id
func (m *lockedOverlayCache) Delete(ctx context.Context, id storj.NodeID) error {
m.Lock()
defer m.Unlock()
return m.db.Close()
return m.db.Delete(ctx, id)
}
// Delete deletes key and the value
func (m *lockedOverlayCache) Delete(a0 storage.Key) error {
// Get looks up the node by nodeID
func (m *lockedOverlayCache) Get(ctx context.Context, nodeID storj.NodeID) (*pb.Node, error) {
m.Lock()
defer m.Unlock()
return m.db.Delete(a0)
return m.db.Get(ctx, nodeID)
}
// Get gets a value to store
func (m *lockedOverlayCache) Get(a0 storage.Key) (storage.Value, error) {
// GetAll looks up nodes based on the ids from the overlay cache
func (m *lockedOverlayCache) GetAll(ctx context.Context, nodeIDs storj.NodeIDList) ([]*pb.Node, error) {
m.Lock()
defer m.Unlock()
return m.db.Get(a0)
return m.db.GetAll(ctx, nodeIDs)
}
// GetAll gets all values from the store
func (m *lockedOverlayCache) GetAll(a0 storage.Keys) (storage.Values, error) {
// List lists nodes starting from cursor
func (m *lockedOverlayCache) List(ctx context.Context, cursor storj.NodeID, limit int) ([]*pb.Node, error) {
m.Lock()
defer m.Unlock()
return m.db.GetAll(a0)
return m.db.List(ctx, cursor, limit)
}
// Iterate iterates over items based on opts
func (m *lockedOverlayCache) Iterate(opts storage.IterateOptions, fn func(storage.Iterator) error) error {
// Update updates node information
func (m *lockedOverlayCache) Update(ctx context.Context, value *pb.Node) error {
m.Lock()
defer m.Unlock()
return m.db.Iterate(opts, fn)
}
// List lists all keys starting from start and upto limit items
func (m *lockedOverlayCache) List(start storage.Key, limit int) (storage.Keys, error) {
m.Lock()
defer m.Unlock()
return m.db.List(start, limit)
}
// Put adds a value to store
func (m *lockedOverlayCache) Put(a0 storage.Key, a1 storage.Value) error {
m.Lock()
defer m.Unlock()
return m.db.Put(a0, a1)
}
// ReverseList lists all keys in revers order
func (m *lockedOverlayCache) ReverseList(a0 storage.Key, a1 int) (storage.Keys, error) {
m.Lock()
defer m.Unlock()
return m.db.ReverseList(a0, a1)
return m.db.Update(ctx, value)
}
// lockedRepairQueue implements locking wrapper for queue.RepairQueue

View File

@ -6,122 +6,240 @@ package satellitedb
import (
"context"
"database/sql"
"errors"
"storj.io/storj/pkg/utils"
"github.com/zeebo/errs"
"storj.io/storj/pkg/overlay"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/storj"
dbx "storj.io/storj/satellite/satellitedb/dbx"
"storj.io/storj/storage"
)
var _ overlay.DB = (*overlaycache)(nil)
type overlaycache struct {
db *dbx.DB
}
func (o *overlaycache) Put(key storage.Key, value storage.Value) error {
if key.IsZero() {
return storage.ErrEmptyKey.New("")
}
ctx := context.Background() // TODO: fix
tx, err := o.db.Open(ctx)
if err != nil {
return Error.Wrap(err)
// Get looks up the node by nodeID
func (cache *overlaycache) Get(ctx context.Context, id storj.NodeID) (*pb.Node, error) {
if id.IsZero() {
return nil, overlay.ErrEmptyNode
}
_, err = tx.Get_OverlayCacheNode_By_Key(ctx, dbx.OverlayCacheNode_Key(key))
if err != nil {
_, err = tx.Create_OverlayCacheNode(
ctx,
dbx.OverlayCacheNode_Key(key),
dbx.OverlayCacheNode_Value(value),
)
if err != nil {
return Error.Wrap(utils.CombineErrors(err, tx.Rollback()))
}
} else {
updateFields := dbx.OverlayCacheNode_Update_Fields{}
updateFields.Value = dbx.OverlayCacheNode_Value(value)
_, err := tx.Update_OverlayCacheNode_By_Key(
ctx,
dbx.OverlayCacheNode_Key(key),
updateFields,
)
if err != nil {
return Error.Wrap(utils.CombineErrors(err, tx.Rollback()))
}
}
return Error.Wrap(tx.Commit())
}
func (o *overlaycache) Get(key storage.Key) (storage.Value, error) {
if key.IsZero() {
return nil, storage.ErrEmptyKey.New("")
}
ctx := context.Background() // TODO: fix
node, err := o.db.Get_OverlayCacheNode_By_Key(ctx, dbx.OverlayCacheNode_Key(key))
node, err := cache.db.Get_OverlayCacheNode_By_NodeId(ctx,
dbx.OverlayCacheNode_NodeId(id.Bytes()),
)
if err == sql.ErrNoRows {
return nil, storage.ErrKeyNotFound.New(key.String())
return nil, overlay.ErrNodeNotFound
}
if err != nil {
return nil, err
}
return node.Value, nil
return convertOverlayNode(node)
}
func (o *overlaycache) GetAll(keys storage.Keys) (storage.Values, error) {
values := make([]storage.Value, len(keys))
for i, key := range keys {
value, err := o.Get(key)
if err == nil {
values[i] = value
// GetAll looks up nodes based on the ids from the overlay cache
func (cache *overlaycache) GetAll(ctx context.Context, ids storj.NodeIDList) ([]*pb.Node, error) {
infos := make([]*pb.Node, len(ids))
for i, id := range ids {
// TODO: abort on canceled context
info, err := cache.Get(ctx, id)
if err != nil {
continue
}
infos[i] = info
}
return values, nil
return infos, nil
}
func (o *overlaycache) Delete(key storage.Key) error {
ctx := context.Background() // TODO: fix
_, err := o.db.Delete_OverlayCacheNode_By_Key(ctx, dbx.OverlayCacheNode_Key(key))
return err
}
func (o *overlaycache) List(start storage.Key, limit int) (keys storage.Keys, err error) {
ctx := context.Background() // TODO: fix
// List lists nodes starting from cursor
func (cache *overlaycache) List(ctx context.Context, cursor storj.NodeID, limit int) ([]*pb.Node, error) {
// TODO: handle this nicer
if limit <= 0 || limit > storage.LookupLimit {
limit = storage.LookupLimit
}
var rows []*dbx.OverlayCacheNode
if start == nil {
rows, err = o.db.Limited_OverlayCacheNode(ctx, limit, 0)
} else {
rows, err = o.db.Limited_OverlayCacheNode_By_Key_GreaterOrEqual(ctx, dbx.OverlayCacheNode_Key(start), limit, 0)
}
dbxInfos, err := cache.db.Limited_OverlayCacheNode_By_NodeId_GreaterOrEqual(ctx,
dbx.OverlayCacheNode_NodeId(cursor.Bytes()),
limit, 0,
)
if err != nil {
return []storage.Key{}, err
return nil, err
}
keys = make([]storage.Key, len(rows))
for i, row := range rows {
keys[i] = row.Key
infos := make([]*pb.Node, len(dbxInfos))
for i, dbxInfo := range dbxInfos {
infos[i], err = convertOverlayNode(dbxInfo)
if err != nil {
return nil, err
}
}
return infos, nil
}
// Update updates node information
func (cache *overlaycache) Update(ctx context.Context, info *pb.Node) (err error) {
if info == nil || info.Id.IsZero() {
return overlay.ErrEmptyNode
}
return keys, nil
tx, err := cache.db.Open(ctx)
if err != nil {
return Error.Wrap(err)
}
// TODO: use upsert
_, err = tx.Get_OverlayCacheNode_By_NodeId(ctx,
dbx.OverlayCacheNode_NodeId(info.Id.Bytes()),
)
address := info.Address
if address == nil {
address = &pb.NodeAddress{}
}
metadata := info.Metadata
if metadata == nil {
metadata = &pb.NodeMetadata{}
}
restrictions := info.Restrictions
if restrictions == nil {
restrictions = &pb.NodeRestrictions{
FreeBandwidth: -1,
FreeDisk: -1,
}
}
reputation := info.Reputation
if reputation == nil {
reputation = &pb.NodeStats{}
}
if err != nil {
_, err = tx.Create_OverlayCacheNode(
ctx,
dbx.OverlayCacheNode_NodeId(info.Id.Bytes()),
dbx.OverlayCacheNode_NodeType(int(info.Type)),
dbx.OverlayCacheNode_Address(address.Address),
dbx.OverlayCacheNode_Protocol(int(address.Transport)),
dbx.OverlayCacheNode_OperatorEmail(metadata.Email),
dbx.OverlayCacheNode_OperatorWallet(metadata.Wallet),
dbx.OverlayCacheNode_FreeBandwidth(restrictions.FreeBandwidth),
dbx.OverlayCacheNode_FreeDisk(restrictions.FreeDisk),
dbx.OverlayCacheNode_Latency90(reputation.Latency_90),
dbx.OverlayCacheNode_AuditSuccessRatio(reputation.AuditSuccessRatio),
dbx.OverlayCacheNode_AuditUptimeRatio(reputation.UptimeRatio),
dbx.OverlayCacheNode_AuditCount(reputation.AuditCount),
dbx.OverlayCacheNode_AuditSuccessCount(reputation.AuditSuccessCount),
dbx.OverlayCacheNode_UptimeCount(reputation.UptimeCount),
dbx.OverlayCacheNode_UptimeSuccessCount(reputation.UptimeSuccessCount),
)
if err != nil {
return Error.Wrap(errs.Combine(err, tx.Rollback()))
}
} else {
update := dbx.OverlayCacheNode_Update_Fields{
// TODO: should we be able to update node type?
Address: dbx.OverlayCacheNode_Address(address.Address),
Protocol: dbx.OverlayCacheNode_Protocol(int(address.Transport)),
Latency90: dbx.OverlayCacheNode_Latency90(info.Reputation.Latency_90),
AuditSuccessRatio: dbx.OverlayCacheNode_AuditSuccessRatio(info.Reputation.AuditSuccessRatio),
AuditUptimeRatio: dbx.OverlayCacheNode_AuditUptimeRatio(info.Reputation.UptimeRatio),
AuditCount: dbx.OverlayCacheNode_AuditCount(info.Reputation.AuditCount),
AuditSuccessCount: dbx.OverlayCacheNode_AuditSuccessCount(info.Reputation.AuditSuccessCount),
UptimeCount: dbx.OverlayCacheNode_UptimeCount(info.Reputation.UptimeCount),
UptimeSuccessCount: dbx.OverlayCacheNode_UptimeSuccessCount(info.Reputation.UptimeSuccessCount),
}
if info.Metadata != nil {
update.OperatorEmail = dbx.OverlayCacheNode_OperatorEmail(info.Metadata.Email)
update.OperatorWallet = dbx.OverlayCacheNode_OperatorWallet(info.Metadata.Wallet)
}
if info.Restrictions != nil {
update.FreeBandwidth = dbx.OverlayCacheNode_FreeBandwidth(restrictions.FreeBandwidth)
update.FreeDisk = dbx.OverlayCacheNode_FreeDisk(restrictions.FreeDisk)
}
_, err := tx.Update_OverlayCacheNode_By_NodeId(ctx,
dbx.OverlayCacheNode_NodeId(info.Id.Bytes()),
update,
)
if err != nil {
return Error.Wrap(errs.Combine(err, tx.Rollback()))
}
}
return Error.Wrap(tx.Commit())
}
// ReverseList lists all keys in revers order
func (o *overlaycache) ReverseList(start storage.Key, limit int) (storage.Keys, error) {
return nil, errors.New("not implemented")
// Delete deletes node based on id
func (cache *overlaycache) Delete(ctx context.Context, id storj.NodeID) error {
_, err := cache.db.Delete_OverlayCacheNode_By_NodeId(ctx,
dbx.OverlayCacheNode_NodeId(id.Bytes()),
)
return err
}
// Iterate iterates over items based on opts
func (o *overlaycache) Iterate(opts storage.IterateOptions, fn func(storage.Iterator) error) error {
return errors.New("not implemented")
}
func convertOverlayNode(info *dbx.OverlayCacheNode) (*pb.Node, error) {
if info == nil {
return nil, Error.New("missing info")
}
// Close closes the store
func (o *overlaycache) Close() error {
return errors.New("not implemented")
id, err := storj.NodeIDFromBytes(info.NodeId)
if err != nil {
return nil, err
}
node := &pb.Node{
Id: id,
Type: pb.NodeType(info.NodeType),
Address: &pb.NodeAddress{
Address: info.Address,
Transport: pb.NodeTransport(info.Protocol),
},
Metadata: &pb.NodeMetadata{
Email: info.OperatorEmail,
Wallet: info.OperatorWallet,
},
Restrictions: &pb.NodeRestrictions{
FreeBandwidth: info.FreeBandwidth,
FreeDisk: info.FreeDisk,
},
Reputation: &pb.NodeStats{
NodeId: id,
Latency_90: info.Latency90,
AuditSuccessRatio: info.AuditSuccessRatio,
UptimeRatio: info.AuditUptimeRatio,
AuditCount: info.AuditCount,
AuditSuccessCount: info.AuditSuccessCount,
UptimeCount: info.UptimeCount,
UptimeSuccessCount: info.UptimeSuccessCount,
},
}
if node.Address.Address == "" {
node.Address = nil
}
if node.Metadata.Email == "" && node.Metadata.Wallet == "" {
node.Metadata = nil
}
if node.Restrictions.FreeBandwidth < 0 && node.Restrictions.FreeDisk < 0 {
node.Restrictions = nil
}
if node.Reputation.Latency_90 < 0 {
node.Reputation = nil
}
return node, nil
}

View File

@ -17,7 +17,7 @@ import (
const (
// postgres connstring that works with docker-compose
defaultPostgresConn = "postgres://storj:storj-pass@test-postgres/teststorj?sslmode=disable"
defaultSqliteConn = "sqlite3://file::memory:?mode=memory&cache=shared"
defaultSqliteConn = "sqlite3://file::memory:?mode=memory"
)
var (