pkg/bwagreements: remove service (#2138)

This commit is contained in:
Egon Elbre 2019-06-06 16:57:58 +03:00 committed by Stefan Benten
parent 503fe0e5ec
commit 28a1201590
24 changed files with 282 additions and 1842 deletions

4
.github/CODEOWNERS vendored
View File

@ -16,7 +16,3 @@
# kademlia
/pkg/dht/ @jenlij
/pkg/kademlia/ @jenlij
# bwagreement
/pkg/bwagreement/ @wthorp

View File

@ -46,11 +46,6 @@ var (
RunE: cmdSetup,
Annotations: map[string]string{"type": "setup"},
}
diagCmd = &cobra.Command{
Use: "diag",
Short: "Diagnostic Tool support",
RunE: cmdDiag,
}
qdiagCmd = &cobra.Command{
Use: "qdiag",
Short: "Repair Queue Diagnostic Tool support",
@ -71,9 +66,6 @@ var (
runCfg Satellite
setupCfg Satellite
diagCfg struct {
Database string `help:"satellite database connection string" releaseDefault:"postgres://" devDefault:"sqlite3://$CONFDIR/master.db"`
}
qdiagCfg struct {
Database string `help:"satellite database connection string" releaseDefault:"postgres://" devDefault:"sqlite3://$CONFDIR/master.db"`
QListLimit int `help:"maximum segments that can be requested" default:"1000"`
@ -94,13 +86,11 @@ func init() {
defaults := cfgstruct.DefaultsFlag(rootCmd)
rootCmd.AddCommand(runCmd)
rootCmd.AddCommand(setupCmd)
rootCmd.AddCommand(diagCmd)
rootCmd.AddCommand(qdiagCmd)
rootCmd.AddCommand(reportsCmd)
reportsCmd.AddCommand(nodeUsageCmd)
process.Bind(runCmd, &runCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
process.Bind(setupCmd, &setupCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir), cfgstruct.SetupMode())
process.Bind(diagCmd, &diagCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
process.Bind(qdiagCmd, &qdiagCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
process.Bind(nodeUsageCmd, &nodeUsageCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
}
@ -170,39 +160,6 @@ func cmdSetup(cmd *cobra.Command, args []string) (err error) {
return process.SaveConfigWithAllDefaults(cmd.Flags(), filepath.Join(setupDir, "config.yaml"), nil)
}
func cmdDiag(cmd *cobra.Command, args []string) (err error) {
database, err := satellitedb.New(zap.L().Named("db"), diagCfg.Database)
if err != nil {
return errs.New("error connecting to master database on satellite: %+v", err)
}
defer func() {
err := database.Close()
if err != nil {
fmt.Printf("error closing connection to master database on satellite: %+v\n", err)
}
}()
//get all bandwidth agreements rows already ordered
stats, err := database.BandwidthAgreement().GetUplinkStats(context.Background(), time.Time{}, time.Now())
if err != nil {
fmt.Printf("error reading satellite database %v: %v\n", diagCfg.Database, err)
return err
}
// initialize the table header (fields)
const padding = 3
w := tabwriter.NewWriter(os.Stdout, 0, 0, padding, ' ', tabwriter.AlignRight|tabwriter.Debug)
fmt.Fprintln(w, "UplinkID\tTotal\t# Of Transactions\tPUT Action\tGET Action\t")
// populate the row fields
for _, s := range stats {
fmt.Fprint(w, s.NodeID, "\t", s.TotalBytes, "\t", s.TotalTransactions, "\t", s.PutActionCount, "\t", s.GetActionCount, "\t\n")
}
// display the data
return w.Flush()
}
func cmdQDiag(cmd *cobra.Command, args []string) (err error) {
// open the master db

View File

@ -33,7 +33,6 @@ import (
"storj.io/storj/pkg/accounting/rollup"
"storj.io/storj/pkg/accounting/tally"
"storj.io/storj/pkg/audit"
"storj.io/storj/pkg/bwagreement"
"storj.io/storj/pkg/datarepair/checker"
"storj.io/storj/pkg/datarepair/repairer"
"storj.io/storj/pkg/discovery"
@ -471,7 +470,6 @@ func (planet *Planet) newSatellites(count int) ([]*satellite.Peer, error) {
Overlay: true,
BwExpiration: 45,
},
BwAgreement: bwagreement.Config{},
Checker: checker.Config{
Interval: 30 * time.Second,
IrreparableInterval: 15 * time.Second,

View File

@ -40,7 +40,7 @@ func ValidateAPIKey(ctx context.Context, actualKey []byte) (err error) {
return Error.New("Could not get api key from context")
}
matches := (1 == subtle.ConstantTimeCompare(actualKey, expectedKey))
matches := 1 == subtle.ConstantTimeCompare(actualKey, expectedKey)
if !matches {
return Error.New("Invalid API credential")
}

View File

@ -1,17 +0,0 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package auth
import (
"context"
"storj.io/storj/pkg/identity"
"storj.io/storj/pkg/pkcrypto"
)
// GenerateSignature creates signature from identity id
func GenerateSignature(ctx context.Context, data []byte, identity *identity.FullIdentity) (_ []byte, err error) {
defer mon.Task()(&ctx)(&err)
return pkcrypto.HashAndSign(identity.Key, data)
}

View File

@ -1,40 +0,0 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package auth
import (
"context"
"testing"
"github.com/stretchr/testify/assert"
"storj.io/storj/internal/testidentity"
"storj.io/storj/pkg/pkcrypto"
)
func TestGenerateSignature(t *testing.T) {
ctx := context.Background()
ca, err := testidentity.NewTestCA(ctx)
assert.NoError(t, err)
identity, err := ca.NewIdentity()
assert.NoError(t, err)
for _, tt := range []struct {
data []byte
verified bool
}{
{identity.ID.Bytes(), true},
{[]byte("non verifiable data"), false},
} {
signature, err := GenerateSignature(ctx, identity.ID.Bytes(), identity)
assert.NoError(t, err)
verifyError := pkcrypto.HashAndVerifySignature(identity.Leaf.PublicKey, tt.data, signature)
if tt.verified {
assert.NoError(t, verifyError)
} else {
assert.Error(t, verifyError)
}
}
}

View File

@ -1,120 +0,0 @@
// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information.
package auth
import (
"context"
"github.com/gogo/protobuf/proto"
"github.com/zeebo/errs"
"storj.io/storj/pkg/identity"
"storj.io/storj/pkg/peertls"
"storj.io/storj/pkg/pkcrypto"
"storj.io/storj/pkg/storj"
)
var (
//ErrSign indicates a failure during signing
ErrSign = errs.Class("Failed to sign message")
//ErrVerify indicates a failure during signature validation
ErrVerify = errs.Class("Failed to validate message signature")
//ErrSigLen indicates an invalid signature length
ErrSigLen = errs.Class("Invalid signature length")
//ErrSerial indicates an invalid serial number length
ErrSerial = errs.Class("Invalid SerialNumber")
//ErrExpired indicates the agreement is expired
ErrExpired = errs.Class("Agreement is expired")
//ErrSigner indicates a public key / node id mismatch
ErrSigner = errs.Class("Message public key did not match expected signer")
//ErrBadID indicates a public key / node id mismatch
ErrBadID = errs.Class("Node ID did not match expected id")
//ErrMarshal indicates a failure during serialization
ErrMarshal = errs.Class("Could not marshal item to bytes")
//ErrUnmarshal indicates a failure during deserialization
ErrUnmarshal = errs.Class("Could not unmarshal bytes to item")
//ErrMissing indicates missing or empty information
ErrMissing = errs.Class("Required field is empty")
)
//SignableMessage is a protocol buffer with a certs and a signature
//Note that we assume proto.Message is a pointer receiver
type SignableMessage interface {
proto.Message
GetCerts() [][]byte
GetSignature() []byte
SetCerts([][]byte)
SetSignature([]byte)
}
//SignMessage adds the crypto-related aspects of signed message
func SignMessage(ctx context.Context, msg SignableMessage, id identity.FullIdentity) (err error) {
defer mon.Task()(&ctx)(&err)
if msg == nil {
return ErrMissing.New("message")
}
msg.SetSignature(nil)
msg.SetCerts(nil)
msgBytes, err := proto.Marshal(msg)
if err != nil {
return ErrMarshal.Wrap(err)
}
signature, err := pkcrypto.HashAndSign(id.Key, msgBytes)
if err != nil {
return ErrSign.Wrap(err)
}
msg.SetSignature(signature)
msg.SetCerts(id.RawChain())
return nil
}
//VerifyMsg checks the crypto-related aspects of signed message
func VerifyMsg(ctx context.Context, msg SignableMessage, signer storj.NodeID) (err error) {
defer mon.Task()(&ctx)(&err)
//setup
switch {
case msg == nil:
return ErrMissing.New("message")
case msg.GetSignature() == nil:
return ErrMissing.New("message signature")
case msg.GetCerts() == nil:
return ErrMissing.New("message certificates")
}
signature := msg.GetSignature()
certs := msg.GetCerts()
msg.SetSignature(nil)
msg.SetCerts(nil)
msgBytes, err := proto.Marshal(msg)
if err != nil {
return ErrMarshal.Wrap(err)
}
//check certs
if len(certs) < 2 {
return ErrVerify.New("Expected at least leaf and CA public keys")
}
err = peertls.VerifyPeerFunc(peertls.VerifyPeerCertChains)(certs, nil)
if err != nil {
return ErrVerify.Wrap(err)
}
leaf, err := pkcrypto.CertFromDER(certs[peertls.LeafIndex])
if err != nil {
return err
}
ca, err := pkcrypto.CertFromDER(certs[peertls.CAIndex])
if err != nil {
return err
}
// verify signature
if id, err := identity.NodeIDFromCert(ca); err != nil || id != signer {
return ErrSigner.New("%+v vs %+v", id, signer)
}
if err := pkcrypto.HashAndVerifySignature(leaf.PublicKey, msgBytes, signature); err != nil {
return ErrVerify.New("%+v", err)
}
//cleanup
msg.SetSignature(signature)
msg.SetCerts(certs)
return nil
}

View File

@ -1,80 +0,0 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package bwagreement_test
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/require"
"storj.io/storj/internal/testcontext"
"storj.io/storj/internal/testidentity"
"storj.io/storj/pkg/bwagreement"
"storj.io/storj/pkg/identity"
"storj.io/storj/pkg/pb"
"storj.io/storj/satellite"
"storj.io/storj/satellite/satellitedb/satellitedbtest"
)
func TestBandwidthDBAgreement(t *testing.T) {
satellitedbtest.Run(t, func(t *testing.T, db satellite.DB) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
upID, err := testidentity.NewTestIdentity(ctx)
require.NoError(t, err)
snID, err := testidentity.NewTestIdentity(ctx)
require.NoError(t, err)
require.NoError(t, testSaveOrder(ctx, t, db.BandwidthAgreement(), pb.BandwidthAction_PUT, "1", upID, snID))
require.Error(t, testSaveOrder(ctx, t, db.BandwidthAgreement(), pb.BandwidthAction_GET, "1", upID, snID))
require.NoError(t, testSaveOrder(ctx, t, db.BandwidthAgreement(), pb.BandwidthAction_GET, "2", upID, snID))
testGetTotals(ctx, t, db.BandwidthAgreement(), snID)
testGetUplinkStats(ctx, t, db.BandwidthAgreement(), upID)
})
}
func testSaveOrder(ctx context.Context, t *testing.T, b bwagreement.DB, action pb.BandwidthAction,
serialNum string, upID, snID *identity.FullIdentity) error {
rba := &pb.Order{
PayerAllocation: pb.OrderLimit{
Action: action,
SerialNumber: serialNum,
UplinkId: upID.ID,
},
Total: 1000,
StorageNodeId: snID.ID,
}
return b.SaveOrder(ctx, rba)
}
func testGetUplinkStats(ctx context.Context, t *testing.T, b bwagreement.DB, upID *identity.FullIdentity) {
stats, err := b.GetUplinkStats(ctx, time.Time{}, time.Now().UTC())
require.NoError(t, err)
var found int
for _, s := range stats {
if upID.ID == s.NodeID {
found++
require.Equal(t, int64(2000), s.TotalBytes)
require.Equal(t, 1, s.GetActionCount)
require.Equal(t, 1, s.PutActionCount)
require.Equal(t, 2, s.TotalTransactions)
}
}
require.Equal(t, 1, found)
}
func testGetTotals(ctx context.Context, t *testing.T, b bwagreement.DB, snID *identity.FullIdentity) {
totals, err := b.GetTotals(ctx, time.Time{}, time.Now().UTC())
require.NoError(t, err)
total := totals[snID.ID]
require.Len(t, total, 5)
require.Equal(t, int64(1000), total[pb.BandwidthAction_PUT])
require.Equal(t, int64(1000), total[pb.BandwidthAction_GET])
require.Equal(t, int64(0), total[pb.BandwidthAction_GET_AUDIT])
require.Equal(t, int64(0), total[pb.BandwidthAction_GET_REPAIR])
require.Equal(t, int64(0), total[pb.BandwidthAction_PUT_REPAIR])
}

View File

@ -1,248 +0,0 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package bwagreement
import (
"context"
"crypto"
"io"
"time"
"github.com/gogo/protobuf/proto"
"github.com/zeebo/errs"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
monkit "gopkg.in/spacemonkeygo/monkit.v2"
"storj.io/storj/internal/dbutil/pgutil"
"storj.io/storj/internal/dbutil/sqliteutil"
"storj.io/storj/pkg/auth"
"storj.io/storj/pkg/certdb"
"storj.io/storj/pkg/identity"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/pkcrypto"
"storj.io/storj/pkg/storj"
)
var (
// Error the default bwagreement errs class
Error = errs.Class("bwagreement error")
mon = monkit.Package()
)
// Config is a configuration struct that is everything you need to start an
// agreement receiver responsibility
type Config struct {
}
//UplinkStat contains information about an uplink's returned Orders
type UplinkStat struct {
NodeID storj.NodeID
TotalBytes int64
PutActionCount int
GetActionCount int
TotalTransactions int
}
//SavedOrder is information from an Order pertaining to accounting
type SavedOrder struct {
Serialnum string
StorageNodeID storj.NodeID
UplinkID storj.NodeID
Action int64
Total int64
CreatedAt time.Time
ExpiresAt time.Time
}
// DB stores orders for accounting purposes
type DB interface {
// SaveOrder saves an order for accounting
SaveOrder(context.Context, *pb.Order) error
// GetTotalsSince returns the sum of each bandwidth type after (exluding) a given date range
GetTotals(context.Context, time.Time, time.Time) (map[storj.NodeID][]int64, error)
//GetTotals returns stats about an uplink
GetUplinkStats(context.Context, time.Time, time.Time) ([]UplinkStat, error)
//GetExpired gets orders that are expired and were created before some time
GetExpired(context.Context, time.Time, time.Time) ([]SavedOrder, error)
//DeleteExpired deletes orders that are expired and were created before some time
DeleteExpired(context.Context, time.Time, time.Time) error
}
// Server is an implementation of the pb.BandwidthServer interface
type Server struct {
bwdb DB
certdb certdb.DB
pkey crypto.PublicKey
NodeID storj.NodeID
log *zap.Logger
}
// NewServer creates instance of Server
func NewServer(db DB, upldb certdb.DB, pkey crypto.PublicKey, log *zap.Logger, nodeID storj.NodeID) *Server {
// TODO: reorder arguments
return &Server{bwdb: db, certdb: upldb, pkey: pkey, log: log, NodeID: nodeID}
}
// Close closes resources
func (s *Server) Close() error { return nil }
// BandwidthAgreements receives and stores bandwidth agreements from storage nodes
func (s *Server) BandwidthAgreements(ctx context.Context, rba *pb.Order) (reply *pb.AgreementsSummary, err error) {
defer mon.Task()(&ctx)(&err)
s.log.Debug("Received Agreement...")
reply = &pb.AgreementsSummary{
Status: pb.AgreementsSummary_REJECTED,
}
pba := rba.PayerAllocation
//verify message content
pi, err := identity.PeerIdentityFromContext(ctx)
if err != nil || rba.StorageNodeId != pi.ID {
return reply, auth.ErrBadID.New("Storage Node ID: %v vs %v", rba.StorageNodeId, pi.ID)
}
//todo: use whitelist for uplinks?
if pba.SatelliteId != s.NodeID {
return reply, pb.ErrPayer.New("Satellite ID: %v vs %v", pba.SatelliteId, s.NodeID)
}
exp := time.Unix(pba.GetExpirationUnixSec(), 0).UTC()
if exp.Before(time.Now().UTC()) {
return reply, pb.ErrPayer.Wrap(auth.ErrExpired.New("%v vs %v", exp, time.Now().UTC()))
}
if err = s.verifySignature(ctx, rba); err != nil {
return reply, err
}
//save and return rersults
if err = s.bwdb.SaveOrder(ctx, rba); err != nil {
if pgutil.IsConstraintError(err) || sqliteutil.IsConstraintError(err) {
return reply, pb.ErrPayer.Wrap(auth.ErrSerial.Wrap(err))
}
reply.Status = pb.AgreementsSummary_FAIL
return reply, pb.ErrPayer.Wrap(err)
}
reply.Status = pb.AgreementsSummary_OK
s.log.Debug("Stored Agreement...")
return reply, nil
}
// Settlement receives and handles agreements.
func (s *Server) Settlement(client pb.Bandwidth_SettlementServer) (err error) {
ctx := client.Context()
defer mon.Task()(&ctx)(&err)
peer, err := identity.PeerIdentityFromContext(ctx)
if err != nil {
return status.Error(codes.Unauthenticated, err.Error())
}
formatError := func(err error) error {
if err == io.EOF {
return nil
}
return status.Error(codes.Unknown, err.Error())
}
s.log.Debug("Settlement", zap.Any("storage node ID", peer.ID))
for {
request, err := client.Recv()
if err != nil {
return formatError(err)
}
if request == nil || request.Allocation == nil {
return status.Error(codes.InvalidArgument, "allocation missing")
}
allocation := request.Allocation
payerAllocation := allocation.PayerAllocation
if allocation.StorageNodeId != peer.ID {
return status.Error(codes.Unauthenticated, "only specified storage node can settle allocation")
}
allocationExpiration := time.Unix(payerAllocation.GetExpirationUnixSec(), 0)
if allocationExpiration.Before(time.Now()) {
s.log.Debug("allocation expired", zap.String("serial", payerAllocation.SerialNumber), zap.Error(err))
err := client.Send(&pb.BandwidthSettlementResponse{
SerialNumber: payerAllocation.SerialNumber,
Status: pb.AgreementsSummary_REJECTED,
})
if err != nil {
return formatError(err)
}
}
if err = s.verifySignature(ctx, allocation); err != nil {
s.log.Debug("signature verification failed", zap.String("serial", payerAllocation.SerialNumber), zap.Error(err))
err := client.Send(&pb.BandwidthSettlementResponse{
SerialNumber: payerAllocation.SerialNumber,
Status: pb.AgreementsSummary_REJECTED,
})
if err != nil {
return formatError(err)
}
}
if err = s.bwdb.SaveOrder(ctx, allocation); err != nil {
s.log.Debug("saving order failed", zap.String("serial", payerAllocation.SerialNumber), zap.Error(err))
duplicateRequest := pgutil.IsConstraintError(err) || sqliteutil.IsConstraintError(err)
if duplicateRequest {
err := client.Send(&pb.BandwidthSettlementResponse{
SerialNumber: payerAllocation.SerialNumber,
Status: pb.AgreementsSummary_REJECTED,
})
if err != nil {
return formatError(err)
}
}
}
err = client.Send(&pb.BandwidthSettlementResponse{
SerialNumber: payerAllocation.SerialNumber,
Status: pb.AgreementsSummary_OK,
})
if err != nil {
return formatError(err)
}
}
}
func (s *Server) verifySignature(ctx context.Context, rba *pb.Order) (err error) {
defer mon.Task()(&ctx)(&err)
pba := rba.GetPayerAllocation()
// Get renter's public key from uplink agreement db
uplinkInfo, err := s.certdb.GetPublicKey(ctx, pba.UplinkId)
if err != nil {
return pb.ErrRenter.Wrap(auth.ErrVerify.New("Failed to unmarshal OrderLimit: %+v", err))
}
// verify Renter's (uplink) signature
rbad := *rba
rbad.SetSignature(nil)
rbad.SetCerts(nil)
rbadBytes, err := proto.Marshal(&rbad)
if err != nil {
return Error.New("marshalling error: %+v", err)
}
if err := pkcrypto.HashAndVerifySignature(uplinkInfo, rbadBytes, rba.GetSignature()); err != nil {
return pb.ErrRenter.Wrap(auth.ErrVerify.Wrap(err))
}
// verify Payer's (satellite) signature
pbad := pba
pbad.SetSignature(nil)
pbad.SetCerts(nil)
pbadBytes, err := proto.Marshal(&pbad)
if err != nil {
return Error.New("marshalling error: %+v", err)
}
if err := pkcrypto.HashAndVerifySignature(s.pkey, pbadBytes, pba.GetSignature()); err != nil {
return pb.ErrPayer.Wrap(auth.ErrVerify.Wrap(err))
}
return nil
}

View File

@ -1,345 +0,0 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package bwagreement_test
import (
"context"
"crypto"
"crypto/tls"
"crypto/x509"
"net"
"testing"
"time"
"github.com/gogo/protobuf/proto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/peer"
"storj.io/storj/internal/testcontext"
"storj.io/storj/internal/testidentity"
"storj.io/storj/pkg/auth"
"storj.io/storj/pkg/bwagreement"
"storj.io/storj/pkg/bwagreement/testbwagreement"
"storj.io/storj/pkg/identity"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/pkcrypto"
"storj.io/storj/pkg/storj"
"storj.io/storj/satellite"
"storj.io/storj/satellite/satellitedb/satellitedbtest"
)
func TestBandwidthAgreement(t *testing.T) {
satellitedbtest.Run(t, func(t *testing.T, db satellite.DB) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
testDatabase(ctx, t, db)
})
}
func getPeerContext(ctx context.Context, t *testing.T) (context.Context, storj.NodeID) {
ident, err := testidentity.NewTestIdentity(ctx)
if !assert.NoError(t, err) || !assert.NotNil(t, ident) {
t.Fatal(err)
}
grpcPeer := &peer.Peer{
Addr: &net.TCPAddr{IP: net.ParseIP("1.2.3.4"), Port: 5},
AuthInfo: credentials.TLSInfo{
State: tls.ConnectionState{
PeerCertificates: []*x509.Certificate{ident.Leaf, ident.CA},
},
},
}
nodeID, err := identity.NodeIDFromCert(ident.CA)
assert.NoError(t, err)
return peer.NewContext(ctx, grpcPeer), nodeID
}
func testDatabase(ctx context.Context, t *testing.T, db satellite.DB) {
upID, err := testidentity.NewTestIdentity(ctx)
assert.NoError(t, err)
satID, err := testidentity.NewTestIdentity(ctx)
assert.NoError(t, err)
satellite := bwagreement.NewServer(db.BandwidthAgreement(), db.CertDB(), satID.Leaf.PublicKey, zap.NewNop(), satID.ID)
{ // TestSameSerialNumberBandwidthAgreements
pbaFile1, err := testbwagreement.GenerateOrderLimit(ctx, pb.BandwidthAction_GET, satID, upID, time.Hour)
assert.NoError(t, err)
err = db.CertDB().SavePublicKey(ctx, pbaFile1.UplinkId, upID.Leaf.PublicKey)
assert.NoError(t, err)
ctxSN1, storageNode1 := getPeerContext(ctx, t)
rbaNode1, err := testbwagreement.GenerateOrder(ctx, pbaFile1, storageNode1, upID, 666)
assert.NoError(t, err)
ctxSN2, storageNode2 := getPeerContext(ctx, t)
rbaNode2, err := testbwagreement.GenerateOrder(ctx, pbaFile1, storageNode2, upID, 666)
assert.NoError(t, err)
/* More than one storage node can submit bwagreements with the same serial number.
Uplink would like to download a file from 2 storage nodes.
Uplink requests a OrderLimit from the satellite. One serial number for all storage nodes.
Uplink signes 2 Order for both storage node. */
{
reply, err := satellite.BandwidthAgreements(ctxSN1, rbaNode1)
assert.NoError(t, err)
assert.Equal(t, pb.AgreementsSummary_OK, reply.Status)
reply, err = satellite.BandwidthAgreements(ctxSN2, rbaNode2)
assert.NoError(t, err)
assert.Equal(t, pb.AgreementsSummary_OK, reply.Status)
}
/* Storage node can submit a second bwagreement with a different sequence value.
Uplink downloads another file. New OrderLimit with a new sequence. */
{
pbaFile2, err := testbwagreement.GenerateOrderLimit(ctx, pb.BandwidthAction_GET, satID, upID, time.Hour)
assert.NoError(t, err)
err = db.CertDB().SavePublicKey(ctx, pbaFile2.UplinkId, upID.Leaf.PublicKey)
assert.NoError(t, err)
rbaNode1, err := testbwagreement.GenerateOrder(ctx, pbaFile2, storageNode1, upID, 666)
assert.NoError(t, err)
reply, err := satellite.BandwidthAgreements(ctxSN1, rbaNode1)
assert.NoError(t, err)
assert.Equal(t, pb.AgreementsSummary_OK, reply.Status)
}
/* Storage nodes can't submit a second bwagreement with the same sequence. */
{
rbaNode1, err := testbwagreement.GenerateOrder(ctx, pbaFile1, storageNode1, upID, 666)
assert.NoError(t, err)
reply, err := satellite.BandwidthAgreements(ctxSN1, rbaNode1)
assert.True(t, auth.ErrSerial.Has(err), err.Error())
assert.Equal(t, pb.AgreementsSummary_REJECTED, reply.Status)
}
/* Storage nodes can't submit the same bwagreement twice.
This test is kind of duplicate cause it will most likely trigger the same sequence error.
For safety we will try it anyway to make sure nothing strange will happen */
{
reply, err := satellite.BandwidthAgreements(ctxSN2, rbaNode2)
assert.True(t, auth.ErrSerial.Has(err), err.Error())
assert.Equal(t, pb.AgreementsSummary_REJECTED, reply.Status)
}
}
{ // TestExpiredBandwidthAgreements
{ // storage nodes can submit a bwagreement that will expire in 30 seconds
pba, err := testbwagreement.GenerateOrderLimit(ctx, pb.BandwidthAction_GET, satID, upID, 30*time.Second)
assert.NoError(t, err)
err = db.CertDB().SavePublicKey(ctx, pba.UplinkId, upID.Leaf.PublicKey)
assert.NoError(t, err)
ctxSN1, storageNode1 := getPeerContext(ctx, t)
rba, err := testbwagreement.GenerateOrder(ctx, pba, storageNode1, upID, 666)
assert.NoError(t, err)
reply, err := satellite.BandwidthAgreements(ctxSN1, rba)
assert.NoError(t, err)
assert.Equal(t, pb.AgreementsSummary_OK, reply.Status)
}
{ // storage nodes can't submit a bwagreement that expires right now
pba, err := testbwagreement.GenerateOrderLimit(ctx, pb.BandwidthAction_GET, satID, upID, 0*time.Second)
assert.NoError(t, err)
err = db.CertDB().SavePublicKey(ctx, pba.UplinkId, upID.Leaf.PublicKey)
assert.NoError(t, err)
ctxSN1, storageNode1 := getPeerContext(ctx, t)
rba, err := testbwagreement.GenerateOrder(ctx, pba, storageNode1, upID, 666)
assert.NoError(t, err)
reply, err := satellite.BandwidthAgreements(ctxSN1, rba)
assert.True(t, auth.ErrExpired.Has(err), err.Error())
assert.Equal(t, pb.AgreementsSummary_REJECTED, reply.Status)
}
{ // storage nodes can't submit a bwagreement that expires yesterday
pba, err := testbwagreement.GenerateOrderLimit(ctx, pb.BandwidthAction_GET, satID, upID, -23*time.Hour-55*time.Second)
assert.NoError(t, err)
err = db.CertDB().SavePublicKey(ctx, pba.UplinkId, upID.Leaf.PublicKey)
assert.NoError(t, err)
ctxSN1, storageNode1 := getPeerContext(ctx, t)
rba, err := testbwagreement.GenerateOrder(ctx, pba, storageNode1, upID, 666)
assert.NoError(t, err)
reply, err := satellite.BandwidthAgreements(ctxSN1, rba)
assert.True(t, auth.ErrExpired.Has(err), err.Error())
assert.Equal(t, pb.AgreementsSummary_REJECTED, reply.Status)
}
}
{ // TestManipulatedBandwidthAgreements
pba, err := testbwagreement.GenerateOrderLimit(ctx, pb.BandwidthAction_GET, satID, upID, time.Hour)
if !assert.NoError(t, err) {
t.Fatal(err)
}
err = db.CertDB().SavePublicKey(ctx, pba.UplinkId, upID.Leaf.PublicKey)
assert.NoError(t, err)
ctxSN1, storageNode1 := getPeerContext(ctx, t)
rba, err := testbwagreement.GenerateOrder(ctx, pba, storageNode1, upID, 666)
assert.NoError(t, err)
// Storage node manipulates the bwagreement
rba.Total = 1337
// Generate a new keypair for self signing bwagreements
manipID, err := testidentity.NewTestIdentity(ctx)
assert.NoError(t, err)
manipCerts := manipID.RawChain()
manipPrivKey := manipID.Key
/* Storage node can't manipulate the bwagreement size (or any other field)
Satellite will verify Renter's Signature. */
{
manipRBA := *rba
// Using uplink signature
reply, err := callBWA(ctxSN1, t, satellite, rba.GetSignature(), &manipRBA, rba.GetCerts())
assert.True(t, auth.ErrVerify.Has(err) && pb.ErrRenter.Has(err), err.Error())
assert.Equal(t, pb.AgreementsSummary_REJECTED, reply.Status)
}
/* Storage node can't sign the manipulated bwagreement
Satellite will verify Renter's Signature. */
{
manipRBA := *rba
manipSignature := GetSignature(t, &manipRBA, manipPrivKey)
assert.NoError(t, err)
// Using self created signature
reply, err := callBWA(ctxSN1, t, satellite, manipSignature, rba, rba.GetCerts())
assert.True(t, auth.ErrVerify.Has(err) && pb.ErrRenter.Has(err), err.Error())
assert.Equal(t, pb.AgreementsSummary_REJECTED, reply.Status)
}
/* Storage node can't replace uplink Certs
Satellite will check uplink Certs against uplinkeId. */
{
manipRBA := *rba
manipSignature := GetSignature(t, &manipRBA, manipPrivKey)
// Using self created signature + public key
reply, err := callBWA(ctxSN1, t, satellite, manipSignature, &manipRBA, manipCerts)
assert.True(t, pb.ErrRenter.Has(err), err.Error())
assert.Equal(t, pb.AgreementsSummary_REJECTED, reply.Status)
}
/* Storage node can't replace uplink NodeId
Satellite will verify the Payer's Signature. */
{
manipRBA := *rba
// Overwrite the uplinkId with our own keypair
manipRBA.PayerAllocation.UplinkId = manipID.ID
manipSignature := GetSignature(t, &manipRBA, manipPrivKey)
// Using self created signature + public key
reply, err := callBWA(ctxSN1, t, satellite, manipSignature, &manipRBA, manipCerts)
assert.True(t, auth.ErrVerify.Has(err) && pb.ErrRenter.Has(err), err.Error())
assert.Equal(t, pb.AgreementsSummary_REJECTED, reply.Status)
}
/* Storage node can't self sign the OrderLimit.
Satellite will verify the Payer's Signature. */
{
manipRBA := *rba
// Overwrite the uplinkId with our own keypair
manipRBA.PayerAllocation.UplinkId = manipID.ID
manipSignature := GetSignature(t, &manipRBA.PayerAllocation, manipPrivKey)
manipRBA.PayerAllocation.Signature = manipSignature
manipSignature = GetSignature(t, &manipRBA, manipPrivKey)
// Using self created Payer and Renter bwagreement signatures
reply, err := callBWA(ctxSN1, t, satellite, manipSignature, &manipRBA, manipCerts)
assert.True(t, auth.ErrVerify.Has(err) && pb.ErrRenter.Has(err), err.Error())
assert.Equal(t, pb.AgreementsSummary_REJECTED, reply.Status)
}
/* Storage node can't replace the satellite Certs.
Satellite will check satellite certs against satelliteId. */
{
manipRBA := *rba
// Overwrite the uplinkId with our own keypair
manipRBA.PayerAllocation.UplinkId = manipID.ID
manipSignature := GetSignature(t, &manipRBA.PayerAllocation, manipPrivKey)
manipRBA.PayerAllocation.Signature = manipSignature
manipRBA.PayerAllocation.Certs = manipCerts
manipSignature = GetSignature(t, &manipRBA, manipPrivKey)
// Using self created Payer and Renter bwagreement signatures
reply, err := callBWA(ctxSN1, t, satellite, manipSignature, &manipRBA, manipCerts)
assert.True(t, pb.ErrRenter.Has(err), err.Error())
assert.Equal(t, pb.AgreementsSummary_REJECTED, reply.Status)
}
/* Storage node can't replace the satellite.
Satellite will verify the Satellite Id. */
{
manipRBA := *rba
// Overwrite the uplinkId and satelliteID with our own keypair
manipRBA.PayerAllocation.UplinkId = manipID.ID
manipRBA.PayerAllocation.SatelliteId = manipID.ID
manipSignature := GetSignature(t, &manipRBA.PayerAllocation, manipPrivKey)
manipRBA.PayerAllocation.Signature = manipSignature
manipRBA.PayerAllocation.Certs = manipCerts
manipSignature = GetSignature(t, &manipRBA, manipPrivKey)
// Using self created Payer and Renter bwagreement signatures
reply, err := callBWA(ctxSN1, t, satellite, manipSignature, &manipRBA, manipCerts)
assert.True(t, pb.ErrPayer.Has(err), err.Error())
assert.Equal(t, pb.AgreementsSummary_REJECTED, reply.Status)
}
}
{ //TestInvalidBandwidthAgreements
ctxSN1, storageNode1 := getPeerContext(ctx, t)
ctxSN2, storageNode2 := getPeerContext(ctx, t)
pba, err := testbwagreement.GenerateOrderLimit(ctx, pb.BandwidthAction_GET, satID, upID, time.Hour)
assert.NoError(t, err)
err = db.CertDB().SavePublicKey(ctx, pba.UplinkId, upID.Leaf.PublicKey)
assert.NoError(t, err)
{ // Storage node sends an corrupted signuature to force a satellite crash
rba, err := testbwagreement.GenerateOrder(ctx, pba, storageNode1, upID, 666)
assert.NoError(t, err)
rba.Signature = []byte("invalid")
reply, err := satellite.BandwidthAgreements(ctxSN1, rba)
assert.Error(t, err)
assert.True(t, pb.ErrRenter.Has(err), err.Error())
assert.Equal(t, pb.AgreementsSummary_REJECTED, reply.Status)
}
{ // Storage node sends an corrupted uplink Certs to force a crash
rba, err := testbwagreement.GenerateOrder(ctx, pba, storageNode2, upID, 666)
assert.NoError(t, err)
rba.PayerAllocation.Certs = nil
reply, err := callBWA(ctxSN2, t, satellite, rba.GetSignature(), rba, rba.GetCerts())
assert.True(t, auth.ErrVerify.Has(err) && pb.ErrRenter.Has(err), err.Error())
assert.Equal(t, pb.AgreementsSummary_REJECTED, reply.Status)
}
}
}
func callBWA(ctx context.Context, t *testing.T, sat *bwagreement.Server, signature []byte, rba *pb.Order, certs [][]byte) (*pb.AgreementsSummary, error) {
rba.SetCerts(certs)
rba.SetSignature(signature)
return sat.BandwidthAgreements(ctx, rba)
}
//GetSignature returns the signature of the signed message
func GetSignature(t *testing.T, msg auth.SignableMessage, privKey crypto.PrivateKey) []byte {
require.NotNil(t, msg)
oldSignature := msg.GetSignature()
certs := msg.GetCerts()
msg.SetSignature(nil)
msg.SetCerts(nil)
msgBytes, err := proto.Marshal(msg)
require.NoError(t, err)
signature, err := pkcrypto.HashAndSign(privKey, msgBytes)
require.NoError(t, err)
msg.SetSignature(oldSignature)
msg.SetCerts(certs)
return signature
}

View File

@ -1,50 +0,0 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package testbwagreement
import (
"context"
"time"
"github.com/skyrings/skyring-common/tools/uuid"
monkit "gopkg.in/spacemonkeygo/monkit.v2"
"storj.io/storj/pkg/auth"
"storj.io/storj/pkg/identity"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/storj"
)
var mon = monkit.Package()
//GenerateOrderLimit creates a signed OrderLimit from a BandwidthAction
func GenerateOrderLimit(ctx context.Context, action pb.BandwidthAction, satID *identity.FullIdentity, upID *identity.FullIdentity, expiration time.Duration) (_ *pb.OrderLimit, err error) {
defer mon.Task()(&ctx)(&err)
serialNum, err := uuid.New()
if err != nil {
return nil, err
}
pba := &pb.OrderLimit{
SatelliteId: satID.ID,
UplinkId: upID.ID,
ExpirationUnixSec: time.Now().Add(expiration).Unix(),
SerialNumber: serialNum.String(),
Action: action,
CreatedUnixSec: time.Now().Unix(),
}
return pba, auth.SignMessage(ctx, pba, *satID)
}
//GenerateOrder creates a signed Order from a OrderLimit
func GenerateOrder(ctx context.Context, pba *pb.OrderLimit, storageNodeID storj.NodeID, upID *identity.FullIdentity, total int64) (_ *pb.Order, err error) {
defer mon.Task()(&ctx)(&err)
rba := &pb.Order{
PayerAllocation: *pba,
StorageNodeId: storageNodeID,
Total: total,
}
// Combine Signature and Data for Order
return rba, auth.SignMessage(ctx, rba, *upID)
}

View File

@ -1,344 +0,0 @@
// Code generated by protoc-gen-gogo. DO NOT EDIT.
// source: bandwidth.proto
package pb
import (
context "context"
fmt "fmt"
proto "github.com/gogo/protobuf/proto"
grpc "google.golang.org/grpc"
math "math"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package
type AgreementsSummary_Status int32
const (
AgreementsSummary_FAIL AgreementsSummary_Status = 0
AgreementsSummary_OK AgreementsSummary_Status = 1
AgreementsSummary_REJECTED AgreementsSummary_Status = 2
)
var AgreementsSummary_Status_name = map[int32]string{
0: "FAIL",
1: "OK",
2: "REJECTED",
}
var AgreementsSummary_Status_value = map[string]int32{
"FAIL": 0,
"OK": 1,
"REJECTED": 2,
}
func (x AgreementsSummary_Status) String() string {
return proto.EnumName(AgreementsSummary_Status_name, int32(x))
}
func (AgreementsSummary_Status) EnumDescriptor() ([]byte, []int) {
return fileDescriptor_ed768d1bfad2d961, []int{0, 0}
}
type AgreementsSummary struct {
Status AgreementsSummary_Status `protobuf:"varint,1,opt,name=status,proto3,enum=bandwidth.AgreementsSummary_Status" json:"status,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *AgreementsSummary) Reset() { *m = AgreementsSummary{} }
func (m *AgreementsSummary) String() string { return proto.CompactTextString(m) }
func (*AgreementsSummary) ProtoMessage() {}
func (*AgreementsSummary) Descriptor() ([]byte, []int) {
return fileDescriptor_ed768d1bfad2d961, []int{0}
}
func (m *AgreementsSummary) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_AgreementsSummary.Unmarshal(m, b)
}
func (m *AgreementsSummary) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_AgreementsSummary.Marshal(b, m, deterministic)
}
func (m *AgreementsSummary) XXX_Merge(src proto.Message) {
xxx_messageInfo_AgreementsSummary.Merge(m, src)
}
func (m *AgreementsSummary) XXX_Size() int {
return xxx_messageInfo_AgreementsSummary.Size(m)
}
func (m *AgreementsSummary) XXX_DiscardUnknown() {
xxx_messageInfo_AgreementsSummary.DiscardUnknown(m)
}
var xxx_messageInfo_AgreementsSummary proto.InternalMessageInfo
func (m *AgreementsSummary) GetStatus() AgreementsSummary_Status {
if m != nil {
return m.Status
}
return AgreementsSummary_FAIL
}
type BandwidthSettlementRequest struct {
Allocation *RenterBandwidthAllocation `protobuf:"bytes,1,opt,name=allocation,proto3" json:"allocation,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *BandwidthSettlementRequest) Reset() { *m = BandwidthSettlementRequest{} }
func (m *BandwidthSettlementRequest) String() string { return proto.CompactTextString(m) }
func (*BandwidthSettlementRequest) ProtoMessage() {}
func (*BandwidthSettlementRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_ed768d1bfad2d961, []int{1}
}
func (m *BandwidthSettlementRequest) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_BandwidthSettlementRequest.Unmarshal(m, b)
}
func (m *BandwidthSettlementRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_BandwidthSettlementRequest.Marshal(b, m, deterministic)
}
func (m *BandwidthSettlementRequest) XXX_Merge(src proto.Message) {
xxx_messageInfo_BandwidthSettlementRequest.Merge(m, src)
}
func (m *BandwidthSettlementRequest) XXX_Size() int {
return xxx_messageInfo_BandwidthSettlementRequest.Size(m)
}
func (m *BandwidthSettlementRequest) XXX_DiscardUnknown() {
xxx_messageInfo_BandwidthSettlementRequest.DiscardUnknown(m)
}
var xxx_messageInfo_BandwidthSettlementRequest proto.InternalMessageInfo
func (m *BandwidthSettlementRequest) GetAllocation() *RenterBandwidthAllocation {
if m != nil {
return m.Allocation
}
return nil
}
type BandwidthSettlementResponse struct {
SerialNumber string `protobuf:"bytes,1,opt,name=serial_number,json=serialNumber,proto3" json:"serial_number,omitempty"`
Status AgreementsSummary_Status `protobuf:"varint,2,opt,name=status,proto3,enum=bandwidth.AgreementsSummary_Status" json:"status,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *BandwidthSettlementResponse) Reset() { *m = BandwidthSettlementResponse{} }
func (m *BandwidthSettlementResponse) String() string { return proto.CompactTextString(m) }
func (*BandwidthSettlementResponse) ProtoMessage() {}
func (*BandwidthSettlementResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_ed768d1bfad2d961, []int{2}
}
func (m *BandwidthSettlementResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_BandwidthSettlementResponse.Unmarshal(m, b)
}
func (m *BandwidthSettlementResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_BandwidthSettlementResponse.Marshal(b, m, deterministic)
}
func (m *BandwidthSettlementResponse) XXX_Merge(src proto.Message) {
xxx_messageInfo_BandwidthSettlementResponse.Merge(m, src)
}
func (m *BandwidthSettlementResponse) XXX_Size() int {
return xxx_messageInfo_BandwidthSettlementResponse.Size(m)
}
func (m *BandwidthSettlementResponse) XXX_DiscardUnknown() {
xxx_messageInfo_BandwidthSettlementResponse.DiscardUnknown(m)
}
var xxx_messageInfo_BandwidthSettlementResponse proto.InternalMessageInfo
func (m *BandwidthSettlementResponse) GetSerialNumber() string {
if m != nil {
return m.SerialNumber
}
return ""
}
func (m *BandwidthSettlementResponse) GetStatus() AgreementsSummary_Status {
if m != nil {
return m.Status
}
return AgreementsSummary_FAIL
}
func init() {
proto.RegisterEnum("bandwidth.AgreementsSummary_Status", AgreementsSummary_Status_name, AgreementsSummary_Status_value)
proto.RegisterType((*AgreementsSummary)(nil), "bandwidth.AgreementsSummary")
proto.RegisterType((*BandwidthSettlementRequest)(nil), "bandwidth.BandwidthSettlementRequest")
proto.RegisterType((*BandwidthSettlementResponse)(nil), "bandwidth.BandwidthSettlementResponse")
}
func init() { proto.RegisterFile("bandwidth.proto", fileDescriptor_ed768d1bfad2d961) }
var fileDescriptor_ed768d1bfad2d961 = []byte{
// 311 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x52, 0x4f, 0x4b, 0xfb, 0x40,
0x14, 0x6c, 0x42, 0x09, 0xed, 0xfb, 0xf5, 0xa7, 0x71, 0xbd, 0x48, 0xf4, 0x20, 0x5b, 0x94, 0x82,
0x10, 0xa4, 0x1e, 0x3d, 0xb5, 0x5a, 0x41, 0x2b, 0x0a, 0x5b, 0x4f, 0x5e, 0x64, 0xd3, 0x3e, 0x34,
0x90, 0xec, 0xc6, 0xdd, 0x17, 0x44, 0x2f, 0x7e, 0x47, 0x3f, 0x91, 0xb0, 0xd1, 0xa4, 0xe0, 0x3f,
0x7a, 0x9d, 0x9d, 0x99, 0x37, 0x33, 0x2c, 0xac, 0x27, 0x52, 0x2d, 0x9e, 0xd2, 0x05, 0x3d, 0xc4,
0x85, 0xd1, 0xa4, 0x59, 0xb7, 0x06, 0xa2, 0xb0, 0x48, 0x71, 0x8e, 0x96, 0xb4, 0xc1, 0xea, 0x91,
0xbf, 0xc0, 0xc6, 0xe8, 0xde, 0x20, 0xe6, 0xa8, 0xc8, 0xce, 0xca, 0x3c, 0x97, 0xe6, 0x99, 0x1d,
0x43, 0x60, 0x49, 0x52, 0x69, 0xb7, 0xbc, 0x5d, 0x6f, 0xb0, 0x36, 0xec, 0xc7, 0x8d, 0xe7, 0x17,
0x76, 0x3c, 0x73, 0x54, 0xf1, 0x21, 0xe1, 0x03, 0x08, 0x2a, 0x84, 0x75, 0xa0, 0x7d, 0x36, 0x3a,
0xbf, 0x0c, 0x5b, 0x2c, 0x00, 0xff, 0x7a, 0x1a, 0x7a, 0xac, 0x07, 0x1d, 0x31, 0xb9, 0x98, 0x9c,
0xdc, 0x4c, 0x4e, 0x43, 0x9f, 0xa7, 0x10, 0x8d, 0x3f, 0x7d, 0x67, 0x48, 0x94, 0x39, 0x5f, 0x81,
0x8f, 0x25, 0x5a, 0x62, 0x53, 0x00, 0x99, 0x65, 0x7a, 0x2e, 0x29, 0xd5, 0xca, 0x05, 0xf9, 0x37,
0x3c, 0x88, 0x9b, 0x02, 0x46, 0x97, 0x84, 0x36, 0x16, 0xa8, 0x08, 0x4d, 0xed, 0x33, 0xaa, 0x25,
0x62, 0x49, 0xce, 0x5f, 0x61, 0xfb, 0xdb, 0x53, 0xb6, 0xd0, 0xca, 0x22, 0xeb, 0xc3, 0x7f, 0x8b,
0x26, 0x95, 0xd9, 0x9d, 0x2a, 0xf3, 0x04, 0x8d, 0x3b, 0xd7, 0x15, 0xbd, 0x0a, 0xbc, 0x72, 0xd8,
0xd2, 0x2a, 0xfe, 0xca, 0xab, 0x0c, 0xdf, 0x3c, 0xe8, 0xd6, 0x09, 0x58, 0x02, 0x9b, 0x4d, 0xe2,
0x5a, 0xca, 0x56, 0xa9, 0x17, 0xed, 0xfc, 0x76, 0x9e, 0xb7, 0x98, 0x04, 0x68, 0x9a, 0xb2, 0xbd,
0x25, 0xf6, 0xcf, 0xa3, 0x47, 0xfb, 0x7f, 0xd1, 0xaa, 0xc1, 0x78, 0x6b, 0xe0, 0x1d, 0x7a, 0xe3,
0xf6, 0xad, 0x5f, 0x24, 0x49, 0xe0, 0x7e, 0xd2, 0xd1, 0x7b, 0x00, 0x00, 0x00, 0xff, 0xff, 0x43,
0x1b, 0x7d, 0x0f, 0x79, 0x02, 0x00, 0x00,
}
// Reference imports to suppress errors if they are not otherwise used.
var _ context.Context
var _ grpc.ClientConn
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
const _ = grpc.SupportPackageIsVersion4
// BandwidthClient is the client API for Bandwidth service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
type BandwidthClient interface {
BandwidthAgreements(ctx context.Context, in *RenterBandwidthAllocation, opts ...grpc.CallOption) (*AgreementsSummary, error)
Settlement(ctx context.Context, opts ...grpc.CallOption) (Bandwidth_SettlementClient, error)
}
type bandwidthClient struct {
cc *grpc.ClientConn
}
func NewBandwidthClient(cc *grpc.ClientConn) BandwidthClient {
return &bandwidthClient{cc}
}
func (c *bandwidthClient) BandwidthAgreements(ctx context.Context, in *RenterBandwidthAllocation, opts ...grpc.CallOption) (*AgreementsSummary, error) {
out := new(AgreementsSummary)
err := c.cc.Invoke(ctx, "/bandwidth.Bandwidth/BandwidthAgreements", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *bandwidthClient) Settlement(ctx context.Context, opts ...grpc.CallOption) (Bandwidth_SettlementClient, error) {
stream, err := c.cc.NewStream(ctx, &_Bandwidth_serviceDesc.Streams[0], "/bandwidth.Bandwidth/Settlement", opts...)
if err != nil {
return nil, err
}
x := &bandwidthSettlementClient{stream}
return x, nil
}
type Bandwidth_SettlementClient interface {
Send(*BandwidthSettlementRequest) error
Recv() (*BandwidthSettlementResponse, error)
grpc.ClientStream
}
type bandwidthSettlementClient struct {
grpc.ClientStream
}
func (x *bandwidthSettlementClient) Send(m *BandwidthSettlementRequest) error {
return x.ClientStream.SendMsg(m)
}
func (x *bandwidthSettlementClient) Recv() (*BandwidthSettlementResponse, error) {
m := new(BandwidthSettlementResponse)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
// BandwidthServer is the server API for Bandwidth service.
type BandwidthServer interface {
BandwidthAgreements(context.Context, *RenterBandwidthAllocation) (*AgreementsSummary, error)
Settlement(Bandwidth_SettlementServer) error
}
func RegisterBandwidthServer(s *grpc.Server, srv BandwidthServer) {
s.RegisterService(&_Bandwidth_serviceDesc, srv)
}
func _Bandwidth_BandwidthAgreements_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(RenterBandwidthAllocation)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(BandwidthServer).BandwidthAgreements(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/bandwidth.Bandwidth/BandwidthAgreements",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(BandwidthServer).BandwidthAgreements(ctx, req.(*RenterBandwidthAllocation))
}
return interceptor(ctx, in, info, handler)
}
func _Bandwidth_Settlement_Handler(srv interface{}, stream grpc.ServerStream) error {
return srv.(BandwidthServer).Settlement(&bandwidthSettlementServer{stream})
}
type Bandwidth_SettlementServer interface {
Send(*BandwidthSettlementResponse) error
Recv() (*BandwidthSettlementRequest, error)
grpc.ServerStream
}
type bandwidthSettlementServer struct {
grpc.ServerStream
}
func (x *bandwidthSettlementServer) Send(m *BandwidthSettlementResponse) error {
return x.ServerStream.SendMsg(m)
}
func (x *bandwidthSettlementServer) Recv() (*BandwidthSettlementRequest, error) {
m := new(BandwidthSettlementRequest)
if err := x.ServerStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
var _Bandwidth_serviceDesc = grpc.ServiceDesc{
ServiceName: "bandwidth.Bandwidth",
HandlerType: (*BandwidthServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "BandwidthAgreements",
Handler: _Bandwidth_BandwidthAgreements_Handler,
},
},
Streams: []grpc.StreamDesc{
{
StreamName: "Settlement",
Handler: _Bandwidth_Settlement_Handler,
ServerStreams: true,
ClientStreams: true,
},
},
Metadata: "bandwidth.proto",
}

View File

@ -1,33 +0,0 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
syntax = "proto3";
option go_package = "pb";
package bandwidth;
import "piecestore.proto";
service Bandwidth {
rpc BandwidthAgreements(piecestoreroutes.RenterBandwidthAllocation) returns (AgreementsSummary) {}
rpc Settlement(stream BandwidthSettlementRequest) returns (stream BandwidthSettlementResponse) {}
}
message AgreementsSummary {
enum Status {
FAIL = 0;
OK = 1;
REJECTED = 2;
}
Status status = 1;
}
message BandwidthSettlementRequest {
piecestoreroutes.RenterBandwidthAllocation allocation = 1;
}
message BandwidthSettlementResponse {
string serial_number = 1;
AgreementsSummary.Status status = 2;
}

View File

@ -93,99 +93,6 @@
}
}
},
{
"protopath": "pkg:/:pb:/:bandwidth.proto",
"def": {
"enums": [
{
"name": "AgreementsSummary.Status",
"enum_fields": [
{
"name": "FAIL"
},
{
"name": "OK",
"integer": 1
},
{
"name": "REJECTED",
"integer": 2
}
]
}
],
"messages": [
{
"name": "AgreementsSummary",
"fields": [
{
"id": 1,
"name": "status",
"type": "Status"
}
]
},
{
"name": "BandwidthSettlementRequest",
"fields": [
{
"id": 1,
"name": "allocation",
"type": "piecestoreroutes.RenterBandwidthAllocation"
}
]
},
{
"name": "BandwidthSettlementResponse",
"fields": [
{
"id": 1,
"name": "serial_number",
"type": "string"
},
{
"id": 2,
"name": "status",
"type": "AgreementsSummary.Status"
}
]
}
],
"services": [
{
"name": "Bandwidth",
"rpcs": [
{
"name": "BandwidthAgreements",
"in_type": "piecestoreroutes.RenterBandwidthAllocation",
"out_type": "AgreementsSummary"
},
{
"name": "Settlement",
"in_type": "BandwidthSettlementRequest",
"out_type": "BandwidthSettlementResponse",
"in_streamed": true,
"out_streamed": true
}
]
}
],
"imports": [
{
"path": "piecestore.proto"
}
],
"package": {
"name": "bandwidth"
},
"options": [
{
"name": "go_package",
"value": "pb"
}
]
}
},
{
"protopath": "pkg:/:pb:/:certificate.proto",
"def": {

View File

@ -29,7 +29,6 @@ import (
"storj.io/storj/pkg/audit"
"storj.io/storj/pkg/auth/grpcauth"
"storj.io/storj/pkg/auth/signing"
"storj.io/storj/pkg/bwagreement"
"storj.io/storj/pkg/certdb"
"storj.io/storj/pkg/datarepair/checker"
"storj.io/storj/pkg/datarepair/irreparable"
@ -75,8 +74,6 @@ type DB interface {
// DropSchema drops the schema
DropSchema(schema string) error
// BandwidthAgreement returns database for storing bandwidth agreements
BandwidthAgreement() bwagreement.DB
// CertDB returns database for storing uplink's public key & ID
CertDB() certdb.DB
// OverlayCache returns database for caching overlay information
@ -111,7 +108,6 @@ type Config struct {
Discovery discovery.Config
Metainfo metainfo.Config
BwAgreement bwagreement.Config // TODO: decide whether to keep empty configs for consistency
Checker checker.Config
Repairer repairer.Config
@ -171,10 +167,6 @@ type Peer struct {
Endpoint *inspector.Endpoint
}
Agreements struct {
Endpoint *bwagreement.Server
}
Orders struct {
Endpoint *orders.Endpoint
Service *orders.Service
@ -415,13 +407,6 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, config *Config, ve
pb.RegisterMetainfoServer(peer.Server.GRPC(), peer.Metainfo.Endpoint2)
}
{ // setup agreements
log.Debug("Setting up agreements")
bwServer := bwagreement.NewServer(peer.DB.BandwidthAgreement(), peer.DB.CertDB(), peer.Identity.Leaf.PublicKey, peer.Log.Named("agreements"), peer.Identity.ID)
peer.Agreements.Endpoint = bwServer
pb.RegisterBandwidthServer(peer.Server.GRPC(), peer.Agreements.Endpoint)
}
{ // setup datarepair
log.Debug("Setting up datarepair")
// TODO: simplify argument list somehow
@ -673,10 +658,6 @@ func (peer *Peer) Close() error {
errlist.Add(peer.Repair.Checker.Close())
}
if peer.Agreements.Endpoint != nil {
errlist.Add(peer.Agreements.Endpoint.Close())
}
if peer.Metainfo.Database != nil {
errlist.Add(peer.Metainfo.Database.Close())
}

View File

@ -1,124 +0,0 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package satellitedb
import (
"context"
"fmt"
"time"
"github.com/zeebo/errs"
"storj.io/storj/pkg/bwagreement"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/storj"
dbx "storj.io/storj/satellite/satellitedb/dbx"
)
type bandwidthagreement struct {
db *dbx.DB
}
func (b *bandwidthagreement) SaveOrder(ctx context.Context, rba *pb.Order) (err error) {
defer mon.Task()(&ctx)(&err)
var saveOrderSQL = `INSERT INTO bwagreements ( serialnum, storage_node_id, uplink_id, action, total, created_at, expires_at ) VALUES ( ?, ?, ?, ?, ?, ?, ? )`
_, err = b.db.DB.ExecContext(ctx, b.db.Rebind(saveOrderSQL),
rba.PayerAllocation.SerialNumber+rba.StorageNodeId.String(),
rba.StorageNodeId,
rba.PayerAllocation.UplinkId,
int64(rba.PayerAllocation.Action),
rba.Total,
time.Now().UTC(),
time.Unix(rba.PayerAllocation.ExpirationUnixSec, 0),
)
return err
}
//GetTotals returns stats about an uplink
func (b *bandwidthagreement) GetUplinkStats(ctx context.Context, from, to time.Time) (stats []bwagreement.UplinkStat, err error) {
defer mon.Task()(&ctx)(&err)
var uplinkSQL = fmt.Sprintf(`SELECT uplink_id, SUM(total),
COUNT(CASE WHEN action = %d THEN total ELSE null END),
COUNT(CASE WHEN action = %d THEN total ELSE null END), COUNT(*)
FROM bwagreements WHERE created_at > ?
AND created_at <= ? GROUP BY uplink_id ORDER BY uplink_id`,
pb.BandwidthAction_PUT, pb.BandwidthAction_GET)
rows, err := b.db.DB.QueryContext(ctx, b.db.Rebind(uplinkSQL), from.UTC(), to.UTC())
if err != nil {
return nil, err
}
defer func() { err = errs.Combine(err, rows.Close()) }()
for rows.Next() {
stat := bwagreement.UplinkStat{}
err := rows.Scan(&stat.NodeID, &stat.TotalBytes, &stat.PutActionCount, &stat.GetActionCount, &stat.TotalTransactions)
if err != nil {
return stats, err
}
stats = append(stats, stat)
}
return stats, nil
}
//GetTotals returns the sum of each bandwidth type after (exluding) a given date range
func (b *bandwidthagreement) GetTotals(ctx context.Context, from, to time.Time) (bwa map[storj.NodeID][]int64, err error) {
defer mon.Task()(&ctx)(&err)
var getTotalsSQL = fmt.Sprintf(`SELECT storage_node_id,
SUM(CASE WHEN action = %d THEN total ELSE 0 END),
SUM(CASE WHEN action = %d THEN total ELSE 0 END),
SUM(CASE WHEN action = %d THEN total ELSE 0 END),
SUM(CASE WHEN action = %d THEN total ELSE 0 END),
SUM(CASE WHEN action = %d THEN total ELSE 0 END)
FROM bwagreements WHERE created_at > ? AND created_at <= ?
GROUP BY storage_node_id ORDER BY storage_node_id`, pb.BandwidthAction_PUT,
pb.BandwidthAction_GET, pb.BandwidthAction_GET_AUDIT,
pb.BandwidthAction_GET_REPAIR, pb.BandwidthAction_PUT_REPAIR)
rows, err := b.db.DB.QueryContext(ctx, b.db.Rebind(getTotalsSQL), from.UTC(), to.UTC())
if err != nil {
return nil, err
}
defer func() { err = errs.Combine(err, rows.Close()) }()
totals := make(map[storj.NodeID][]int64)
for i := 0; rows.Next(); i++ {
var nodeID storj.NodeID
data := make([]int64, len(pb.BandwidthAction_value))
err := rows.Scan(&nodeID, &data[pb.BandwidthAction_PUT], &data[pb.BandwidthAction_GET],
&data[pb.BandwidthAction_GET_AUDIT], &data[pb.BandwidthAction_GET_REPAIR], &data[pb.BandwidthAction_PUT_REPAIR])
if err != nil {
return totals, err
}
totals[nodeID] = data
}
return totals, nil
}
//GetExpired gets orders that are expired and were created before some time
func (b *bandwidthagreement) GetExpired(ctx context.Context, before time.Time, expiredAt time.Time) (orders []bwagreement.SavedOrder, err error) {
defer mon.Task()(&ctx)(&err)
var getExpiredSQL = `SELECT serialnum, storage_node_id, uplink_id, action, total, created_at, expires_at
FROM bwagreements WHERE created_at < ? AND expires_at < ?`
rows, err := b.db.DB.QueryContext(ctx, b.db.Rebind(getExpiredSQL), before, expiredAt)
if err != nil {
return nil, err
}
defer func() { err = errs.Combine(err, rows.Close()) }()
for i := 0; rows.Next(); i++ {
o := bwagreement.SavedOrder{}
err = rows.Scan(&o.Serialnum, &o.StorageNodeID, &o.UplinkID, &o.Action, &o.Total, &o.CreatedAt, &o.ExpiresAt)
if err != nil {
break
}
orders = append(orders, o)
}
return orders, err
}
//DeleteExpired deletes orders that are expired and were created before some time
func (b *bandwidthagreement) DeleteExpired(ctx context.Context, before time.Time, expiredAt time.Time) (err error) {
defer mon.Task()(&ctx)(&err)
var deleteExpiredSQL = `DELETE FROM bwagreements WHERE created_at < ? AND expires_at < ?`
_, err = b.db.DB.ExecContext(ctx, b.db.Rebind(deleteExpiredSQL), before, expiredAt)
return err
}

View File

@ -11,7 +11,6 @@ import (
"storj.io/storj/internal/dbutil/pgutil"
"storj.io/storj/pkg/accounting"
"storj.io/storj/pkg/audit"
"storj.io/storj/pkg/bwagreement"
"storj.io/storj/pkg/certdb"
"storj.io/storj/pkg/datarepair/irreparable"
"storj.io/storj/pkg/datarepair/queue"
@ -93,11 +92,6 @@ func (db *DB) DropSchema(schema string) error {
return nil
}
// BandwidthAgreement is a getter for bandwidth agreement repository
func (db *DB) BandwidthAgreement() bwagreement.DB {
return &bandwidthagreement{db: db.db}
}
// CertDB is a getter for uplink's specific info like public key, id, etc...
func (db *DB) CertDB() certdb.DB {
return &certDB{db: db.db}

View File

@ -29,21 +29,6 @@ read one (
where pending_audits.node_id = ?
)
//--- bwagreement ---//
model bwagreement (
key serialnum
field serialnum text
field storage_node_id blob
field uplink_id blob
field action int64
field total int64
field created_at timestamp ( autoinsert )
field expires_at timestamp
)
//--- irreparableDB ---//
model irreparabledb (

View File

@ -327,16 +327,6 @@ CREATE TABLE bucket_usages (
audit_egress bigint NOT NULL,
PRIMARY KEY ( id )
);
CREATE TABLE bwagreements (
serialnum text NOT NULL,
storage_node_id bytea NOT NULL,
uplink_id bytea NOT NULL,
action bigint NOT NULL,
total bigint NOT NULL,
created_at timestamp with time zone NOT NULL,
expires_at timestamp with time zone NOT NULL,
PRIMARY KEY ( serialnum )
);
CREATE TABLE certRecords (
publickey bytea NOT NULL,
id bytea NOT NULL,
@ -621,16 +611,6 @@ CREATE TABLE bucket_usages (
audit_egress INTEGER NOT NULL,
PRIMARY KEY ( id )
);
CREATE TABLE bwagreements (
serialnum TEXT NOT NULL,
storage_node_id BLOB NOT NULL,
uplink_id BLOB NOT NULL,
action INTEGER NOT NULL,
total INTEGER NOT NULL,
created_at TIMESTAMP NOT NULL,
expires_at TIMESTAMP NOT NULL,
PRIMARY KEY ( serialnum )
);
CREATE TABLE certRecords (
publickey BLOB NOT NULL,
id BLOB NOT NULL,
@ -1705,154 +1685,6 @@ func (f BucketUsage_AuditEgress_Field) value() interface{} {
func (BucketUsage_AuditEgress_Field) _Column() string { return "audit_egress" }
type Bwagreement struct {
Serialnum string
StorageNodeId []byte
UplinkId []byte
Action int64
Total int64
CreatedAt time.Time
ExpiresAt time.Time
}
func (Bwagreement) _Table() string { return "bwagreements" }
type Bwagreement_Update_Fields struct {
}
type Bwagreement_Serialnum_Field struct {
_set bool
_null bool
_value string
}
func Bwagreement_Serialnum(v string) Bwagreement_Serialnum_Field {
return Bwagreement_Serialnum_Field{_set: true, _value: v}
}
func (f Bwagreement_Serialnum_Field) value() interface{} {
if !f._set || f._null {
return nil
}
return f._value
}
func (Bwagreement_Serialnum_Field) _Column() string { return "serialnum" }
type Bwagreement_StorageNodeId_Field struct {
_set bool
_null bool
_value []byte
}
func Bwagreement_StorageNodeId(v []byte) Bwagreement_StorageNodeId_Field {
return Bwagreement_StorageNodeId_Field{_set: true, _value: v}
}
func (f Bwagreement_StorageNodeId_Field) value() interface{} {
if !f._set || f._null {
return nil
}
return f._value
}
func (Bwagreement_StorageNodeId_Field) _Column() string { return "storage_node_id" }
type Bwagreement_UplinkId_Field struct {
_set bool
_null bool
_value []byte
}
func Bwagreement_UplinkId(v []byte) Bwagreement_UplinkId_Field {
return Bwagreement_UplinkId_Field{_set: true, _value: v}
}
func (f Bwagreement_UplinkId_Field) value() interface{} {
if !f._set || f._null {
return nil
}
return f._value
}
func (Bwagreement_UplinkId_Field) _Column() string { return "uplink_id" }
type Bwagreement_Action_Field struct {
_set bool
_null bool
_value int64
}
func Bwagreement_Action(v int64) Bwagreement_Action_Field {
return Bwagreement_Action_Field{_set: true, _value: v}
}
func (f Bwagreement_Action_Field) value() interface{} {
if !f._set || f._null {
return nil
}
return f._value
}
func (Bwagreement_Action_Field) _Column() string { return "action" }
type Bwagreement_Total_Field struct {
_set bool
_null bool
_value int64
}
func Bwagreement_Total(v int64) Bwagreement_Total_Field {
return Bwagreement_Total_Field{_set: true, _value: v}
}
func (f Bwagreement_Total_Field) value() interface{} {
if !f._set || f._null {
return nil
}
return f._value
}
func (Bwagreement_Total_Field) _Column() string { return "total" }
type Bwagreement_CreatedAt_Field struct {
_set bool
_null bool
_value time.Time
}
func Bwagreement_CreatedAt(v time.Time) Bwagreement_CreatedAt_Field {
return Bwagreement_CreatedAt_Field{_set: true, _value: v}
}
func (f Bwagreement_CreatedAt_Field) value() interface{} {
if !f._set || f._null {
return nil
}
return f._value
}
func (Bwagreement_CreatedAt_Field) _Column() string { return "created_at" }
type Bwagreement_ExpiresAt_Field struct {
_set bool
_null bool
_value time.Time
}
func Bwagreement_ExpiresAt(v time.Time) Bwagreement_ExpiresAt_Field {
return Bwagreement_ExpiresAt_Field{_set: true, _value: v}
}
func (f Bwagreement_ExpiresAt_Field) value() interface{} {
if !f._set || f._null {
return nil
}
return f._value
}
func (Bwagreement_ExpiresAt_Field) _Column() string { return "expires_at" }
type CertRecord struct {
Publickey []byte
Id []byte
@ -7184,16 +7016,6 @@ func (obj *postgresImpl) deleteAll(ctx context.Context) (count int64, err error)
return 0, obj.makeErr(err)
}
__count, err = __res.RowsAffected()
if err != nil {
return 0, obj.makeErr(err)
}
count += __count
__res, err = obj.driver.Exec("DELETE FROM bwagreements;")
if err != nil {
return 0, obj.makeErr(err)
}
__count, err = __res.RowsAffected()
if err != nil {
return 0, obj.makeErr(err)
@ -10501,16 +10323,6 @@ func (obj *sqlite3Impl) deleteAll(ctx context.Context) (count int64, err error)
return 0, obj.makeErr(err)
}
__count, err = __res.RowsAffected()
if err != nil {
return 0, obj.makeErr(err)
}
count += __count
__res, err = obj.driver.Exec("DELETE FROM bwagreements;")
if err != nil {
return 0, obj.makeErr(err)
}
__count, err = __res.RowsAffected()
if err != nil {
return 0, obj.makeErr(err)

View File

@ -55,16 +55,6 @@ CREATE TABLE bucket_usages (
audit_egress bigint NOT NULL,
PRIMARY KEY ( id )
);
CREATE TABLE bwagreements (
serialnum text NOT NULL,
storage_node_id bytea NOT NULL,
uplink_id bytea NOT NULL,
action bigint NOT NULL,
total bigint NOT NULL,
created_at timestamp with time zone NOT NULL,
expires_at timestamp with time zone NOT NULL,
PRIMARY KEY ( serialnum )
);
CREATE TABLE certRecords (
publickey bytea NOT NULL,
id bytea NOT NULL,

View File

@ -55,16 +55,6 @@ CREATE TABLE bucket_usages (
audit_egress INTEGER NOT NULL,
PRIMARY KEY ( id )
);
CREATE TABLE bwagreements (
serialnum TEXT NOT NULL,
storage_node_id BLOB NOT NULL,
uplink_id BLOB NOT NULL,
action INTEGER NOT NULL,
total INTEGER NOT NULL,
created_at TIMESTAMP NOT NULL,
expires_at TIMESTAMP NOT NULL,
PRIMARY KEY ( serialnum )
);
CREATE TABLE certRecords (
publickey BLOB NOT NULL,
id BLOB NOT NULL,

View File

@ -16,7 +16,6 @@ import (
"storj.io/storj/internal/memory"
"storj.io/storj/pkg/accounting"
"storj.io/storj/pkg/audit"
"storj.io/storj/pkg/bwagreement"
"storj.io/storj/pkg/certdb"
"storj.io/storj/pkg/datarepair/irreparable"
"storj.io/storj/pkg/datarepair/queue"
@ -40,54 +39,6 @@ func newLocked(db satellite.DB) satellite.DB {
return &locked{&sync.Mutex{}, db}
}
// BandwidthAgreement returns database for storing bandwidth agreements
func (m *locked) BandwidthAgreement() bwagreement.DB {
m.Lock()
defer m.Unlock()
return &lockedBandwidthAgreement{m.Locker, m.db.BandwidthAgreement()}
}
// lockedBandwidthAgreement implements locking wrapper for bwagreement.DB
type lockedBandwidthAgreement struct {
sync.Locker
db bwagreement.DB
}
// DeleteExpired deletes orders that are expired and were created before some time
func (m *lockedBandwidthAgreement) DeleteExpired(ctx context.Context, a1 time.Time, a2 time.Time) error {
m.Lock()
defer m.Unlock()
return m.db.DeleteExpired(ctx, a1, a2)
}
// GetExpired gets orders that are expired and were created before some time
func (m *lockedBandwidthAgreement) GetExpired(ctx context.Context, a1 time.Time, a2 time.Time) ([]bwagreement.SavedOrder, error) {
m.Lock()
defer m.Unlock()
return m.db.GetExpired(ctx, a1, a2)
}
// GetTotalsSince returns the sum of each bandwidth type after (exluding) a given date range
func (m *lockedBandwidthAgreement) GetTotals(ctx context.Context, a1 time.Time, a2 time.Time) (map[storj.NodeID][]int64, error) {
m.Lock()
defer m.Unlock()
return m.db.GetTotals(ctx, a1, a2)
}
// GetTotals returns stats about an uplink
func (m *lockedBandwidthAgreement) GetUplinkStats(ctx context.Context, a1 time.Time, a2 time.Time) ([]bwagreement.UplinkStat, error) {
m.Lock()
defer m.Unlock()
return m.db.GetUplinkStats(ctx, a1, a2)
}
// SaveOrder saves an order for accounting
func (m *lockedBandwidthAgreement) SaveOrder(ctx context.Context, a1 *pb.RenterBandwidthAllocation) error {
m.Lock()
defer m.Unlock()
return m.db.SaveOrder(ctx, a1)
}
// CertDB returns database for storing uplink's public key & ID
func (m *locked) CertDB() certdb.DB {
m.Lock()

View File

@ -748,6 +748,13 @@ func (db *DB) PostgresMigration() *migrate.Migration {
)`,
},
},
{
Description: "Remove agreements table",
Version: 28,
Action: migrate.SQL{
`DROP TABLE bwagreements`,
},
},
},
}
}

View File

@ -0,0 +1,273 @@
CREATE TABLE accounting_rollups (
id bigserial NOT NULL,
node_id bytea NOT NULL,
start_time timestamp with time zone NOT NULL,
put_total bigint NOT NULL,
get_total bigint NOT NULL,
get_audit_total bigint NOT NULL,
get_repair_total bigint NOT NULL,
put_repair_total bigint NOT NULL,
at_rest_total double precision NOT NULL,
PRIMARY KEY ( id )
);
CREATE TABLE accounting_timestamps (
name text NOT NULL,
value timestamp with time zone NOT NULL,
PRIMARY KEY ( name )
);
CREATE TABLE bucket_bandwidth_rollups (
bucket_name bytea NOT NULL,
project_id bytea NOT NULL,
interval_start timestamp NOT NULL,
interval_seconds integer NOT NULL,
action integer NOT NULL,
inline bigint NOT NULL,
allocated bigint NOT NULL,
settled bigint NOT NULL,
PRIMARY KEY ( bucket_name, project_id, interval_start, action )
);
CREATE TABLE bucket_storage_tallies (
bucket_name bytea NOT NULL,
project_id bytea NOT NULL,
interval_start timestamp NOT NULL,
inline bigint NOT NULL,
remote bigint NOT NULL,
remote_segments_count integer NOT NULL,
inline_segments_count integer NOT NULL,
object_count integer NOT NULL,
metadata_size bigint NOT NULL,
PRIMARY KEY ( bucket_name, project_id, interval_start )
);
CREATE TABLE bucket_usages (
id bytea NOT NULL,
bucket_id bytea NOT NULL,
rollup_end_time timestamp with time zone NOT NULL,
remote_stored_data bigint NOT NULL,
inline_stored_data bigint NOT NULL,
remote_segments integer NOT NULL,
inline_segments integer NOT NULL,
objects integer NOT NULL,
metadata_size bigint NOT NULL,
repair_egress bigint NOT NULL,
get_egress bigint NOT NULL,
audit_egress bigint NOT NULL,
PRIMARY KEY ( id )
);
CREATE TABLE certRecords (
publickey bytea NOT NULL,
id bytea NOT NULL,
update_at timestamp with time zone NOT NULL,
PRIMARY KEY ( id )
);
CREATE TABLE injuredsegments (
path text NOT NULL,
data bytea NOT NULL,
attempted timestamp,
PRIMARY KEY ( path )
);
CREATE TABLE irreparabledbs (
segmentpath bytea NOT NULL,
segmentdetail bytea NOT NULL,
pieces_lost_count bigint NOT NULL,
seg_damaged_unix_sec bigint NOT NULL,
repair_attempt_count bigint NOT NULL,
PRIMARY KEY ( segmentpath )
);
CREATE TABLE nodes (
id bytea NOT NULL,
address text NOT NULL,
last_ip text NOT NULL,
protocol integer NOT NULL,
type integer NOT NULL,
email text NOT NULL,
wallet text NOT NULL,
free_bandwidth bigint NOT NULL,
free_disk bigint NOT NULL,
major bigint NOT NULL,
minor bigint NOT NULL,
patch bigint NOT NULL,
hash text NOT NULL,
timestamp timestamp with time zone NOT NULL,
release boolean NOT NULL,
latency_90 bigint NOT NULL,
audit_success_count bigint NOT NULL,
total_audit_count bigint NOT NULL,
audit_success_ratio double precision NOT NULL,
uptime_success_count bigint NOT NULL,
total_uptime_count bigint NOT NULL,
uptime_ratio double precision NOT NULL,
created_at timestamp with time zone NOT NULL,
updated_at timestamp with time zone NOT NULL,
last_contact_success timestamp with time zone NOT NULL,
last_contact_failure timestamp with time zone NOT NULL,
contained boolean NOT NULL,
disqualified boolean NOT NULL,
PRIMARY KEY ( id )
);
CREATE TABLE offers (
id serial NOT NULL,
name text NOT NULL,
description text NOT NULL,
award_credit_in_cents integer NOT NULL,
invitee_credit_in_cents integer NOT NULL,
award_credit_duration_days integer NOT NULL,
invitee_credit_duration_days integer NOT NULL,
redeemable_cap integer NOT NULL,
num_redeemed integer NOT NULL,
expires_at timestamp with time zone NOT NULL,
created_at timestamp with time zone NOT NULL,
status integer NOT NULL,
type integer NOT NULL,
PRIMARY KEY ( id )
);
CREATE TABLE pending_audits (
node_id bytea NOT NULL,
piece_id bytea NOT NULL,
stripe_index bigint NOT NULL,
share_size bigint NOT NULL,
expected_share_hash bytea NOT NULL,
reverify_count bigint NOT NULL,
PRIMARY KEY ( node_id )
);
CREATE TABLE projects (
id bytea NOT NULL,
name text NOT NULL,
description text NOT NULL,
usage_limit bigint NOT NULL,
created_at timestamp with time zone NOT NULL,
PRIMARY KEY ( id )
);
CREATE TABLE registration_tokens (
secret bytea NOT NULL,
owner_id bytea,
project_limit integer NOT NULL,
created_at timestamp with time zone NOT NULL,
PRIMARY KEY ( secret ),
UNIQUE ( owner_id )
);
CREATE TABLE reset_password_tokens (
secret bytea NOT NULL,
owner_id bytea NOT NULL,
created_at timestamp with time zone NOT NULL,
PRIMARY KEY ( secret ),
UNIQUE ( owner_id )
);
CREATE TABLE serial_numbers (
id serial NOT NULL,
serial_number bytea NOT NULL,
bucket_id bytea NOT NULL,
expires_at timestamp NOT NULL,
PRIMARY KEY ( id )
);
CREATE TABLE storagenode_bandwidth_rollups (
storagenode_id bytea NOT NULL,
interval_start timestamp NOT NULL,
interval_seconds integer NOT NULL,
action integer NOT NULL,
allocated bigint NOT NULL,
settled bigint NOT NULL,
PRIMARY KEY ( storagenode_id, interval_start, action )
);
CREATE TABLE storagenode_storage_tallies (
id bigserial NOT NULL,
node_id bytea NOT NULL,
interval_end_time timestamp with time zone NOT NULL,
data_total double precision NOT NULL,
PRIMARY KEY ( id )
);
CREATE TABLE users (
id bytea NOT NULL,
full_name text NOT NULL,
short_name text,
email text NOT NULL,
password_hash bytea NOT NULL,
status integer NOT NULL,
created_at timestamp with time zone NOT NULL,
PRIMARY KEY ( id )
);
CREATE TABLE api_keys (
id bytea NOT NULL,
project_id bytea NOT NULL REFERENCES projects( id ) ON DELETE CASCADE,
head bytea NOT NULL,
name text NOT NULL,
secret bytea NOT NULL,
created_at timestamp with time zone NOT NULL,
PRIMARY KEY ( id ),
UNIQUE ( head ),
UNIQUE ( name, project_id )
);
CREATE TABLE project_members (
member_id bytea NOT NULL REFERENCES users( id ) ON DELETE CASCADE,
project_id bytea NOT NULL REFERENCES projects( id ) ON DELETE CASCADE,
created_at timestamp with time zone NOT NULL,
PRIMARY KEY ( member_id, project_id )
);
CREATE TABLE used_serials (
serial_number_id integer NOT NULL REFERENCES serial_numbers( id ) ON DELETE CASCADE,
storage_node_id bytea NOT NULL,
PRIMARY KEY ( serial_number_id, storage_node_id )
);
CREATE INDEX bucket_name_project_id_interval_start_interval_seconds ON bucket_bandwidth_rollups ( bucket_name, project_id, interval_start, interval_seconds );
CREATE UNIQUE INDEX bucket_id_rollup ON bucket_usages ( bucket_id, rollup_end_time );
CREATE INDEX node_last_ip ON nodes ( last_ip );
CREATE UNIQUE INDEX serial_number ON serial_numbers ( serial_number );
CREATE INDEX serial_numbers_expires_at_index ON serial_numbers ( expires_at );
CREATE INDEX storagenode_id_interval_start_interval_seconds ON storagenode_bandwidth_rollups ( storagenode_id, interval_start, interval_seconds );
CREATE TABLE value_attributions (
bucket_id bytea NOT NULL,
partner_id bytea NOT NULL,
last_updated timestamp NOT NULL,
PRIMARY KEY ( bucket_id )
);
---
INSERT INTO "accounting_rollups"("id", "node_id", "start_time", "put_total", "get_total", "get_audit_total", "get_repair_total", "put_repair_total", "at_rest_total") VALUES (1, E'\\367M\\177\\251]t/\\022\\256\\214\\265\\025\\224\\204:\\217\\212\\0102<\\321\\374\\020&\\271Qc\\325\\261\\354\\246\\233'::bytea, '2019-02-09 00:00:00+00', 1000, 2000, 3000, 4000, 0, 5000);
INSERT INTO "accounting_timestamps" VALUES ('LastAtRestTally', '0001-01-01 00:00:00+00');
INSERT INTO "accounting_timestamps" VALUES ('LastRollup', '0001-01-01 00:00:00+00');
INSERT INTO "accounting_timestamps" VALUES ('LastBandwidthTally', '0001-01-01 00:00:00+00');
INSERT INTO "nodes"("id", "address", "last_ip", "protocol", "type", "email", "wallet", "free_bandwidth", "free_disk", "major", "minor", "patch", "hash", "timestamp", "release","latency_90", "audit_success_count", "total_audit_count", "audit_success_ratio", "uptime_success_count", "total_uptime_count", "uptime_ratio", "created_at", "updated_at", "last_contact_success", "last_contact_failure", "contained", "disqualified") VALUES (E'\\153\\313\\233\\074\\327\\177\\136\\070\\346\\001', '127.0.0.1:55516', '', 0, 4, '', '', -1, -1, 0, 1, 0, '', 'epoch', false, 0, 0, 5, 0, 0, 5, 0, '2019-02-14 08:07:31.028103+00', '2019-02-14 08:07:31.108963+00', 'epoch', 'epoch', false, false);
INSERT INTO "nodes"("id", "address", "last_ip", "protocol", "type", "email", "wallet", "free_bandwidth", "free_disk", "major", "minor", "patch", "hash", "timestamp", "release","latency_90", "audit_success_count", "total_audit_count", "audit_success_ratio", "uptime_success_count", "total_uptime_count", "uptime_ratio", "created_at", "updated_at", "last_contact_success", "last_contact_failure", "contained", "disqualified") VALUES (E'\\006\\223\\250R\\221\\005\\365\\377v>0\\266\\365\\216\\255?\\347\\244\\371?2\\264\\262\\230\\007<\\001\\262\\263\\237\\247n', '127.0.0.1:55518', '', 0, 4, '', '', -1, -1, 0, 1, 0, '', 'epoch', false, 0, 0, 0, 1, 3, 3, 1, '2019-02-14 08:07:31.028103+00', '2019-02-14 08:07:31.108963+00', 'epoch', 'epoch', false, false);
INSERT INTO "nodes"("id", "address", "last_ip", "protocol", "type", "email", "wallet", "free_bandwidth", "free_disk", "major", "minor", "patch", "hash", "timestamp", "release","latency_90", "audit_success_count", "total_audit_count", "audit_success_ratio", "uptime_success_count", "total_uptime_count", "uptime_ratio", "created_at", "updated_at", "last_contact_success", "last_contact_failure", "contained", "disqualified") VALUES (E'\\363\\342\\363\\371>+F\\256\\263\\300\\273|\\342N\\347\\014', '127.0.0.1:55517', '', 0, 4, '', '', -1, -1, 0, 1, 0, '', 'epoch', false, 0, 0, 0, 1, 0, 0, 1, '2019-02-14 08:07:31.028103+00', '2019-02-14 08:07:31.108963+00', 'epoch', 'epoch', false, false);
INSERT INTO "projects"("id", "name", "description", "usage_limit", "created_at") VALUES (E'\\022\\217/\\014\\376!K\\023\\276\\031\\311}m\\236\\205\\300'::bytea, 'ProjectName', 'projects description', 0, '2019-02-14 08:28:24.254934+00');
INSERT INTO "users"("id", "full_name", "short_name", "email", "password_hash", "status", "created_at") VALUES (E'\\363\\311\\033w\\222\\303Ci\\265\\343U\\303\\312\\204",'::bytea, 'Noahson', 'William', '1email1@ukr.net', E'some_readable_hash'::bytea, 1, '2019-02-14 08:28:24.614594+00');
INSERT INTO "projects"("id", "name", "description", "usage_limit", "created_at") VALUES (E'\\363\\342\\363\\371>+F\\256\\263\\300\\273|\\342N\\347\\014'::bytea, 'projName1', 'Test project 1', 0, '2019-02-14 08:28:24.636949+00');
INSERT INTO "project_members"("member_id", "project_id", "created_at") VALUES (E'\\363\\311\\033w\\222\\303Ci\\265\\343U\\303\\312\\204",'::bytea, E'\\363\\342\\363\\371>+F\\256\\263\\300\\273|\\342N\\347\\014'::bytea, '2019-02-14 08:28:24.677953+00');
INSERT INTO "irreparabledbs" ("segmentpath", "segmentdetail", "pieces_lost_count", "seg_damaged_unix_sec", "repair_attempt_count") VALUES ('\x49616d5365676d656e746b6579696e666f30', '\x49616d5365676d656e7464657461696c696e666f30', 10, 1550159554, 10);
INSERT INTO "injuredsegments" ("path", "data") VALUES ('0', '\x0a0130120100');
INSERT INTO "injuredsegments" ("path", "data") VALUES ('here''s/a/great/path', '\x0a136865726527732f612f67726561742f70617468120a0102030405060708090a');
INSERT INTO "injuredsegments" ("path", "data") VALUES ('yet/another/cool/path', '\x0a157965742f616e6f746865722f636f6f6c2f70617468120a0102030405060708090a');
INSERT INTO "injuredsegments" ("path", "data") VALUES ('so/many/iconic/paths/to/choose/from', '\x0a23736f2f6d616e792f69636f6e69632f70617468732f746f2f63686f6f73652f66726f6d120a0102030405060708090a');
INSERT INTO "certrecords" VALUES (E'0Y0\\023\\006\\007*\\206H\\316=\\002\\001\\006\\010*\\206H\\316=\\003\\001\\007\\003B\\000\\004\\360\\267\\227\\377\\253u\\222\\337Y\\324C:GQ\\010\\277v\\010\\315D\\271\\333\\337.\\203\\023=C\\343\\014T%6\\027\\362?\\214\\326\\017U\\334\\000\\260\\224\\260J\\221\\304\\331F\\304\\221\\236zF,\\325\\326l\\215\\306\\365\\200\\022', E'L\\301|\\200\\247}F|1\\320\\232\\037n\\335\\241\\206\\244\\242\\207\\204.\\253\\357\\326\\352\\033Dt\\202`\\022\\325', '2019-02-14 08:07:31.335028+00');
INSERT INTO "bucket_usages" ("id", "bucket_id", "rollup_end_time", "remote_stored_data", "inline_stored_data", "remote_segments", "inline_segments", "objects", "metadata_size", "repair_egress", "get_egress", "audit_egress") VALUES (E'\\153\\313\\233\\074\\327\\177\\136\\070\\346\\001",'::bytea, E'\\366\\146\\032\\321\\316\\161\\070\\133\\302\\271",'::bytea, '2019-03-06 08:28:24.677953+00', 10, 11, 12, 13, 14, 15, 16, 17, 18);
INSERT INTO "registration_tokens" ("secret", "owner_id", "project_limit", "created_at") VALUES (E'\\070\\127\\144\\013\\332\\344\\102\\376\\306\\056\\303\\130\\106\\132\\321\\276\\321\\274\\170\\264\\054\\333\\221\\116\\154\\221\\335\\070\\220\\146\\344\\216'::bytea, null, 1, '2019-02-14 08:28:24.677953+00');
INSERT INTO "serial_numbers" ("id", "serial_number", "bucket_id", "expires_at") VALUES (1, E'0123456701234567'::bytea, E'\\363\\342\\363\\371>+F\\256\\263\\300\\273|\\342N\\347\\014/testbucket'::bytea, '2019-03-06 08:28:24.677953+00');
INSERT INTO "used_serials" ("serial_number_id", "storage_node_id") VALUES (1, E'\\006\\223\\250R\\221\\005\\365\\377v>0\\266\\365\\216\\255?\\347\\244\\371?2\\264\\262\\230\\007<\\001\\262\\263\\237\\247n');
INSERT INTO "storagenode_bandwidth_rollups" ("storagenode_id", "interval_start", "interval_seconds", "action", "allocated", "settled") VALUES (E'\\006\\223\\250R\\221\\005\\365\\377v>0\\266\\365\\216\\255?\\347\\244\\371?2\\264\\262\\230\\007<\\001\\262\\263\\237\\247n', '2019-03-06 08:00:00.000000+00', 3600, 1, 1024, 2024);
INSERT INTO "storagenode_storage_tallies" VALUES (1, E'\\3510\\323\\225"~\\036<\\342\\330m\\0253Jhr\\246\\233K\\246#\\2303\\351\\256\\275j\\212UM\\362\\207', '2019-02-14 08:16:57.812849+00', 1000);
INSERT INTO "bucket_bandwidth_rollups" ("bucket_name", "project_id", "interval_start", "interval_seconds", "action", "inline", "allocated", "settled") VALUES (E'testbucket'::bytea, E'\\363\\342\\363\\371>+F\\256\\263\\300\\273|\\342N\\347\\014'::bytea,'2019-03-06 08:00:00.000000+00', 3600, 1, 1024, 2024, 3024);
INSERT INTO "bucket_storage_tallies" ("bucket_name", "project_id", "interval_start", "inline", "remote", "remote_segments_count", "inline_segments_count", "object_count", "metadata_size") VALUES (E'testbucket'::bytea, E'\\363\\342\\363\\371>+F\\256\\263\\300\\273|\\342N\\347\\014'::bytea,'2019-03-06 08:00:00.000000+00', 4024, 5024, 0, 0, 0, 0);
INSERT INTO "reset_password_tokens" ("secret", "owner_id", "created_at") VALUES (E'\\070\\127\\144\\013\\332\\344\\102\\376\\306\\056\\303\\130\\106\\132\\321\\276\\321\\274\\170\\264\\054\\333\\221\\116\\154\\221\\335\\070\\220\\146\\344\\216'::bytea, E'\\363\\311\\033w\\222\\303Ci\\265\\343U\\303\\312\\204",'::bytea, '2019-05-08 08:28:24.677953+00');
INSERT INTO "pending_audits" ("node_id", "piece_id", "stripe_index", "share_size", "expected_share_hash", "reverify_count") VALUES (E'\\153\\313\\233\\074\\327\\177\\136\\070\\346\\001'::bytea, E'\\363\\311\\033w\\222\\303Ci\\265\\343U\\303\\312\\204",'::bytea, 5, 1024, E'\\070\\127\\144\\013\\332\\344\\102\\376\\306\\056\\303\\130\\106\\132\\321\\276\\321\\274\\170\\264\\054\\333\\221\\116\\154\\221\\335\\070\\220\\146\\344\\216'::bytea, 1);
INSERT INTO "offers" ("id", "name", "description", "award_credit_in_cents", "invitee_credit_in_cents", "award_credit_duration_days", "invitee_credit_duration_days", "redeemable_cap", "expires_at", "created_at", "num_redeemed", "status", "type") VALUES (1, 'testOffer', 'Test offer 1', 0, 0, 14, 14, 50, '2019-03-14 08:28:24.636949+00', '2019-02-14 08:28:24.636949+00', 0, 0, 0);
INSERT INTO "api_keys" ("id", "project_id", "head", "name", "secret", "created_at") VALUES (E'\\334/\\302;\\225\\355O\\323\\276f\\247\\354/6\\241\\033'::bytea, E'\\022\\217/\\014\\376!K\\023\\276\\031\\311}m\\236\\205\\300'::bytea, E'\\111\\142\\147\\304\\132\\375\\070\\163\\270\\160\\251\\370\\126\\063\\351\\037\\257\\071\\143\\375\\351\\320\\253\\232\\220\\260\\075\\173\\306\\307\\115\\136'::bytea, 'key 2', E'\\254\\011\\315\\333\\273\\365\\001\\071\\024\\154\\253\\332\\301\\216\\361\\074\\221\\367\\251\\231\\274\\333\\300\\367\\001\\272\\327\\111\\315\\123\\042\\016'::bytea, '2019-02-14 08:28:24.267934+00');
INSERT INTO "value_attributions" ("bucket_id", "partner_id", "last_updated") VALUES (E'\\363\\311\\033w\\222\\303Ci\\265\\343U\\303\\312\\204",'::bytea, E'\\363\\342\\363\\371>+F\\256\\263\\300\\273|\\342N\\347\\014'::bytea,'2019-02-14 08:07:31.028103+00');
-- NEW DATA --