Store the uplinks public key on the satellite so that it can verify bandwidth requests in the future (#1042)

* integrated with bwagreement & psserver

* integrated with pointerdb

* code review updates

* refactor after code review

* uplinkdb rename to certdb

* Code review changes
This commit is contained in:
aligeti 2019-02-07 14:22:49 -05:00 committed by GitHub
parent ef61c170b1
commit b736ae4823
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 767 additions and 33 deletions

View File

@ -5,9 +5,11 @@ package tally_test
import (
"context"
"crypto/ecdsa"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"storj.io/storj/internal/testcontext"
@ -15,6 +17,7 @@ import (
"storj.io/storj/pkg/bwagreement/testbwagreement"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/piecestore/psserver/psdb"
"storj.io/storj/satellite"
)
func TestQueryNoAgreements(t *testing.T) {
@ -32,7 +35,9 @@ func TestQueryWithBw(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
sendGeneratedAgreements(ctx, t, planet)
db := planet.Satellites[0].DB
sendGeneratedAgreements(ctx, t, db, planet)
tally := planet.Satellites[0].Accounting.Tally
tallyEnd, bwTotals, err := tally.QueryBW(ctx)
require.NoError(t, err)
@ -50,7 +55,7 @@ func TestQueryWithBw(t *testing.T) {
})
}
func sendGeneratedAgreements(ctx context.Context, t *testing.T, planet *testplanet.Planet) {
func sendGeneratedAgreements(ctx context.Context, t *testing.T, db satellite.DB, planet *testplanet.Planet) {
satID := planet.Satellites[0].Identity
upID := planet.Uplinks[0].Identity
snID := planet.StorageNodes[0].Identity
@ -67,6 +72,8 @@ func sendGeneratedAgreements(ctx context.Context, t *testing.T, planet *testplan
for i, action := range actions {
pba, err := testbwagreement.GeneratePayerBandwidthAllocation(action, satID, upID, time.Hour)
require.NoError(t, err)
err = db.CertDB().SavePublicKey(ctx, pba.UplinkId, upID.Leaf.PublicKey.(*ecdsa.PublicKey))
assert.NoError(t, err)
rba, err := testbwagreement.GenerateRenterBandwidthAllocation(pba, snID.ID, upID, 1000)
require.NoError(t, err)
agreements[i] = &psdb.Agreement{Agreement: *rba}

View File

@ -5,14 +5,19 @@ package bwagreement
import (
"context"
"crypto"
"crypto/ecdsa"
"strings"
"time"
"github.com/gogo/protobuf/proto"
"github.com/gtank/cryptopasta"
"github.com/zeebo/errs"
"go.uber.org/zap"
monkit "gopkg.in/spacemonkeygo/monkit.v2"
"storj.io/storj/pkg/auth"
"storj.io/storj/pkg/certdb"
"storj.io/storj/pkg/identity"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/storj"
@ -50,15 +55,17 @@ type DB interface {
// Server is an implementation of the pb.BandwidthServer interface
type Server struct {
db DB
bwdb DB
certdb certdb.DB
pkey crypto.PublicKey
NodeID storj.NodeID
logger *zap.Logger
}
// NewServer creates instance of Server
func NewServer(db DB, logger *zap.Logger, nodeID storj.NodeID) *Server {
func NewServer(db DB, upldb certdb.DB, pkey crypto.PublicKey, logger *zap.Logger, nodeID storj.NodeID) *Server {
// TODO: reorder arguments, rename logger -> log
return &Server{db: db, logger: logger, NodeID: nodeID}
return &Server{bwdb: db, certdb: upldb, pkey: pkey, logger: logger, NodeID: nodeID}
}
// Close closes resources
@ -85,16 +92,13 @@ func (s *Server) BandwidthAgreements(ctx context.Context, rba *pb.RenterBandwidt
if exp.Before(time.Now().UTC()) {
return reply, pb.ErrPayer.Wrap(auth.ErrExpired.New("%v vs %v", exp, time.Now().UTC()))
}
//verify message crypto
if err := auth.VerifyMsg(rba, pba.UplinkId); err != nil {
return reply, pb.ErrRenter.Wrap(err)
}
if err := auth.VerifyMsg(&pba, pba.SatelliteId); err != nil {
return reply, pb.ErrPayer.Wrap(err)
if err = s.verifySignature(ctx, rba); err != nil {
return reply, err
}
//save and return rersults
if err = s.db.CreateAgreement(ctx, rba); err != nil {
if err = s.bwdb.CreateAgreement(ctx, rba); err != nil {
if strings.Contains(err.Error(), "UNIQUE constraint failed") ||
strings.Contains(err.Error(), "violates unique constraint") {
return reply, pb.ErrPayer.Wrap(auth.ErrSerial.Wrap(err))
@ -106,3 +110,56 @@ func (s *Server) BandwidthAgreements(ctx context.Context, rba *pb.RenterBandwidt
s.logger.Debug("Stored Agreement...")
return reply, nil
}
func (s *Server) verifySignature(ctx context.Context, rba *pb.RenterBandwidthAllocation) error {
pba := rba.GetPayerAllocation()
// Get renter's public key from uplink agreement db
uplinkInfo, err := s.certdb.GetPublicKey(ctx, pba.UplinkId)
if err != nil {
return pb.ErrRenter.Wrap(auth.ErrVerify.New("Failed to unmarshal PayerBandwidthAllocation: %+v", err))
}
signatureLength := uplinkInfo.Curve.Params().P.BitLen() / 8
if len(rba.GetSignature()) < signatureLength {
return pb.ErrRenter.Wrap(auth.ErrSigLen.New("%d vs %d", len(rba.GetSignature()), signatureLength))
}
// verify Renter's (uplink) signature
rbad := *rba
rbad.SetSignature(nil)
rbad.SetCerts(nil)
rbadBytes, err := proto.Marshal(&rbad)
if err != nil {
return Error.New("marshalling error: %+v", err)
}
if ok := cryptopasta.Verify(rbadBytes, rba.GetSignature(), uplinkInfo); !ok {
return pb.ErrRenter.Wrap(auth.ErrVerify.New("%+v", ok))
}
// satellite public key
k, ok := s.pkey.(*ecdsa.PublicKey)
if !ok {
return Error.New("UnsupportedKey error: %+v", s.pkey)
}
signatureLength = k.Curve.Params().P.BitLen() / 8
if len(pba.GetSignature()) < signatureLength {
return pb.ErrPayer.Wrap(auth.ErrSigLen.New("%d vs %d", len(pba.GetSignature()), signatureLength))
}
// verify Payer's (satellite) signature
pbad := pba
pbad.SetSignature(nil)
pbad.SetCerts(nil)
pbadBytes, err := proto.Marshal(&pbad)
if err != nil {
return Error.New("marshalling error: %+v", err)
}
if ok := cryptopasta.Verify(pbadBytes, pba.GetSignature(), k); !ok {
return pb.ErrPayer.Wrap(auth.ErrVerify.New("%+v", ok))
}
return nil
}

View File

@ -37,7 +37,7 @@ func TestBandwidthAgreement(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
testDatabase(ctx, t, db.BandwidthAgreement())
testDatabase(ctx, t, db)
})
}
@ -59,16 +59,18 @@ func getPeerContext(ctx context.Context, t *testing.T) (context.Context, storj.N
return peer.NewContext(ctx, grpcPeer), nodeID
}
func testDatabase(ctx context.Context, t *testing.T, bwdb bwagreement.DB) {
func testDatabase(ctx context.Context, t *testing.T, db satellite.DB) {
upID, err := testidentity.NewTestIdentity(ctx)
assert.NoError(t, err)
satID, err := testidentity.NewTestIdentity(ctx)
assert.NoError(t, err)
satellite := bwagreement.NewServer(bwdb, zap.NewNop(), satID.ID)
satellite := bwagreement.NewServer(db.BandwidthAgreement(), db.CertDB(), satID.Leaf.PublicKey.(*ecdsa.PublicKey), zap.NewNop(), satID.ID)
{ // TestSameSerialNumberBandwidthAgreements
pbaFile1, err := testbwagreement.GeneratePayerBandwidthAllocation(pb.BandwidthAction_GET, satID, upID, time.Hour)
assert.NoError(t, err)
err = db.CertDB().SavePublicKey(ctx, pbaFile1.UplinkId, upID.Leaf.PublicKey.(*ecdsa.PublicKey))
assert.NoError(t, err)
ctxSN1, storageNode1 := getPeerContext(ctx, t)
rbaNode1, err := testbwagreement.GenerateRenterBandwidthAllocation(pbaFile1, storageNode1, upID, 666)
@ -97,6 +99,8 @@ func testDatabase(ctx context.Context, t *testing.T, bwdb bwagreement.DB) {
{
pbaFile2, err := testbwagreement.GeneratePayerBandwidthAllocation(pb.BandwidthAction_GET, satID, upID, time.Hour)
assert.NoError(t, err)
err = db.CertDB().SavePublicKey(ctx, pbaFile2.UplinkId, upID.Leaf.PublicKey.(*ecdsa.PublicKey))
assert.NoError(t, err)
rbaNode1, err := testbwagreement.GenerateRenterBandwidthAllocation(pbaFile2, storageNode1, upID, 666)
assert.NoError(t, err)
@ -130,6 +134,8 @@ func testDatabase(ctx context.Context, t *testing.T, bwdb bwagreement.DB) {
{ // storage nodes can submit a bwagreement that will expire in 30 seconds
pba, err := testbwagreement.GeneratePayerBandwidthAllocation(pb.BandwidthAction_GET, satID, upID, 30*time.Second)
assert.NoError(t, err)
err = db.CertDB().SavePublicKey(ctx, pba.UplinkId, upID.Leaf.PublicKey.(*ecdsa.PublicKey))
assert.NoError(t, err)
ctxSN1, storageNode1 := getPeerContext(ctx, t)
rba, err := testbwagreement.GenerateRenterBandwidthAllocation(pba, storageNode1, upID, 666)
@ -143,6 +149,8 @@ func testDatabase(ctx context.Context, t *testing.T, bwdb bwagreement.DB) {
{ // storage nodes can't submit a bwagreement that expires right now
pba, err := testbwagreement.GeneratePayerBandwidthAllocation(pb.BandwidthAction_GET, satID, upID, 0*time.Second)
assert.NoError(t, err)
err = db.CertDB().SavePublicKey(ctx, pba.UplinkId, upID.Leaf.PublicKey.(*ecdsa.PublicKey))
assert.NoError(t, err)
ctxSN1, storageNode1 := getPeerContext(ctx, t)
rba, err := testbwagreement.GenerateRenterBandwidthAllocation(pba, storageNode1, upID, 666)
@ -156,6 +164,8 @@ func testDatabase(ctx context.Context, t *testing.T, bwdb bwagreement.DB) {
{ // storage nodes can't submit a bwagreement that expires yesterday
pba, err := testbwagreement.GeneratePayerBandwidthAllocation(pb.BandwidthAction_GET, satID, upID, -23*time.Hour-55*time.Second)
assert.NoError(t, err)
err = db.CertDB().SavePublicKey(ctx, pba.UplinkId, upID.Leaf.PublicKey.(*ecdsa.PublicKey))
assert.NoError(t, err)
ctxSN1, storageNode1 := getPeerContext(ctx, t)
rba, err := testbwagreement.GenerateRenterBandwidthAllocation(pba, storageNode1, upID, 666)
@ -172,6 +182,8 @@ func testDatabase(ctx context.Context, t *testing.T, bwdb bwagreement.DB) {
if !assert.NoError(t, err) {
t.Fatal(err)
}
err = db.CertDB().SavePublicKey(ctx, pba.UplinkId, upID.Leaf.PublicKey.(*ecdsa.PublicKey))
assert.NoError(t, err)
ctxSN1, storageNode1 := getPeerContext(ctx, t)
rba, err := testbwagreement.GenerateRenterBandwidthAllocation(pba, storageNode1, upID, 666)
@ -216,7 +228,7 @@ func testDatabase(ctx context.Context, t *testing.T, bwdb bwagreement.DB) {
manipSignature := GetSignature(t, &manipRBA, manipPrivKey)
// Using self created signature + public key
reply, err := callBWA(ctxSN1, t, satellite, manipSignature, &manipRBA, manipCerts)
assert.True(t, auth.ErrSigner.Has(err) && pb.ErrRenter.Has(err), err.Error())
assert.True(t, pb.ErrRenter.Has(err), err.Error())
assert.Equal(t, pb.AgreementsSummary_REJECTED, reply.Status)
}
@ -229,7 +241,7 @@ func testDatabase(ctx context.Context, t *testing.T, bwdb bwagreement.DB) {
manipSignature := GetSignature(t, &manipRBA, manipPrivKey)
// Using self created signature + public key
reply, err := callBWA(ctxSN1, t, satellite, manipSignature, &manipRBA, manipCerts)
assert.True(t, auth.ErrVerify.Has(err) && pb.ErrPayer.Has(err), err.Error())
assert.True(t, auth.ErrVerify.Has(err) && pb.ErrRenter.Has(err), err.Error())
assert.Equal(t, pb.AgreementsSummary_REJECTED, reply.Status)
}
@ -244,7 +256,7 @@ func testDatabase(ctx context.Context, t *testing.T, bwdb bwagreement.DB) {
manipSignature = GetSignature(t, &manipRBA, manipPrivKey)
// Using self created Payer and Renter bwagreement signatures
reply, err := callBWA(ctxSN1, t, satellite, manipSignature, &manipRBA, manipCerts)
assert.True(t, auth.ErrVerify.Has(err) && pb.ErrPayer.Has(err), err.Error())
assert.True(t, auth.ErrVerify.Has(err) && pb.ErrRenter.Has(err), err.Error())
assert.Equal(t, pb.AgreementsSummary_REJECTED, reply.Status)
}
@ -260,7 +272,7 @@ func testDatabase(ctx context.Context, t *testing.T, bwdb bwagreement.DB) {
manipSignature = GetSignature(t, &manipRBA, manipPrivKey)
// Using self created Payer and Renter bwagreement signatures
reply, err := callBWA(ctxSN1, t, satellite, manipSignature, &manipRBA, manipCerts)
assert.True(t, auth.ErrSigner.Has(err) && pb.ErrPayer.Has(err), err.Error())
assert.True(t, pb.ErrRenter.Has(err), err.Error())
assert.Equal(t, pb.AgreementsSummary_REJECTED, reply.Status)
}
@ -287,6 +299,8 @@ func testDatabase(ctx context.Context, t *testing.T, bwdb bwagreement.DB) {
ctxSN2, storageNode2 := getPeerContext(ctx, t)
pba, err := testbwagreement.GeneratePayerBandwidthAllocation(pb.BandwidthAction_GET, satID, upID, time.Hour)
assert.NoError(t, err)
err = db.CertDB().SavePublicKey(ctx, pba.UplinkId, upID.Leaf.PublicKey.(*ecdsa.PublicKey))
assert.NoError(t, err)
{ // Storage node sends an corrupted signuature to force a satellite crash
rba, err := testbwagreement.GenerateRenterBandwidthAllocation(pba, storageNode1, upID, 666)

View File

@ -28,6 +28,7 @@ func GeneratePayerBandwidthAllocation(action pb.BandwidthAction, satID *identity
Action: action,
CreatedUnixSec: time.Now().Unix(),
}
return pba, auth.SignMessage(pba, *satID)
}

20
pkg/certdb/certdb.go Normal file
View File

@ -0,0 +1,20 @@
// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information.
package certdb
import (
"context"
"crypto"
"crypto/ecdsa"
"storj.io/storj/pkg/storj"
)
// DB stores uplink public keys.
type DB interface {
// SavePublicKey adds a new bandwidth agreement.
SavePublicKey(context.Context, storj.NodeID, crypto.PublicKey) error
// GetPublicKey gets the public key of uplink corresponding to uplink id
GetPublicKey(context.Context, storj.NodeID) (*ecdsa.PublicKey, error)
}

56
pkg/certdb/certdb_test.go Normal file
View File

@ -0,0 +1,56 @@
// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information.
package certdb_test
import (
"context"
"crypto/ecdsa"
"crypto/x509"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"storj.io/storj/internal/testcontext"
"storj.io/storj/internal/testidentity"
"storj.io/storj/pkg/certdb"
"storj.io/storj/satellite"
"storj.io/storj/satellite/satellitedb/satellitedbtest"
)
func TestCertDB(t *testing.T) {
satellitedbtest.Run(t, func(t *testing.T, db satellite.DB) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
testDatabase(ctx, t, db.CertDB())
})
}
func testDatabase(ctx context.Context, t *testing.T, upldb certdb.DB) {
//testing variables
upID, err := testidentity.NewTestIdentity(ctx)
require.NoError(t, err)
publicKeyEcdsa, _ := upID.Leaf.PublicKey.(*ecdsa.PublicKey)
upIDpubbytes, err := x509.MarshalPKIXPublicKey(publicKeyEcdsa)
require.NoError(t, err)
{ // New entry
err := upldb.SavePublicKey(ctx, upID.ID, upID.Leaf.PublicKey.(*ecdsa.PublicKey))
assert.NoError(t, err)
}
{ // New entry
err := upldb.SavePublicKey(ctx, upID.ID, upID.Leaf.PublicKey)
assert.NoError(t, err)
}
{ // Get the corresponding Public key for the serialnum
pubkey, err := upldb.GetPublicKey(ctx, upID.ID)
assert.NoError(t, err)
pubbytes, err := x509.MarshalPKIXPublicKey(pubkey)
assert.NoError(t, err)
assert.EqualValues(t, upIDpubbytes, pubbytes)
}
}

View File

@ -5,11 +5,13 @@ package pointerdb
import (
"context"
"crypto/ecdsa"
"time"
"github.com/skyrings/skyring-common/tools/uuid"
"storj.io/storj/pkg/auth"
"storj.io/storj/pkg/certdb"
"storj.io/storj/pkg/identity"
"storj.io/storj/pkg/pb"
)
@ -18,13 +20,15 @@ import (
type AllocationSigner struct {
satelliteIdentity *identity.FullIdentity
bwExpiration int
certdb certdb.DB
}
// NewAllocationSigner creates new instance
func NewAllocationSigner(satelliteIdentity *identity.FullIdentity, bwExpiration int) *AllocationSigner {
func NewAllocationSigner(satelliteIdentity *identity.FullIdentity, bwExpiration int, upldb certdb.DB) *AllocationSigner {
return &AllocationSigner{
satelliteIdentity: satelliteIdentity,
bwExpiration: bwExpiration,
certdb: upldb,
}
}
@ -41,6 +45,17 @@ func (allocation *AllocationSigner) PayerBandwidthAllocation(ctx context.Context
// convert ttl from days to seconds
ttl := allocation.bwExpiration
ttl *= 86400
pk, ok := peerIdentity.Leaf.PublicKey.(*ecdsa.PublicKey)
if !ok {
return nil, Error.New("UnsupportedKey error: %+v", peerIdentity.Leaf.PublicKey)
}
// store the corresponding uplink's id and public key into certDB db
err = allocation.certdb.SavePublicKey(ctx, peerIdentity.ID, pk)
if err != nil {
return nil, err
}
pba = &pb.PayerBandwidthAllocation{
SatelliteId: allocation.satelliteIdentity.ID,
UplinkId: peerIdentity.ID,

View File

@ -1,7 +1,7 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package pointerdb
package pointerdb_test
import (
"context"
@ -24,9 +24,11 @@ import (
"storj.io/storj/internal/testidentity"
"storj.io/storj/pkg/auth"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/pointerdb"
"storj.io/storj/pkg/storage/meta"
"storj.io/storj/pkg/storj"
"storj.io/storj/satellite/console"
"storj.io/storj/satellite/satellitedb"
"storj.io/storj/storage"
"storj.io/storj/storage/teststore"
)
@ -61,8 +63,8 @@ func TestServicePut(t *testing.T) {
errTag := fmt.Sprintf("Test case #%d", i)
db := teststore.New()
service := NewService(zap.NewNop(), db)
s := Server{service: service, logger: zap.NewNop(), apiKeys: apiKeys}
service := pointerdb.NewService(zap.NewNop(), db)
s := pointerdb.NewServer(zap.NewNop(), service, nil, nil, pointerdb.Config{}, nil, apiKeys)
path := "a/b/c"
pr := pb.Pointer{}
@ -97,6 +99,14 @@ func TestServiceGet(t *testing.T) {
validAPIKey := console.APIKey{}
apiKeys := &mockAPIKeys{}
// creating in-memory db and opening connection
satdb, err := satellitedb.NewInMemory()
defer func() {
err = satdb.Close()
assert.NoError(t, err)
}()
err = satdb.CreateTables()
assert.NoError(t, err)
for i, tt := range []struct {
apiKey []byte
@ -113,10 +123,10 @@ func TestServiceGet(t *testing.T) {
errTag := fmt.Sprintf("Test case #%d", i)
db := teststore.New()
service := NewService(zap.NewNop(), db)
allocation := NewAllocationSigner(identity, 45)
service := pointerdb.NewService(zap.NewNop(), db)
allocation := pointerdb.NewAllocationSigner(identity, 45, satdb.CertDB())
s := NewServer(zap.NewNop(), service, allocation, nil, Config{}, identity, apiKeys)
s := pointerdb.NewServer(zap.NewNop(), service, allocation, nil, pointerdb.Config{}, identity, apiKeys)
path := "a/b/c"
@ -168,8 +178,8 @@ func TestServiceDelete(t *testing.T) {
db := teststore.New()
_ = db.Put(storage.Key(storj.JoinPaths(apiKeys.info.ProjectID.String(), path)), storage.Value("hello"))
service := NewService(zap.NewNop(), db)
s := Server{service: service, logger: zap.NewNop(), apiKeys: apiKeys}
service := pointerdb.NewService(zap.NewNop(), db)
s := pointerdb.NewServer(zap.NewNop(), service, nil, nil, pointerdb.Config{}, nil, apiKeys)
if tt.err != nil {
db.ForceError++
@ -191,8 +201,8 @@ func TestServiceList(t *testing.T) {
apiKeys := &mockAPIKeys{}
db := teststore.New()
service := NewService(zap.NewNop(), db)
server := Server{service: service, logger: zap.NewNop(), apiKeys: apiKeys}
service := pointerdb.NewService(zap.NewNop(), db)
server := pointerdb.NewServer(zap.NewNop(), service, nil, nil, pointerdb.Config{}, nil, apiKeys)
pointer := &pb.Pointer{}
pointer.CreationDate = ptypes.TimestampNow()

View File

@ -22,6 +22,7 @@ import (
"storj.io/storj/pkg/audit"
"storj.io/storj/pkg/auth/grpcauth"
"storj.io/storj/pkg/bwagreement"
"storj.io/storj/pkg/certdb"
"storj.io/storj/pkg/datarepair/checker"
"storj.io/storj/pkg/datarepair/irreparable"
"storj.io/storj/pkg/datarepair/queue"
@ -58,6 +59,8 @@ type DB interface {
// BandwidthAgreement returns database for storing bandwidth agreements
BandwidthAgreement() bwagreement.DB
// CertDB returns database for storing uplink's public key & ID
CertDB() certdb.DB
// StatDB returns database for storing node statistics
StatDB() statdb.DB
// OverlayCache returns database for caching overlay information
@ -287,7 +290,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, config *Config) (*
peer.Metainfo.Database = storelogger.New(peer.Log.Named("pdb"), db)
peer.Metainfo.Service = pointerdb.NewService(peer.Log.Named("pointerdb"), peer.Metainfo.Database)
peer.Metainfo.Allocation = pointerdb.NewAllocationSigner(peer.Identity, config.PointerDB.BwExpiration)
peer.Metainfo.Allocation = pointerdb.NewAllocationSigner(peer.Identity, config.PointerDB.BwExpiration, peer.DB.CertDB())
peer.Metainfo.Endpoint = pointerdb.NewServer(peer.Log.Named("pointerdb:endpoint"),
peer.Metainfo.Service,
peer.Metainfo.Allocation,
@ -299,7 +302,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, config *Config) (*
}
{ // setup agreements
bwServer := bwagreement.NewServer(peer.DB.BandwidthAgreement(), peer.Log.Named("agreements"), peer.Identity.ID)
bwServer := bwagreement.NewServer(peer.DB.BandwidthAgreement(), peer.DB.CertDB(), peer.Identity.Leaf.PublicKey, peer.Log.Named("agreements"), peer.Identity.ID)
peer.Agreements.Endpoint = bwServer
pb.RegisterBandwidthServer(peer.Public.Server.GRPC(), peer.Agreements.Endpoint)
}

View File

@ -0,0 +1,78 @@
// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information.
package satellitedb
import (
"context"
"crypto"
"crypto/ecdsa"
"crypto/x509"
"github.com/zeebo/errs"
"storj.io/storj/pkg/storj"
"storj.io/storj/pkg/utils"
dbx "storj.io/storj/satellite/satellitedb/dbx"
)
type certDB struct {
db *dbx.DB
}
func (b *certDB) SavePublicKey(ctx context.Context, nodeID storj.NodeID, publicKey crypto.PublicKey) error {
tx, err := b.db.Open(ctx)
if err != nil {
return Error.Wrap(err)
}
_, err = tx.Get_CertRecord_By_Id(ctx, dbx.CertRecord_Id(nodeID.Bytes()))
if err != nil {
// no rows err, so create/insert an entry
publicKeyEcdsa, ok := publicKey.(*ecdsa.PublicKey)
if !ok {
return Error.Wrap(utils.CombineErrors(errs.New("Uplink Private Key is not a valid *ecdsa.PrivateKey"), tx.Rollback()))
}
pubbytes, err := x509.MarshalPKIXPublicKey(publicKeyEcdsa)
if err != nil {
return Error.Wrap(utils.CombineErrors(err, tx.Rollback()))
}
if err != nil {
return Error.Wrap(utils.CombineErrors(err, tx.Rollback()))
}
_, err = tx.Create_CertRecord(ctx,
dbx.CertRecord_Publickey(pubbytes),
dbx.CertRecord_Id(nodeID.Bytes()),
)
if err != nil {
return Error.Wrap(utils.CombineErrors(err, tx.Rollback()))
}
} else {
// nodeID entry already exists, just return
return Error.Wrap(tx.Rollback())
}
return Error.Wrap(tx.Commit())
}
func (b *certDB) GetPublicKey(ctx context.Context, nodeID storj.NodeID) (*ecdsa.PublicKey, error) {
dbxInfo, err := b.db.Get_CertRecord_By_Id(ctx, dbx.CertRecord_Id(nodeID.Bytes()))
if err != nil {
return nil, err
}
pubkey, err := x509.ParsePKIXPublicKey(dbxInfo.Publickey)
if err != nil {
return nil, Error.Wrap(Error.New("Failed to extract Public Key from RenterBandwidthAllocation: %+v", err))
}
// Typecast public key
pkey, ok := pubkey.(*ecdsa.PublicKey)
if !ok {
return nil, Error.Wrap(Error.New("UnsupportedKey error: %+v", pubkey))
}
return pkey, nil
}

View File

@ -11,6 +11,7 @@ import (
"storj.io/storj/internal/migrate"
"storj.io/storj/pkg/accounting"
"storj.io/storj/pkg/bwagreement"
"storj.io/storj/pkg/certdb"
"storj.io/storj/pkg/datarepair/irreparable"
"storj.io/storj/pkg/datarepair/queue"
"storj.io/storj/pkg/overlay"
@ -89,6 +90,11 @@ func (db *DB) BandwidthAgreement() bwagreement.DB {
return &bandwidthagreement{db: db.db}
}
// CertDB is a getter for uplink's specific info like public key, id, etc...
func (db *DB) CertDB() certdb.DB {
return &certDB{db: db.db}
}
// // PointerDB is a getter for PointerDB repository
// func (db *DB) PointerDB() pointerdb.DB {
// return &pointerDB{db: db.db}

View File

@ -333,4 +333,23 @@ read all (
select api_key
where api_key.project_id = ?
orderby asc api_key.name
)
)
//--- certRecord ---//
model certRecord (
key id
field publickey blob //--uplink public key--//
field id blob //--uplink node id --//
field update_at timestamp ( autoinsert, autoupdate )
)
create certRecord ( )
delete certRecord ( where certRecord.id = ? )
update certRecord ( where certRecord.id = ? )
read one (
select certRecord
where certRecord.id = ?
)

View File

@ -309,6 +309,12 @@ CREATE TABLE bwagreements (
expires_at timestamp with time zone NOT NULL,
PRIMARY KEY ( serialnum )
);
CREATE TABLE certRecords (
publickey bytea NOT NULL,
id bytea NOT NULL,
update_at timestamp with time zone NOT NULL,
PRIMARY KEY ( id )
);
CREATE TABLE injuredsegments (
id bigserial NOT NULL,
info bytea NOT NULL,
@ -485,6 +491,12 @@ CREATE TABLE bwagreements (
expires_at TIMESTAMP NOT NULL,
PRIMARY KEY ( serialnum )
);
CREATE TABLE certRecords (
publickey BLOB NOT NULL,
id BLOB NOT NULL,
update_at TIMESTAMP NOT NULL,
PRIMARY KEY ( id )
);
CREATE TABLE injuredsegments (
id INTEGER NOT NULL,
info BLOB NOT NULL,
@ -1137,6 +1149,74 @@ func (f Bwagreement_ExpiresAt_Field) value() interface{} {
func (Bwagreement_ExpiresAt_Field) _Column() string { return "expires_at" }
type CertRecord struct {
Publickey []byte
Id []byte
UpdateAt time.Time
}
func (CertRecord) _Table() string { return "certRecords" }
type CertRecord_Update_Fields struct {
}
type CertRecord_Publickey_Field struct {
_set bool
_null bool
_value []byte
}
func CertRecord_Publickey(v []byte) CertRecord_Publickey_Field {
return CertRecord_Publickey_Field{_set: true, _value: v}
}
func (f CertRecord_Publickey_Field) value() interface{} {
if !f._set || f._null {
return nil
}
return f._value
}
func (CertRecord_Publickey_Field) _Column() string { return "publickey" }
type CertRecord_Id_Field struct {
_set bool
_null bool
_value []byte
}
func CertRecord_Id(v []byte) CertRecord_Id_Field {
return CertRecord_Id_Field{_set: true, _value: v}
}
func (f CertRecord_Id_Field) value() interface{} {
if !f._set || f._null {
return nil
}
return f._value
}
func (CertRecord_Id_Field) _Column() string { return "id" }
type CertRecord_UpdateAt_Field struct {
_set bool
_null bool
_value time.Time
}
func CertRecord_UpdateAt(v time.Time) CertRecord_UpdateAt_Field {
return CertRecord_UpdateAt_Field{_set: true, _value: v}
}
func (f CertRecord_UpdateAt_Field) value() interface{} {
if !f._set || f._null {
return nil
}
return f._value
}
func (CertRecord_UpdateAt_Field) _Column() string { return "update_at" }
type Injuredsegment struct {
Id int64
Info []byte
@ -2769,6 +2849,30 @@ func (obj *postgresImpl) Create_ApiKey(ctx context.Context,
}
func (obj *postgresImpl) Create_CertRecord(ctx context.Context,
certRecord_publickey CertRecord_Publickey_Field,
certRecord_id CertRecord_Id_Field) (
certRecord *CertRecord, err error) {
__now := obj.db.Hooks.Now().UTC()
__publickey_val := certRecord_publickey.value()
__id_val := certRecord_id.value()
__update_at_val := __now
var __embed_stmt = __sqlbundle_Literal("INSERT INTO certRecords ( publickey, id, update_at ) VALUES ( ?, ?, ? ) RETURNING certRecords.publickey, certRecords.id, certRecords.update_at")
var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt)
obj.logStmt(__stmt, __publickey_val, __id_val, __update_at_val)
certRecord = &CertRecord{}
err = obj.driver.QueryRow(__stmt, __publickey_val, __id_val, __update_at_val).Scan(&certRecord.Publickey, &certRecord.Id, &certRecord.UpdateAt)
if err != nil {
return nil, obj.makeErr(err)
}
return certRecord, nil
}
func (obj *postgresImpl) Limited_Bwagreement(ctx context.Context,
limit int, offset int64) (
rows []*Bwagreement, err error) {
@ -3567,6 +3671,27 @@ func (obj *postgresImpl) All_ApiKey_By_ProjectId_OrderBy_Asc_Name(ctx context.Co
}
func (obj *postgresImpl) Get_CertRecord_By_Id(ctx context.Context,
certRecord_id CertRecord_Id_Field) (
certRecord *CertRecord, err error) {
var __embed_stmt = __sqlbundle_Literal("SELECT certRecords.publickey, certRecords.id, certRecords.update_at FROM certRecords WHERE certRecords.id = ?")
var __values []interface{}
__values = append(__values, certRecord_id.value())
var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt)
obj.logStmt(__stmt, __values...)
certRecord = &CertRecord{}
err = obj.driver.QueryRow(__stmt, __values...).Scan(&certRecord.Publickey, &certRecord.Id, &certRecord.UpdateAt)
if err != nil {
return nil, obj.makeErr(err)
}
return certRecord, nil
}
func (obj *postgresImpl) Update_Irreparabledb_By_Segmentpath(ctx context.Context,
irreparabledb_segmentpath Irreparabledb_Segmentpath_Field,
update Irreparabledb_Update_Fields) (
@ -3963,6 +4088,42 @@ func (obj *postgresImpl) Update_ApiKey_By_Id(ctx context.Context,
return api_key, nil
}
func (obj *postgresImpl) Update_CertRecord_By_Id(ctx context.Context,
certRecord_id CertRecord_Id_Field,
update CertRecord_Update_Fields) (
certRecord *CertRecord, err error) {
var __sets = &__sqlbundle_Hole{}
var __embed_stmt = __sqlbundle_Literals{Join: "", SQLs: []__sqlbundle_SQL{__sqlbundle_Literal("UPDATE certRecords SET "), __sets, __sqlbundle_Literal(" WHERE certRecords.id = ? RETURNING certRecords.publickey, certRecords.id, certRecords.update_at")}}
__sets_sql := __sqlbundle_Literals{Join: ", "}
var __values []interface{}
var __args []interface{}
__now := obj.db.Hooks.Now().UTC()
__values = append(__values, __now)
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("update_at = ?"))
__args = append(__args, certRecord_id.value())
__values = append(__values, __args...)
__sets.SQL = __sets_sql
var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt)
obj.logStmt(__stmt, __values...)
certRecord = &CertRecord{}
err = obj.driver.QueryRow(__stmt, __values...).Scan(&certRecord.Publickey, &certRecord.Id, &certRecord.UpdateAt)
if err == sql.ErrNoRows {
return nil, nil
}
if err != nil {
return nil, obj.makeErr(err)
}
return certRecord, nil
}
func (obj *postgresImpl) Delete_Irreparabledb_By_Segmentpath(ctx context.Context,
irreparabledb_segmentpath Irreparabledb_Segmentpath_Field) (
deleted bool, err error) {
@ -4224,6 +4385,32 @@ func (obj *postgresImpl) Delete_ApiKey_By_Id(ctx context.Context,
}
func (obj *postgresImpl) Delete_CertRecord_By_Id(ctx context.Context,
certRecord_id CertRecord_Id_Field) (
deleted bool, err error) {
var __embed_stmt = __sqlbundle_Literal("DELETE FROM certRecords WHERE certRecords.id = ?")
var __values []interface{}
__values = append(__values, certRecord_id.value())
var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt)
obj.logStmt(__stmt, __values...)
__res, err := obj.driver.Exec(__stmt, __values...)
if err != nil {
return false, obj.makeErr(err)
}
__count, err := __res.RowsAffected()
if err != nil {
return false, obj.makeErr(err)
}
return __count > 0, nil
}
func (impl postgresImpl) isConstraintError(err error) (
constraint string, ok bool) {
if e, ok := err.(*pq.Error); ok {
@ -4312,6 +4499,16 @@ func (obj *postgresImpl) deleteAll(ctx context.Context) (count int64, err error)
return 0, obj.makeErr(err)
}
__count, err = __res.RowsAffected()
if err != nil {
return 0, obj.makeErr(err)
}
count += __count
__res, err = obj.driver.Exec("DELETE FROM certRecords;")
if err != nil {
return 0, obj.makeErr(err)
}
__count, err = __res.RowsAffected()
if err != nil {
return 0, obj.makeErr(err)
@ -4747,6 +4944,33 @@ func (obj *sqlite3Impl) Create_ApiKey(ctx context.Context,
}
func (obj *sqlite3Impl) Create_CertRecord(ctx context.Context,
certRecord_publickey CertRecord_Publickey_Field,
certRecord_id CertRecord_Id_Field) (
certRecord *CertRecord, err error) {
__now := obj.db.Hooks.Now().UTC()
__publickey_val := certRecord_publickey.value()
__id_val := certRecord_id.value()
__update_at_val := __now
var __embed_stmt = __sqlbundle_Literal("INSERT INTO certRecords ( publickey, id, update_at ) VALUES ( ?, ?, ? )")
var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt)
obj.logStmt(__stmt, __publickey_val, __id_val, __update_at_val)
__res, err := obj.driver.Exec(__stmt, __publickey_val, __id_val, __update_at_val)
if err != nil {
return nil, obj.makeErr(err)
}
__pk, err := __res.LastInsertId()
if err != nil {
return nil, obj.makeErr(err)
}
return obj.getLastCertRecord(ctx, __pk)
}
func (obj *sqlite3Impl) Limited_Bwagreement(ctx context.Context,
limit int, offset int64) (
rows []*Bwagreement, err error) {
@ -5545,6 +5769,27 @@ func (obj *sqlite3Impl) All_ApiKey_By_ProjectId_OrderBy_Asc_Name(ctx context.Con
}
func (obj *sqlite3Impl) Get_CertRecord_By_Id(ctx context.Context,
certRecord_id CertRecord_Id_Field) (
certRecord *CertRecord, err error) {
var __embed_stmt = __sqlbundle_Literal("SELECT certRecords.publickey, certRecords.id, certRecords.update_at FROM certRecords WHERE certRecords.id = ?")
var __values []interface{}
__values = append(__values, certRecord_id.value())
var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt)
obj.logStmt(__stmt, __values...)
certRecord = &CertRecord{}
err = obj.driver.QueryRow(__stmt, __values...).Scan(&certRecord.Publickey, &certRecord.Id, &certRecord.UpdateAt)
if err != nil {
return nil, obj.makeErr(err)
}
return certRecord, nil
}
func (obj *sqlite3Impl) Update_Irreparabledb_By_Segmentpath(ctx context.Context,
irreparabledb_segmentpath Irreparabledb_Segmentpath_Field,
update Irreparabledb_Update_Fields) (
@ -6011,6 +6256,52 @@ func (obj *sqlite3Impl) Update_ApiKey_By_Id(ctx context.Context,
return api_key, nil
}
func (obj *sqlite3Impl) Update_CertRecord_By_Id(ctx context.Context,
certRecord_id CertRecord_Id_Field,
update CertRecord_Update_Fields) (
certRecord *CertRecord, err error) {
var __sets = &__sqlbundle_Hole{}
var __embed_stmt = __sqlbundle_Literals{Join: "", SQLs: []__sqlbundle_SQL{__sqlbundle_Literal("UPDATE certRecords SET "), __sets, __sqlbundle_Literal(" WHERE certRecords.id = ?")}}
__sets_sql := __sqlbundle_Literals{Join: ", "}
var __values []interface{}
var __args []interface{}
__now := obj.db.Hooks.Now().UTC()
__values = append(__values, __now)
__sets_sql.SQLs = append(__sets_sql.SQLs, __sqlbundle_Literal("update_at = ?"))
__args = append(__args, certRecord_id.value())
__values = append(__values, __args...)
__sets.SQL = __sets_sql
var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt)
obj.logStmt(__stmt, __values...)
certRecord = &CertRecord{}
_, err = obj.driver.Exec(__stmt, __values...)
if err != nil {
return nil, obj.makeErr(err)
}
var __embed_stmt_get = __sqlbundle_Literal("SELECT certRecords.publickey, certRecords.id, certRecords.update_at FROM certRecords WHERE certRecords.id = ?")
var __stmt_get = __sqlbundle_Render(obj.dialect, __embed_stmt_get)
obj.logStmt("(IMPLIED) "+__stmt_get, __args...)
err = obj.driver.QueryRow(__stmt_get, __args...).Scan(&certRecord.Publickey, &certRecord.Id, &certRecord.UpdateAt)
if err == sql.ErrNoRows {
return nil, nil
}
if err != nil {
return nil, obj.makeErr(err)
}
return certRecord, nil
}
func (obj *sqlite3Impl) Delete_Irreparabledb_By_Segmentpath(ctx context.Context,
irreparabledb_segmentpath Irreparabledb_Segmentpath_Field) (
deleted bool, err error) {
@ -6272,6 +6563,32 @@ func (obj *sqlite3Impl) Delete_ApiKey_By_Id(ctx context.Context,
}
func (obj *sqlite3Impl) Delete_CertRecord_By_Id(ctx context.Context,
certRecord_id CertRecord_Id_Field) (
deleted bool, err error) {
var __embed_stmt = __sqlbundle_Literal("DELETE FROM certRecords WHERE certRecords.id = ?")
var __values []interface{}
__values = append(__values, certRecord_id.value())
var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt)
obj.logStmt(__stmt, __values...)
__res, err := obj.driver.Exec(__stmt, __values...)
if err != nil {
return false, obj.makeErr(err)
}
__count, err := __res.RowsAffected()
if err != nil {
return false, obj.makeErr(err)
}
return __count > 0, nil
}
func (obj *sqlite3Impl) getLastBwagreement(ctx context.Context,
pk int64) (
bwagreement *Bwagreement, err error) {
@ -6488,6 +6805,24 @@ func (obj *sqlite3Impl) getLastApiKey(ctx context.Context,
}
func (obj *sqlite3Impl) getLastCertRecord(ctx context.Context,
pk int64) (
certRecord *CertRecord, err error) {
var __embed_stmt = __sqlbundle_Literal("SELECT certRecords.publickey, certRecords.id, certRecords.update_at FROM certRecords WHERE _rowid_ = ?")
var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt)
obj.logStmt(__stmt, pk)
certRecord = &CertRecord{}
err = obj.driver.QueryRow(__stmt, pk).Scan(&certRecord.Publickey, &certRecord.Id, &certRecord.UpdateAt)
if err != nil {
return nil, obj.makeErr(err)
}
return certRecord, nil
}
func (impl sqlite3Impl) isConstraintError(err error) (
constraint string, ok bool) {
if e, ok := err.(sqlite3.Error); ok {
@ -6581,6 +6916,16 @@ func (obj *sqlite3Impl) deleteAll(ctx context.Context) (count int64, err error)
return 0, obj.makeErr(err)
}
__count, err = __res.RowsAffected()
if err != nil {
return 0, obj.makeErr(err)
}
count += __count
__res, err = obj.driver.Exec("DELETE FROM certRecords;")
if err != nil {
return 0, obj.makeErr(err)
}
__count, err = __res.RowsAffected()
if err != nil {
return 0, obj.makeErr(err)
@ -6855,6 +7200,18 @@ func (rx *Rx) Create_Bwagreement(ctx context.Context,
}
func (rx *Rx) Create_CertRecord(ctx context.Context,
certRecord_publickey CertRecord_Publickey_Field,
certRecord_id CertRecord_Id_Field) (
certRecord *CertRecord, err error) {
var tx *Tx
if tx, err = rx.getTx(ctx); err != nil {
return
}
return tx.Create_CertRecord(ctx, certRecord_publickey, certRecord_id)
}
func (rx *Rx) Create_Injuredsegment(ctx context.Context,
injuredsegment_info Injuredsegment_Info_Field) (
injuredsegment *Injuredsegment, err error) {
@ -6993,6 +7350,16 @@ func (rx *Rx) Delete_ApiKey_By_Id(ctx context.Context,
return tx.Delete_ApiKey_By_Id(ctx, api_key_id)
}
func (rx *Rx) Delete_CertRecord_By_Id(ctx context.Context,
certRecord_id CertRecord_Id_Field) (
deleted bool, err error) {
var tx *Tx
if tx, err = rx.getTx(ctx); err != nil {
return
}
return tx.Delete_CertRecord_By_Id(ctx, certRecord_id)
}
func (rx *Rx) Delete_Injuredsegment_By_Id(ctx context.Context,
injuredsegment_id Injuredsegment_Id_Field) (
deleted bool, err error) {
@ -7123,6 +7490,16 @@ func (rx *Rx) Get_ApiKey_By_Key(ctx context.Context,
return tx.Get_ApiKey_By_Key(ctx, api_key_key)
}
func (rx *Rx) Get_CertRecord_By_Id(ctx context.Context,
certRecord_id CertRecord_Id_Field) (
certRecord *CertRecord, err error) {
var tx *Tx
if tx, err = rx.getTx(ctx); err != nil {
return
}
return tx.Get_CertRecord_By_Id(ctx, certRecord_id)
}
func (rx *Rx) Get_Irreparabledb_By_Segmentpath(ctx context.Context,
irreparabledb_segmentpath Irreparabledb_Segmentpath_Field) (
irreparabledb *Irreparabledb, err error) {
@ -7257,6 +7634,17 @@ func (rx *Rx) Update_ApiKey_By_Id(ctx context.Context,
return tx.Update_ApiKey_By_Id(ctx, api_key_id, update)
}
func (rx *Rx) Update_CertRecord_By_Id(ctx context.Context,
certRecord_id CertRecord_Id_Field,
update CertRecord_Update_Fields) (
certRecord *CertRecord, err error) {
var tx *Tx
if tx, err = rx.getTx(ctx); err != nil {
return
}
return tx.Update_CertRecord_By_Id(ctx, certRecord_id, update)
}
func (rx *Rx) Update_Irreparabledb_By_Segmentpath(ctx context.Context,
irreparabledb_segmentpath Irreparabledb_Segmentpath_Field,
update Irreparabledb_Update_Fields) (
@ -7394,6 +7782,11 @@ type Methods interface {
bwagreement_expires_at Bwagreement_ExpiresAt_Field) (
bwagreement *Bwagreement, err error)
Create_CertRecord(ctx context.Context,
certRecord_publickey CertRecord_Publickey_Field,
certRecord_id CertRecord_Id_Field) (
certRecord *CertRecord, err error)
Create_Injuredsegment(ctx context.Context,
injuredsegment_info Injuredsegment_Info_Field) (
injuredsegment *Injuredsegment, err error)
@ -7465,6 +7858,10 @@ type Methods interface {
api_key_id ApiKey_Id_Field) (
deleted bool, err error)
Delete_CertRecord_By_Id(ctx context.Context,
certRecord_id CertRecord_Id_Field) (
deleted bool, err error)
Delete_Injuredsegment_By_Id(ctx context.Context,
injuredsegment_id Injuredsegment_Id_Field) (
deleted bool, err error)
@ -7517,6 +7914,10 @@ type Methods interface {
api_key_key ApiKey_Key_Field) (
api_key *ApiKey, err error)
Get_CertRecord_By_Id(ctx context.Context,
certRecord_id CertRecord_Id_Field) (
certRecord *CertRecord, err error)
Get_Irreparabledb_By_Segmentpath(ctx context.Context,
irreparabledb_segmentpath Irreparabledb_Segmentpath_Field) (
irreparabledb *Irreparabledb, err error)
@ -7573,6 +7974,11 @@ type Methods interface {
update ApiKey_Update_Fields) (
api_key *ApiKey, err error)
Update_CertRecord_By_Id(ctx context.Context,
certRecord_id CertRecord_Id_Field,
update CertRecord_Update_Fields) (
certRecord *CertRecord, err error)
Update_Irreparabledb_By_Segmentpath(ctx context.Context,
irreparabledb_segmentpath Irreparabledb_Segmentpath_Field,
update Irreparabledb_Update_Fields) (

View File

@ -36,6 +36,12 @@ CREATE TABLE bwagreements (
expires_at timestamp with time zone NOT NULL,
PRIMARY KEY ( serialnum )
);
CREATE TABLE certRecords (
publickey bytea NOT NULL,
id bytea NOT NULL,
update_at timestamp with time zone NOT NULL,
PRIMARY KEY ( id )
);
CREATE TABLE injuredsegments (
id bigserial NOT NULL,
info bytea NOT NULL,

View File

@ -36,6 +36,12 @@ CREATE TABLE bwagreements (
expires_at TIMESTAMP NOT NULL,
PRIMARY KEY ( serialnum )
);
CREATE TABLE certRecords (
publickey BLOB NOT NULL,
id BLOB NOT NULL,
update_at TIMESTAMP NOT NULL,
PRIMARY KEY ( id )
);
CREATE TABLE injuredsegments (
id INTEGER NOT NULL,
info BLOB NOT NULL,

View File

@ -7,6 +7,8 @@ package satellitedb
import (
"context"
"crypto"
"crypto/ecdsa"
"sync"
"time"
@ -14,6 +16,7 @@ import (
"storj.io/storj/pkg/accounting"
"storj.io/storj/pkg/bwagreement"
"storj.io/storj/pkg/certdb"
"storj.io/storj/pkg/datarepair/irreparable"
"storj.io/storj/pkg/datarepair/queue"
"storj.io/storj/pkg/overlay"
@ -131,6 +134,33 @@ func (m *lockedBandwidthAgreement) GetUplinkStats(ctx context.Context, a1 time.T
return m.db.GetUplinkStats(ctx, a1, a2)
}
// CertDB returns database for storing uplink's public key & ID
func (m *locked) CertDB() certdb.DB {
m.Lock()
defer m.Unlock()
return &lockedCertDB{m.Locker, m.db.CertDB()}
}
// lockedCertDB implements locking wrapper for certdb.DB
type lockedCertDB struct {
sync.Locker
db certdb.DB
}
// GetPublicKey gets the public key of uplink corresponding to uplink id
func (m *lockedCertDB) GetPublicKey(ctx context.Context, a1 storj.NodeID) (*ecdsa.PublicKey, error) {
m.Lock()
defer m.Unlock()
return m.db.GetPublicKey(ctx, a1)
}
// SavePublicKey adds a new bandwidth agreement.
func (m *lockedCertDB) SavePublicKey(ctx context.Context, a1 storj.NodeID, a2 crypto.PublicKey) error {
m.Lock()
defer m.Unlock()
return m.db.SavePublicKey(ctx, a1, a2)
}
// Close closes the database
func (m *locked) Close() error {
m.Lock()