storage node collector (#1913)

This commit is contained in:
Egon Elbre 2019-05-08 14:11:59 +03:00 committed by GitHub
parent 934cde90ca
commit a2b61fd67c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 462 additions and 81 deletions

View File

@ -6,6 +6,7 @@ package sync2
import (
"context"
"sync"
"sync/atomic"
"time"
"golang.org/x/sync/errgroup"
@ -19,7 +20,10 @@ type Cycle struct {
ticker *time.Ticker
control chan interface{}
stop chan struct{}
stopsent int64
stopping chan struct{}
stopped chan struct{}
init sync.Once
}
@ -28,7 +32,6 @@ type (
// cycle control messages
cyclePause struct{}
cycleContinue struct{}
cycleStop struct{}
cycleChangeInterval struct{ Interval time.Duration }
cycleTrigger struct{ done chan struct{} }
)
@ -47,7 +50,8 @@ func (cycle *Cycle) SetInterval(interval time.Duration) {
func (cycle *Cycle) initialize() {
cycle.init.Do(func() {
cycle.stop = make(chan struct{})
cycle.stopped = make(chan struct{})
cycle.stopping = make(chan struct{})
cycle.control = make(chan interface{})
})
}
@ -65,7 +69,7 @@ func (cycle *Cycle) Start(ctx context.Context, group *errgroup.Group, fn func(ct
// When `fn` is not fast enough, it may skip some of those executions.
func (cycle *Cycle) Run(ctx context.Context, fn func(ctx context.Context) error) error {
cycle.initialize()
defer close(cycle.stop)
defer close(cycle.stopped)
currentInterval := cycle.interval
cycle.ticker = time.NewTicker(currentInterval)
@ -79,8 +83,6 @@ func (cycle *Cycle) Run(ctx context.Context, fn func(ctx context.Context) error)
// handle control messages
switch message := message.(type) {
case cycleStop:
return nil
case cycleChangeInterval:
currentInterval = message.Interval
@ -109,6 +111,9 @@ func (cycle *Cycle) Run(ctx context.Context, fn func(ctx context.Context) error)
}
}
case <-cycle.stopping:
return nil
case <-ctx.Done():
// handle control messages
return ctx.Err()
@ -125,7 +130,7 @@ func (cycle *Cycle) Run(ctx context.Context, fn func(ctx context.Context) error)
// Close closes all resources associated with it.
func (cycle *Cycle) Close() {
cycle.Stop()
<-cycle.stop
<-cycle.stopped
close(cycle.control)
}
@ -134,13 +139,16 @@ func (cycle *Cycle) sendControl(message interface{}) {
cycle.initialize()
select {
case cycle.control <- message:
case <-cycle.stop:
case <-cycle.stopped:
}
}
// Stop stops the cycle permanently
func (cycle *Cycle) Stop() {
cycle.sendControl(cycleStop{})
cycle.initialize()
if atomic.CompareAndSwapInt64(&cycle.stopsent, 0, 1) {
close(cycle.stopping)
}
}
// ChangeInterval allows to change the ticker interval after it has started.
@ -173,6 +181,6 @@ func (cycle *Cycle) TriggerWait() {
cycle.sendControl(cycleTrigger{done})
select {
case <-done:
case <-cycle.stop:
case <-cycle.stopped:
}
}

View File

@ -87,3 +87,43 @@ func TestCycle_Basic(t *testing.T) {
})
}
}
func TestCycle_MultipleStops(t *testing.T) {
t.Parallel()
cycle := sync2.NewCycle(time.Second)
defer cycle.Close()
ctx := context.Background()
var group errgroup.Group
var count int64
cycle.Start(ctx, &group, func(ctx context.Context) error {
atomic.AddInt64(&count, 1)
return nil
})
go cycle.Stop()
cycle.Stop()
cycle.Stop()
}
func TestCycle_StopCancelled(t *testing.T) {
t.Parallel()
cycle := sync2.NewCycle(time.Second)
defer cycle.Close()
ctx, cancel := context.WithCancel(context.Background())
cancel()
var group errgroup.Group
var count int64
cycle.Start(ctx, &group, func(ctx context.Context) error {
atomic.AddInt64(&count, 1)
return nil
})
cycle.Stop()
cycle.Stop()
}

View File

@ -52,6 +52,7 @@ import (
"storj.io/storj/satellite/metainfo"
"storj.io/storj/satellite/satellitedb"
"storj.io/storj/storagenode"
"storj.io/storj/storagenode/collector"
"storj.io/storj/storagenode/orders"
"storj.io/storj/storagenode/piecestore"
"storj.io/storj/storagenode/storagenodedb"
@ -603,12 +604,12 @@ func (planet *Planet) newStorageNodes(count int, whitelistedSatelliteIDs []strin
AllocatedBandwidth: memory.TB,
KBucketRefreshInterval: time.Hour,
AgreementSenderCheckInterval: time.Hour,
CollectorInterval: time.Hour,
SatelliteIDRestriction: true,
WhitelistedSatelliteIDs: strings.Join(whitelistedSatelliteIDs, ","),
},
Collector: collector.Config{
Interval: time.Minute,
},
Storage2: piecestore.Config{
Sender: orders.SenderConfig{
Interval: time.Hour,

View File

@ -149,6 +149,11 @@ func (uplink *Uplink) DialPiecestore(ctx context.Context, destination Peer) (*pi
// Upload data to specific satellite
func (uplink *Uplink) Upload(ctx context.Context, satellite *satellite.Peer, bucket string, path storj.Path, data []byte) error {
return uplink.UploadWithExpiration(ctx, satellite, bucket, path, data, time.Time{})
}
// UploadWithExpiration data to specific satellite
func (uplink *Uplink) UploadWithExpiration(ctx context.Context, satellite *satellite.Peer, bucket string, path storj.Path, data []byte, expiration time.Time) error {
config := uplink.GetConfig(satellite)
metainfo, streams, err := config.GetMetainfo(ctx, uplink.Identity)
if err != nil {
@ -174,6 +179,7 @@ func (uplink *Uplink) Upload(ctx context.Context, satellite *satellite.Peer, buc
createInfo := storj.CreateObject{
RedundancyScheme: redScheme,
EncryptionScheme: encScheme,
Expires: expiration,
}
obj, err := metainfo.CreateObject(ctx, bucket, path, &createInfo)
if err != nil {

View File

@ -1,19 +1,26 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
// Package collector implements expired piece deletion from storage node.
package collector
import (
"context"
"time"
"go.uber.org/zap"
monkit "gopkg.in/spacemonkeygo/monkit.v2"
"storj.io/storj/internal/memory"
"storj.io/storj/internal/sync2"
"storj.io/storj/storagenode/pieces"
)
var mon = monkit.Package()
// Config defines parameters for storage node Collector.
type Config struct {
Interval time.Duration
Interval time.Duration `help:"how frequently expired pieces are collected" default:"1h0m0s"`
}
// Service implements collecting expired pieces on the storage node.
@ -21,13 +28,84 @@ type Service struct {
log *zap.Logger
pieces *pieces.Store
pieceinfos pieces.DB
Loop sync2.Cycle
}
// NewService creates a new collector service.
func NewService(log *zap.Logger, pieces *pieces.Store, pieceinfos pieces.DB) *Service {
func NewService(log *zap.Logger, pieces *pieces.Store, pieceinfos pieces.DB, config Config) *Service {
return &Service{
log: log,
pieces: pieces,
pieceinfos: pieceinfos,
Loop: *sync2.NewCycle(config.Interval),
}
}
// Run runs monitor service
func (service *Service) Run(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
return service.Loop.Run(ctx, func(ctx context.Context) error {
err := service.Collect(ctx, time.Now())
if err != nil {
service.log.Error("error during collecting pieces: ", zap.Error(err))
}
return nil
})
}
// Close stops the collector service.
func (service *Service) Close() (err error) {
service.Loop.Close()
return nil
}
// Collect collects pieces that have expired by now.
func (service *Service) Collect(ctx context.Context, now time.Time) (err error) {
defer mon.Task()(&ctx)(&err)
const maxBatches = 100
const batchSize = 1000
var count int64
var bytes int64
defer func() {
if count > 0 {
service.log.Info("collect", zap.Int64("count", count), zap.Stringer("size", memory.Size(bytes)))
}
}()
for k := 0; k < maxBatches; k++ {
infos, err := service.pieceinfos.GetExpired(ctx, now, batchSize)
if err != nil {
return err
}
if len(infos) == 0 {
return nil
}
for _, expired := range infos {
err := service.pieces.Delete(ctx, expired.SatelliteID, expired.PieceID)
if err != nil {
errfailed := service.pieceinfos.DeleteFailed(ctx, expired.SatelliteID, expired.PieceID, now)
if errfailed != nil {
service.log.Error("unable to update piece info", zap.Stringer("satellite id", expired.SatelliteID), zap.Stringer("piece id", expired.PieceID), zap.Error(errfailed))
}
service.log.Error("unable to delete piece", zap.Stringer("satellite id", expired.SatelliteID), zap.Stringer("piece id", expired.PieceID), zap.Error(err))
continue
}
err = service.pieceinfos.Delete(ctx, expired.SatelliteID, expired.PieceID)
if err != nil {
service.log.Error("unable to delete piece info", zap.Stringer("satellite id", expired.SatelliteID), zap.Stringer("piece id", expired.PieceID), zap.Error(err))
continue
}
count++
bytes += expired.PieceSize
}
}
return nil
}

View File

@ -0,0 +1,69 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package collector_test
import (
"crypto/rand"
"testing"
"time"
"github.com/stretchr/testify/require"
"storj.io/storj/internal/memory"
"storj.io/storj/internal/testcontext"
"storj.io/storj/internal/testplanet"
)
func TestCollector(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 3, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
for _, storageNode := range planet.StorageNodes {
// stop collector, so we can run it manually
storageNode.Collector.Loop.Pause()
// stop order sender because we will stop satellite later
storageNode.Storage2.Sender.Loop.Pause()
}
expectedData := make([]byte, 100*memory.KiB)
_, err := rand.Read(expectedData)
require.NoError(t, err)
// upload some data that expires in 8 days
err = planet.Uplinks[0].UploadWithExpiration(ctx,
planet.Satellites[0], "testbucket", "test/path",
expectedData, time.Now().Add(8*24*time.Hour))
require.NoError(t, err)
// stop planet to prevent audits
require.NoError(t, planet.StopPeer(planet.Satellites[0]))
collections := 0
// imagine we are 16 days in the future
for _, storageNode := range planet.StorageNodes {
pieceinfos := storageNode.DB.PieceInfo()
// verify that we actually have some data on storage nodes
used, err := pieceinfos.SpaceUsed(ctx)
require.NoError(t, err)
if used == 0 {
// this storage node didn't get picked for storing data
continue
}
// collect all the data
err = storageNode.Collector.Collect(ctx, time.Now().Add(16*24*time.Hour))
require.NoError(t, err)
// verify that we deleted everything
used, err = pieceinfos.SpaceUsed(ctx)
require.NoError(t, err)
require.Equal(t, int64(0), used)
collections++
}
require.NotZero(t, collections)
})
}

View File

@ -116,6 +116,12 @@ func (service *Service) Run(ctx context.Context) (err error) {
})
}
// Close stops the monitor service.
func (service *Service) Close() (err error) {
service.Loop.Close()
return nil
}
func (service *Service) updateNodeInformation(ctx context.Context) error {
usedSpace, err := service.usedSpace(ctx)
if err != nil {

View File

@ -204,6 +204,6 @@ func (sender *Sender) Settle(ctx context.Context, satelliteID storj.NodeID, orde
// Close stops the sending service.
func (sender *Sender) Close() error {
sender.Loop.Stop()
sender.Loop.Close()
return nil
}

View File

@ -23,6 +23,7 @@ import (
"storj.io/storj/pkg/transport"
"storj.io/storj/storage"
"storj.io/storj/storagenode/bandwidth"
"storj.io/storj/storagenode/collector"
"storj.io/storj/storagenode/inspector"
"storj.io/storj/storagenode/monitor"
"storj.io/storj/storagenode/orders"
@ -57,8 +58,10 @@ type Config struct {
Server server.Config
Kademlia kademlia.Config
Storage piecestore.OldConfig
Storage2 piecestore.Config
// TODO: flatten storage config and only keep the new one
Storage piecestore.OldConfig
Storage2 piecestore.Config
Collector collector.Config
Version version.Config
}
@ -91,6 +94,7 @@ type Peer struct {
}
Storage2 struct {
// TODO: lift things outside of it to organize better
Trust *trust.Pool
Store *pieces.Store
Endpoint *piecestore.Endpoint
@ -98,6 +102,8 @@ type Peer struct {
Monitor *monitor.Service
Sender *orders.Sender
}
Collector *collector.Service
}
// New creates a new Storage Node.
@ -181,7 +187,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, config Config, ver
pb.RegisterKadInspectorServer(peer.Server.PrivateGRPC(), peer.Kademlia.Inspector)
}
{ // setup storage 2
{ // setup storage
trustAllSatellites := !config.Storage.SatelliteIDRestriction
peer.Storage2.Trust, err = trust.NewPool(peer.Kademlia.Service, trustAllSatellites, config.Storage.WhitelistedSatelliteIDs)
if err != nil {
@ -237,6 +243,8 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, config Config, ver
)
}
peer.Collector = collector.NewService(peer.Log.Named("collector"), peer.Storage2.Store, peer.DB.PieceInfo(), config.Collector)
return peer, nil
}
@ -247,18 +255,24 @@ func (peer *Peer) Run(ctx context.Context) error {
group.Go(func() error {
return errs2.IgnoreCanceled(peer.Version.Run(ctx))
})
group.Go(func() error {
return errs2.IgnoreCanceled(peer.Kademlia.Service.Bootstrap(ctx))
})
group.Go(func() error {
return errs2.IgnoreCanceled(peer.Kademlia.Service.Run(ctx))
})
group.Go(func() error {
return errs2.IgnoreCanceled(peer.Collector.Run(ctx))
})
group.Go(func() error {
return errs2.IgnoreCanceled(peer.Storage2.Sender.Run(ctx))
})
group.Go(func() error {
return errs2.IgnoreCanceled(peer.Storage2.Monitor.Run(ctx))
})
group.Go(func() error {
// TODO: move the message into Server instead
// Don't change the format of this comment, it is used to figure out the node id.
@ -283,6 +297,17 @@ func (peer *Peer) Close() error {
}
// close services in reverse initialization order
if peer.Storage2.Monitor != nil {
errlist.Add(peer.Storage2.Monitor.Close())
}
if peer.Storage2.Sender != nil {
errlist.Add(peer.Storage2.Sender.Close())
}
if peer.Collector != nil {
errlist.Add(peer.Collector.Close())
}
if peer.Kademlia.Service != nil {
errlist.Add(peer.Kademlia.Service.Close())
}

View File

@ -122,11 +122,25 @@ func TestPieceInfo(t *testing.T) {
require.NoError(t, err)
require.Empty(t, cmp.Diff(info1, info1loaded, cmp.Comparer(pb.Equal)))
// getting expired pieces
exp := time.Now().Add(time.Hour * 24)
infoexp, err := pieceinfos.GetExpired(ctx, exp)
// getting no expired pieces
expired, err := pieceinfos.GetExpired(ctx, now.Add(-10*time.Hour), 10)
assert.NoError(t, err)
assert.NotEmpty(t, infoexp)
assert.Len(t, expired, 0)
// getting expired pieces
exp := now.Add(8 * 24 * time.Hour)
expired, err = pieceinfos.GetExpired(ctx, exp, 10)
assert.NoError(t, err)
assert.Len(t, expired, 3)
// mark info0 deletion as a failure
err = pieceinfos.DeleteFailed(ctx, info0.SatelliteID, info0.PieceID, exp)
assert.NoError(t, err)
// this shouldn't return info0
expired, err = pieceinfos.GetExpired(ctx, exp, 10)
assert.NoError(t, err)
assert.Len(t, expired, 2)
// deleting
err = pieceinfos.Delete(ctx, info0.SatelliteID, info0.PieceID)
@ -135,7 +149,10 @@ func TestPieceInfo(t *testing.T) {
require.NoError(t, err)
// deleting expired pieces
err = pieceinfos.DeleteExpired(ctx, exp, info2.SatelliteID, info2.PieceID)
err = pieceinfos.Delete(ctx, info2.SatelliteID, info2.PieceID)
require.NoError(t, err)
// duplicate deletion
err = pieceinfos.Delete(ctx, info2.SatelliteID, info2.PieceID)
require.NoError(t, err)
// getting after delete

View File

@ -38,6 +38,13 @@ type Info struct {
Uplink *identity.PeerIdentity
}
// ExpiredInfo is a fully namespaced piece id
type ExpiredInfo struct {
SatelliteID storj.NodeID
PieceID storj.PieceID
PieceSize int64
}
// DB stores meta information about a piece, the actual piece is stored in storage.Blobs
type DB interface {
// Add inserts Info to the database.
@ -46,12 +53,12 @@ type DB interface {
Get(ctx context.Context, satelliteID storj.NodeID, pieceID storj.PieceID) (*Info, error)
// Delete deletes Info about a piece.
Delete(ctx context.Context, satelliteID storj.NodeID, pieceID storj.PieceID) error
// DeleteFailed marks piece deletion from disk failed
DeleteFailed(ctx context.Context, satelliteID storj.NodeID, pieceID storj.PieceID, failedAt time.Time) error
// SpaceUsed calculates disk space used by all pieces
SpaceUsed(ctx context.Context) (int64, error)
// GetExpired gets orders that are expired and were created before some time
GetExpired(ctx context.Context, expiredAt time.Time) ([]Info, error)
// DeleteExpired deletes pieces that are expired
DeleteExpired(ctx context.Context, expiredAt time.Time, satelliteID storj.NodeID, pieceID storj.PieceID) error
GetExpired(ctx context.Context, expiredAt time.Time, limit int64) ([]ExpiredInfo, error)
}
// Store implements storing pieces onto a blob storage implementation.

View File

@ -47,9 +47,6 @@ type OldConfig struct {
AllocatedDiskSpace memory.Size `user:"true" help:"total allocated disk space in bytes" default:"1TB"`
AllocatedBandwidth memory.Size `user:"true" help:"total allocated bandwidth in bytes" default:"500GiB"`
KBucketRefreshInterval time.Duration `help:"how frequently Kademlia bucket should be refreshed with node stats" default:"1h0m0s"`
AgreementSenderCheckInterval time.Duration `help:"duration between agreement checks" default:"1h0m0s"`
CollectorInterval time.Duration `help:"interval to check for expired pieces" default:"1h0m0s"`
}
// Config defines parameters for piecestore endpoint.

View File

@ -170,6 +170,13 @@ func (db *InfoDB) Migration() *migrate.Migration {
`UPDATE pieceinfo SET piece_expiration = '2019-05-09 00:00:00.000000+00:00'`,
},
},
{
Description: "Add tracking of deletion failures.",
Version: 2,
Action: migrate.SQL{
`ALTER TABLE pieceinfo ADD COLUMN deletion_failed_at TIMESTAMP`,
},
},
},
}
}

View File

@ -39,11 +39,11 @@ func (db *pieceinfo) Add(ctx context.Context, info *pieces.Info) error {
defer db.locked()()
_, err = db.db.Exec(`
_, err = db.db.ExecContext(ctx, db.Rebind(`
INSERT INTO
pieceinfo(satellite_id, piece_id, piece_size, piece_expiration, uplink_piece_hash, uplink_cert_id)
VALUES (?,?,?,?,?,?)
`, info.SatelliteID, info.PieceID, info.PieceSize, info.PieceExpiration, uplinkPieceHash, certid)
`), info.SatelliteID, info.PieceID, info.PieceSize, info.PieceExpiration, uplinkPieceHash, certid)
return ErrInfo.Wrap(err)
}
@ -58,12 +58,12 @@ func (db *pieceinfo) Get(ctx context.Context, satelliteID storj.NodeID, pieceID
var uplinkIdentity []byte
db.mu.Lock()
err := db.db.QueryRow(`
err := db.db.QueryRowContext(ctx, db.Rebind(`
SELECT piece_size, piece_expiration, uplink_piece_hash, certificate.peer_identity
FROM pieceinfo
INNER JOIN certificate ON pieceinfo.uplink_cert_id = certificate.cert_id
WHERE satellite_id = ? AND piece_id = ?
`, satelliteID, pieceID).Scan(&info.PieceSize, &info.PieceExpiration, &uplinkPieceHash, &uplinkIdentity)
`), satelliteID, pieceID).Scan(&info.PieceSize, &info.PieceExpiration, &uplinkPieceHash, &uplinkIdentity)
db.mu.Unlock()
if err != nil {
@ -88,64 +88,67 @@ func (db *pieceinfo) Get(ctx context.Context, satelliteID storj.NodeID, pieceID
func (db *pieceinfo) Delete(ctx context.Context, satelliteID storj.NodeID, pieceID storj.PieceID) error {
defer db.locked()()
_, err := db.db.Exec(`DELETE FROM pieceinfo WHERE satellite_id = ? AND piece_id = ?`, satelliteID, pieceID)
_, err := db.db.ExecContext(ctx, db.Rebind(`
DELETE FROM pieceinfo
WHERE satellite_id = ?
AND piece_id = ?
`), satelliteID, pieceID)
return ErrInfo.Wrap(err)
}
// DeleteFailed marks piece as a failed deletion.
func (db *pieceinfo) DeleteFailed(ctx context.Context, satelliteID storj.NodeID, pieceID storj.PieceID, now time.Time) error {
defer db.locked()()
_, err := db.db.ExecContext(ctx, db.Rebind(`
UPDATE pieceinfo
SET deletion_failed_at = ?
WHERE satellite_id = ?
AND piece_id = ?
`), now, satelliteID, pieceID)
return ErrInfo.Wrap(err)
}
// GetExpired gets pieceinformation identites that are expired.
func (db *pieceinfo) GetExpired(ctx context.Context, expiredAt time.Time, limit int64) (infos []pieces.ExpiredInfo, err error) {
defer db.locked()()
rows, err := db.db.QueryContext(ctx, db.Rebind(`
SELECT satellite_id, piece_id, piece_size
FROM pieceinfo
WHERE piece_expiration < ? AND ((deletion_failed_at IS NULL) OR deletion_failed_at <> ?)
ORDER BY satellite_id
LIMIT ?
`), expiredAt, expiredAt, limit)
if err != nil {
return nil, ErrInfo.Wrap(err)
}
defer func() { err = errs.Combine(err, rows.Close()) }()
for rows.Next() {
info := pieces.ExpiredInfo{}
err = rows.Scan(&info.SatelliteID, &info.PieceID, &info.PieceSize)
if err != nil {
return infos, ErrInfo.Wrap(err)
}
infos = append(infos, info)
}
return infos, nil
}
// SpaceUsed calculates disk space used by all pieces
func (db *pieceinfo) SpaceUsed(ctx context.Context) (int64, error) {
defer db.locked()()
var sum *int64
err := db.db.QueryRow(`SELECT SUM(piece_size) FROM pieceinfo;`).Scan(&sum)
err := db.db.QueryRowContext(ctx, db.Rebind(`
SELECT SUM(piece_size)
FROM pieceinfo
`)).Scan(&sum)
if err == sql.ErrNoRows || sum == nil {
return 0, nil
}
return *sum, err
}
// GetExpired gets orders that are expired and were created before some time
func (db *pieceinfo) GetExpired(ctx context.Context, expiredAt time.Time) (info []pieces.Info, err error) {
var getExpiredSQL = `SELECT satellite_id, piece_id, piece_size, piece_expiration, uplink_piece_hash, certificate.peer_identity
FROM pieceinfo
INNER JOIN certificate ON pieceinfo.uplink_cert_id = certificate.cert_id
WHERE piece_expiration < ? ORDER BY satellite_id `
rows, err := db.db.QueryContext(ctx, db.Rebind(getExpiredSQL), expiredAt)
if err != nil {
return nil, ErrInfo.Wrap(err)
}
defer func() { err = errs.Combine(err, rows.Close()) }()
for rows.Next() {
pi := pieces.Info{}
var uplinkPieceHash []byte
var uplinkIdentity []byte
err = rows.Scan(&pi.SatelliteID, &pi.PieceID, &pi.PieceSize, &pi.PieceExpiration, &uplinkPieceHash, &uplinkIdentity)
if err != nil {
return info, ErrInfo.Wrap(err)
}
pi.UplinkPieceHash = &pb.PieceHash{}
err = proto.Unmarshal(uplinkPieceHash, pi.UplinkPieceHash)
if err != nil {
return nil, ErrInfo.Wrap(err)
}
pi.Uplink, err = decodePeerIdentity(uplinkIdentity)
if err != nil {
return nil, ErrInfo.Wrap(err)
}
info = append(info, pi)
}
return info, nil
}
// DeleteExpired deletes expired piece information.
func (db *pieceinfo) DeleteExpired(ctx context.Context, expiredAt time.Time, satelliteID storj.NodeID, pieceID storj.PieceID) error {
defer db.locked()()
_, err := db.db.Exec(`DELETE FROM pieceinfo WHERE piece_expiration < ? AND satellite_id = ? AND piece_id = ?`, expiredAt, satelliteID, pieceID)
return ErrInfo.Wrap(err)
}

View File

@ -0,0 +1,117 @@
-- table for keeping serials that need to be verified against
CREATE TABLE used_serial (
satellite_id BLOB NOT NULL,
serial_number BLOB NOT NULL,
expiration TIMESTAMP NOT NULL
);
-- primary key on satellite id and serial number
CREATE UNIQUE INDEX pk_used_serial ON used_serial(satellite_id, serial_number);
-- expiration index to allow fast deletion
CREATE INDEX idx_used_serial ON used_serial(expiration);
-- certificate table for storing uplink/satellite certificates
CREATE TABLE certificate (
cert_id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
node_id BLOB NOT NULL,
peer_identity BLOB UNIQUE NOT NULL
);
-- table for storing piece meta info
CREATE TABLE pieceinfo (
satellite_id BLOB NOT NULL,
piece_id BLOB NOT NULL,
piece_size BIGINT NOT NULL,
piece_expiration TIMESTAMP,
uplink_piece_hash BLOB NOT NULL,
uplink_cert_id INTEGER NOT NULL,
deletion_failed_at TIMESTAMP,
FOREIGN KEY(uplink_cert_id) REFERENCES certificate(cert_id)
);
-- primary key by satellite id and piece id
CREATE UNIQUE INDEX pk_pieceinfo ON pieceinfo(satellite_id, piece_id);
-- table for storing bandwidth usage
CREATE TABLE bandwidth_usage (
satellite_id BLOB NOT NULL,
action INTEGER NOT NULL,
amount BIGINT NOT NULL,
created_at TIMESTAMP NOT NULL
);
CREATE INDEX idx_bandwidth_usage_satellite ON bandwidth_usage(satellite_id);
CREATE INDEX idx_bandwidth_usage_created ON bandwidth_usage(created_at);
-- table for storing all unsent orders
CREATE TABLE unsent_order (
satellite_id BLOB NOT NULL,
serial_number BLOB NOT NULL,
order_limit_serialized BLOB NOT NULL,
order_serialized BLOB NOT NULL,
order_limit_expiration TIMESTAMP NOT NULL,
uplink_cert_id INTEGER NOT NULL,
FOREIGN KEY(uplink_cert_id) REFERENCES certificate(cert_id)
);
CREATE UNIQUE INDEX idx_orders ON unsent_order(satellite_id, serial_number);
-- table for storing all sent orders
CREATE TABLE order_archive (
satellite_id BLOB NOT NULL,
serial_number BLOB NOT NULL,
order_limit_serialized BLOB NOT NULL,
order_serialized BLOB NOT NULL,
uplink_cert_id INTEGER NOT NULL,
status INTEGER NOT NULL,
archived_at TIMESTAMP NOT NULL,
FOREIGN KEY(uplink_cert_id) REFERENCES certificate(cert_id)
);
CREATE INDEX idx_order_archive_satellite ON order_archive(satellite_id);
CREATE INDEX idx_order_archive_status ON order_archive(status);
INSERT INTO used_serial VALUES(X'0693a8529105f5ff763e30b6f58ead3fe7a4f93f32b4b298073c01b2b39fa76e',X'18283dd3cec0a5abf6112e903549bdff','2019-04-01 18:58:53.3169599+03:00');
INSERT INTO used_serial VALUES(X'976a6bbcfcec9d96d847f8642c377d5f23c118187fb0ca21e9e1c5a9fbafa5f7',X'18283dd3cec0a5abf6112e903549bdff','2019-04-01 18:58:53.3169599+03:00');
INSERT INTO certificate VALUES(1,X'0ed28abb2813e184a1e98b0f6605c4911ea468c7e8433eb583e0fca7ceac3000',X'3082016230820108a003020102021100c33fe521df34530b97db93000404a190300a06082a8648ce3d0403023010310e300c060355040a130553746f726a3022180f30303031303130313030303030305a180f30303031303130313030303030305a3010310e300c060355040a130553746f726a3059301306072a8648ce3d020106082a8648ce3d03010703420004bff703807b8d8357dd2371124c31e19ef68b39dbc44d25b32d843324027e7c2b2387f3b46f973d2e0919e1864dc06c313e5d71df13279dfc73c510cc49c26946a33f303d300e0603551d0f0101ff0404030205a0301d0603551d250416301406082b0601050507030106082b06010505070302300c0603551d130101ff04023000300a06082a8648ce3d0403020348003045022100b97d54c84ce8d1673db96a3ac2073b39ec2abd0e7d04447fff864a4fedf0c72c022031c8e620dc8941f62034abfa43faa5305ee4be345c9518e86074d0c54f76a6383082015b30820101a003020102021100c7e57be609bdba51c2bf85aa24eb472b300a06082a8648ce3d0403023010310e300c060355040a130553746f726a3022180f30303031303130313030303030305a180f30303031303130313030303030305a3010310e300c060355040a130553746f726a3059301306072a8648ce3d020106082a8648ce3d030107034200044b3b89f6502a7ae97fcc639033859b1f6c160e070f350eff15df2d415d7b5b1cdb1458d63c453eebe45493b8b1ec697c2a4f01dd534e5b8e09cb653fd7770a9aa3383036300e0603551d0f0101ff04040302020430130603551d25040c300a06082b06010505070301300f0603551d130101ff040530030101ff300a06082a8648ce3d0403020348003045022100daf71e6ac3f4b23b7a41124d920755fc838d242174206826b02a288026e1f60802200de61e08af44121deec4805385143f1a4138e7dc7bb6d5b89971bec9cd7e49333082015a30820100a0030201020210773700aea87b629f5a1a28895cce3ef1300a06082a8648ce3d0403023010310e300c060355040a130553746f726a3022180f30303031303130313030303030305a180f30303031303130313030303030305a3010310e300c060355040a130553746f726a3059301306072a8648ce3d020106082a8648ce3d03010703420004cfd64f1621b3fc8629283cf876f667f341d8a25e7fe7d692aee61e5eef843f49805c15328c0c105b4a3820216712c1643e3bc6160384706fe2facb2d2fa6df01a3383036300e0603551d0f0101ff04040302020430130603551d25040c300a06082b06010505070301300f0603551d130101ff040530030101ff300a06082a8648ce3d040302034800304502202fa033fb085d71eae63266a25c39d0a2951e5a9aaa97718f127feb1f28a931d6022100d70f446ea3d7439bbfa0cf8e0dfd530649ac37d35f9c9b18d48d80dcd284beaf');
INSERT INTO certificate VALUES(2,X'2b3a5863a41f25408a8f5348839d7a1361dbd886d75786bb139a8ca0bdf41000',X'3082016230820107a003020102021014b88821c7656cb81c018becec7890d9300a06082a8648ce3d0403023010310e300c060355040a130553746f726a3022180f30303031303130313030303030305a180f30303031303130313030303030305a3010310e300c060355040a130553746f726a3059301306072a8648ce3d020106082a8648ce3d030107034200048a0de5abc8fe7ef79268c6d3537a7ae6e5de8c9d9c6d2e7d905e53451cbc937dc30ec8bf122d2b1da76d37789fa7b4cabeacb8ca1198e9c2a3c2beb9d0989767a33f303d300e0603551d0f0101ff0404030205a0301d0603551d250416301406082b0601050507030106082b06010505070302300c0603551d130101ff04023000300a06082a8648ce3d04030203490030460221008acdfd5b518203817a68baca94214ba67599499e4f3f37a263c3fc21b8aa199b0221008a4f49fdd95d6eb005b4abb2af8cef504a5dbb9117e6282402c16304b11e1ee53082015b30820101a003020102021100fdfc8b0889977076db13fb8c8aafa0df300a06082a8648ce3d0403023010310e300c060355040a130553746f726a3022180f30303031303130313030303030305a180f30303031303130313030303030305a3010310e300c060355040a130553746f726a3059301306072a8648ce3d020106082a8648ce3d03010703420004d2b8b6fb4adbf0ab2aef7524bfed63969eb4d47cc4c97715cea6d02708101fd392a6c1415302876c3924635e3c6652b38ffd4157f21a3b0563bb1a23e497405fa3383036300e0603551d0f0101ff04040302020430130603551d25040c300a06082b06010505070301300f0603551d130101ff040530030101ff300a06082a8648ce3d0403020348003045022028657adc5655ef62371aa197e0f8b2abfa99204e7cc248ea48c8708ff37e7b37022100cfbd362c4dc028e875fb2c3d6fd4397c679d6360e08e79a6694f48c520a91bd53082015a30820100a0030201020210773700aea87b629f5a1a28895cce3ef1300a06082a8648ce3d0403023010310e300c060355040a130553746f726a3022180f30303031303130313030303030305a180f30303031303130313030303030305a3010310e300c060355040a130553746f726a3059301306072a8648ce3d020106082a8648ce3d03010703420004cfd64f1621b3fc8629283cf876f667f341d8a25e7fe7d692aee61e5eef843f49805c15328c0c105b4a3820216712c1643e3bc6160384706fe2facb2d2fa6df01a3383036300e0603551d0f0101ff04040302020430130603551d25040c300a06082b06010505070301300f0603551d130101ff040530030101ff300a06082a8648ce3d040302034800304502202fa033fb085d71eae63266a25c39d0a2951e5a9aaa97718f127feb1f28a931d6022100d70f446ea3d7439bbfa0cf8e0dfd530649ac37d35f9c9b18d48d80dcd284beaf');
INSERT INTO unsent_order VALUES(X'2b3a5863a41f25408a8f5348839d7a1361dbd886d75786bb139a8ca0bdf41000',X'1eddef484b4c03f01332279032796972',X'0a101eddef484b4c03f0133227903279697212202b3a5863a41f25408a8f5348839d7a1361dbd886d75786bb139a8ca0bdf410001a201968996e7ef170a402fdfd88b6753df792c063c07c555905ffac9cd3cbd1c00022200ed28abb2813e184a1e98b0f6605c4911ea468c7e8433eb583e0fca7ceac30002a20d00cf14f3c68b56321ace04902dec0484eb6f9098b22b31c6b3f82db249f191630643802420c08dfeb88e50510a8c1a5b9034a0c08dfeb88e50510a8c1a5b9035246304402204df59dc6f5d1bb7217105efbc9b3604d19189af37a81efbf16258e5d7db5549e02203bb4ead16e6e7f10f658558c22b59c3339911841e8dbaae6e2dea821f7326894',X'0a101eddef484b4c03f0133227903279697210321a47304502206d4c106ddec88140414bac5979c95bdea7de2e0ecc5be766e08f7d5ea36641a7022100e932ff858f15885ffa52d07e260c2c25d3861810ea6157956c1793ad0c906284','2019-04-01 16:01:35.9254586+00:00',1);
INSERT INTO pieceinfo VALUES(X'0ed28abb2813e184a1e98b0f6605c4911ea468c7e8433eb583e0fca7ceac3000',X'd5e757fd8d207d1c46583fb58330f803dc961b71147308ff75ff1e72a0df6b0b',123,'2019-05-09 00:00:00.000000+00:00',X'0a20d5e757fd8d207d1c46583fb58330f803dc961b71147308ff75ff1e72a0df6b0b120501020304051a47304502201c16d76ecd9b208f7ad9f1edf66ce73dce50da6bde6bbd7d278415099a727421022100ca730450e7f6506c2647516f6e20d0641e47c8270f58dde2bb07d1f5a3a45673',1,NULL);
INSERT INTO pieceinfo VALUES(X'2b3a5863a41f25408a8f5348839d7a1361dbd886d75786bb139a8ca0bdf41000',X'd5e757fd8d207d1c46583fb58330f803dc961b71147308ff75ff1e72a0df6b0b',123,'2019-05-09 00:00:00.000000+00:00',X'0a20d5e757fd8d207d1c46583fb58330f803dc961b71147308ff75ff1e72a0df6b0b120501020304051a483046022100e623cf4705046e2c04d5b42d5edbecb81f000459713ad460c691b3361817adbf022100993da2a5298bb88de6c35b2e54009d1bf306cda5d441c228aa9eaf981ceb0f3d',2,NULL);
INSERT INTO bandwidth_usage VALUES(X'0ed28abb2813e184a1e98b0f6605c4911ea468c7e8433eb583e0fca7ceac3000',0,0,'2019-04-01 18:51:24.1074772+03:00');
INSERT INTO bandwidth_usage VALUES(X'2b3a5863a41f25408a8f5348839d7a1361dbd886d75786bb139a8ca0bdf41000',0,0,'2019-04-01 20:51:24.1074772+03:00');
INSERT INTO bandwidth_usage VALUES(X'0ed28abb2813e184a1e98b0f6605c4911ea468c7e8433eb583e0fca7ceac3000',1,1,'2019-04-01 18:51:24.1074772+03:00');
INSERT INTO bandwidth_usage VALUES(X'2b3a5863a41f25408a8f5348839d7a1361dbd886d75786bb139a8ca0bdf41000',1,1,'2019-04-01 20:51:24.1074772+03:00');
INSERT INTO bandwidth_usage VALUES(X'0ed28abb2813e184a1e98b0f6605c4911ea468c7e8433eb583e0fca7ceac3000',2,2,'2019-04-01 18:51:24.1074772+03:00');
INSERT INTO bandwidth_usage VALUES(X'2b3a5863a41f25408a8f5348839d7a1361dbd886d75786bb139a8ca0bdf41000',2,2,'2019-04-01 20:51:24.1074772+03:00');
INSERT INTO bandwidth_usage VALUES(X'0ed28abb2813e184a1e98b0f6605c4911ea468c7e8433eb583e0fca7ceac3000',3,3,'2019-04-01 18:51:24.1074772+03:00');
INSERT INTO bandwidth_usage VALUES(X'2b3a5863a41f25408a8f5348839d7a1361dbd886d75786bb139a8ca0bdf41000',3,3,'2019-04-01 20:51:24.1074772+03:00');
INSERT INTO bandwidth_usage VALUES(X'0ed28abb2813e184a1e98b0f6605c4911ea468c7e8433eb583e0fca7ceac3000',4,4,'2019-04-01 18:51:24.1074772+03:00');
INSERT INTO bandwidth_usage VALUES(X'2b3a5863a41f25408a8f5348839d7a1361dbd886d75786bb139a8ca0bdf41000',4,4,'2019-04-01 20:51:24.1074772+03:00');
INSERT INTO bandwidth_usage VALUES(X'0ed28abb2813e184a1e98b0f6605c4911ea468c7e8433eb583e0fca7ceac3000',5,5,'2019-04-01 18:51:24.1074772+03:00');
INSERT INTO bandwidth_usage VALUES(X'2b3a5863a41f25408a8f5348839d7a1361dbd886d75786bb139a8ca0bdf41000',5,5,'2019-04-01 20:51:24.1074772+03:00');
INSERT INTO bandwidth_usage VALUES(X'0ed28abb2813e184a1e98b0f6605c4911ea468c7e8433eb583e0fca7ceac3000',6,6,'2019-04-01 18:51:24.1074772+03:00');
INSERT INTO bandwidth_usage VALUES(X'2b3a5863a41f25408a8f5348839d7a1361dbd886d75786bb139a8ca0bdf41000',6,6,'2019-04-01 20:51:24.1074772+03:00');
INSERT INTO bandwidth_usage VALUES(X'0ed28abb2813e184a1e98b0f6605c4911ea468c7e8433eb583e0fca7ceac3000',1,1,'2019-04-01 18:51:24.1074772+03:00');
INSERT INTO bandwidth_usage VALUES(X'2b3a5863a41f25408a8f5348839d7a1361dbd886d75786bb139a8ca0bdf41000',1,1,'2019-04-01 20:51:24.1074772+03:00');
INSERT INTO bandwidth_usage VALUES(X'0ed28abb2813e184a1e98b0f6605c4911ea468c7e8433eb583e0fca7ceac3000',2,2,'2019-04-01 18:51:24.1074772+03:00');
INSERT INTO bandwidth_usage VALUES(X'2b3a5863a41f25408a8f5348839d7a1361dbd886d75786bb139a8ca0bdf41000',2,2,'2019-04-01 20:51:24.1074772+03:00');
INSERT INTO bandwidth_usage VALUES(X'0ed28abb2813e184a1e98b0f6605c4911ea468c7e8433eb583e0fca7ceac3000',3,3,'2019-04-01 18:51:24.1074772+03:00');
INSERT INTO bandwidth_usage VALUES(X'2b3a5863a41f25408a8f5348839d7a1361dbd886d75786bb139a8ca0bdf41000',3,3,'2019-04-01 20:51:24.1074772+03:00');
INSERT INTO bandwidth_usage VALUES(X'0ed28abb2813e184a1e98b0f6605c4911ea468c7e8433eb583e0fca7ceac3000',4,4,'2019-04-01 18:51:24.1074772+03:00');
INSERT INTO bandwidth_usage VALUES(X'2b3a5863a41f25408a8f5348839d7a1361dbd886d75786bb139a8ca0bdf41000',4,4,'2019-04-01 20:51:24.1074772+03:00');
INSERT INTO bandwidth_usage VALUES(X'0ed28abb2813e184a1e98b0f6605c4911ea468c7e8433eb583e0fca7ceac3000',5,5,'2019-04-01 18:51:24.1074772+03:00');
INSERT INTO bandwidth_usage VALUES(X'2b3a5863a41f25408a8f5348839d7a1361dbd886d75786bb139a8ca0bdf41000',5,5,'2019-04-01 20:51:24.1074772+03:00');
INSERT INTO bandwidth_usage VALUES(X'0ed28abb2813e184a1e98b0f6605c4911ea468c7e8433eb583e0fca7ceac3000',6,6,'2019-04-01 18:51:24.1074772+03:00');
INSERT INTO bandwidth_usage VALUES(X'2b3a5863a41f25408a8f5348839d7a1361dbd886d75786bb139a8ca0bdf41000',6,6,'2019-04-01 20:51:24.1074772+03:00');
INSERT INTO order_archive VALUES(X'2b3a5863a41f25408a8f5348839d7a1361dbd886d75786bb139a8ca0bdf41000',X'62180593328b8ff3c9f97565fdfd305d',X'0a1062180593328b8ff3c9f97565fdfd305d12202b3a5863a41f25408a8f5348839d7a1361dbd886d75786bb139a8ca0bdf410001a201968996e7ef170a402fdfd88b6753df792c063c07c555905ffac9cd3cbd1c00022200ed28abb2813e184a1e98b0f6605c4911ea468c7e8433eb583e0fca7ceac30002a2077003db64dfd50c5bdc84daf28bcef97f140d302c3e5bfd002bcc7ac04e1273430643802420c08fce688e50510a0ffe7ff014a0c08fce688e50510a0ffe7ff0152473045022100943d90068a1b1e6879b16a6ed8cdf0237005de09f61cddab884933fefd9692bf0220417a74f2e59523d962e800a1b06618f0113039d584e28aae37737e4a71555966',X'0a1062180593328b8ff3c9f97565fdfd305d10321a47304502200f4d97f03ad2d87501f68bfcf0525ec518aebf817cf56aa5eeaea53d01b153a102210096e60cf4b594837b43b5c841d283e4b72c9a09207d64bdd4665c700dc2e0a4a2',1,1,'2019-04-01 18:51:24.5374893+03:00');