satellite/repair: migrate to new repair_queue table

We want to use StreamID/Position to identify injured
segment. As it is hard to alter existing injuredsegments
table we are adding a new table that will replace existing
one. Old table will be dropped later.

Change-Id: I0d3b06522645013178b6678c19378ebafe485c49
This commit is contained in:
Michał Niewrzał 2021-06-17 17:05:04 +02:00
parent 0ec3867ec0
commit d53aacc058
22 changed files with 1311 additions and 389 deletions

View File

@ -516,11 +516,11 @@ func cmdQDiag(cmd *cobra.Command, args []string) (err error) {
// initialize the table header (fields)
const padding = 3
w := tabwriter.NewWriter(os.Stdout, 0, 0, padding, ' ', tabwriter.AlignRight|tabwriter.Debug)
fmt.Fprintln(w, "Path\tLost Pieces\t")
fmt.Fprintln(w, "Segment StreamID\tSegment Position\tSegment Health\t")
// populate the row fields
for _, v := range list {
fmt.Fprint(w, v.GetPath(), "\t", v.GetLostPieces(), "\t")
fmt.Fprint(w, v.StreamID.String(), "\t", v.Position.Encode(), "\t", v.SegmentHealth, "\t")
}
// display the data

View File

@ -17,6 +17,7 @@ import (
"storj.io/storj/private/testplanet"
"storj.io/storj/satellite/accounting"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/repair/queue"
)
func TestBilling_DownloadWithoutExpansionFactor(t *testing.T) {
@ -260,13 +261,11 @@ func TestBilling_AuditRepairTraffic(t *testing.T) {
}
// trigger repair
loc := metabase.SegmentLocation{
ProjectID: objectsBefore[0].ProjectID,
BucketName: objectsBefore[0].BucketName,
ObjectKey: objectsBefore[0].ObjectKey,
Position: metabase.SegmentPosition{Index: 0},
queueSegment := queue.InjuredSegment{
StreamID: objectsBefore[0].StreamID,
Position: metabase.SegmentPosition{Index: 0},
}
_, err = satelliteSys.Repairer.SegmentRepairer.Repair(ctx, string(loc.Encode()))
_, err = satelliteSys.Repairer.SegmentRepairer.Repair(ctx, &queueSegment)
require.NoError(t, err)
// get the only metainfo record (our upload)

View File

@ -1,104 +0,0 @@
// Code generated by protoc-gen-gogo. DO NOT EDIT.
// source: datarepair.proto
package internalpb
import (
fmt "fmt"
math "math"
time "time"
proto "github.com/gogo/protobuf/proto"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
var _ = time.Kitchen
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package
// InjuredSegment is the queue item used for the data repair queue.
type InjuredSegment struct {
Path []byte `protobuf:"bytes,1,opt,name=path,proto3" json:"path,omitempty"`
LostPieces []int32 `protobuf:"varint,2,rep,packed,name=lost_pieces,json=lostPieces,proto3" json:"lost_pieces,omitempty"`
InsertedTime time.Time `protobuf:"bytes,3,opt,name=inserted_time,json=insertedTime,proto3,stdtime" json:"inserted_time"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *InjuredSegment) Reset() { *m = InjuredSegment{} }
func (m *InjuredSegment) String() string { return proto.CompactTextString(m) }
func (*InjuredSegment) ProtoMessage() {}
func (*InjuredSegment) Descriptor() ([]byte, []int) {
return fileDescriptor_b1b08e6fe9398aa6, []int{0}
}
func (m *InjuredSegment) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_InjuredSegment.Unmarshal(m, b)
}
func (m *InjuredSegment) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_InjuredSegment.Marshal(b, m, deterministic)
}
func (m *InjuredSegment) XXX_Merge(src proto.Message) {
xxx_messageInfo_InjuredSegment.Merge(m, src)
}
func (m *InjuredSegment) XXX_Size() int {
return xxx_messageInfo_InjuredSegment.Size(m)
}
func (m *InjuredSegment) XXX_DiscardUnknown() {
xxx_messageInfo_InjuredSegment.DiscardUnknown(m)
}
var xxx_messageInfo_InjuredSegment proto.InternalMessageInfo
func (m *InjuredSegment) GetPath() []byte {
if m != nil {
return m.Path
}
return nil
}
func (m *InjuredSegment) GetLostPieces() []int32 {
if m != nil {
return m.LostPieces
}
return nil
}
func (m *InjuredSegment) GetInsertedTime() time.Time {
if m != nil {
return m.InsertedTime
}
return time.Time{}
}
func init() {
proto.RegisterType((*InjuredSegment)(nil), "satellite.repair.InjuredSegment")
}
func init() { proto.RegisterFile("datarepair.proto", fileDescriptor_b1b08e6fe9398aa6) }
var fileDescriptor_b1b08e6fe9398aa6 = []byte{
// 230 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x3c, 0x8f, 0x4d, 0x4e, 0xc3, 0x30,
0x10, 0x85, 0x31, 0x05, 0x84, 0xdc, 0x82, 0x2a, 0xaf, 0xa2, 0x6c, 0x12, 0x81, 0x90, 0xb2, 0xb2,
0x25, 0xb8, 0x41, 0x77, 0xdd, 0xa1, 0xc0, 0x8a, 0x4d, 0xe5, 0x90, 0xc1, 0xb8, 0x72, 0x3c, 0x96,
0x3d, 0xbd, 0x47, 0x8e, 0xc5, 0x29, 0xe0, 0x2a, 0x28, 0x8e, 0xd2, 0xdd, 0x9b, 0xf7, 0xe6, 0xe7,
0x1b, 0xbe, 0xed, 0x35, 0xe9, 0x08, 0x41, 0xdb, 0x28, 0x43, 0x44, 0x42, 0xb1, 0x4d, 0x9a, 0xc0,
0x39, 0x4b, 0x20, 0x67, 0xbf, 0xe4, 0x06, 0x0d, 0xce, 0x69, 0x59, 0x19, 0x44, 0xe3, 0x40, 0xe5,
0xaa, 0x3b, 0x7d, 0x29, 0xb2, 0x03, 0x24, 0xd2, 0x43, 0x98, 0x1b, 0x1e, 0x46, 0xc6, 0xef, 0xf7,
0xfe, 0x78, 0x8a, 0xd0, 0xbf, 0x81, 0x19, 0xc0, 0x93, 0x10, 0xfc, 0x2a, 0x68, 0xfa, 0x2e, 0x58,
0xcd, 0x9a, 0x4d, 0x9b, 0xb5, 0xa8, 0xf8, 0xda, 0x61, 0xa2, 0x43, 0xb0, 0xf0, 0x09, 0xa9, 0xb8,
0xac, 0x57, 0xcd, 0x75, 0xcb, 0x27, 0xeb, 0x35, 0x3b, 0x62, 0xcf, 0xef, 0xac, 0x4f, 0x10, 0x09,
0xfa, 0xc3, 0x74, 0xa3, 0x58, 0xd5, 0xac, 0x59, 0x3f, 0x97, 0x72, 0x06, 0x90, 0x0b, 0x80, 0x7c,
0x5f, 0x00, 0x76, 0xb7, 0x3f, 0xbf, 0xd5, 0xc5, 0xf8, 0x57, 0xb1, 0x76, 0xb3, 0x8c, 0x4e, 0xe1,
0xee, 0xe9, 0xe3, 0x31, 0x11, 0xc6, 0xa3, 0xb4, 0xa8, 0xb2, 0x50, 0xe7, 0x17, 0x95, 0xf5, 0x04,
0xd1, 0x6b, 0x17, 0xba, 0xee, 0x26, 0xaf, 0x7c, 0xf9, 0x0f, 0x00, 0x00, 0xff, 0xff, 0x1a, 0xcb,
0xa5, 0x58, 0x13, 0x01, 0x00, 0x00,
}

View File

@ -1,17 +0,0 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
syntax = "proto3";
option go_package = "storj.io/storj/satellite/internalpb";
import "gogo.proto";
import "google/protobuf/timestamp.proto";
package satellite.repair;
// InjuredSegment is the queue item used for the data repair queue.
message InjuredSegment {
bytes path = 1;
repeated int32 lost_pieces = 2;
google.protobuf.Timestamp inserted_time = 3 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
}

View File

@ -1,41 +0,0 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package internalpb
import (
"database/sql/driver"
proto "github.com/gogo/protobuf/proto"
"github.com/zeebo/errs"
"storj.io/common/pb"
)
var scanError = errs.Class("Protobuf Scanner")
var valueError = errs.Class("Protobuf Valuer")
// scan automatically converts database []byte to proto.Messages.
func scan(msg proto.Message, value interface{}) error {
bytes, ok := value.([]byte)
if !ok {
return scanError.New("%t was %t, expected []bytes", msg, value)
}
return scanError.Wrap(pb.Unmarshal(bytes, msg))
}
// value automatically converts proto.Messages to database []byte.
func value(msg proto.Message) (driver.Value, error) {
value, err := pb.Marshal(msg)
return value, valueError.Wrap(err)
}
// Scan implements the Scanner interface.
func (n *InjuredSegment) Scan(value interface{}) error {
return scan(n, value)
}
// Value implements the driver Valuer interface.
func (n InjuredSegment) Value() (driver.Value, error) {
return value(&n)
}

View File

@ -15,7 +15,6 @@ import (
"storj.io/common/pb"
"storj.io/common/storj"
"storj.io/common/sync2"
"storj.io/storj/satellite/internalpb"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/metabase/metaloop"
"storj.io/storj/satellite/overlay"
@ -275,7 +274,6 @@ func (obs *checkerObserver) RemoteSegment(ctx context.Context, segment *metaloop
mon.FloatVal("checker_segment_health").Observe(segmentHealth) //mon:locked
stats.segmentHealth.Observe(segmentHealth)
key := segment.Location.Encode()
// we repair when the number of healthy pieces is less than or equal to the repair threshold and is greater or equal to
// minimum required pieces in redundancy
// except for the case when the repair and success thresholds are the same (a case usually seen during testing)
@ -284,11 +282,12 @@ func (obs *checkerObserver) RemoteSegment(ctx context.Context, segment *metaloop
stats.injuredSegmentHealth.Observe(segmentHealth)
obs.monStats.remoteSegmentsNeedingRepair++
stats.iterationAggregates.remoteSegmentsNeedingRepair++
alreadyInserted, err := obs.repairQueue.Insert(ctx, &internalpb.InjuredSegment{
Path: key,
LostPieces: missingPieces,
InsertedTime: time.Now().UTC(),
}, segmentHealth)
alreadyInserted, err := obs.repairQueue.Insert(ctx, &queue.InjuredSegment{
StreamID: segment.StreamID,
Position: segment.Position,
UpdatedAt: time.Now().UTC(),
SegmentHealth: segmentHealth,
})
if err != nil {
obs.log.Error("error adding injured segment to queue", zap.Error(err))
return nil

View File

@ -4,7 +4,6 @@
package checker_test
import (
"bytes"
"context"
"fmt"
"testing"
@ -15,6 +14,7 @@ import (
"storj.io/common/storj"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/common/uuid"
"storj.io/storj/private/testplanet"
"storj.io/storj/satellite/metabase"
)
@ -54,7 +54,7 @@ func TestIdentifyInjuredSegments(t *testing.T) {
// add pointer that needs repair
expectedLocation.ObjectKey = metabase.ObjectKey("b-0")
insertSegment(ctx, t, planet, rs, expectedLocation, createLostPieces(planet, rs), time.Time{})
b0StreamID := insertSegment(ctx, t, planet, rs, expectedLocation, createLostPieces(planet, rs), time.Time{})
// add pointer that is unhealthy, but is expired
expectedLocation.ObjectKey = metabase.ObjectKey("b-1")
@ -75,12 +75,7 @@ func TestIdentifyInjuredSegments(t *testing.T) {
err = repairQueue.Delete(ctx, injuredSegment)
require.NoError(t, err)
expectedLocation.ObjectKey = "b-0"
require.Equal(t, string(expectedLocation.Encode()), string(injuredSegment.Path))
require.Equal(t, int(rs.OptimalShares-rs.RequiredShares), len(injuredSegment.LostPieces))
for _, lostPiece := range injuredSegment.LostPieces {
require.True(t, int32(rs.RequiredShares) <= lostPiece && lostPiece < int32(rs.OptimalShares), fmt.Sprintf("%v", lostPiece))
}
require.Equal(t, b0StreamID, injuredSegment.StreamID)
_, err = repairQueue.Select(ctx)
require.Error(t, err)
@ -204,9 +199,11 @@ func TestCleanRepairQueue(t *testing.T) {
insertSegment(ctx, t, planet, rs, expectedLocation, createPieces(planet, rs), time.Time{})
}
unhealthyCount := 5
unhealthyIDs := make(map[uuid.UUID]struct{})
for i := 0; i < unhealthyCount; i++ {
expectedLocation.ObjectKey = metabase.ObjectKey(fmt.Sprintf("unhealthy-%d", i))
insertSegment(ctx, t, planet, rs, expectedLocation, createLostPieces(planet, rs), time.Time{})
unhealthyStreamID := insertSegment(ctx, t, planet, rs, expectedLocation, createLostPieces(planet, rs), time.Time{})
unhealthyIDs[unhealthyStreamID] = struct{}{}
}
// suspend enough nodes to make healthy pointers unhealthy
@ -247,9 +244,11 @@ func TestCleanRepairQueue(t *testing.T) {
segs, err := repairQueue.SelectN(ctx, count)
require.NoError(t, err)
require.Equal(t, len(unhealthyIDs), len(segs))
for _, s := range segs {
require.True(t, bytes.Contains(s.GetPath(), []byte("unhealthy")))
_, ok := unhealthyIDs[s.StreamID]
require.True(t, ok)
}
})
}
@ -282,7 +281,7 @@ func createLostPieces(planet *testplanet.Planet, rs storj.RedundancyScheme) meta
return pieces
}
func insertSegment(ctx context.Context, t *testing.T, planet *testplanet.Planet, rs storj.RedundancyScheme, location metabase.SegmentLocation, pieces metabase.Pieces, expire time.Time) {
func insertSegment(ctx context.Context, t *testing.T, planet *testplanet.Planet, rs storj.RedundancyScheme, location metabase.SegmentLocation, pieces metabase.Pieces, expire time.Time) uuid.UUID {
var expiresAt *time.Time
if !expire.IsZero() {
expiresAt = &expire
@ -332,4 +331,6 @@ func insertSegment(ctx context.Context, t *testing.T, planet *testplanet.Planet,
ObjectStream: obj,
})
require.NoError(t, err)
return obj.StreamID
}

View File

@ -7,27 +7,40 @@ import (
"context"
"time"
"storj.io/storj/satellite/internalpb"
"storj.io/common/uuid"
"storj.io/storj/satellite/metabase"
)
// InjuredSegment contains information about segment which
// should be repaired.
type InjuredSegment struct {
StreamID uuid.UUID
Position metabase.SegmentPosition
SegmentHealth float64
AttemptedAt *time.Time
UpdatedAt time.Time
InsertedAt time.Time
}
// RepairQueue implements queueing for segments that need repairing.
// Implementation can be found at satellite/satellitedb/repairqueue.go.
//
// architecture: Database
type RepairQueue interface {
// Insert adds an injured segment.
Insert(ctx context.Context, s *internalpb.InjuredSegment, segmentHealth float64) (alreadyInserted bool, err error)
Insert(ctx context.Context, s *InjuredSegment) (alreadyInserted bool, err error)
// Select gets an injured segment.
Select(ctx context.Context) (*internalpb.InjuredSegment, error)
Select(ctx context.Context) (*InjuredSegment, error)
// Delete removes an injured segment.
Delete(ctx context.Context, s *internalpb.InjuredSegment) error
Delete(ctx context.Context, s *InjuredSegment) error
// Clean removes all segments last updated before a certain time
Clean(ctx context.Context, before time.Time) (deleted int64, err error)
// SelectN lists limit amount of injured segments.
SelectN(ctx context.Context, limit int) ([]internalpb.InjuredSegment, error)
SelectN(ctx context.Context, limit int) ([]InjuredSegment, error)
// Count counts the number of segments in the repair queue.
Count(ctx context.Context) (count int, err error)
// TestingSetAttemptedTime sets attempted time for a repairpath.
TestingSetAttemptedTime(ctx context.Context, repairpath []byte, t time.Time) (rowsAffected int64, err error)
// TestingSetAttemptedTime sets attempted time for a segment.
TestingSetAttemptedTime(ctx context.Context, streamID uuid.UUID, position metabase.SegmentPosition, t time.Time) (rowsAffected int64, err error)
}

View File

@ -5,7 +5,6 @@ package queue_test
import (
"math/rand"
"strconv"
"testing"
"time"
@ -14,10 +13,13 @@ import (
"go.uber.org/zap/zaptest"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/common/uuid"
"storj.io/private/dbutil/pgtest"
"storj.io/private/dbutil/tempdb"
"storj.io/storj/satellite"
"storj.io/storj/satellite/internalpb"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/repair/queue"
"storj.io/storj/satellite/satellitedb"
"storj.io/storj/satellite/satellitedb/satellitedbtest"
"storj.io/storj/storage"
@ -28,14 +30,15 @@ func TestUntilEmpty(t *testing.T) {
repairQueue := db.RepairQueue()
// insert a bunch of segments
pathsMap := make(map[string]int)
idsMap := make(map[uuid.UUID]int)
for i := 0; i < 20; i++ {
path := "/path/" + strconv.Itoa(i)
injuredSeg := &internalpb.InjuredSegment{Path: []byte(path)}
alreadyInserted, err := repairQueue.Insert(ctx, injuredSeg, 10)
injuredSeg := &queue.InjuredSegment{
StreamID: testrand.UUID(),
}
alreadyInserted, err := repairQueue.Insert(ctx, injuredSeg)
require.NoError(t, err)
require.False(t, alreadyInserted)
pathsMap[path] = 0
idsMap[injuredSeg.StreamID] = 0
}
// select segments until no more are returned, and we should get each one exactly once
@ -45,10 +48,10 @@ func TestUntilEmpty(t *testing.T) {
require.True(t, storage.ErrEmptyQueue.Has(err))
break
}
pathsMap[string(injuredSeg.Path)]++
idsMap[injuredSeg.StreamID]++
}
for _, selectCount := range pathsMap {
for _, selectCount := range idsMap {
assert.Equal(t, selectCount, 1)
}
})
@ -58,48 +61,52 @@ func TestOrder(t *testing.T) {
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
repairQueue := db.RepairQueue()
nullPath := []byte("/path/null")
recentRepairPath := []byte("/path/recent")
oldRepairPath := []byte("/path/old")
olderRepairPath := []byte("/path/older")
nullID := testrand.UUID()
recentID := testrand.UUID()
oldID := testrand.UUID()
olderID := testrand.UUID()
for _, path := range [][]byte{oldRepairPath, recentRepairPath, nullPath, olderRepairPath} {
injuredSeg := &internalpb.InjuredSegment{Path: path}
alreadyInserted, err := repairQueue.Insert(ctx, injuredSeg, 10)
for _, streamID := range []uuid.UUID{oldID, recentID, nullID, olderID} {
injuredSeg := &queue.InjuredSegment{
StreamID: streamID,
SegmentHealth: 10,
}
alreadyInserted, err := repairQueue.Insert(ctx, injuredSeg)
require.NoError(t, err)
require.False(t, alreadyInserted)
}
updateList := []struct {
path []byte
attempted time.Time
streamID uuid.UUID
attemptedAt time.Time
}{
{recentRepairPath, time.Now()},
{oldRepairPath, time.Now().Add(-7 * time.Hour)},
{olderRepairPath, time.Now().Add(-8 * time.Hour)},
{recentID, time.Now()},
{oldID, time.Now().Add(-7 * time.Hour)},
{olderID, time.Now().Add(-8 * time.Hour)},
}
for _, item := range updateList {
rowsAffected, err := db.RepairQueue().TestingSetAttemptedTime(ctx, item.path, item.attempted)
rowsAffected, err := db.RepairQueue().TestingSetAttemptedTime(ctx,
item.streamID, metabase.SegmentPosition{}, item.attemptedAt)
require.NoError(t, err)
require.EqualValues(t, 1, rowsAffected)
}
// path with attempted = null should be selected first
// segment with attempted = null should be selected first
injuredSeg, err := repairQueue.Select(ctx)
require.NoError(t, err)
assert.Equal(t, string(nullPath), string(injuredSeg.Path))
assert.Equal(t, nullID, injuredSeg.StreamID)
// path with attempted = 8 hours ago should be selected next
// segment with attempted = 8 hours ago should be selected next
injuredSeg, err = repairQueue.Select(ctx)
require.NoError(t, err)
assert.Equal(t, string(olderRepairPath), string(injuredSeg.Path))
assert.Equal(t, olderID, injuredSeg.StreamID)
// path with attempted = 7 hours ago should be selected next
// segment with attempted = 7 hours ago should be selected next
injuredSeg, err = repairQueue.Select(ctx)
require.NoError(t, err)
assert.Equal(t, string(oldRepairPath), string(injuredSeg.Path))
assert.Equal(t, oldID, injuredSeg.StreamID)
// queue should be considered "empty" now
// segment should be considered "empty" now
injuredSeg, err = repairQueue.Select(ctx)
assert.True(t, storage.ErrEmptyQueue.Has(err))
assert.Nil(t, injuredSeg)
@ -128,29 +135,29 @@ func testorderHealthyPieces(t *testing.T, connStr string) {
repairQueue := db.RepairQueue()
// we insert (path, segmentHealth, lastAttempted) as follows:
// ("path/a", 6, now-8h)
// ("path/b", 7, now)
// ("path/c", 8, null)
// ("path/d", 9, null)
// ("path/e", 9, now-7h)
// ("path/f", 9, now-8h)
// ("path/g", 10, null)
// ("path/h", 10, now-8h)
// ("a", 6, now-8h)
// ("b", 7, now)
// ("c", 8, null)
// ("d", 9, null)
// ("e", 9, now-7h)
// ("f", 9, now-8h)
// ("g", 10, null)
// ("h", 10, now-8h)
// insert the 8 segments according to the plan above
injuredSegList := []struct {
path []byte
streamID uuid.UUID
segmentHealth float64
attempted time.Time
}{
{[]byte("path/a"), 6, time.Now().Add(-8 * time.Hour)},
{[]byte("path/b"), 7, time.Now()},
{[]byte("path/c"), 8, time.Time{}},
{[]byte("path/d"), 9, time.Time{}},
{[]byte("path/e"), 9, time.Now().Add(-7 * time.Hour)},
{[]byte("path/f"), 9, time.Now().Add(-8 * time.Hour)},
{[]byte("path/g"), 10, time.Time{}},
{[]byte("path/h"), 10, time.Now().Add(-8 * time.Hour)},
{uuid.UUID{'a'}, 6, time.Now().Add(-8 * time.Hour)},
{uuid.UUID{'b'}, 7, time.Now()},
{uuid.UUID{'c'}, 8, time.Time{}},
{uuid.UUID{'d'}, 9, time.Time{}},
{uuid.UUID{'e'}, 9, time.Now().Add(-7 * time.Hour)},
{uuid.UUID{'f'}, 9, time.Now().Add(-8 * time.Hour)},
{uuid.UUID{'g'}, 10, time.Time{}},
{uuid.UUID{'h'}, 10, time.Now().Add(-8 * time.Hour)},
}
// shuffle list since select order should not depend on insert order
rand.Seed(time.Now().UnixNano())
@ -159,14 +166,17 @@ func testorderHealthyPieces(t *testing.T, connStr string) {
})
for _, item := range injuredSegList {
// first, insert the injured segment
injuredSeg := &internalpb.InjuredSegment{Path: item.path}
alreadyInserted, err := repairQueue.Insert(ctx, injuredSeg, item.segmentHealth)
injuredSeg := &queue.InjuredSegment{
StreamID: item.streamID,
SegmentHealth: item.segmentHealth,
}
alreadyInserted, err := repairQueue.Insert(ctx, injuredSeg)
require.NoError(t, err)
require.False(t, alreadyInserted)
// next, if applicable, update the "attempted at" timestamp
if !item.attempted.IsZero() {
rowsAffected, err := db.RepairQueue().TestingSetAttemptedTime(ctx, item.path, item.attempted)
rowsAffected, err := db.RepairQueue().TestingSetAttemptedTime(ctx, item.streamID, metabase.SegmentPosition{}, item.attempted)
require.NoError(t, err)
require.EqualValues(t, 1, rowsAffected)
}
@ -177,21 +187,21 @@ func testorderHealthyPieces(t *testing.T, connStr string) {
// (excluding segments that have been attempted in the past six hours)
// we do not expect to see segments that have been attempted in the past hour
// therefore, the order of selection should be:
// "path/a", "path/c", "path/d", "path/f", "path/e", "path/g", "path/h"
// "path/b" will not be selected because it was attempted recently
// "a", "c", "d", "f", "e", "g", "h"
// "b" will not be selected because it was attempted recently
for _, nextPath := range []string{
"path/a",
"path/c",
"path/d",
"path/f",
"path/e",
"path/g",
"path/h",
for _, nextID := range []uuid.UUID{
{'a'},
{'c'},
{'d'},
{'f'},
{'e'},
{'g'},
{'h'},
} {
injuredSeg, err := repairQueue.Select(ctx)
require.NoError(t, err)
assert.Equal(t, nextPath, string(injuredSeg.Path))
assert.Equal(t, nextID, injuredSeg.StreamID)
}
// queue should be considered "empty" now
@ -205,23 +215,28 @@ func TestOrderOverwrite(t *testing.T) {
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
repairQueue := db.RepairQueue()
// insert "path/a" with segment segment health 10
// insert "path/b" with segment segment health 9
// re-insert "path/a" with segment segment health 8
// when we select, expect "path/a" first since after the re-insert, it is the least durable segment.
// insert segment A with segment health 10
// insert segment B with segment health 9
// re-insert segment A with segment segment health 8
// when we select, expect segment A first since after the re-insert, it is the least durable segment.
segmentA := uuid.UUID{1}
segmentB := uuid.UUID{2}
// insert the 8 segments according to the plan above
injuredSegList := []struct {
path []byte
streamID uuid.UUID
segmentHealth float64
}{
{[]byte("path/a"), 10},
{[]byte("path/b"), 9},
{[]byte("path/a"), 8},
{segmentA, 10},
{segmentB, 9},
{segmentA, 8},
}
for i, item := range injuredSegList {
injuredSeg := &internalpb.InjuredSegment{Path: item.path}
alreadyInserted, err := repairQueue.Insert(ctx, injuredSeg, item.segmentHealth)
injuredSeg := &queue.InjuredSegment{
StreamID: item.streamID,
SegmentHealth: item.segmentHealth,
}
alreadyInserted, err := repairQueue.Insert(ctx, injuredSeg)
require.NoError(t, err)
if i == 2 {
require.True(t, alreadyInserted)
@ -230,13 +245,13 @@ func TestOrderOverwrite(t *testing.T) {
}
}
for _, nextPath := range []string{
"path/a",
"path/b",
for _, nextStreamID := range []uuid.UUID{
segmentA,
segmentB,
} {
injuredSeg, err := repairQueue.Select(ctx)
require.NoError(t, err)
assert.Equal(t, nextPath, string(injuredSeg.Path))
assert.Equal(t, nextStreamID, injuredSeg.StreamID)
}
// queue should be considered "empty" now
@ -251,15 +266,14 @@ func TestCount(t *testing.T) {
repairQueue := db.RepairQueue()
// insert a bunch of segments
pathsMap := make(map[string]int)
numSegments := 20
for i := 0; i < numSegments; i++ {
path := "/path/" + strconv.Itoa(i)
injuredSeg := &internalpb.InjuredSegment{Path: []byte(path)}
alreadyInserted, err := repairQueue.Insert(ctx, injuredSeg, 10)
injuredSeg := &queue.InjuredSegment{
StreamID: testrand.UUID(),
}
alreadyInserted, err := repairQueue.Insert(ctx, injuredSeg)
require.NoError(t, err)
require.False(t, alreadyInserted)
pathsMap[path] = 0
}
count, err := repairQueue.Count(ctx)

View File

@ -5,17 +5,18 @@ package queue_test
import (
"sort"
"strconv"
"testing"
"time"
"github.com/stretchr/testify/require"
"storj.io/common/errs2"
"storj.io/common/pb"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/common/uuid"
"storj.io/storj/satellite"
"storj.io/storj/satellite/internalpb"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/repair/queue"
"storj.io/storj/satellite/satellitedb/satellitedbtest"
"storj.io/storj/storage"
)
@ -24,18 +25,26 @@ func TestInsertSelect(t *testing.T) {
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
q := db.RepairQueue()
seg := &internalpb.InjuredSegment{
Path: []byte("abc"),
LostPieces: []int32{int32(1), int32(3)},
seg := &queue.InjuredSegment{
StreamID: testrand.UUID(),
Position: metabase.SegmentPosition{
Part: uint32(testrand.Intn(1000)),
Index: uint32(testrand.Intn(1000)),
},
SegmentHealth: 0.4,
}
alreadyInserted, err := q.Insert(ctx, seg, 10)
alreadyInserted, err := q.Insert(ctx, seg)
require.NoError(t, err)
require.False(t, alreadyInserted)
s, err := q.Select(ctx)
require.NoError(t, err)
err = q.Delete(ctx, s)
require.NoError(t, err)
require.True(t, pb.Equal(s, seg))
require.Equal(t, seg.StreamID, s.StreamID)
require.Equal(t, seg.Position, s.Position)
require.Equal(t, seg.SegmentHealth, s.SegmentHealth)
require.WithinDuration(t, time.Now(), s.InsertedAt, 5*time.Second)
require.NotZero(t, s.UpdatedAt)
})
}
@ -43,14 +52,18 @@ func TestInsertDuplicate(t *testing.T) {
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
q := db.RepairQueue()
seg := &internalpb.InjuredSegment{
Path: []byte("abc"),
LostPieces: []int32{int32(1), int32(3)},
seg := &queue.InjuredSegment{
StreamID: testrand.UUID(),
Position: metabase.SegmentPosition{
Part: uint32(testrand.Intn(1000)),
Index: uint32(testrand.Intn(1000)),
},
SegmentHealth: 10,
}
alreadyInserted, err := q.Insert(ctx, seg, 10)
alreadyInserted, err := q.Insert(ctx, seg)
require.NoError(t, err)
require.False(t, alreadyInserted)
alreadyInserted, err = q.Insert(ctx, seg, 10)
alreadyInserted, err = q.Insert(ctx, seg)
require.NoError(t, err)
require.True(t, alreadyInserted)
})
@ -71,13 +84,13 @@ func TestSequential(t *testing.T) {
q := db.RepairQueue()
const N = 20
var addSegs []*internalpb.InjuredSegment
var addSegs []*queue.InjuredSegment
for i := 0; i < N; i++ {
seg := &internalpb.InjuredSegment{
Path: []byte(strconv.Itoa(i)),
LostPieces: []int32{int32(i)},
seg := &queue.InjuredSegment{
StreamID: uuid.UUID{byte(i)},
SegmentHealth: 6,
}
alreadyInserted, err := q.Insert(ctx, seg, 10)
alreadyInserted, err := q.Insert(ctx, seg)
require.NoError(t, err)
require.False(t, alreadyInserted)
addSegs = append(addSegs, seg)
@ -87,20 +100,15 @@ func TestSequential(t *testing.T) {
require.NoError(t, err)
require.Len(t, list, N)
sort.SliceStable(list, func(i, j int) bool { return list[i].LostPieces[0] < list[j].LostPieces[0] })
for i := 0; i < N; i++ {
require.True(t, pb.Equal(addSegs[i], &list[i]))
}
// TODO: fix out of order issue
for i := 0; i < N; i++ {
s, err := q.Select(ctx)
require.NoError(t, err)
err = q.Delete(ctx, s)
require.NoError(t, err)
expected := s.LostPieces[0]
require.True(t, pb.Equal(addSegs[expected], s))
require.Equal(t, addSegs[i].StreamID, s.StreamID)
require.Equal(t, addSegs[i].Position, s.Position)
require.Equal(t, addSegs[i].SegmentHealth, s.SegmentHealth)
}
})
}
@ -109,17 +117,23 @@ func TestParallel(t *testing.T) {
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
q := db.RepairQueue()
const N = 20
entries := make(chan *internalpb.InjuredSegment, N)
entries := make(chan *queue.InjuredSegment, N)
expectedSegments := make([]*queue.InjuredSegment, N)
for i := 0; i < N; i++ {
expectedSegments[i] = &queue.InjuredSegment{
StreamID: testrand.UUID(),
SegmentHealth: float64(i),
}
}
var inserts errs2.Group
// Add to queue concurrently
for i := 0; i < N; i++ {
i := i
inserts.Go(func() error {
_, err := q.Insert(ctx, &internalpb.InjuredSegment{
Path: []byte(strconv.Itoa(i)),
LostPieces: []int32{int32(i)},
}, 10)
alreadyInserted, err := q.Insert(ctx, expectedSegments[i])
require.False(t, alreadyInserted)
return err
})
}
@ -147,19 +161,25 @@ func TestParallel(t *testing.T) {
require.Empty(t, remove.Wait(), "unexpected queue.Select/Delete errors")
close(entries)
var items []*internalpb.InjuredSegment
var items []*queue.InjuredSegment
for segment := range entries {
items = append(items, segment)
}
sort.Slice(items, func(i, k int) bool {
return items[i].LostPieces[0] < items[k].LostPieces[0]
return items[i].SegmentHealth < items[k].SegmentHealth
})
// check if the enqueued and dequeued elements match
for i := 0; i < N; i++ {
require.Equal(t, items[i].LostPieces[0], int32(i))
require.Equal(t, expectedSegments[i].StreamID, items[i].StreamID)
require.Equal(t, expectedSegments[i].Position, items[i].Position)
require.Equal(t, expectedSegments[i].SegmentHealth, items[i].SegmentHealth)
}
count, err := q.Count(ctx)
require.NoError(t, err)
require.Zero(t, count)
})
}
@ -167,29 +187,26 @@ func TestClean(t *testing.T) {
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
q := db.RepairQueue()
seg1 := &internalpb.InjuredSegment{
Path: []byte("seg1"),
LostPieces: []int32{int32(1), int32(3)},
seg1 := &queue.InjuredSegment{
StreamID: testrand.UUID(),
}
seg2 := &internalpb.InjuredSegment{
Path: []byte("seg2"),
LostPieces: []int32{int32(1), int32(3)},
seg2 := &queue.InjuredSegment{
StreamID: testrand.UUID(),
}
seg3 := &internalpb.InjuredSegment{
Path: []byte("seg3"),
LostPieces: []int32{int32(1), int32(3)},
seg3 := &queue.InjuredSegment{
StreamID: testrand.UUID(),
}
timeBeforeInsert1 := time.Now()
segmentHealth := 1.3
_, err := q.Insert(ctx, seg1, segmentHealth)
_, err := q.Insert(ctx, seg1)
require.NoError(t, err)
_, err = q.Insert(ctx, seg2, segmentHealth)
_, err = q.Insert(ctx, seg2)
require.NoError(t, err)
_, err = q.Insert(ctx, seg3, segmentHealth)
_, err = q.Insert(ctx, seg3)
require.NoError(t, err)
count, err := q.Count(ctx)
@ -208,11 +225,12 @@ func TestClean(t *testing.T) {
// seg1 "becomes healthy", so do not update it
// seg2 stays at the same health
_, err = q.Insert(ctx, seg2, segmentHealth)
_, err = q.Insert(ctx, seg2)
require.NoError(t, err)
// seg3 has a lower health
_, err = q.Insert(ctx, seg3, segmentHealth-0.1)
seg3.SegmentHealth = segmentHealth - 0.1
_, err = q.Insert(ctx, seg3)
require.NoError(t, err)
count, err = q.Count(ctx)

View File

@ -21,6 +21,7 @@ import (
"storj.io/common/uuid"
"storj.io/storj/private/testplanet"
"storj.io/storj/satellite"
"storj.io/storj/satellite/accounting"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/overlay"
"storj.io/storj/satellite/repair/checker"
@ -73,6 +74,10 @@ func testDataRepair(t *testing.T, inMemoryRepair bool) {
satellite.Repair.Checker.Loop.Pause()
satellite.Repair.Repairer.Loop.Pause()
for _, storageNode := range planet.StorageNodes {
storageNode.Storage2.Orders.Sender.Pause()
}
testData := testrand.Bytes(8 * memory.KiB)
err := uplinkPeer.Upload(ctx, satellite, "testbucket", "test/path", testData)
require.NoError(t, err)
@ -154,6 +159,31 @@ func testDataRepair(t *testing.T, inMemoryRepair bool) {
nodesToKillForMinThreshold--
}
}
{
// test that while repair, order limits without specified bucket are counted correctly
// for storage node repair bandwidth usage and the storage nodes will be paid for that
require.NoError(t, planet.WaitForStorageNodeEndpoints(ctx))
for _, storageNode := range planet.StorageNodes {
storageNode.Storage2.Orders.SendOrders(ctx, time.Now().Add(24*time.Hour))
}
repairSettled := make(map[storj.NodeID]uint64)
err = satellite.DB.StoragenodeAccounting().GetBandwidthSince(ctx, time.Time{}, func(c context.Context, sbr *accounting.StoragenodeBandwidthRollup) error {
if sbr.Action == uint(pb.PieceAction_GET_REPAIR) {
repairSettled[sbr.NodeID] += sbr.Settled
}
return nil
})
require.NoError(t, err)
require.Equal(t, minThreshold, len(repairSettled))
for _, value := range repairSettled {
// TODO verify node ids
require.NotZero(t, value)
}
}
// we should be able to download data without any of the original nodes
newData, err := uplinkPeer.Download(ctx, satellite, "testbucket", "test/path")
require.NoError(t, err)

View File

@ -62,7 +62,7 @@ func (ec *ECRepairer) dialPiecestore(ctx context.Context, n storj.NodeURL) (*pie
// After downloading a piece, the ECRepairer will verify the hash and original order limit for that piece.
// If verification fails, another piece will be downloaded until we reach the minimum required or run out of order limits.
// If piece hash verification fails, it will return all failed node IDs.
func (ec *ECRepairer) Get(ctx context.Context, limits []*pb.AddressedOrderLimit, privateKey storj.PiecePrivateKey, es eestream.ErasureScheme, dataSize int64, path storj.Path) (_ io.ReadCloser, failedPieces []*pb.RemotePiece, err error) {
func (ec *ECRepairer) Get(ctx context.Context, limits []*pb.AddressedOrderLimit, privateKey storj.PiecePrivateKey, es eestream.ErasureScheme, dataSize int64) (_ io.ReadCloser, failedPieces []*pb.RemotePiece, err error) {
defer mon.Task()(&ctx)(&err)
if len(limits) != es.TotalCount() {
@ -148,7 +148,6 @@ func (ec *ECRepairer) Get(ctx context.Context, limits []*pb.AddressedOrderLimit,
if successfulPieces < es.RequiredCount() {
mon.Meter("download_failed_not_enough_pieces_repair").Mark(1) //mon:locked
return nil, failedPieces, &irreparableError{
path: path,
piecesAvailable: int32(successfulPieces),
piecesRequired: int32(es.RequiredCount()),
}
@ -287,7 +286,7 @@ func verifyOrderLimitSignature(ctx context.Context, satellite signing.Signee, li
// Repair takes a provided segment, encodes it with the provided redundancy strategy,
// and uploads the pieces in need of repair to new nodes provided by order limits.
func (ec *ECRepairer) Repair(ctx context.Context, limits []*pb.AddressedOrderLimit, privateKey storj.PiecePrivateKey, rs eestream.RedundancyStrategy, data io.Reader, timeout time.Duration, path storj.Path, successfulNeeded int) (successfulNodes []*pb.Node, successfulHashes []*pb.PieceHash, err error) {
func (ec *ECRepairer) Repair(ctx context.Context, limits []*pb.AddressedOrderLimit, privateKey storj.PiecePrivateKey, rs eestream.RedundancyStrategy, data io.Reader, timeout time.Duration, successfulNeeded int) (successfulNodes []*pb.Node, successfulHashes []*pb.PieceHash, err error) {
defer mon.Task()(&ctx)(&err)
pieceCount := len(limits)
@ -318,7 +317,7 @@ func (ec *ECRepairer) Repair(ctx context.Context, limits []*pb.AddressedOrderLim
for i, addressedLimit := range limits {
go func(i int, addressedLimit *pb.AddressedOrderLimit) {
hash, err := ec.putPiece(psCtx, ctx, addressedLimit, privateKey, readers[i], path)
hash, err := ec.putPiece(psCtx, ctx, addressedLimit, privateKey, readers[i])
infos <- info{i: i, err: err, hash: hash}
}(i, addressedLimit)
}
@ -408,7 +407,7 @@ func (ec *ECRepairer) Repair(ctx context.Context, limits []*pb.AddressedOrderLim
return successfulNodes, successfulHashes, nil
}
func (ec *ECRepairer) putPiece(ctx, parent context.Context, limit *pb.AddressedOrderLimit, privateKey storj.PiecePrivateKey, data io.ReadCloser, path storj.Path) (hash *pb.PieceHash, err error) {
func (ec *ECRepairer) putPiece(ctx, parent context.Context, limit *pb.AddressedOrderLimit, privateKey storj.PiecePrivateKey, data io.ReadCloser) (hash *pb.PieceHash, err error) {
defer mon.Task()(&ctx)(&err)
nodeName := "nil"

View File

@ -14,7 +14,6 @@ import (
"storj.io/common/memory"
"storj.io/common/sync2"
"storj.io/storj/satellite/internalpb"
"storj.io/storj/satellite/repair/queue"
"storj.io/storj/storage"
)
@ -150,14 +149,14 @@ func (service *Service) process(ctx context.Context) (err error) {
return nil
}
func (service *Service) worker(ctx context.Context, seg *internalpb.InjuredSegment) (err error) {
func (service *Service) worker(ctx context.Context, seg *queue.InjuredSegment) (err error) {
defer mon.Task()(&ctx)(&err)
workerStartTime := service.nowFn().UTC()
service.log.Debug("Limiter running repair on segment")
// note that shouldDelete is used even in the case where err is not null
shouldDelete, err := service.repairer.Repair(ctx, string(seg.GetPath()))
shouldDelete, err := service.repairer.Repair(ctx, seg)
if shouldDelete {
if err != nil {
service.log.Error("unexpected error repairing segment!", zap.Error(err))
@ -179,7 +178,7 @@ func (service *Service) worker(ctx context.Context, seg *internalpb.InjuredSegme
timeForRepair := repairedTime.Sub(workerStartTime)
mon.FloatVal("time_for_repair").Observe(timeForRepair.Seconds()) //mon:locked
insertedTime := seg.GetInsertedTime()
insertedTime := seg.InsertedAt
// do not send metrics if segment was added before the InsertedTime field was added
if !insertedTime.IsZero() {
timeSinceQueued := workerStartTime.Sub(insertedTime)

View File

@ -22,6 +22,7 @@ import (
"storj.io/storj/satellite/orders"
"storj.io/storj/satellite/overlay"
"storj.io/storj/satellite/repair/checker"
"storj.io/storj/satellite/repair/queue"
"storj.io/uplink/private/eestream"
)
@ -39,7 +40,6 @@ var (
// which are hopefully transient (e.g. too many pieces unavailable). The segment should be added
// to the irreparableDB.
type irreparableError struct {
path storj.Path
piecesAvailable int32
piecesRequired int32
}
@ -104,22 +104,15 @@ func NewSegmentRepairer(
// Repair retrieves an at-risk segment and repairs and stores lost pieces on new nodes
// note that shouldDelete is used even in the case where err is not null
// note that it will update audit status as failed for nodes that failed piece hash verification during repair downloading.
func (repairer *SegmentRepairer) Repair(ctx context.Context, path storj.Path) (shouldDelete bool, err error) {
defer mon.Task()(&ctx, path)(&err)
func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue.InjuredSegment) (shouldDelete bool, err error) {
defer mon.Task()(&ctx, queueSegment.StreamID.String(), queueSegment.Position.Encode())(&err)
// TODO extend InjuredSegment with StreamID/Position and replace path
segmentLocation, err := metabase.ParseSegmentKey(metabase.SegmentKey(path))
if err != nil {
return false, metainfoGetError.Wrap(err)
}
// TODO we should replace GetSegmentByLocation with GetSegmentByPosition when
// we refactor the repair queue to store metabase.SegmentPosition instead of storj.Path.
segment, err := repairer.metabase.GetSegmentByLocation(ctx, metabase.GetSegmentByLocation{
SegmentLocation: segmentLocation,
segment, err := repairer.metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
StreamID: queueSegment.StreamID,
Position: queueSegment.Position,
})
if err != nil {
if storj.ErrObjectNotFound.Has(err) {
if metabase.ErrSegmentNotFound.Has(err) {
mon.Meter("repair_unnecessary").Mark(1) //mon:locked
mon.Meter("segment_deleted_before_repair").Mark(1) //mon:locked
repairer.log.Debug("segment was deleted")
@ -167,7 +160,8 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, path storj.Path) (s
stats.repairerNodesUnavailable.Mark(1)
repairer.log.Warn("irreparable segment",
zap.String("path", path),
zap.String("StreamID", queueSegment.StreamID.String()),
zap.Uint64("Position", queueSegment.Position.Encode()),
zap.Int("piecesAvailable", numHealthy),
zap.Int16("piecesRequired", segment.Redundancy.RequiredShares),
)
@ -221,10 +215,8 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, path storj.Path) (s
}
}
bucket := segmentLocation.Bucket()
// Create the order limits for the GET_REPAIR action
getOrderLimits, getPrivateKey, err := repairer.orders.CreateGetRepairOrderLimits(ctx, bucket, segment, healthyPieces)
getOrderLimits, getPrivateKey, err := repairer.orders.CreateGetRepairOrderLimits(ctx, metabase.BucketLocation{}, segment, healthyPieces)
if err != nil {
return false, orderLimitFailureError.New("could not create GET_REPAIR order limits: %w", err)
}
@ -260,13 +252,13 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, path storj.Path) (s
}
// Create the order limits for the PUT_REPAIR action
putLimits, putPrivateKey, err := repairer.orders.CreatePutRepairOrderLimits(ctx, bucket, segment, getOrderLimits, newNodes, repairer.multiplierOptimalThreshold)
putLimits, putPrivateKey, err := repairer.orders.CreatePutRepairOrderLimits(ctx, metabase.BucketLocation{}, segment, getOrderLimits, newNodes, repairer.multiplierOptimalThreshold)
if err != nil {
return false, orderLimitFailureError.New("could not create PUT_REPAIR order limits: %w", err)
}
// Download the segment using just the healthy pieces
segmentReader, pbFailedPieces, err := repairer.ec.Get(ctx, getOrderLimits, getPrivateKey, redundancy, int64(segment.EncryptedSize), path)
segmentReader, pbFailedPieces, err := repairer.ec.Get(ctx, getOrderLimits, getPrivateKey, redundancy, int64(segment.EncryptedSize))
// Populate node IDs that failed piece hashes verification
var failedNodeIDs storj.NodeIDList
@ -305,7 +297,8 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, path storj.Path) (s
stats.repairTooManyNodesFailed.Mark(1)
repairer.log.Warn("irreparable segment",
zap.String("path", irreparableErr.path),
zap.String("StreamID", queueSegment.StreamID.String()),
zap.Uint64("Position", queueSegment.Position.Encode()),
zap.Int32("piecesAvailable", irreparableErr.piecesAvailable),
zap.Int32("piecesRequired", irreparableErr.piecesRequired),
)
@ -317,7 +310,7 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, path storj.Path) (s
defer func() { err = errs.Combine(err, segmentReader.Close()) }()
// Upload the repaired pieces
successfulNodes, _, err := repairer.ec.Repair(ctx, putLimits, putPrivateKey, redundancy, segmentReader, repairer.timeout, path, minSuccessfulNeeded)
successfulNodes, _, err := repairer.ec.Repair(ctx, putLimits, putPrivateKey, redundancy, segmentReader, repairer.timeout, minSuccessfulNeeded)
if err != nil {
return false, repairPutError.Wrap(err)
}
@ -388,7 +381,7 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, path storj.Path) (s
err = repairer.metabase.UpdateSegmentPieces(ctx, metabase.UpdateSegmentPieces{
StreamID: segment.StreamID,
Position: segmentLocation.Position,
Position: segment.Position,
OldPieces: segment.Pieces,
NewRedundancy: segment.Redundancy,

View File

@ -304,7 +304,7 @@ read one (
)
//--- repairqueue ---//
// TODO drop it later
model injuredsegment (
key path
@ -334,6 +334,30 @@ model injuredsegment (
delete injuredsegment ( where injuredsegment.updated_at < ? )
model repair_queue (
table repair_queue
key stream_id position
field stream_id blob
field position uint64
field attempted_at timestamp (updatable, nullable)
field updated_at timestamp ( updatable, default current_timestamp )
field inserted_at timestamp ( default current_timestamp )
field segment_health float64 (default 1)
index (
fields updated_at
)
index (
name repair_queue_num_healthy_pieces_attempted_at_index
fields segment_health attempted_at
)
)
delete repair_queue ( where repair_queue.updated_at < ? )
//--- satellite console ---//
model user (

View File

@ -591,6 +591,15 @@ CREATE TABLE registration_tokens (
PRIMARY KEY ( secret ),
UNIQUE ( owner_id )
);
CREATE TABLE repair_queue (
stream_id bytea NOT NULL,
position bigint NOT NULL,
attempted_at timestamp with time zone,
updated_at timestamp with time zone NOT NULL DEFAULT current_timestamp,
inserted_at timestamp with time zone NOT NULL DEFAULT current_timestamp,
segment_health double precision NOT NULL DEFAULT 1,
PRIMARY KEY ( stream_id, position )
);
CREATE TABLE reputations (
id bytea NOT NULL,
audit_success_count bigint NOT NULL DEFAULT 0,
@ -829,6 +838,8 @@ CREATE INDEX node_last_ip ON nodes ( last_net ) ;
CREATE INDEX nodes_dis_unk_off_exit_fin_last_success_index ON nodes ( disqualified, unknown_audit_suspended, offline_suspended, exit_finished_at, last_contact_success ) ;
CREATE INDEX nodes_type_last_cont_success_free_disk_ma_mi_patch_vetted_partial_index ON nodes ( type, last_contact_success, free_disk, major, minor, patch, vetted_at ) WHERE nodes.disqualified is NULL AND nodes.unknown_audit_suspended is NULL AND nodes.exit_initiated_at is NULL AND nodes.release = true AND nodes.last_net != '' ;
CREATE INDEX nodes_dis_unk_aud_exit_init_rel_type_last_cont_success_stored_index ON nodes ( disqualified, unknown_audit_suspended, exit_initiated_at, release, type, last_contact_success ) WHERE nodes.disqualified is NULL AND nodes.unknown_audit_suspended is NULL AND nodes.exit_initiated_at is NULL AND nodes.release = true ;
CREATE INDEX repair_queue_updated_at_index ON repair_queue ( updated_at ) ;
CREATE INDEX repair_queue_num_healthy_pieces_attempted_at_index ON repair_queue ( segment_health, attempted_at ) ;
CREATE INDEX storagenode_bandwidth_rollups_interval_start_index ON storagenode_bandwidth_rollups ( interval_start ) ;
CREATE INDEX storagenode_bandwidth_rollup_archives_interval_start_index ON storagenode_bandwidth_rollup_archives ( interval_start ) ;
CREATE INDEX storagenode_payments_node_id_period_index ON storagenode_payments ( node_id, period ) ;
@ -1205,6 +1216,15 @@ CREATE TABLE registration_tokens (
PRIMARY KEY ( secret ),
UNIQUE ( owner_id )
);
CREATE TABLE repair_queue (
stream_id bytea NOT NULL,
position bigint NOT NULL,
attempted_at timestamp with time zone,
updated_at timestamp with time zone NOT NULL DEFAULT current_timestamp,
inserted_at timestamp with time zone NOT NULL DEFAULT current_timestamp,
segment_health double precision NOT NULL DEFAULT 1,
PRIMARY KEY ( stream_id, position )
);
CREATE TABLE reputations (
id bytea NOT NULL,
audit_success_count bigint NOT NULL DEFAULT 0,
@ -1443,6 +1463,8 @@ CREATE INDEX node_last_ip ON nodes ( last_net ) ;
CREATE INDEX nodes_dis_unk_off_exit_fin_last_success_index ON nodes ( disqualified, unknown_audit_suspended, offline_suspended, exit_finished_at, last_contact_success ) ;
CREATE INDEX nodes_type_last_cont_success_free_disk_ma_mi_patch_vetted_partial_index ON nodes ( type, last_contact_success, free_disk, major, minor, patch, vetted_at ) WHERE nodes.disqualified is NULL AND nodes.unknown_audit_suspended is NULL AND nodes.exit_initiated_at is NULL AND nodes.release = true AND nodes.last_net != '' ;
CREATE INDEX nodes_dis_unk_aud_exit_init_rel_type_last_cont_success_stored_index ON nodes ( disqualified, unknown_audit_suspended, exit_initiated_at, release, type, last_contact_success ) WHERE nodes.disqualified is NULL AND nodes.unknown_audit_suspended is NULL AND nodes.exit_initiated_at is NULL AND nodes.release = true ;
CREATE INDEX repair_queue_updated_at_index ON repair_queue ( updated_at ) ;
CREATE INDEX repair_queue_num_healthy_pieces_attempted_at_index ON repair_queue ( segment_health, attempted_at ) ;
CREATE INDEX storagenode_bandwidth_rollups_interval_start_index ON storagenode_bandwidth_rollups ( interval_start ) ;
CREATE INDEX storagenode_bandwidth_rollup_archives_interval_start_index ON storagenode_bandwidth_rollup_archives ( interval_start ) ;
CREATE INDEX storagenode_payments_node_id_period_index ON storagenode_payments ( node_id, period ) ;
@ -6334,6 +6356,156 @@ func (f RegistrationToken_CreatedAt_Field) value() interface{} {
func (RegistrationToken_CreatedAt_Field) _Column() string { return "created_at" }
type RepairQueue struct {
StreamId []byte
Position uint64
AttemptedAt *time.Time
UpdatedAt time.Time
InsertedAt time.Time
SegmentHealth float64
}
func (RepairQueue) _Table() string { return "repair_queue" }
type RepairQueue_Create_Fields struct {
AttemptedAt RepairQueue_AttemptedAt_Field
UpdatedAt RepairQueue_UpdatedAt_Field
InsertedAt RepairQueue_InsertedAt_Field
SegmentHealth RepairQueue_SegmentHealth_Field
}
type RepairQueue_Update_Fields struct {
AttemptedAt RepairQueue_AttemptedAt_Field
UpdatedAt RepairQueue_UpdatedAt_Field
}
type RepairQueue_StreamId_Field struct {
_set bool
_null bool
_value []byte
}
func RepairQueue_StreamId(v []byte) RepairQueue_StreamId_Field {
return RepairQueue_StreamId_Field{_set: true, _value: v}
}
func (f RepairQueue_StreamId_Field) value() interface{} {
if !f._set || f._null {
return nil
}
return f._value
}
func (RepairQueue_StreamId_Field) _Column() string { return "stream_id" }
type RepairQueue_Position_Field struct {
_set bool
_null bool
_value uint64
}
func RepairQueue_Position(v uint64) RepairQueue_Position_Field {
return RepairQueue_Position_Field{_set: true, _value: v}
}
func (f RepairQueue_Position_Field) value() interface{} {
if !f._set || f._null {
return nil
}
return f._value
}
func (RepairQueue_Position_Field) _Column() string { return "position" }
type RepairQueue_AttemptedAt_Field struct {
_set bool
_null bool
_value *time.Time
}
func RepairQueue_AttemptedAt(v time.Time) RepairQueue_AttemptedAt_Field {
return RepairQueue_AttemptedAt_Field{_set: true, _value: &v}
}
func RepairQueue_AttemptedAt_Raw(v *time.Time) RepairQueue_AttemptedAt_Field {
if v == nil {
return RepairQueue_AttemptedAt_Null()
}
return RepairQueue_AttemptedAt(*v)
}
func RepairQueue_AttemptedAt_Null() RepairQueue_AttemptedAt_Field {
return RepairQueue_AttemptedAt_Field{_set: true, _null: true}
}
func (f RepairQueue_AttemptedAt_Field) isnull() bool { return !f._set || f._null || f._value == nil }
func (f RepairQueue_AttemptedAt_Field) value() interface{} {
if !f._set || f._null {
return nil
}
return f._value
}
func (RepairQueue_AttemptedAt_Field) _Column() string { return "attempted_at" }
type RepairQueue_UpdatedAt_Field struct {
_set bool
_null bool
_value time.Time
}
func RepairQueue_UpdatedAt(v time.Time) RepairQueue_UpdatedAt_Field {
return RepairQueue_UpdatedAt_Field{_set: true, _value: v}
}
func (f RepairQueue_UpdatedAt_Field) value() interface{} {
if !f._set || f._null {
return nil
}
return f._value
}
func (RepairQueue_UpdatedAt_Field) _Column() string { return "updated_at" }
type RepairQueue_InsertedAt_Field struct {
_set bool
_null bool
_value time.Time
}
func RepairQueue_InsertedAt(v time.Time) RepairQueue_InsertedAt_Field {
return RepairQueue_InsertedAt_Field{_set: true, _value: v}
}
func (f RepairQueue_InsertedAt_Field) value() interface{} {
if !f._set || f._null {
return nil
}
return f._value
}
func (RepairQueue_InsertedAt_Field) _Column() string { return "inserted_at" }
type RepairQueue_SegmentHealth_Field struct {
_set bool
_null bool
_value float64
}
func RepairQueue_SegmentHealth(v float64) RepairQueue_SegmentHealth_Field {
return RepairQueue_SegmentHealth_Field{_set: true, _value: v}
}
func (f RepairQueue_SegmentHealth_Field) value() interface{} {
if !f._set || f._null {
return nil
}
return f._value
}
func (RepairQueue_SegmentHealth_Field) _Column() string { return "segment_health" }
type Reputation struct {
Id []byte
AuditSuccessCount int64
@ -15725,6 +15897,33 @@ func (obj *pgxImpl) Delete_Injuredsegment_By_UpdatedAt_Less(ctx context.Context,
}
func (obj *pgxImpl) Delete_RepairQueue_By_UpdatedAt_Less(ctx context.Context,
repair_queue_updated_at_less RepairQueue_UpdatedAt_Field) (
count int64, err error) {
defer mon.Task()(&ctx)(&err)
var __embed_stmt = __sqlbundle_Literal("DELETE FROM repair_queue WHERE repair_queue.updated_at < ?")
var __values []interface{}
__values = append(__values, repair_queue_updated_at_less.value())
var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt)
obj.logStmt(__stmt, __values...)
__res, err := obj.driver.ExecContext(ctx, __stmt, __values...)
if err != nil {
return 0, obj.makeErr(err)
}
count, err = __res.RowsAffected()
if err != nil {
return 0, obj.makeErr(err)
}
return count, nil
}
func (obj *pgxImpl) Delete_User_By_Id(ctx context.Context,
user_id User_Id_Field) (
deleted bool, err error) {
@ -16319,6 +16518,16 @@ func (obj *pgxImpl) deleteAll(ctx context.Context) (count int64, err error) {
return 0, obj.makeErr(err)
}
__count, err = __res.RowsAffected()
if err != nil {
return 0, obj.makeErr(err)
}
count += __count
__res, err = obj.driver.ExecContext(ctx, "DELETE FROM repair_queue;")
if err != nil {
return 0, obj.makeErr(err)
}
__count, err = __res.RowsAffected()
if err != nil {
return 0, obj.makeErr(err)
@ -21753,6 +21962,33 @@ func (obj *pgxcockroachImpl) Delete_Injuredsegment_By_UpdatedAt_Less(ctx context
}
func (obj *pgxcockroachImpl) Delete_RepairQueue_By_UpdatedAt_Less(ctx context.Context,
repair_queue_updated_at_less RepairQueue_UpdatedAt_Field) (
count int64, err error) {
defer mon.Task()(&ctx)(&err)
var __embed_stmt = __sqlbundle_Literal("DELETE FROM repair_queue WHERE repair_queue.updated_at < ?")
var __values []interface{}
__values = append(__values, repair_queue_updated_at_less.value())
var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt)
obj.logStmt(__stmt, __values...)
__res, err := obj.driver.ExecContext(ctx, __stmt, __values...)
if err != nil {
return 0, obj.makeErr(err)
}
count, err = __res.RowsAffected()
if err != nil {
return 0, obj.makeErr(err)
}
return count, nil
}
func (obj *pgxcockroachImpl) Delete_User_By_Id(ctx context.Context,
user_id User_Id_Field) (
deleted bool, err error) {
@ -22347,6 +22583,16 @@ func (obj *pgxcockroachImpl) deleteAll(ctx context.Context) (count int64, err er
return 0, obj.makeErr(err)
}
__count, err = __res.RowsAffected()
if err != nil {
return 0, obj.makeErr(err)
}
count += __count
__res, err = obj.driver.ExecContext(ctx, "DELETE FROM repair_queue;")
if err != nil {
return 0, obj.makeErr(err)
}
__count, err = __res.RowsAffected()
if err != nil {
return 0, obj.makeErr(err)
@ -23344,6 +23590,17 @@ func (rx *Rx) Delete_Project_By_Id(ctx context.Context,
return tx.Delete_Project_By_Id(ctx, project_id)
}
func (rx *Rx) Delete_RepairQueue_By_UpdatedAt_Less(ctx context.Context,
repair_queue_updated_at_less RepairQueue_UpdatedAt_Field) (
count int64, err error) {
var tx *Tx
if tx, err = rx.getTx(ctx); err != nil {
return
}
return tx.Delete_RepairQueue_By_UpdatedAt_Less(ctx, repair_queue_updated_at_less)
}
func (rx *Rx) Delete_ResetPasswordToken_By_Secret(ctx context.Context,
reset_password_token_secret ResetPasswordToken_Secret_Field) (
deleted bool, err error) {
@ -24552,6 +24809,10 @@ type Methods interface {
project_id Project_Id_Field) (
deleted bool, err error)
Delete_RepairQueue_By_UpdatedAt_Less(ctx context.Context,
repair_queue_updated_at_less RepairQueue_UpdatedAt_Field) (
count int64, err error)
Delete_ResetPasswordToken_By_Secret(ctx context.Context,
reset_password_token_secret ResetPasswordToken_Secret_Field) (
deleted bool, err error)

View File

@ -271,6 +271,15 @@ CREATE TABLE registration_tokens (
PRIMARY KEY ( secret ),
UNIQUE ( owner_id )
);
CREATE TABLE repair_queue (
stream_id bytea NOT NULL,
position bigint NOT NULL,
attempted_at timestamp with time zone,
updated_at timestamp with time zone NOT NULL DEFAULT current_timestamp,
inserted_at timestamp with time zone NOT NULL DEFAULT current_timestamp,
segment_health double precision NOT NULL DEFAULT 1,
PRIMARY KEY ( stream_id, position )
);
CREATE TABLE reputations (
id bytea NOT NULL,
audit_success_count bigint NOT NULL DEFAULT 0,
@ -509,6 +518,8 @@ CREATE INDEX node_last_ip ON nodes ( last_net ) ;
CREATE INDEX nodes_dis_unk_off_exit_fin_last_success_index ON nodes ( disqualified, unknown_audit_suspended, offline_suspended, exit_finished_at, last_contact_success ) ;
CREATE INDEX nodes_type_last_cont_success_free_disk_ma_mi_patch_vetted_partial_index ON nodes ( type, last_contact_success, free_disk, major, minor, patch, vetted_at ) WHERE nodes.disqualified is NULL AND nodes.unknown_audit_suspended is NULL AND nodes.exit_initiated_at is NULL AND nodes.release = true AND nodes.last_net != '' ;
CREATE INDEX nodes_dis_unk_aud_exit_init_rel_type_last_cont_success_stored_index ON nodes ( disqualified, unknown_audit_suspended, exit_initiated_at, release, type, last_contact_success ) WHERE nodes.disqualified is NULL AND nodes.unknown_audit_suspended is NULL AND nodes.exit_initiated_at is NULL AND nodes.release = true ;
CREATE INDEX repair_queue_updated_at_index ON repair_queue ( updated_at ) ;
CREATE INDEX repair_queue_num_healthy_pieces_attempted_at_index ON repair_queue ( segment_health, attempted_at ) ;
CREATE INDEX storagenode_bandwidth_rollups_interval_start_index ON storagenode_bandwidth_rollups ( interval_start ) ;
CREATE INDEX storagenode_bandwidth_rollup_archives_interval_start_index ON storagenode_bandwidth_rollup_archives ( interval_start ) ;
CREATE INDEX storagenode_payments_node_id_period_index ON storagenode_payments ( node_id, period ) ;

View File

@ -271,6 +271,15 @@ CREATE TABLE registration_tokens (
PRIMARY KEY ( secret ),
UNIQUE ( owner_id )
);
CREATE TABLE repair_queue (
stream_id bytea NOT NULL,
position bigint NOT NULL,
attempted_at timestamp with time zone,
updated_at timestamp with time zone NOT NULL DEFAULT current_timestamp,
inserted_at timestamp with time zone NOT NULL DEFAULT current_timestamp,
segment_health double precision NOT NULL DEFAULT 1,
PRIMARY KEY ( stream_id, position )
);
CREATE TABLE reputations (
id bytea NOT NULL,
audit_success_count bigint NOT NULL DEFAULT 0,
@ -509,6 +518,8 @@ CREATE INDEX node_last_ip ON nodes ( last_net ) ;
CREATE INDEX nodes_dis_unk_off_exit_fin_last_success_index ON nodes ( disqualified, unknown_audit_suspended, offline_suspended, exit_finished_at, last_contact_success ) ;
CREATE INDEX nodes_type_last_cont_success_free_disk_ma_mi_patch_vetted_partial_index ON nodes ( type, last_contact_success, free_disk, major, minor, patch, vetted_at ) WHERE nodes.disqualified is NULL AND nodes.unknown_audit_suspended is NULL AND nodes.exit_initiated_at is NULL AND nodes.release = true AND nodes.last_net != '' ;
CREATE INDEX nodes_dis_unk_aud_exit_init_rel_type_last_cont_success_stored_index ON nodes ( disqualified, unknown_audit_suspended, exit_initiated_at, release, type, last_contact_success ) WHERE nodes.disqualified is NULL AND nodes.unknown_audit_suspended is NULL AND nodes.exit_initiated_at is NULL AND nodes.release = true ;
CREATE INDEX repair_queue_updated_at_index ON repair_queue ( updated_at ) ;
CREATE INDEX repair_queue_num_healthy_pieces_attempted_at_index ON repair_queue ( segment_health, attempted_at ) ;
CREATE INDEX storagenode_bandwidth_rollups_interval_start_index ON storagenode_bandwidth_rollups ( interval_start ) ;
CREATE INDEX storagenode_bandwidth_rollup_archives_interval_start_index ON storagenode_bandwidth_rollup_archives ( interval_start ) ;
CREATE INDEX storagenode_payments_node_id_period_index ON storagenode_payments ( node_id, period ) ;

View File

@ -1509,6 +1509,24 @@ func (db *satelliteDB) PostgresMigration() *migrate.Migration {
`ALTER TABLE users ADD COLUMN paid_tier bool NOT NULL DEFAULT false;`,
},
},
{
DB: &db.migrationDB,
Description: "add repair_queue table, replacement for injuredsegments table",
Version: 165,
Action: migrate.SQL{
`CREATE TABLE repair_queue (
stream_id bytea NOT NULL,
position bigint NOT NULL,
attempted_at timestamp with time zone,
updated_at timestamp with time zone NOT NULL DEFAULT current_timestamp,
inserted_at timestamp with time zone NOT NULL DEFAULT current_timestamp,
segment_health double precision NOT NULL DEFAULT 1,
PRIMARY KEY ( stream_id, position )
)`,
`CREATE INDEX repair_queue_updated_at_index ON repair_queue ( updated_at )`,
`CREATE INDEX repair_queue_num_healthy_pieces_attempted_at_index ON repair_queue ( segment_health, attempted_at NULLS FIRST)`,
},
},
// NB: after updating testdata in `testdata`, run
// `go generate` to update `migratez.go`.
},

View File

@ -13,7 +13,7 @@ func (db *satelliteDB) testMigration() *migrate.Migration {
{
DB: &db.migrationDB,
Description: "Testing setup",
Version: 164,
Version: 165,
Action: migrate.SQL{`-- AUTOGENERATED BY storj.io/dbx
-- DO NOT EDIT
CREATE TABLE accounting_rollups (
@ -287,6 +287,15 @@ CREATE TABLE registration_tokens (
PRIMARY KEY ( secret ),
UNIQUE ( owner_id )
);
CREATE TABLE repair_queue (
stream_id bytea NOT NULL,
position bigint NOT NULL,
attempted_at timestamp with time zone,
updated_at timestamp with time zone NOT NULL DEFAULT current_timestamp,
inserted_at timestamp with time zone NOT NULL DEFAULT current_timestamp,
segment_health double precision NOT NULL DEFAULT 1,
PRIMARY KEY ( stream_id, position )
);
CREATE TABLE reputations (
id bytea NOT NULL,
audit_success_count bigint NOT NULL DEFAULT 0,
@ -436,7 +445,7 @@ CREATE TABLE users (
partner_id bytea,
created_at timestamp with time zone NOT NULL,
project_limit integer NOT NULL DEFAULT 0,
paid_tier boolean NOT NULL DEFAULT false,
paid_tier boolean NOT NULL DEFAULT false,
position text,
company_name text,
company_size integer,
@ -525,6 +534,8 @@ CREATE INDEX node_last_ip ON nodes ( last_net ) ;
CREATE INDEX nodes_dis_unk_off_exit_fin_last_success_index ON nodes ( disqualified, unknown_audit_suspended, offline_suspended, exit_finished_at, last_contact_success ) ;
CREATE INDEX nodes_type_last_cont_success_free_disk_ma_mi_patch_vetted_partial_index ON nodes ( type, last_contact_success, free_disk, major, minor, patch, vetted_at ) WHERE nodes.disqualified is NULL AND nodes.unknown_audit_suspended is NULL AND nodes.exit_initiated_at is NULL AND nodes.release = true AND nodes.last_net != '' ;
CREATE INDEX nodes_dis_unk_aud_exit_init_rel_type_last_cont_success_stored_index ON nodes ( disqualified, unknown_audit_suspended, exit_initiated_at, release, type, last_contact_success ) WHERE nodes.disqualified is NULL AND nodes.unknown_audit_suspended is NULL AND nodes.exit_initiated_at is NULL AND nodes.release = true ;
CREATE INDEX repair_queue_updated_at_index ON repair_queue ( updated_at ) ;
CREATE INDEX repair_queue_num_healthy_pieces_attempted_at_index ON repair_queue ( segment_health, attempted_at ) ;
CREATE INDEX storagenode_bandwidth_rollups_interval_start_index ON storagenode_bandwidth_rollups ( interval_start ) ;
CREATE INDEX storagenode_bandwidth_rollup_archives_interval_start_index ON storagenode_bandwidth_rollup_archives ( interval_start ) ;
CREATE INDEX storagenode_payments_node_id_period_index ON storagenode_payments ( node_id, period ) ;

View File

@ -11,8 +11,10 @@ import (
"github.com/zeebo/errs"
"storj.io/common/uuid"
"storj.io/private/dbutil"
"storj.io/storj/satellite/internalpb"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/repair/queue"
"storj.io/storj/satellite/satellitedb/dbx"
"storj.io/storj/storage"
)
@ -24,7 +26,7 @@ type repairQueue struct {
db *satelliteDB
}
func (r *repairQueue) Insert(ctx context.Context, seg *internalpb.InjuredSegment, segmentHealth float64) (alreadyInserted bool, err error) {
func (r *repairQueue) Insert(ctx context.Context, seg *queue.InjuredSegment) (alreadyInserted bool, err error) {
defer mon.Task()(&ctx)(&err)
// insert if not exists, or update healthy count if does exist
var query string
@ -35,14 +37,14 @@ func (r *repairQueue) Insert(ctx context.Context, seg *internalpb.InjuredSegment
switch r.db.impl {
case dbutil.Postgres:
query = `
INSERT INTO injuredsegments
INSERT INTO repair_queue
(
path, data, segment_health
stream_id, position, segment_health
)
VALUES (
$1, $2, $3
)
ON CONFLICT (path)
ON CONFLICT (stream_id, position)
DO UPDATE
SET segment_health=$3, updated_at=current_timestamp
RETURNING (xmax != 0) AS alreadyInserted
@ -50,16 +52,17 @@ func (r *repairQueue) Insert(ctx context.Context, seg *internalpb.InjuredSegment
case dbutil.Cockroach:
query = `
WITH updater AS (
UPDATE injuredsegments SET segment_health = $3, updated_at = current_timestamp WHERE path = $1
UPDATE repair_queue SET segment_health = $3, updated_at = current_timestamp
WHERE stream_id = $1 AND position = $2
RETURNING *
)
INSERT INTO injuredsegments (path, data, segment_health)
INSERT INTO repair_queue (stream_id, position, segment_health)
SELECT $1, $2, $3
WHERE NOT EXISTS (SELECT * FROM updater)
RETURNING false
`
}
rows, err := r.db.QueryContext(ctx, query, seg.Path, seg, segmentHealth)
rows, err := r.db.QueryContext(ctx, query, seg.StreamID, seg.Position.Encode(), seg.SegmentHealth)
if err != nil {
return false, err
}
@ -77,59 +80,72 @@ func (r *repairQueue) Insert(ctx context.Context, seg *internalpb.InjuredSegment
return alreadyInserted, rows.Err()
}
func (r *repairQueue) Select(ctx context.Context) (seg *internalpb.InjuredSegment, err error) {
func (r *repairQueue) Select(ctx context.Context) (seg *queue.InjuredSegment, err error) {
defer mon.Task()(&ctx)(&err)
segment := queue.InjuredSegment{}
switch r.db.impl {
case dbutil.Cockroach:
err = r.db.QueryRowContext(ctx, `
UPDATE injuredsegments SET attempted = now()
WHERE attempted IS NULL OR attempted < now() - interval '6 hours'
UPDATE repair_queue SET attempted_at = now()
WHERE attempted_at IS NULL OR attempted_at < now() - interval '6 hours'
ORDER BY segment_health ASC, attempted_at NULLS FIRST
LIMIT 1
RETURNING data`).Scan(&seg)
RETURNING stream_id, position, attempted_at, updated_at, inserted_at, segment_health
`).Scan(&segment.StreamID, &segment.Position, &segment.AttemptedAt,
&segment.UpdatedAt, &segment.InsertedAt, &segment.SegmentHealth)
case dbutil.Postgres:
err = r.db.QueryRowContext(ctx, `
UPDATE injuredsegments SET attempted = now() WHERE path = (
SELECT path FROM injuredsegments
WHERE attempted IS NULL OR attempted < now() - interval '6 hours'
ORDER BY segment_health ASC, attempted NULLS FIRST FOR UPDATE SKIP LOCKED LIMIT 1
) RETURNING data`).Scan(&seg)
UPDATE repair_queue SET attempted_at = now() WHERE (stream_id, position) = (
SELECT stream_id, position FROM repair_queue
WHERE attempted_at IS NULL OR attempted_at < now() - interval '6 hours'
ORDER BY segment_health ASC, attempted_at NULLS FIRST FOR UPDATE SKIP LOCKED LIMIT 1
) RETURNING stream_id, position, attempted_at, updated_at, inserted_at, segment_health
`).Scan(&segment.StreamID, &segment.Position, &segment.AttemptedAt,
&segment.UpdatedAt, &segment.InsertedAt, &segment.SegmentHealth)
default:
return seg, errs.New("unhandled database: %v", r.db.impl)
}
if errors.Is(err, sql.ErrNoRows) {
err = storage.ErrEmptyQueue.New("")
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, storage.ErrEmptyQueue.New("")
}
return nil, err
}
return seg, err
return &segment, err
}
func (r *repairQueue) Delete(ctx context.Context, seg *internalpb.InjuredSegment) (err error) {
func (r *repairQueue) Delete(ctx context.Context, seg *queue.InjuredSegment) (err error) {
defer mon.Task()(&ctx)(&err)
_, err = r.db.ExecContext(ctx, r.db.Rebind(`DELETE FROM injuredsegments WHERE path = ?`), seg.Path)
_, err = r.db.ExecContext(ctx, r.db.Rebind(`DELETE FROM repair_queue WHERE stream_id = ? AND position = ?`), seg.StreamID, seg.Position.Encode())
return Error.Wrap(err)
}
func (r *repairQueue) Clean(ctx context.Context, before time.Time) (deleted int64, err error) {
defer mon.Task()(&ctx)(&err)
n, err := r.db.Delete_Injuredsegment_By_UpdatedAt_Less(ctx, dbx.Injuredsegment_UpdatedAt(before))
n, err := r.db.Delete_RepairQueue_By_UpdatedAt_Less(ctx, dbx.RepairQueue_UpdatedAt(before))
return n, Error.Wrap(err)
}
func (r *repairQueue) SelectN(ctx context.Context, limit int) (segs []internalpb.InjuredSegment, err error) {
func (r *repairQueue) SelectN(ctx context.Context, limit int) (segs []queue.InjuredSegment, err error) {
defer mon.Task()(&ctx)(&err)
if limit <= 0 || limit > RepairQueueSelectLimit {
limit = RepairQueueSelectLimit
}
// TODO: strictly enforce order-by or change tests
rows, err := r.db.QueryContext(ctx, r.db.Rebind(`SELECT data FROM injuredsegments LIMIT ?`), limit)
rows, err := r.db.QueryContext(ctx,
r.db.Rebind(`SELECT stream_id, position, attempted_at, updated_at, segment_health
FROM repair_queue LIMIT ?`), limit,
)
if err != nil {
return nil, Error.Wrap(err)
}
defer func() { err = errs.Combine(err, rows.Close()) }()
for rows.Next() {
var seg internalpb.InjuredSegment
err = rows.Scan(&seg)
var seg queue.InjuredSegment
err = rows.Scan(&seg.StreamID, &seg.Position, &seg.AttemptedAt,
&seg.UpdatedAt, &seg.SegmentHealth)
if err != nil {
return segs, Error.Wrap(err)
}
@ -143,15 +159,20 @@ func (r *repairQueue) Count(ctx context.Context) (count int, err error) {
defer mon.Task()(&ctx)(&err)
// Count every segment regardless of how recently repair was last attempted
err = r.db.QueryRowContext(ctx, r.db.Rebind(`SELECT COUNT(*) as count FROM injuredsegments`)).Scan(&count)
err = r.db.QueryRowContext(ctx, r.db.Rebind(`SELECT COUNT(*) as count FROM repair_queue`)).Scan(&count)
return count, Error.Wrap(err)
}
// TestingSetAttemptedTime sets attempted time for a repairpath.
func (r *repairQueue) TestingSetAttemptedTime(ctx context.Context, repairpath []byte, t time.Time) (rowsAffected int64, err error) {
// TestingSetAttemptedTime sets attempted time for a segment.
func (r *repairQueue) TestingSetAttemptedTime(ctx context.Context, streamID uuid.UUID,
position metabase.SegmentPosition, t time.Time) (rowsAffected int64, err error) {
defer mon.Task()(&ctx)(&err)
res, err := r.db.ExecContext(ctx, r.db.Rebind(`UPDATE injuredsegments SET attempted = ? WHERE path = ?`), t, repairpath)
res, err := r.db.ExecContext(ctx,
r.db.Rebind(`UPDATE repair_queue SET attempted_at = ? WHERE stream_id = ? AND position = ?`),
t, streamID, position.Encode(),
)
if err != nil {
return 0, Error.Wrap(err)
}

View File

@ -0,0 +1,662 @@
-- AUTOGENERATED BY storj.io/dbx
-- DO NOT EDIT
CREATE TABLE accounting_rollups (
node_id bytea NOT NULL,
start_time timestamp with time zone NOT NULL,
put_total bigint NOT NULL,
get_total bigint NOT NULL,
get_audit_total bigint NOT NULL,
get_repair_total bigint NOT NULL,
put_repair_total bigint NOT NULL,
at_rest_total double precision NOT NULL,
PRIMARY KEY ( node_id, start_time )
);
CREATE TABLE accounting_timestamps (
name text NOT NULL,
value timestamp with time zone NOT NULL,
PRIMARY KEY ( name )
);
CREATE TABLE audit_histories (
node_id bytea NOT NULL,
history bytea NOT NULL,
PRIMARY KEY ( node_id )
);
CREATE TABLE bucket_bandwidth_rollups (
bucket_name bytea NOT NULL,
project_id bytea NOT NULL,
interval_start timestamp with time zone NOT NULL,
interval_seconds integer NOT NULL,
action integer NOT NULL,
inline bigint NOT NULL,
allocated bigint NOT NULL,
settled bigint NOT NULL,
PRIMARY KEY ( bucket_name, project_id, interval_start, action )
);
CREATE TABLE bucket_bandwidth_rollup_archives (
bucket_name bytea NOT NULL,
project_id bytea NOT NULL,
interval_start timestamp with time zone NOT NULL,
interval_seconds integer NOT NULL,
action integer NOT NULL,
inline bigint NOT NULL,
allocated bigint NOT NULL,
settled bigint NOT NULL,
PRIMARY KEY ( bucket_name, project_id, interval_start, action )
);
CREATE TABLE bucket_storage_tallies (
bucket_name bytea NOT NULL,
project_id bytea NOT NULL,
interval_start timestamp with time zone NOT NULL,
inline bigint NOT NULL,
remote bigint NOT NULL,
remote_segments_count integer NOT NULL,
inline_segments_count integer NOT NULL,
object_count integer NOT NULL,
metadata_size bigint NOT NULL,
PRIMARY KEY ( bucket_name, project_id, interval_start )
);
CREATE TABLE coinpayments_transactions (
id text NOT NULL,
user_id bytea NOT NULL,
address text NOT NULL,
amount bytea NOT NULL,
received bytea NOT NULL,
status integer NOT NULL,
key text NOT NULL,
timeout integer NOT NULL,
created_at timestamp with time zone NOT NULL,
PRIMARY KEY ( id )
);
CREATE TABLE coupons (
id bytea NOT NULL,
user_id bytea NOT NULL,
amount bigint NOT NULL,
description text NOT NULL,
type integer NOT NULL,
status integer NOT NULL,
duration bigint NOT NULL,
billing_periods bigint,
coupon_code_name text,
created_at timestamp with time zone NOT NULL,
PRIMARY KEY ( id )
);
CREATE TABLE coupon_codes (
id bytea NOT NULL,
name text NOT NULL,
amount bigint NOT NULL,
description text NOT NULL,
type integer NOT NULL,
billing_periods bigint,
created_at timestamp with time zone NOT NULL,
PRIMARY KEY ( id ),
UNIQUE ( name )
);
CREATE TABLE coupon_usages (
coupon_id bytea NOT NULL,
amount bigint NOT NULL,
status integer NOT NULL,
period timestamp with time zone NOT NULL,
PRIMARY KEY ( coupon_id, period )
);
CREATE TABLE graceful_exit_progress (
node_id bytea NOT NULL,
bytes_transferred bigint NOT NULL,
pieces_transferred bigint NOT NULL DEFAULT 0,
pieces_failed bigint NOT NULL DEFAULT 0,
updated_at timestamp with time zone NOT NULL,
uses_segment_transfer_queue boolean NOT NULL DEFAULT false,
PRIMARY KEY ( node_id )
);
CREATE TABLE graceful_exit_segment_transfer_queue (
node_id bytea NOT NULL,
stream_id bytea NOT NULL,
position bigint NOT NULL,
piece_num integer NOT NULL,
root_piece_id bytea,
durability_ratio double precision NOT NULL,
queued_at timestamp with time zone NOT NULL,
requested_at timestamp with time zone,
last_failed_at timestamp with time zone,
last_failed_code integer,
failed_count integer,
finished_at timestamp with time zone,
order_limit_send_count integer NOT NULL DEFAULT 0,
PRIMARY KEY ( node_id, stream_id, position, piece_num )
);
CREATE TABLE graceful_exit_transfer_queue (
node_id bytea NOT NULL,
path bytea NOT NULL,
piece_num integer NOT NULL,
root_piece_id bytea,
durability_ratio double precision NOT NULL,
queued_at timestamp with time zone NOT NULL,
requested_at timestamp with time zone,
last_failed_at timestamp with time zone,
last_failed_code integer,
failed_count integer,
finished_at timestamp with time zone,
order_limit_send_count integer NOT NULL DEFAULT 0,
PRIMARY KEY ( node_id, path, piece_num )
);
CREATE TABLE injuredsegments (
path bytea NOT NULL,
data bytea NOT NULL,
attempted timestamp with time zone,
updated_at timestamp with time zone NOT NULL DEFAULT current_timestamp,
segment_health double precision NOT NULL DEFAULT 1,
PRIMARY KEY ( path )
);
CREATE TABLE irreparabledbs (
segmentpath bytea NOT NULL,
segmentdetail bytea NOT NULL,
pieces_lost_count bigint NOT NULL,
seg_damaged_unix_sec bigint NOT NULL,
repair_attempt_count bigint NOT NULL,
PRIMARY KEY ( segmentpath )
);
CREATE TABLE nodes (
id bytea NOT NULL,
address text NOT NULL DEFAULT '',
last_net text NOT NULL,
last_ip_port text,
protocol integer NOT NULL DEFAULT 0,
type integer NOT NULL DEFAULT 0,
email text NOT NULL,
wallet text NOT NULL,
wallet_features text NOT NULL DEFAULT '',
free_disk bigint NOT NULL DEFAULT -1,
piece_count bigint NOT NULL DEFAULT 0,
major bigint NOT NULL DEFAULT 0,
minor bigint NOT NULL DEFAULT 0,
patch bigint NOT NULL DEFAULT 0,
hash text NOT NULL DEFAULT '',
timestamp timestamp with time zone NOT NULL DEFAULT '0001-01-01 00:00:00+00',
release boolean NOT NULL DEFAULT false,
latency_90 bigint NOT NULL DEFAULT 0,
audit_success_count bigint NOT NULL DEFAULT 0,
total_audit_count bigint NOT NULL DEFAULT 0,
vetted_at timestamp with time zone,
created_at timestamp with time zone NOT NULL DEFAULT current_timestamp,
updated_at timestamp with time zone NOT NULL DEFAULT current_timestamp,
last_contact_success timestamp with time zone NOT NULL DEFAULT 'epoch',
last_contact_failure timestamp with time zone NOT NULL DEFAULT 'epoch',
contained boolean NOT NULL DEFAULT false,
disqualified timestamp with time zone,
suspended timestamp with time zone,
unknown_audit_suspended timestamp with time zone,
offline_suspended timestamp with time zone,
under_review timestamp with time zone,
online_score double precision NOT NULL DEFAULT 1,
audit_reputation_alpha double precision NOT NULL DEFAULT 1,
audit_reputation_beta double precision NOT NULL DEFAULT 0,
unknown_audit_reputation_alpha double precision NOT NULL DEFAULT 1,
unknown_audit_reputation_beta double precision NOT NULL DEFAULT 0,
exit_initiated_at timestamp with time zone,
exit_loop_completed_at timestamp with time zone,
exit_finished_at timestamp with time zone,
exit_success boolean NOT NULL DEFAULT false,
PRIMARY KEY ( id )
);
CREATE TABLE node_api_versions (
id bytea NOT NULL,
api_version integer NOT NULL,
created_at timestamp with time zone NOT NULL,
updated_at timestamp with time zone NOT NULL,
PRIMARY KEY ( id )
);
CREATE TABLE offers (
id serial NOT NULL,
name text NOT NULL,
description text NOT NULL,
award_credit_in_cents integer NOT NULL DEFAULT 0,
invitee_credit_in_cents integer NOT NULL DEFAULT 0,
award_credit_duration_days integer,
invitee_credit_duration_days integer,
redeemable_cap integer,
expires_at timestamp with time zone NOT NULL,
created_at timestamp with time zone NOT NULL,
status integer NOT NULL,
type integer NOT NULL,
PRIMARY KEY ( id )
);
CREATE TABLE peer_identities (
node_id bytea NOT NULL,
leaf_serial_number bytea NOT NULL,
chain bytea NOT NULL,
updated_at timestamp with time zone NOT NULL,
PRIMARY KEY ( node_id )
);
CREATE TABLE pending_audits (
node_id bytea NOT NULL,
piece_id bytea NOT NULL,
stripe_index bigint NOT NULL,
share_size bigint NOT NULL,
expected_share_hash bytea NOT NULL,
reverify_count bigint NOT NULL,
path bytea NOT NULL,
PRIMARY KEY ( node_id )
);
CREATE TABLE projects (
id bytea NOT NULL,
name text NOT NULL,
description text NOT NULL,
usage_limit bigint,
bandwidth_limit bigint,
rate_limit integer,
max_buckets integer,
partner_id bytea,
owner_id bytea NOT NULL,
created_at timestamp with time zone NOT NULL,
PRIMARY KEY ( id )
);
CREATE TABLE project_bandwidth_daily_rollups (
project_id bytea NOT NULL,
interval_day date NOT NULL,
egress_allocated bigint NOT NULL,
egress_settled bigint NOT NULL,
egress_dead bigint NOT NULL DEFAULT 0,
PRIMARY KEY ( project_id, interval_day )
);
CREATE TABLE project_bandwidth_rollups (
project_id bytea NOT NULL,
interval_month date NOT NULL,
egress_allocated bigint NOT NULL,
PRIMARY KEY ( project_id, interval_month )
);
CREATE TABLE registration_tokens (
secret bytea NOT NULL,
owner_id bytea,
project_limit integer NOT NULL,
created_at timestamp with time zone NOT NULL,
PRIMARY KEY ( secret ),
UNIQUE ( owner_id )
);
CREATE TABLE repair_queue (
stream_id bytea NOT NULL,
position bigint NOT NULL,
attempted_at timestamp with time zone,
updated_at timestamp with time zone NOT NULL DEFAULT current_timestamp,
inserted_at timestamp with time zone NOT NULL DEFAULT current_timestamp,
segment_health double precision NOT NULL DEFAULT 1,
PRIMARY KEY ( stream_id, position )
);
CREATE TABLE reputations (
id bytea NOT NULL,
audit_success_count bigint NOT NULL DEFAULT 0,
total_audit_count bigint NOT NULL DEFAULT 0,
vetted_at timestamp with time zone,
created_at timestamp with time zone NOT NULL DEFAULT current_timestamp,
updated_at timestamp with time zone NOT NULL DEFAULT current_timestamp,
contained boolean NOT NULL DEFAULT false,
disqualified timestamp with time zone,
suspended timestamp with time zone,
unknown_audit_suspended timestamp with time zone,
offline_suspended timestamp with time zone,
under_review timestamp with time zone,
online_score double precision NOT NULL DEFAULT 1,
audit_history bytea NOT NULL,
audit_reputation_alpha double precision NOT NULL DEFAULT 1,
audit_reputation_beta double precision NOT NULL DEFAULT 0,
unknown_audit_reputation_alpha double precision NOT NULL DEFAULT 1,
unknown_audit_reputation_beta double precision NOT NULL DEFAULT 0,
PRIMARY KEY ( id )
);
CREATE TABLE reset_password_tokens (
secret bytea NOT NULL,
owner_id bytea NOT NULL,
created_at timestamp with time zone NOT NULL,
PRIMARY KEY ( secret ),
UNIQUE ( owner_id )
);
CREATE TABLE revocations (
revoked bytea NOT NULL,
api_key_id bytea NOT NULL,
PRIMARY KEY ( revoked )
);
CREATE TABLE segment_pending_audits (
node_id bytea NOT NULL,
stream_id bytea NOT NULL,
position bigint NOT NULL,
piece_id bytea NOT NULL,
stripe_index bigint NOT NULL,
share_size bigint NOT NULL,
expected_share_hash bytea NOT NULL,
reverify_count bigint NOT NULL,
PRIMARY KEY ( node_id )
);
CREATE TABLE storagenode_bandwidth_rollups (
storagenode_id bytea NOT NULL,
interval_start timestamp with time zone NOT NULL,
interval_seconds integer NOT NULL,
action integer NOT NULL,
allocated bigint DEFAULT 0,
settled bigint NOT NULL,
PRIMARY KEY ( storagenode_id, interval_start, action )
);
CREATE TABLE storagenode_bandwidth_rollup_archives (
storagenode_id bytea NOT NULL,
interval_start timestamp with time zone NOT NULL,
interval_seconds integer NOT NULL,
action integer NOT NULL,
allocated bigint DEFAULT 0,
settled bigint NOT NULL,
PRIMARY KEY ( storagenode_id, interval_start, action )
);
CREATE TABLE storagenode_bandwidth_rollups_phase2 (
storagenode_id bytea NOT NULL,
interval_start timestamp with time zone NOT NULL,
interval_seconds integer NOT NULL,
action integer NOT NULL,
allocated bigint DEFAULT 0,
settled bigint NOT NULL,
PRIMARY KEY ( storagenode_id, interval_start, action )
);
CREATE TABLE storagenode_payments (
id bigserial NOT NULL,
created_at timestamp with time zone NOT NULL,
node_id bytea NOT NULL,
period text NOT NULL,
amount bigint NOT NULL,
receipt text,
notes text,
PRIMARY KEY ( id )
);
CREATE TABLE storagenode_paystubs (
period text NOT NULL,
node_id bytea NOT NULL,
created_at timestamp with time zone NOT NULL,
codes text NOT NULL,
usage_at_rest double precision NOT NULL,
usage_get bigint NOT NULL,
usage_put bigint NOT NULL,
usage_get_repair bigint NOT NULL,
usage_put_repair bigint NOT NULL,
usage_get_audit bigint NOT NULL,
comp_at_rest bigint NOT NULL,
comp_get bigint NOT NULL,
comp_put bigint NOT NULL,
comp_get_repair bigint NOT NULL,
comp_put_repair bigint NOT NULL,
comp_get_audit bigint NOT NULL,
surge_percent bigint NOT NULL,
held bigint NOT NULL,
owed bigint NOT NULL,
disposed bigint NOT NULL,
paid bigint NOT NULL,
distributed bigint NOT NULL,
PRIMARY KEY ( period, node_id )
);
CREATE TABLE storagenode_storage_tallies (
node_id bytea NOT NULL,
interval_end_time timestamp with time zone NOT NULL,
data_total double precision NOT NULL,
PRIMARY KEY ( interval_end_time, node_id )
);
CREATE TABLE stripe_customers (
user_id bytea NOT NULL,
customer_id text NOT NULL,
created_at timestamp with time zone NOT NULL,
PRIMARY KEY ( user_id ),
UNIQUE ( customer_id )
);
CREATE TABLE stripecoinpayments_invoice_project_records (
id bytea NOT NULL,
project_id bytea NOT NULL,
storage double precision NOT NULL,
egress bigint NOT NULL,
objects bigint NOT NULL,
period_start timestamp with time zone NOT NULL,
period_end timestamp with time zone NOT NULL,
state integer NOT NULL,
created_at timestamp with time zone NOT NULL,
PRIMARY KEY ( id ),
UNIQUE ( project_id, period_start, period_end )
);
CREATE TABLE stripecoinpayments_tx_conversion_rates (
tx_id text NOT NULL,
rate bytea NOT NULL,
created_at timestamp with time zone NOT NULL,
PRIMARY KEY ( tx_id )
);
CREATE TABLE users (
id bytea NOT NULL,
email text NOT NULL,
normalized_email text NOT NULL,
full_name text NOT NULL,
short_name text,
password_hash bytea NOT NULL,
status integer NOT NULL,
partner_id bytea,
created_at timestamp with time zone NOT NULL,
project_limit integer NOT NULL DEFAULT 0,
paid_tier boolean NOT NULL DEFAULT false,
position text,
company_name text,
company_size integer,
working_on text,
is_professional boolean NOT NULL DEFAULT false,
employee_count text,
have_sales_contact boolean NOT NULL DEFAULT false,
PRIMARY KEY ( id )
);
CREATE TABLE value_attributions (
project_id bytea NOT NULL,
bucket_name bytea NOT NULL,
partner_id bytea NOT NULL,
last_updated timestamp with time zone NOT NULL,
PRIMARY KEY ( project_id, bucket_name )
);
CREATE TABLE api_keys (
id bytea NOT NULL,
project_id bytea NOT NULL REFERENCES projects( id ) ON DELETE CASCADE,
head bytea NOT NULL,
name text NOT NULL,
secret bytea NOT NULL,
partner_id bytea,
created_at timestamp with time zone NOT NULL,
PRIMARY KEY ( id ),
UNIQUE ( head ),
UNIQUE ( name, project_id )
);
CREATE TABLE bucket_metainfos (
id bytea NOT NULL,
project_id bytea NOT NULL REFERENCES projects( id ),
name bytea NOT NULL,
partner_id bytea,
path_cipher integer NOT NULL,
created_at timestamp with time zone NOT NULL,
default_segment_size integer NOT NULL,
default_encryption_cipher_suite integer NOT NULL,
default_encryption_block_size integer NOT NULL,
default_redundancy_algorithm integer NOT NULL,
default_redundancy_share_size integer NOT NULL,
default_redundancy_required_shares integer NOT NULL,
default_redundancy_repair_shares integer NOT NULL,
default_redundancy_optimal_shares integer NOT NULL,
default_redundancy_total_shares integer NOT NULL,
PRIMARY KEY ( id ),
UNIQUE ( project_id, name )
);
CREATE TABLE project_members (
member_id bytea NOT NULL REFERENCES users( id ) ON DELETE CASCADE,
project_id bytea NOT NULL REFERENCES projects( id ) ON DELETE CASCADE,
created_at timestamp with time zone NOT NULL,
PRIMARY KEY ( member_id, project_id )
);
CREATE TABLE stripecoinpayments_apply_balance_intents (
tx_id text NOT NULL REFERENCES coinpayments_transactions( id ) ON DELETE CASCADE,
state integer NOT NULL,
created_at timestamp with time zone NOT NULL,
PRIMARY KEY ( tx_id )
);
CREATE TABLE user_credits (
id serial NOT NULL,
user_id bytea NOT NULL REFERENCES users( id ) ON DELETE CASCADE,
offer_id integer NOT NULL REFERENCES offers( id ),
referred_by bytea REFERENCES users( id ) ON DELETE SET NULL,
type text NOT NULL,
credits_earned_in_cents integer NOT NULL,
credits_used_in_cents integer NOT NULL,
expires_at timestamp with time zone NOT NULL,
created_at timestamp with time zone NOT NULL,
PRIMARY KEY ( id ),
UNIQUE ( id, offer_id )
);
CREATE INDEX accounting_rollups_start_time_index ON accounting_rollups ( start_time ) ;
CREATE INDEX bucket_bandwidth_rollups_project_id_action_interval_index ON bucket_bandwidth_rollups ( project_id, action, interval_start ) ;
CREATE INDEX bucket_bandwidth_rollups_action_interval_project_id_index ON bucket_bandwidth_rollups ( action, interval_start, project_id ) ;
CREATE INDEX bucket_bandwidth_rollups_archive_project_id_action_interval_index ON bucket_bandwidth_rollup_archives ( project_id, action, interval_start ) ;
CREATE INDEX bucket_bandwidth_rollups_archive_action_interval_project_id_index ON bucket_bandwidth_rollup_archives ( action, interval_start, project_id ) ;
CREATE INDEX bucket_storage_tallies_project_id_interval_start_index ON bucket_storage_tallies ( project_id, interval_start ) ;
CREATE INDEX graceful_exit_transfer_queue_nid_dr_qa_fa_lfa_index ON graceful_exit_transfer_queue ( node_id, durability_ratio, queued_at, finished_at, last_failed_at ) ;
CREATE INDEX graceful_exit_segment_transfer_nid_dr_qa_fa_lfa_index ON graceful_exit_segment_transfer_queue ( node_id, durability_ratio, queued_at, finished_at, last_failed_at ) ;
CREATE INDEX injuredsegments_attempted_index ON injuredsegments ( attempted ) ;
CREATE INDEX injuredsegments_segment_health_index ON injuredsegments ( segment_health ) ;
CREATE INDEX injuredsegments_updated_at_index ON injuredsegments ( updated_at ) ;
CREATE INDEX injuredsegments_num_healthy_pieces_attempted_index ON injuredsegments ( segment_health, attempted ) ;
CREATE INDEX node_last_ip ON nodes ( last_net ) ;
CREATE INDEX nodes_dis_unk_off_exit_fin_last_success_index ON nodes ( disqualified, unknown_audit_suspended, offline_suspended, exit_finished_at, last_contact_success ) ;
CREATE INDEX nodes_type_last_cont_success_free_disk_ma_mi_patch_vetted_partial_index ON nodes ( type, last_contact_success, free_disk, major, minor, patch, vetted_at ) WHERE nodes.disqualified is NULL AND nodes.unknown_audit_suspended is NULL AND nodes.exit_initiated_at is NULL AND nodes.release = true AND nodes.last_net != '' ;
CREATE INDEX nodes_dis_unk_aud_exit_init_rel_type_last_cont_success_stored_index ON nodes ( disqualified, unknown_audit_suspended, exit_initiated_at, release, type, last_contact_success ) WHERE nodes.disqualified is NULL AND nodes.unknown_audit_suspended is NULL AND nodes.exit_initiated_at is NULL AND nodes.release = true ;
CREATE INDEX repair_queue_updated_at_index ON repair_queue ( updated_at ) ;
CREATE INDEX repair_queue_num_healthy_pieces_attempted_at_index ON repair_queue ( segment_health, attempted_at ) ;
CREATE INDEX storagenode_bandwidth_rollups_interval_start_index ON storagenode_bandwidth_rollups ( interval_start ) ;
CREATE INDEX storagenode_bandwidth_rollup_archives_interval_start_index ON storagenode_bandwidth_rollup_archives ( interval_start ) ;
CREATE INDEX storagenode_payments_node_id_period_index ON storagenode_payments ( node_id, period ) ;
CREATE INDEX storagenode_paystubs_node_id_index ON storagenode_paystubs ( node_id ) ;
CREATE INDEX storagenode_storage_tallies_node_id_index ON storagenode_storage_tallies ( node_id ) ;
CREATE UNIQUE INDEX credits_earned_user_id_offer_id ON user_credits ( id, offer_id ) ;
INSERT INTO "offers" ("id", "name", "description", "award_credit_in_cents", "invitee_credit_in_cents", "expires_at", "created_at", "status", "type", "award_credit_duration_days", "invitee_credit_duration_days") VALUES (1, 'Default referral offer', 'Is active when no other active referral offer', 300, 600, '2119-03-14 08:28:24.636949+00', '2019-07-14 08:28:24.636949+00', 1, 2, 365, 14);
INSERT INTO "offers" ("id", "name", "description", "award_credit_in_cents", "invitee_credit_in_cents", "expires_at", "created_at", "status", "type", "award_credit_duration_days", "invitee_credit_duration_days") VALUES (2, 'Default free credit offer', 'Is active when no active free credit offer', 0, 300, '2119-03-14 08:28:24.636949+00', '2019-07-14 08:28:24.636949+00', 1, 1, NULL, 14);
-- MAIN DATA --
INSERT INTO "accounting_rollups"("node_id", "start_time", "put_total", "get_total", "get_audit_total", "get_repair_total", "put_repair_total", "at_rest_total") VALUES (E'\\367M\\177\\251]t/\\022\\256\\214\\265\\025\\224\\204:\\217\\212\\0102<\\321\\374\\020&\\271Qc\\325\\261\\354\\246\\233'::bytea, '2019-02-09 00:00:00+00', 3000, 6000, 9000, 12000, 0, 15000);
INSERT INTO "accounting_timestamps" VALUES ('LastAtRestTally', '0001-01-01 00:00:00+00');
INSERT INTO "accounting_timestamps" VALUES ('LastRollup', '0001-01-01 00:00:00+00');
INSERT INTO "accounting_timestamps" VALUES ('LastBandwidthTally', '0001-01-01 00:00:00+00');
INSERT INTO "nodes"("id", "address", "last_net", "protocol", "type", "email", "wallet", "free_disk", "piece_count", "major", "minor", "patch", "hash", "timestamp", "release","latency_90", "audit_success_count", "total_audit_count", "created_at", "updated_at", "last_contact_success", "last_contact_failure", "contained", "disqualified", "suspended", "audit_reputation_alpha", "audit_reputation_beta", "unknown_audit_reputation_alpha", "unknown_audit_reputation_beta", "exit_success", "online_score") VALUES (E'\\153\\313\\233\\074\\327\\177\\136\\070\\346\\001', '127.0.0.1:55516', '', 0, 4, '', '', -1, 0, 0, 1, 0, '', 'epoch', false, 0, 0, 5, '2019-02-14 08:07:31.028103+00', '2019-02-14 08:07:31.108963+00', 'epoch', 'epoch', false, NULL, NULL, 50, 0, 1, 0, false, 1);
INSERT INTO "nodes"("id", "address", "last_net", "protocol", "type", "email", "wallet", "free_disk", "piece_count", "major", "minor", "patch", "hash", "timestamp", "release","latency_90", "audit_success_count", "total_audit_count", "created_at", "updated_at", "last_contact_success", "last_contact_failure", "contained", "disqualified", "suspended", "audit_reputation_alpha", "audit_reputation_beta", "unknown_audit_reputation_alpha", "unknown_audit_reputation_beta", "exit_success", "online_score") VALUES (E'\\006\\223\\250R\\221\\005\\365\\377v>0\\266\\365\\216\\255?\\347\\244\\371?2\\264\\262\\230\\007<\\001\\262\\263\\237\\247n', '127.0.0.1:55518', '', 0, 4, '', '', -1, 0, 0, 1, 0, '', 'epoch', false, 0, 0, 0, '2019-02-14 08:07:31.028103+00', '2019-02-14 08:07:31.108963+00', 'epoch', 'epoch', false, NULL, NULL, 50, 0, 1, 0, false, 1);
INSERT INTO "nodes"("id", "address", "last_net", "protocol", "type", "email", "wallet", "free_disk", "piece_count", "major", "minor", "patch", "hash", "timestamp", "release","latency_90", "audit_success_count", "total_audit_count", "created_at", "updated_at", "last_contact_success", "last_contact_failure", "contained", "disqualified", "suspended", "audit_reputation_alpha", "audit_reputation_beta", "unknown_audit_reputation_alpha", "unknown_audit_reputation_beta", "exit_success", "online_score") VALUES (E'\\363\\342\\363\\371>+F\\256\\263\\300\\273|\\342N\\347\\014', '127.0.0.1:55517', '', 0, 4, '', '', -1, 0, 0, 1, 0, '', 'epoch', false, 0, 0, 0, '2019-02-14 08:07:31.028103+00', '2019-02-14 08:07:31.108963+00', 'epoch', 'epoch', false, NULL, NULL, 50, 0, 1, 0, false, 1);
INSERT INTO "nodes"("id", "address", "last_net", "protocol", "type", "email", "wallet", "free_disk", "piece_count", "major", "minor", "patch", "hash", "timestamp", "release","latency_90", "audit_success_count", "total_audit_count", "created_at", "updated_at", "last_contact_success", "last_contact_failure", "contained", "disqualified", "suspended", "audit_reputation_alpha", "audit_reputation_beta", "unknown_audit_reputation_alpha", "unknown_audit_reputation_beta", "exit_success", "online_score") VALUES (E'\\363\\342\\363\\371>+F\\256\\263\\300\\273|\\342N\\347\\015', '127.0.0.1:55519', '', 0, 4, '', '', -1, 0, 0, 1, 0, '', 'epoch', false, 0, 1, 2, '2019-02-14 08:07:31.028103+00', '2019-02-14 08:07:31.108963+00', 'epoch', 'epoch', false, NULL, NULL, 50, 0, 1, 0, false, 1);
INSERT INTO "nodes"("id", "address", "last_net", "protocol", "type", "email", "wallet", "free_disk", "piece_count", "major", "minor", "patch", "hash", "timestamp", "release","latency_90", "audit_success_count", "total_audit_count", "created_at", "updated_at", "last_contact_success", "last_contact_failure", "contained", "disqualified", "suspended", "audit_reputation_alpha", "audit_reputation_beta", "unknown_audit_reputation_alpha", "unknown_audit_reputation_beta", "exit_success", "vetted_at", "online_score") VALUES (E'\\363\\342\\363\\371>+F\\256\\263\\300\\273|\\342N\\347\\016', '127.0.0.1:55520', '', 0, 4, '', '', -1, 0, 0, 1, 0, '', 'epoch', false, 0, 300, 400, '2019-02-14 08:07:31.028103+00', '2019-02-14 08:07:31.108963+00', 'epoch', 'epoch', false, NULL, NULL, 300, 0, 1, 0, false, '2020-03-18 12:00:00.000000+00', 1);
INSERT INTO "nodes"("id", "address", "last_net", "protocol", "type", "email", "wallet", "free_disk", "piece_count", "major", "minor", "patch", "hash", "timestamp", "release","latency_90", "audit_success_count", "total_audit_count", "created_at", "updated_at", "last_contact_success", "last_contact_failure", "contained", "disqualified", "suspended", "audit_reputation_alpha", "audit_reputation_beta", "unknown_audit_reputation_alpha", "unknown_audit_reputation_beta", "exit_success", "online_score") VALUES (E'\\154\\313\\233\\074\\327\\177\\136\\070\\346\\001', '127.0.0.1:55516', '', 0, 4, '', '', -1, 0, 0, 1, 0, '', 'epoch', false, 0, 0, 5, '2019-02-14 08:07:31.028103+00', '2019-02-14 08:07:31.108963+00', 'epoch', 'epoch', false, NULL, NULL, 50, 0, 75, 25, false, 1);
INSERT INTO "nodes"("id", "address", "last_net", "last_ip_port", "protocol", "type", "email", "wallet", "free_disk", "piece_count", "major", "minor", "patch", "hash", "timestamp", "release","latency_90", "audit_success_count", "total_audit_count", "created_at", "updated_at", "last_contact_success", "last_contact_failure", "contained", "disqualified", "suspended", "audit_reputation_alpha", "audit_reputation_beta", "unknown_audit_reputation_alpha", "unknown_audit_reputation_beta", "exit_success", "online_score") VALUES (E'\\154\\313\\233\\074\\327\\177\\136\\070\\346\\002', '127.0.0.1:55516', '127.0.0.0', '127.0.0.1:55516', 0, 4, '', '', -1, 0, 0, 1, 0, '', 'epoch', false, 0, 0, 5, '2019-02-14 08:07:31.028103+00', '2019-02-14 08:07:31.108963+00', 'epoch', 'epoch', false, NULL, NULL, 50, 0, 75, 25, false, 1);
INSERT INTO "nodes"("id", "address", "last_net", "protocol", "type", "email", "wallet", "free_disk", "piece_count", "major", "minor", "patch", "hash", "timestamp", "release","latency_90", "audit_success_count", "total_audit_count", "created_at", "updated_at", "last_contact_success", "last_contact_failure", "contained", "disqualified", "suspended", "audit_reputation_alpha", "audit_reputation_beta", "unknown_audit_reputation_alpha", "unknown_audit_reputation_beta", "exit_success", "online_score") VALUES (E'\\363\\341\\363\\371>+F\\256\\263\\300\\273|\\342N\\347\\016', '127.0.0.1:55516', '', 0, 4, '', '', -1, 0, 0, 1, 0, '', 'epoch', false, 0, 0, 5, '2019-02-14 08:07:31.028103+00', '2019-02-14 08:07:31.108963+00', 'epoch', 'epoch', false, NULL, NULL, 50, 0, 1, 0, false, 1);
INSERT INTO "nodes"("id", "address", "last_net", "protocol", "type", "email", "wallet", "wallet_features", "free_disk", "piece_count", "major", "minor", "patch", "hash", "timestamp", "release","latency_90", "audit_success_count", "total_audit_count", "created_at", "updated_at", "last_contact_success", "last_contact_failure", "contained", "disqualified", "suspended", "audit_reputation_alpha", "audit_reputation_beta", "unknown_audit_reputation_alpha", "unknown_audit_reputation_beta", "exit_success", "online_score") VALUES (E'\\362\\341\\363\\371>+F\\256\\263\\300\\273|\\342N\\347\\016', '127.0.0.1:55516', '', 0, 4, '', '', '', -1, 0, 0, 1, 0, '', 'epoch', false, 0, 0, 5, '2019-02-14 08:07:31.028103+00', '2019-02-14 08:07:31.108963+00', 'epoch', 'epoch', false, NULL, NULL, 50, 0, 1, 0, false, 1);
INSERT INTO "users"("id", "full_name", "short_name", "email", "normalized_email", "password_hash", "status", "partner_id", "created_at", "is_professional", "project_limit", "paid_tier") VALUES (E'\\363\\311\\033w\\222\\303Ci\\265\\343U\\303\\312\\204",'::bytea, 'Noahson', 'William', '1email1@mail.test', '1EMAIL1@MAIL.TEST', E'some_readable_hash'::bytea, 1, NULL, '2019-02-14 08:28:24.614594+00', false, 10, false);
INSERT INTO "users"("id", "full_name", "short_name", "email", "normalized_email", "password_hash", "status", "partner_id", "created_at", "position", "company_name", "working_on", "company_size", "is_professional", "employee_count", "project_limit", "have_sales_contact") VALUES (E'\\363\\311\\033w\\222\\303Ci\\265\\343U\\304\\313\\206\\311",'::bytea, 'Ian', 'Pires', '3email3@mail.test', '3EMAIL3@MAIL.TEST', E'some_readable_hash'::bytea, 2, NULL, '2020-03-18 10:28:24.614594+00', 'engineer', 'storj', 'data storage', 51, true, '1-50', 10, true);
INSERT INTO "users"("id", "full_name", "short_name", "email", "normalized_email", "password_hash", "status", "partner_id", "created_at", "position", "company_name", "working_on", "company_size", "is_professional", "employee_count", "project_limit") VALUES (E'\\363\\311\\033w\\222\\303Ci\\265\\343U\\303\\312\\205\\312",'::bytea, 'Campbell', 'Wright', '4email4@mail.test', '4EMAIL4@MAIL.TEST', E'some_readable_hash'::bytea, 2, NULL, '2020-07-17 10:28:24.614594+00', 'engineer', 'storj', 'data storage', 82, true, '1-50', 10);
INSERT INTO "users"("id", "full_name", "short_name", "email", "normalized_email", "password_hash", "status", "partner_id", "created_at", "position", "company_name", "working_on", "company_size", "is_professional", "project_limit", "paid_tier") VALUES (E'\\363\\311\\033w\\222\\303Ci\\265\\343U\\303\\312\\205\\311",'::bytea, 'Thierry', 'Berg', '2email2@mail.test', '2EMAIL2@MAIL.TEST', E'some_readable_hash'::bytea, 2, NULL, '2020-05-16 10:28:24.614594+00', 'engineer', 'storj', 'data storage', 55, true, 10, false);
INSERT INTO "projects"("id", "name", "description", "usage_limit", "bandwidth_limit", "max_buckets", "partner_id", "owner_id", "created_at") VALUES (E'\\022\\217/\\014\\376!K\\023\\276\\031\\311}m\\236\\205\\300'::bytea, 'ProjectName', 'projects description', 5e11, 5e11, NULL, NULL, E'\\363\\311\\033w\\222\\303Ci\\265\\343U\\303\\312\\204",'::bytea, '2019-02-14 08:28:24.254934+00');
INSERT INTO "projects"("id", "name", "description", "usage_limit", "bandwidth_limit", "max_buckets", "partner_id", "owner_id", "created_at") VALUES (E'\\363\\342\\363\\371>+F\\256\\263\\300\\273|\\342N\\347\\014'::bytea, 'projName1', 'Test project 1', 5e11, 5e11, NULL, NULL, E'\\363\\311\\033w\\222\\303Ci\\265\\343U\\303\\312\\204",'::bytea, '2019-02-14 08:28:24.636949+00');
INSERT INTO "project_members"("member_id", "project_id", "created_at") VALUES (E'\\363\\311\\033w\\222\\303Ci\\265\\343U\\303\\312\\204",'::bytea, E'\\363\\342\\363\\371>+F\\256\\263\\300\\273|\\342N\\347\\014'::bytea, '2019-02-14 08:28:24.677953+00');
INSERT INTO "project_members"("member_id", "project_id", "created_at") VALUES (E'\\363\\311\\033w\\222\\303Ci\\265\\343U\\303\\312\\204",'::bytea, E'\\022\\217/\\014\\376!K\\023\\276\\031\\311}m\\236\\205\\300'::bytea, '2019-02-13 08:28:24.677953+00');
INSERT INTO "irreparabledbs" ("segmentpath", "segmentdetail", "pieces_lost_count", "seg_damaged_unix_sec", "repair_attempt_count") VALUES ('\x49616d5365676d656e746b6579696e666f30', '\x49616d5365676d656e7464657461696c696e666f30', 10, 1550159554, 10);
INSERT INTO "registration_tokens" ("secret", "owner_id", "project_limit", "created_at") VALUES (E'\\070\\127\\144\\013\\332\\344\\102\\376\\306\\056\\303\\130\\106\\132\\321\\276\\321\\274\\170\\264\\054\\333\\221\\116\\154\\221\\335\\070\\220\\146\\344\\216'::bytea, null, 1, '2019-02-14 08:28:24.677953+00');
INSERT INTO "storagenode_bandwidth_rollups" ("storagenode_id", "interval_start", "interval_seconds", "action", "allocated", "settled") VALUES (E'\\006\\223\\250R\\221\\005\\365\\377v>0\\266\\365\\216\\255?\\347\\244\\371?2\\264\\262\\230\\007<\\001\\262\\263\\237\\247n', '2019-03-06 08:00:00.000000' AT TIME ZONE current_setting('TIMEZONE'), 3600, 1, 1024, 2024);
INSERT INTO "storagenode_storage_tallies" VALUES (E'\\3510\\323\\225"~\\036<\\342\\330m\\0253Jhr\\246\\233K\\246#\\2303\\351\\256\\275j\\212UM\\362\\207', '2019-02-14 08:16:57.812849+00', 1000);
INSERT INTO "bucket_bandwidth_rollups" ("bucket_name", "project_id", "interval_start", "interval_seconds", "action", "inline", "allocated", "settled") VALUES (E'testbucket'::bytea, E'\\363\\342\\363\\371>+F\\256\\263\\300\\273|\\342N\\347\\014'::bytea,'2019-03-06 08:00:00.000000' AT TIME ZONE current_setting('TIMEZONE'), 3600, 1, 1024, 2024, 3024);
INSERT INTO "bucket_storage_tallies" ("bucket_name", "project_id", "interval_start", "inline", "remote", "remote_segments_count", "inline_segments_count", "object_count", "metadata_size") VALUES (E'testbucket'::bytea, E'\\363\\342\\363\\371>+F\\256\\263\\300\\273|\\342N\\347\\014'::bytea,'2019-03-06 08:00:00.000000' AT TIME ZONE current_setting('TIMEZONE'), 4024, 5024, 0, 0, 0, 0);
INSERT INTO "bucket_bandwidth_rollups" ("bucket_name", "project_id", "interval_start", "interval_seconds", "action", "inline", "allocated", "settled") VALUES (E'testbucket'::bytea, E'\\170\\160\\157\\370\\274\\366\\113\\364\\272\\235\\301\\243\\321\\102\\321\\136'::bytea,'2019-03-06 08:00:00.000000' AT TIME ZONE current_setting('TIMEZONE'), 3600, 1, 1024, 2024, 3024);
INSERT INTO "bucket_storage_tallies" ("bucket_name", "project_id", "interval_start", "inline", "remote", "remote_segments_count", "inline_segments_count", "object_count", "metadata_size") VALUES (E'testbucket'::bytea, E'\\170\\160\\157\\370\\274\\366\\113\\364\\272\\235\\301\\243\\321\\102\\321\\136'::bytea,'2019-03-06 08:00:00.000000' AT TIME ZONE current_setting('TIMEZONE'), 4024, 5024, 0, 0, 0, 0);
INSERT INTO "reset_password_tokens" ("secret", "owner_id", "created_at") VALUES (E'\\070\\127\\144\\013\\332\\344\\102\\376\\306\\056\\303\\130\\106\\132\\321\\276\\321\\274\\170\\264\\054\\333\\221\\116\\154\\221\\335\\070\\220\\146\\344\\216'::bytea, E'\\363\\311\\033w\\222\\303Ci\\265\\343U\\303\\312\\204",'::bytea, '2019-05-08 08:28:24.677953+00');
INSERT INTO "api_keys" ("id", "project_id", "head", "name", "secret", "partner_id", "created_at") VALUES (E'\\334/\\302;\\225\\355O\\323\\276f\\247\\354/6\\241\\033'::bytea, E'\\022\\217/\\014\\376!K\\023\\276\\031\\311}m\\236\\205\\300'::bytea, E'\\111\\142\\147\\304\\132\\375\\070\\163\\270\\160\\251\\370\\126\\063\\351\\037\\257\\071\\143\\375\\351\\320\\253\\232\\220\\260\\075\\173\\306\\307\\115\\136'::bytea, 'key 2', E'\\254\\011\\315\\333\\273\\365\\001\\071\\024\\154\\253\\332\\301\\216\\361\\074\\221\\367\\251\\231\\274\\333\\300\\367\\001\\272\\327\\111\\315\\123\\042\\016'::bytea, NULL, '2019-02-14 08:28:24.267934+00');
INSERT INTO "value_attributions" ("project_id", "bucket_name", "partner_id", "last_updated") VALUES (E'\\363\\311\\033w\\222\\303Ci\\265\\343U\\303\\312\\204",'::bytea, E''::bytea, E'\\363\\342\\363\\371>+F\\256\\263\\300\\273|\\342N\\347\\014'::bytea,'2019-02-14 08:07:31.028103+00');
INSERT INTO "user_credits" ("id", "user_id", "offer_id", "referred_by", "credits_earned_in_cents", "credits_used_in_cents", "type", "expires_at", "created_at") VALUES (1, E'\\363\\311\\033w\\222\\303Ci\\265\\343U\\303\\312\\204",'::bytea, 1, E'\\363\\311\\033w\\222\\303Ci\\265\\343U\\303\\312\\204",'::bytea, 200, 0, 'invalid', '2019-10-01 08:28:24.267934+00', '2019-06-01 08:28:24.267934+00');
INSERT INTO "bucket_metainfos" ("id", "project_id", "name", "partner_id", "created_at", "path_cipher", "default_segment_size", "default_encryption_cipher_suite", "default_encryption_block_size", "default_redundancy_algorithm", "default_redundancy_share_size", "default_redundancy_required_shares", "default_redundancy_repair_shares", "default_redundancy_optimal_shares", "default_redundancy_total_shares") VALUES (E'\\334/\\302;\\225\\355O\\323\\276f\\247\\354/6\\241\\033'::bytea, E'\\022\\217/\\014\\376!K\\023\\276\\031\\311}m\\236\\205\\300'::bytea, E'testbucketuniquename'::bytea, NULL, '2019-06-14 08:28:24.677953+00', 1, 65536, 1, 8192, 1, 4096, 4, 6, 8, 10);
INSERT INTO "pending_audits" ("node_id", "piece_id", "stripe_index", "share_size", "expected_share_hash", "reverify_count", "path") VALUES (E'\\153\\313\\233\\074\\327\\177\\136\\070\\346\\001'::bytea, E'\\363\\311\\033w\\222\\303Ci\\265\\343U\\303\\312\\204",'::bytea, 5, 1024, E'\\070\\127\\144\\013\\332\\344\\102\\376\\306\\056\\303\\130\\106\\132\\321\\276\\321\\274\\170\\264\\054\\333\\221\\116\\154\\221\\335\\070\\220\\146\\344\\216'::bytea, 1, 'not null');
INSERT INTO "peer_identities" VALUES (E'\\334/\\302;\\225\\355O\\323\\276f\\247\\354/6\\241\\033'::bytea, E'\\363\\342\\363\\371>+F\\256\\263\\300\\273|\\342N\\347\\014'::bytea, E'\\363\\311\\033w\\222\\303Ci\\265\\343U\\303\\312\\204",'::bytea, '2019-02-14 08:07:31.335028+00');
INSERT INTO "graceful_exit_progress" ("node_id", "bytes_transferred", "pieces_transferred", "pieces_failed", "updated_at", "uses_segment_transfer_queue") VALUES (E'\\363\\342\\363\\371>+F\\256\\263\\300\\273|\\342N\\347\\016', 1000000000000000, 0, 0, '2019-09-12 10:07:31.028103+00', false);
INSERT INTO "graceful_exit_transfer_queue" ("node_id", "path", "piece_num", "durability_ratio", "queued_at", "requested_at", "last_failed_at", "last_failed_code", "failed_count", "finished_at", "order_limit_send_count") VALUES (E'\\363\\342\\363\\371>+F\\256\\263\\300\\273|\\342N\\347\\016', E'f8419768-5baa-4901-b3ba-62808013ec45/s0/test3/\\240\\243\\223n \\334~b}\\2624)\\250m\\201\\202\\235\\276\\361\\3304\\323\\352\\311\\361\\353;\\326\\311', 8, 1.0, '2019-09-12 10:07:31.028103+00', '2019-09-12 10:07:32.028103+00', null, null, 0, '2019-09-12 10:07:33.028103+00', 0);
INSERT INTO "graceful_exit_transfer_queue" ("node_id", "path", "piece_num", "durability_ratio", "queued_at", "requested_at", "last_failed_at", "last_failed_code", "failed_count", "finished_at", "order_limit_send_count") VALUES (E'\\363\\342\\363\\371>+F\\256\\263\\300\\273|\\342N\\347\\016', E'f8419768-5baa-4901-b3ba-62808013ec45/s0/test3/\\240\\243\\223n \\334~b}\\2624)\\250m\\201\\202\\235\\276\\361\\3304\\323\\352\\311\\361\\353;\\326\\312', 8, 1.0, '2019-09-12 10:07:31.028103+00', '2019-09-12 10:07:32.028103+00', null, null, 0, '2019-09-12 10:07:33.028103+00', 0);
INSERT INTO "stripe_customers" ("user_id", "customer_id", "created_at") VALUES (E'\\363\\311\\033w\\222\\303Ci\\265\\343U\\303\\312\\204",'::bytea, 'stripe_id', '2019-06-01 08:28:24.267934+00');
INSERT INTO "graceful_exit_transfer_queue" ("node_id", "path", "piece_num", "durability_ratio", "queued_at", "requested_at", "last_failed_at", "last_failed_code", "failed_count", "finished_at", "order_limit_send_count") VALUES (E'\\363\\342\\363\\371>+F\\256\\263\\300\\273|\\342N\\347\\016', E'f8419768-5baa-4901-b3ba-62808013ec45/s0/test3/\\240\\243\\223n \\334~b}\\2624)\\250m\\201\\202\\235\\276\\361\\3304\\323\\352\\311\\361\\353;\\326\\311', 9, 1.0, '2019-09-12 10:07:31.028103+00', '2019-09-12 10:07:32.028103+00', null, null, 0, '2019-09-12 10:07:33.028103+00', 0);
INSERT INTO "graceful_exit_transfer_queue" ("node_id", "path", "piece_num", "durability_ratio", "queued_at", "requested_at", "last_failed_at", "last_failed_code", "failed_count", "finished_at", "order_limit_send_count") VALUES (E'\\363\\342\\363\\371>+F\\256\\263\\300\\273|\\342N\\347\\016', E'f8419768-5baa-4901-b3ba-62808013ec45/s0/test3/\\240\\243\\223n \\334~b}\\2624)\\250m\\201\\202\\235\\276\\361\\3304\\323\\352\\311\\361\\353;\\326\\312', 9, 1.0, '2019-09-12 10:07:31.028103+00', '2019-09-12 10:07:32.028103+00', null, null, 0, '2019-09-12 10:07:33.028103+00', 0);
INSERT INTO "stripecoinpayments_invoice_project_records"("id", "project_id", "storage", "egress", "objects", "period_start", "period_end", "state", "created_at") VALUES (E'\\022\\217/\\014\\376!K\\023\\276\\031\\311}m\\236\\205\\300'::bytea, E'\\021\\217/\\014\\376!K\\023\\276\\031\\311}m\\236\\205\\300'::bytea, 0, 0, 0, '2019-06-01 08:28:24.267934+00', '2019-06-01 08:28:24.267934+00', 0, '2019-06-01 08:28:24.267934+00');
INSERT INTO "graceful_exit_transfer_queue" ("node_id", "path", "piece_num", "root_piece_id", "durability_ratio", "queued_at", "requested_at", "last_failed_at", "last_failed_code", "failed_count", "finished_at", "order_limit_send_count") VALUES (E'\\363\\342\\363\\371>+F\\256\\263\\300\\273|\\342N\\347\\016', E'f8419768-5baa-4901-b3ba-62808013ec45/s0/test3/\\240\\243\\223n \\334~b}\\2624)\\250m\\201\\202\\235\\276\\361\\3304\\323\\352\\311\\361\\353;\\326\\311', 10, E'\\363\\311\\033w\\222\\303Ci\\265\\343U\\303\\312\\204",'::bytea, 1.0, '2019-09-12 10:07:31.028103+00', '2019-09-12 10:07:32.028103+00', null, null, 0, '2019-09-12 10:07:33.028103+00', 0);
INSERT INTO "stripecoinpayments_tx_conversion_rates" ("tx_id", "rate", "created_at") VALUES ('tx_id', E'\\363\\311\\033w\\222\\303Ci,'::bytea, '2019-06-01 08:28:24.267934+00');
INSERT INTO "coinpayments_transactions" ("id", "user_id", "address", "amount", "received", "status", "key", "timeout", "created_at") VALUES ('tx_id', E'\\363\\311\\033w\\222\\303Ci\\265\\343U\\303\\312\\204",'::bytea, 'address', E'\\363\\311\\033w'::bytea, E'\\363\\311\\033w'::bytea, 1, 'key', 60, '2019-06-01 08:28:24.267934+00');
INSERT INTO "storagenode_bandwidth_rollups" ("storagenode_id", "interval_start", "interval_seconds", "action", "settled") VALUES (E'\\006\\223\\250R\\221\\005\\365\\377v>0\\266\\365\\216\\255?\\347\\244\\371?2\\264\\262\\230\\007<\\001\\262\\263\\237\\247n', '2020-01-11 08:00:00.000000' AT TIME ZONE current_setting('TIMEZONE'), 3600, 1, 2024);
INSERT INTO "coupons" ("id", "user_id", "amount", "description", "type", "status", "duration", "billing_periods", "created_at") VALUES (E'\\362\\342\\363\\371>+F\\256\\263\\300\\273|\\342N\\347\\014'::bytea, E'\\363\\311\\033w\\222\\303Ci\\265\\343U\\303\\312\\204",'::bytea, 50, 'description', 0, 0, 2, 2, '2019-06-01 08:28:24.267934+00');
INSERT INTO "coupons" ("id", "user_id", "amount", "description", "type", "status", "duration", "billing_periods", "created_at") VALUES (E'\\362\\342\\363\\371>+F\\256\\263\\300\\273|\\342N\\347\\012'::bytea, E'\\363\\311\\033w\\222\\303Ci\\265\\343U\\303\\312\\204",'::bytea, 50, 'description', 0, 0, 2, 2, '2019-06-01 08:28:24.267934+00');
INSERT INTO "coupons" ("id", "user_id", "amount", "description", "type", "status", "duration", "billing_periods", "created_at") VALUES (E'\\362\\342\\363\\371>+F\\256\\263\\300\\273|\\342N\\347\\015'::bytea, E'\\363\\311\\033w\\222\\303Ci\\265\\343U\\303\\312\\204",'::bytea, 50, 'description', 0, 0, 2, 2, '2019-06-01 08:28:24.267934+00');
INSERT INTO "coupon_usages" ("coupon_id", "amount", "status", "period") VALUES (E'\\362\\342\\363\\371>+F\\256\\263\\300\\273|\\342N\\347\\014'::bytea, 22, 0, '2019-06-01 09:28:24.267934+00');
INSERT INTO "coupon_codes" ("id", "name", "amount", "description", "type", "billing_periods", "created_at") VALUES (E'\\362\\342\\363\\371>+F\\256\\263\\300\\273|\\342N\\347\\014'::bytea, 'STORJ50', 50, '$50 for your first 5 months', 0, NULL, '2019-06-01 08:28:24.267934+00');
INSERT INTO "coupon_codes" ("id", "name", "amount", "description", "type", "billing_periods", "created_at") VALUES (E'\\362\\342\\363\\371>+F\\256\\263\\300\\273|\\342N\\347\\015'::bytea, 'STORJ75', 75, '$75 for your first 5 months', 0, 2, '2019-06-01 08:28:24.267934+00');
INSERT INTO "stripecoinpayments_apply_balance_intents" ("tx_id", "state", "created_at") VALUES ('tx_id', 0, '2019-06-01 08:28:24.267934+00');
INSERT INTO "projects"("id", "name", "description", "usage_limit", "bandwidth_limit", "max_buckets", "rate_limit", "partner_id", "owner_id", "created_at") VALUES (E'\\363\\342\\363\\371>+F\\256\\263\\300\\273|\\342N\\347\\347'::bytea, 'projName1', 'Test project 1', 5e11, 5e11, NULL, 2000000, NULL, E'\\363\\311\\033w\\222\\303Ci\\265\\343U\\303\\312\\204",'::bytea, '2020-01-15 08:28:24.636949+00');
INSERT INTO "injuredsegments" ("path", "data", "segment_health", "updated_at") VALUES ('0', '\x0a0130120100', 1.0, '2020-09-01 00:00:00.000000+00');
INSERT INTO "injuredsegments" ("path", "data", "segment_health", "updated_at") VALUES ('here''s/a/great/path', '\x0a136865726527732f612f67726561742f70617468120a0102030405060708090a', 1.0, '2020-09-01 00:00:00.000000+00');
INSERT INTO "injuredsegments" ("path", "data", "segment_health", "updated_at") VALUES ('yet/another/cool/path', '\x0a157965742f616e6f746865722f636f6f6c2f70617468120a0102030405060708090a', 1.0, '2020-09-01 00:00:00.000000+00');
INSERT INTO "injuredsegments" ("path", "data", "segment_health", "updated_at") VALUES ('/this/is/a/new/path', '\x0a23736f2f6d616e792f69636f6e69632f70617468732f746f2f63686f6f73652f66726f6d120a0102030405060708090a', 1.0, '2020-09-01 00:00:00.000000+00');
INSERT INTO "injuredsegments" ("path", "data", "segment_health", "updated_at") VALUES ('/some/path/1/23/4', '\x0a23736f2f6d618e792f69636f6e69632f70617468732f746f2f63686f6f73652f66726f6d120a0102030405060708090a', 0.2, '2020-09-01 00:00:00.000000+00');
INSERT INTO "project_bandwidth_rollups"("project_id", "interval_month", egress_allocated) VALUES (E'\\363\\342\\363\\371>+F\\256\\263\\300\\273|\\342N\\347\\347'::bytea, '2020-04-01', 10000);
INSERT INTO "project_bandwidth_daily_rollups"("project_id", "interval_day", egress_allocated, egress_settled, egress_dead) VALUES (E'\\363\\342\\363\\371>+F\\256\\263\\300\\273|\\342N\\347\\347'::bytea, '2021-04-22', 10000, 5000, 0);
INSERT INTO "projects"("id", "name", "description", "usage_limit", "bandwidth_limit", "max_buckets","rate_limit", "partner_id", "owner_id", "created_at") VALUES (E'\\363\\342\\363\\371>+F\\256\\263\\300\\273|\\342N\\347\\345'::bytea, 'egress101', 'High Bandwidth Project', 5e11, 5e11, NULL, 2000000, NULL, E'\\363\\311\\033w\\222\\303Ci\\265\\343U\\303\\312\\204",'::bytea, '2020-05-15 08:46:24.000000+00');
INSERT INTO "storagenode_paystubs"("period", "node_id", "created_at", "codes", "usage_at_rest", "usage_get", "usage_put", "usage_get_repair", "usage_put_repair", "usage_get_audit", "comp_at_rest", "comp_get", "comp_put", "comp_get_repair", "comp_put_repair", "comp_get_audit", "surge_percent", "held", "owed", "disposed", "paid", "distributed") VALUES ('2020-01', '\xf2a3b4c4dfdf7221310382fd5db5aa73e1d227d6df09734ec4e5305000000000', '2020-04-07T20:14:21.479141Z', '', 1327959864508416, 294054066688, 159031363328, 226751, 0, 836608, 2861984, 5881081, 0, 226751, 0, 8, 300, 0, 26909472, 0, 26909472, 0);
INSERT INTO "nodes"("id", "address", "last_net", "protocol", "type", "email", "wallet", "free_disk", "piece_count", "major", "minor", "patch", "hash", "timestamp", "release","latency_90", "audit_success_count", "total_audit_count", "created_at", "updated_at", "last_contact_success", "last_contact_failure", "contained", "disqualified", "suspended", "audit_reputation_alpha", "audit_reputation_beta", "unknown_audit_reputation_alpha", "unknown_audit_reputation_beta", "exit_success", "unknown_audit_suspended", "offline_suspended", "under_review") VALUES (E'\\153\\313\\233\\074\\327\\255\\136\\070\\346\\001', '127.0.0.1:55516', '', 0, 4, '', '', -1, 0, 0, 1, 0, '', 'epoch', false, 0, 0, 5, '2019-02-14 08:07:31.028103+00', '2019-02-14 08:07:31.108963+00', 'epoch', 'epoch', false, NULL, NULL, 50, 0, 1, 0, false, '2019-02-14 08:07:31.108963+00', '2019-02-14 08:07:31.108963+00', '2019-02-14 08:07:31.108963+00');
INSERT INTO "audit_histories" ("node_id", "history") VALUES (E'\\153\\313\\233\\074\\327\\177\\136\\070\\346\\001', '\x0a23736f2f6d616e792f69636f6e69632f70617468732f746f2f63686f6f73652f66726f6d120a0102030405060708090a');
INSERT INTO "node_api_versions"("id", "api_version", "created_at", "updated_at") VALUES (E'\\153\\313\\233\\074\\327\\177\\136\\070\\346\\001', 1, '2019-02-14 08:07:31.028103+00', '2019-02-14 08:07:31.108963+00');
INSERT INTO "node_api_versions"("id", "api_version", "created_at", "updated_at") VALUES (E'\\006\\223\\250R\\221\\005\\365\\377v>0\\266\\365\\216\\255?\\347\\244\\371?2\\264\\262\\230\\007<\\001\\262\\263\\237\\247n', 2, '2019-02-14 08:07:31.028103+00', '2019-02-14 08:07:31.108963+00');
INSERT INTO "node_api_versions"("id", "api_version", "created_at", "updated_at") VALUES (E'\\363\\342\\363\\371>+F\\256\\263\\300\\273|\\342N\\347\\014', 3, '2019-02-14 08:07:31.028103+00', '2019-02-14 08:07:31.108963+00');
INSERT INTO "projects"("id", "name", "description", "usage_limit", "bandwidth_limit", "rate_limit", "partner_id", "owner_id", "created_at", "max_buckets") VALUES (E'300\\273|\\342N\\347\\347\\363\\342\\363\\371>+F\\256\\263'::bytea, 'egress102', 'High Bandwidth Project 2', 5e11, 5e11, 2000000, NULL, E'265\\343U\\303\\312\\312\\363\\311\\033w\\222\\303Ci",'::bytea, '2020-05-15 08:46:24.000000+00', 1000);
INSERT INTO "projects"("id", "name", "description", "usage_limit", "bandwidth_limit", "rate_limit", "partner_id", "owner_id", "created_at", "max_buckets") VALUES (E'300\\273|\\342N\\347\\347\\363\\342\\363\\371>+F\\255\\244'::bytea, 'egress103', 'High Bandwidth Project 3', 5e11, 5e11, 2000000, NULL, E'265\\343U\\303\\312\\312\\363\\311\\033w\\222\\303Ci",'::bytea, '2020-05-15 08:46:24.000000+00', 1000);
INSERT INTO "projects"("id", "name", "description", "usage_limit", "bandwidth_limit", "rate_limit", "partner_id", "owner_id", "created_at", "max_buckets") VALUES (E'300\\273|\\342N\\347\\347\\363\\342\\363\\371>+F\\253\\231'::bytea, 'Limit Test 1', 'This project is above the default', 50000000001, 50000000001, 2000000, NULL, E'265\\343U\\303\\312\\312\\363\\311\\033w\\222\\303Ci",'::bytea, '2020-10-14 10:10:10.000000+00', 101);
INSERT INTO "projects"("id", "name", "description", "usage_limit", "bandwidth_limit", "rate_limit", "partner_id", "owner_id", "created_at", "max_buckets") VALUES (E'300\\273|\\342N\\347\\347\\363\\342\\363\\371>+F\\252\\230'::bytea, 'Limit Test 2', 'This project is below the default', 5e11, 5e11, 2000000, NULL, E'265\\343U\\303\\312\\312\\363\\311\\033w\\222\\303Ci",'::bytea, '2020-10-14 10:10:11.000000+00', NULL);
INSERT INTO "storagenode_bandwidth_rollups_phase2" ("storagenode_id", "interval_start", "interval_seconds", "action", "allocated", "settled") VALUES (E'\\006\\223\\250R\\221\\005\\365\\377v>0\\266\\365\\216\\255?\\347\\244\\371?2\\264\\262\\230\\007<\\001\\262\\263\\237\\247n', '2019-03-06 08:00:00.000000' AT TIME ZONE current_setting('TIMEZONE'), 3600, 1, 1024, 2024);
INSERT INTO "storagenode_bandwidth_rollup_archives" ("storagenode_id", "interval_start", "interval_seconds", "action", "allocated", "settled") VALUES (E'\\006\\223\\250R\\221\\005\\365\\377v>0\\266\\365\\216\\255?\\347\\244\\371?2\\264\\262\\230\\007<\\001\\262\\263\\237\\247n', '2019-03-06 08:00:00.000000' AT TIME ZONE current_setting('TIMEZONE'), 3600, 1, 1024, 2024);
INSERT INTO "bucket_bandwidth_rollup_archives" ("bucket_name", "project_id", "interval_start", "interval_seconds", "action", "inline", "allocated", "settled") VALUES (E'testbucket'::bytea, E'\\170\\160\\157\\370\\274\\366\\113\\364\\272\\235\\301\\243\\321\\102\\321\\136'::bytea,'2019-03-06 08:00:00.000000' AT TIME ZONE current_setting('TIMEZONE'), 3600, 1, 1024, 2024, 3024);
INSERT INTO "storagenode_paystubs"("period", "node_id", "created_at", "codes", "usage_at_rest", "usage_get", "usage_put", "usage_get_repair", "usage_put_repair", "usage_get_audit", "comp_at_rest", "comp_get", "comp_put", "comp_get_repair", "comp_put_repair", "comp_get_audit", "surge_percent", "held", "owed", "disposed", "paid", "distributed") VALUES ('2020-12', '\x1111111111111111111111111111111111111111111111111111111111111111', '2020-04-07T20:14:21.479141Z', '', 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 117);
INSERT INTO "storagenode_payments"("id", "created_at", "period", "node_id", "amount") VALUES (1, '2020-04-07T20:14:21.479141Z', '2020-12', '\x1111111111111111111111111111111111111111111111111111111111111111', 117);
INSERT INTO "reputations"("id", "audit_success_count", "total_audit_count", "created_at", "updated_at", "contained", "disqualified", "suspended", "audit_reputation_alpha", "audit_reputation_beta", "unknown_audit_reputation_alpha", "unknown_audit_reputation_beta", "unknown_audit_suspended", "offline_suspended", "under_review", "audit_history") VALUES (E'\\153\\313\\233\\074\\327\\255\\136\\070\\346\\001', 5, 2, '2019-02-14 08:07:31.028103+00', '2019-02-14 08:07:31.108963+00', false, NULL, NULL, 50, 0, 1, 0, '2019-02-14 08:07:31.108963+00', '2019-02-14 08:07:31.108963+00', '2019-02-14 08:07:31.108963+00', '\x0a23736f2f6d616e792f69636f6e69632f70617468732f746f2f63686f6f73652f66726f6d120a0102030405060708090a');
INSERT INTO "graceful_exit_segment_transfer_queue" ("node_id", "stream_id", "position", "piece_num", "durability_ratio", "queued_at", "requested_at", "last_failed_at", "last_failed_code", "failed_count", "finished_at", "order_limit_send_count") VALUES (E'\\363\\342\\363\\371>+F\\256\\263\\300\\273|\\342N\\347\\016', E'\\363\\311\\033w\\222\\303Ci\\265\\343U\\303\\312\\204",'::bytea, 10 , 8, 1.0, '2019-09-12 10:07:31.028103+00', '2019-09-12 10:07:32.028103+00', null, null, 0, '2019-09-12 10:07:33.028103+00', 0);
INSERT INTO "segment_pending_audits" ("node_id", "piece_id", "stripe_index", "share_size", "expected_share_hash", "reverify_count", "stream_id", position) VALUES (E'\\153\\313\\233\\074\\327\\177\\136\\070\\346\\001'::bytea, E'\\363\\311\\033w\\222\\303Ci\\265\\343U\\303\\312\\204",'::bytea, 5, 1024, E'\\070\\127\\144\\013\\332\\344\\102\\376\\306\\056\\303\\130\\106\\132\\321\\276\\321\\274\\170\\264\\054\\333\\221\\116\\154\\221\\335\\070\\220\\146\\344\\216'::bytea, 1, '\x010101', 1);
INSERT INTO "users"("id", "full_name", "short_name", "email", "normalized_email", "password_hash", "status", "partner_id", "created_at", "is_professional", "project_limit", "paid_tier") VALUES (E'\\363\\311\\033w\\222\\303Ci\\266\\342U\\303\\312\\204",'::bytea, 'Noahson', 'William', '100email1@mail.test', '100EMAIL1@MAIL.TEST', E'some_readable_hash'::bytea, 1, NULL, '2019-02-14 08:28:24.614594+00', false, 10, true);
-- NEW DATA --
INSERT INTO "repair_queue" ("stream_id", "position", "attempted_at", "segment_health", "updated_at", "inserted_at") VALUES ('\x01', 1, null, 1, '2020-09-01 00:00:00.000000+00', '2021-09-01 00:00:00.000000+00');