// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
syntax = "proto3";
option go_package = "";
import "gogo.proto";
import "google/protobuf/timestamp.proto";
package repair;
// InjuredSegment is the queue item used for the data repair queue.
message InjuredSegment {
bytes path = 1;
repeated int32 lost_pieces = 2;
google.protobuf.Timestamp inserted_time = 3 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];

// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
//go:generate go run gen.go
// Package internalpb contains proto definitions for satellite internal tools.
package internalpb

// +build ignore
//go:generate go run gen.go
package main
import (
@ -18,7 +16,7 @@ import (
)

var (
var (
mainpkg = flag.String("pkg", "", "main package name")
mainpkg = flag.String("pkg", "", "main package name")
protoc = flag.String("protoc", "protoc", "protoc compiler")
@ -67,10 +65,10 @@ func main() {
commonPb := os.Getenv("STORJ_COMMON_PB")
if commonPb == "" {
commonPb = "../../common/pb"
commonPb = "../../../common/pb"
overrideImports := ",Mgoogle/protobuf/"
overrideImports := ",Mgoogle/protobuf/"
args := []string{
args := []string{
		"--drpc_out=plugins=drpc,paths=source_relative" + overrideImports + ":.",

@ -207,36 +207,36 @@ func init() {
func init() { proto.RegisterFile("metainfo_sat.proto", fileDescriptor_47c60bd892d94aaf) }
// See LICENSE for copying information.
syntax = "proto3";
syntax = "proto3";
option go_package = "";
package metainfo;

// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package internalpb
import (
proto ""
var scanError = errs.Class("Protobuf Scanner")
var valueError = errs.Class("Protobuf Valuer")
// scan automatically converts database []byte to proto.Messages.
func scan(msg proto.Message, value interface{}) error {
bytes, ok := value.([]byte)
if !ok {
return scanError.New("%t was %t, expected []bytes", msg, value)
return scanError.Wrap(pb.Unmarshal(bytes, msg))
// value automatically converts proto.Messages to database []byte.
func value(msg proto.Message) (driver.Value, error) {
value, err := pb.Marshal(msg)
return value, valueError.Wrap(err)
// Scan implements the Scanner interface.
func (n *InjuredSegment) Scan(value interface{}) error {
return scan(n, value)
// Value implements the driver Valuer interface.
func (n InjuredSegment) Value() (driver.Value, error) {
return value(&n)

@ -204,7 +205,7 @@ func (checker *Checker) updateIrreparableSegmentStatus(ctx context.Context, poin
// If the segment is suddenly entirely healthy again, we don't need to repair and we don't need to
// keep it in the irreparabledb queue either.
if numHealthy >= redundancy.MinReq && numHealthy <= repairThreshold && numHealthy < redundancy.SuccessThreshold {
_, err = checker.repairQueue.Insert(ctx, &pb.InjuredSegment{
_, err = checker.repairQueue.Insert(ctx, &internalpb.InjuredSegment{
Path: key,
LostPieces: missingPieces,
InsertedTime: time.Now().UTC(),
@ -308,7 +309,7 @@ func (obs *checkerObserver) RemoteSegment(ctx context.Context, segment *metainfo
// except for the case when the repair and success thresholds are the same (a case usually seen during testing)
if numHealthy >= required && numHealthy <= repairThreshold && numHealthy < successThreshold {
alreadyInserted, err := obs.repairQueue.Insert(ctx, &pb.InjuredSegment{
alreadyInserted, err := obs.repairQueue.Insert(ctx, &internalpb.InjuredSegment{
Path: key,
LostPieces: missingPieces,
InsertedTime: time.Now().UTC(),

// RepairQueue implements queueing for segments that need repairing.
@ -16,15 +16,15 @@ import (
// architecture: Database
type RepairQueue interface {
// Insert adds an injured segment.
Insert(ctx context.Context, s *pb.InjuredSegment, numHealthy int) (alreadyInserted bool, err error)
Insert(ctx context.Context, s *internalpb.InjuredSegment, numHealthy int) (alreadyInserted bool, err error)
// Select gets an injured segment.
Select(ctx context.Context) (*pb.InjuredSegment, error)
Select(ctx context.Context) (*internalpb.InjuredSegment, error)
// Delete removes an injured segment.
Delete(ctx context.Context, s *pb.InjuredSegment) error
Delete(ctx context.Context, s *internalpb.InjuredSegment) error
// Clean removes all segments last updated before a certain time
Clean(ctx context.Context, before time.Time) (deleted int64, err error)
// SelectN lists limit amount of injured segments.
SelectN(ctx context.Context, limit int) ([]pb.InjuredSegment, error)
SelectN(ctx context.Context, limit int) ([]internalpb.InjuredSegment, error)
// Count counts the number of segments in the repair queue.
Count(ctx context.Context) (count int, err error)

@ -29,7 +29,7 @@ func TestUntilEmpty(t *testing.T) {
pathsMap := make(map[string]int)
for i := 0; i < 20; i++ {
path := "/path/" + strconv.Itoa(i)
injuredSeg := &pb.InjuredSegment{Path: []byte(path)}
injuredSeg := &internalpb.InjuredSegment{Path: []byte(path)}
alreadyInserted, err := repairQueue.Insert(ctx, injuredSeg, 10)
require.NoError(t, err)
require.False(t, alreadyInserted)
@ -62,7 +62,7 @@ func TestOrder(t *testing.T) {
olderRepairPath := []byte("/path/older")
for _, path := range [][]byte{oldRepairPath, recentRepairPath, nullPath, olderRepairPath} {
injuredSeg := &pb.InjuredSegment{Path: path}
injuredSeg := &internalpb.InjuredSegment{Path: path}
alreadyInserted, err := repairQueue.Insert(ctx, injuredSeg, 10)
require.NoError(t, err)
require.False(t, alreadyInserted)
@ -157,7 +157,7 @@ func TestOrderHealthyPieces(t *testing.T) {
for _, item := range injuredSegList {
// first, insert the injured segment
injuredSeg := &pb.InjuredSegment{Path: item.path}
injuredSeg := &internalpb.InjuredSegment{Path: item.path}
alreadyInserted, err := repairQueue.Insert(ctx, injuredSeg,
require.NoError(t, err)
require.False(t, alreadyInserted)
@ -221,7 +221,7 @@ func TestOrderOverwrite(t *testing.T) {
{[]byte("path/a"), 8},
for i, item := range injuredSegList {
injuredSeg := &pb.InjuredSegment{Path: item.path}
injuredSeg := &internalpb.InjuredSegment{Path: item.path}
alreadyInserted, err := repairQueue.Insert(ctx, injuredSeg,
require.NoError(t, err)
if i == 2 {
@ -256,7 +256,7 @@ func TestCount(t *testing.T) {
numSegments := 20
for i := 0; i < numSegments; i++ {
path := "/path/" + strconv.Itoa(i)
injuredSeg := &pb.InjuredSegment{Path: []byte(path)}
injuredSeg := &internalpb.InjuredSegment{Path: []byte(path)}
alreadyInserted, err := repairQueue.Insert(ctx, injuredSeg, 10)
require.NoError(t, err)
require.False(t, alreadyInserted)

@ -23,7 +24,7 @@ func TestInsertSelect(t *testing.T) {
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
q := db.RepairQueue()
seg := &pb.InjuredSegment{
seg := &internalpb.InjuredSegment{
Path: []byte("abc"),
LostPieces: []int32{int32(1), int32(3)},
@ -42,7 +43,7 @@ func TestInsertDuplicate(t *testing.T) {
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
q := db.RepairQueue()
seg := &pb.InjuredSegment{
seg := &internalpb.InjuredSegment{
Path: []byte("abc"),
LostPieces: []int32{int32(1), int32(3)},
@ -70,9 +71,9 @@ func TestSequential(t *testing.T) {
q := db.RepairQueue()
const N = 20
var addSegs []*pb.InjuredSegment
var addSegs []*internalpb.InjuredSegment
for i := 0; i < N; i++ {
seg := &pb.InjuredSegment{
seg := &internalpb.InjuredSegment{
Path: []byte(strconv.Itoa(i)),
LostPieces: []int32{int32(i)},
@ -108,14 +109,14 @@ func TestParallel(t *testing.T) {
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
q := db.RepairQueue()
const N = 20
entries := make(chan *pb.InjuredSegment, N)
entries := make(chan *internalpb.InjuredSegment, N)
var inserts errs2.Group
// Add to queue concurrently
for i := 0; i < N; i++ {
i := i
inserts.Go(func() error {
_, err := q.Insert(ctx, &pb.InjuredSegment{
_, err := q.Insert(ctx, &internalpb.InjuredSegment{
Path: []byte(strconv.Itoa(i)),
LostPieces: []int32{int32(i)},
}, 10)
@ -146,7 +147,7 @@ func TestParallel(t *testing.T) {
require.Empty(t, remove.Wait(), "unexpected queue.Select/Delete errors")
var items []*pb.InjuredSegment
var items []*internalpb.InjuredSegment
for segment := range entries {
items = append(items, segment)
@ -166,15 +167,15 @@ func TestClean(t *testing.T) {
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
q := db.RepairQueue()
seg1 := &pb.InjuredSegment{
seg1 := &internalpb.InjuredSegment{
Path: []byte("seg1"),
LostPieces: []int32{int32(1), int32(3)},
seg2 := &pb.InjuredSegment{
seg2 := &internalpb.InjuredSegment{
Path: []byte("seg2"),
LostPieces: []int32{int32(1), int32(3)},
seg3 := &pb.InjuredSegment{
seg3 := &internalpb.InjuredSegment{
Path: []byte("seg3"),
LostPieces: []int32{int32(1), int32(3)},

@ -149,7 +150,7 @@ func (service *Service) process(ctx context.Context) (err error) {
	return nil
}
return nil
func (service *Service) worker(ctx context.Context, seg *pb.InjuredSegment) (err error) {
func (service *Service) worker(ctx context.Context, seg *internalpb.InjuredSegment) (err error) {
defer mon.Task()(&ctx)(&err)
workerStartTime := time.Now().UTC()

@ -24,7 +24,7 @@ type repairQueue struct {
	db *satelliteDB
}
db *satelliteDB
func (r *repairQueue) Insert(ctx context.Context, seg *pb.InjuredSegment, numHealthy int) (alreadyInserted bool, err error) {
func (r *repairQueue) Insert(ctx context.Context, seg *internalpb.InjuredSegment, numHealthy int) (alreadyInserted bool, err error) {
defer mon.Task()(&ctx)(&err)
// insert if not exists, or update healthy count if does exist
var query string
@ -77,7 +77,7 @@ func (r *repairQueue) Insert(ctx context.Context, seg *pb.InjuredSegment, numHea
	return alreadyInserted, rows.Err()
}
return alreadyInserted, rows.Err()
func (r *repairQueue) Select(ctx context.Context) (seg *pb.InjuredSegment, err error) {
func (r *repairQueue) Select(ctx context.Context) (seg *internalpb.InjuredSegment, err error) {
defer mon.Task()(&ctx)(&err)
switch r.db.implementation {
case dbutil.Cockroach:
@ -103,7 +103,7 @@ func (r *repairQueue) Select(ctx context.Context) (seg *pb.InjuredSegment, err e
	return seg, err
}
return seg, err
func (r *repairQueue) Delete(ctx context.Context, seg *pb.InjuredSegment) (err error) {
func (r *repairQueue) Delete(ctx context.Context, seg *internalpb.InjuredSegment) (err error) {
defer mon.Task()(&ctx)(&err)
_, err = r.db.ExecContext(ctx, r.db.Rebind(`DELETE FROM injuredsegments WHERE path = ?`), seg.Path)
return Error.Wrap(err)
@ -115,7 +115,7 @@ func (r *repairQueue) Clean(ctx context.Context, before time.Time) (deleted int6
	return n, Error.Wrap(err)
}
return n, Error.Wrap(err)
func (r *repairQueue) SelectN(ctx context.Context, limit int) (segs []pb.InjuredSegment, err error) {
func (r *repairQueue) SelectN(ctx context.Context, limit int) (segs []internalpb.InjuredSegment, err error) {
defer mon.Task()(&ctx)(&err)
if limit <= 0 || limit > RepairQueueSelectLimit {
limit = RepairQueueSelectLimit
@ -128,7 +128,7 @@ func (r *repairQueue) SelectN(ctx context.Context, limit int) (segs []pb.Injured
defer func() { err = errs.Combine(err, rows.Close()) }()
for rows.Next() {
var seg pb.InjuredSegment
var seg internalpb.InjuredSegment
err = rows.Scan(&seg)
if err != nil {
return segs, Error.Wrap(err)