segment/{metabase,repair}: add dedicated methods on metabase.Pieces

This change adds dedicated methods on metabase.Pieces to be able to add, remove pieces and also to check duplicates.

Change-Id: I21aaeff40c017c2ebe1cc85a864ae546754769cc
This commit is contained in:
Clement Sam 2021-07-28 06:43:24 +00:00
parent 80854ea921
commit f06e7c5f60
4 changed files with 512 additions and 80 deletions

View File

@ -7,7 +7,6 @@ import (
"context"
"database/sql"
"io"
"sort"
"sync"
"time"
@ -969,18 +968,14 @@ func (endpoint *Endpoint) GracefulExitFeasibility(ctx context.Context, req *pb.G
func (endpoint *Endpoint) UpdatePiecesCheckDuplicates(ctx context.Context, segment metabase.Segment, toAdd, toRemove metabase.Pieces, checkDuplicates bool) (err error) {
defer mon.Task()(&ctx)(&err)
// put all existing pieces to a map
pieceMap := make(map[uint16]metabase.Piece)
nodePieceMap := make(map[storj.NodeID]struct{})
for _, piece := range segment.Pieces {
pieceMap[piece.Number] = piece
if checkDuplicates {
nodePieceMap[piece.StorageNode] = struct{}{}
}
}
// Return an error if the segment already has a piece for this node
if checkDuplicates {
// put all existing pieces to a map
nodePieceMap := make(map[storj.NodeID]struct{})
for _, piece := range segment.Pieces {
nodePieceMap[piece.StorageNode] = struct{}{}
}
for _, piece := range toAdd {
_, ok := nodePieceMap[piece.StorageNode]
if ok {
@ -989,37 +984,12 @@ func (endpoint *Endpoint) UpdatePiecesCheckDuplicates(ctx context.Context, segme
nodePieceMap[piece.StorageNode] = struct{}{}
}
}
// remove the toRemove pieces from the map
// only if all piece number, node id
for _, piece := range toRemove {
if piece == (metabase.Piece{}) {
continue
}
existing := pieceMap[piece.Number]
if existing != (metabase.Piece{}) && existing.StorageNode == piece.StorageNode {
delete(pieceMap, piece.Number)
}
}
// add the toAdd pieces to the map
for _, piece := range toAdd {
if piece == (metabase.Piece{}) {
continue
}
_, exists := pieceMap[piece.Number]
if exists {
return Error.New("piece to add already exists (piece no: %d)", piece.Number)
}
pieceMap[piece.Number] = piece
pieces, err := segment.Pieces.Update(toAdd, toRemove)
if err != nil {
return Error.Wrap(err)
}
// copy the pieces from the map back to the segment, sorted by piece number
pieces := make(metabase.Pieces, 0, len(pieceMap))
for _, piece := range pieceMap {
pieces = append(pieces, piece)
}
sort.Sort(pieces)
err = endpoint.metabase.UpdateSegmentPieces(ctx, metabase.UpdateSegmentPieces{
StreamID: segment.StreamID,
Position: segment.Position,

View File

@ -364,3 +364,63 @@ func (p Pieces) Less(i, j int) bool { return p[i].Number < p[j].Number }
// Swap swaps the pieces with indexes i and j.
func (p Pieces) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
// Add adds the specified pieces and returns the updated Pieces.
func (p Pieces) Add(piecesToAdd Pieces) (Pieces, error) {
return p.Update(piecesToAdd, nil)
}
// Remove removes the specified pieces from the original pieces
// and returns the updated Pieces.
func (p Pieces) Remove(piecesToRemove Pieces) (Pieces, error) {
if len(p) == 0 {
return Pieces{}, ErrInvalidRequest.New("pieces missing")
}
return p.Update(nil, piecesToRemove)
}
// Update adds piecesToAdd pieces and removes piecesToRemove pieces from
// the original pieces struct and returns the updated Pieces.
//
// It removes the piecesToRemove only if all piece number, node id match.
//
// When adding a piece, it checks if the piece already exists using the piece Number
// If a piece already exists, it returns an empty pieces struct and an error.
func (p Pieces) Update(piecesToAdd, piecesToRemove Pieces) (Pieces, error) {
pieceMap := make(map[uint16]Piece)
for _, piece := range p {
pieceMap[piece.Number] = piece
}
// remove the piecesToRemove from the map
// only if all piece number, node id match
for _, piece := range piecesToRemove {
if piece == (Piece{}) {
continue
}
existing := pieceMap[piece.Number]
if existing != (Piece{}) && existing.StorageNode == piece.StorageNode {
delete(pieceMap, piece.Number)
}
}
// add the piecesToAdd to the map
for _, piece := range piecesToAdd {
if piece == (Piece{}) {
continue
}
_, exists := pieceMap[piece.Number]
if exists {
return Pieces{}, Error.New("piece to add already exists (piece no: %d)", piece.Number)
}
pieceMap[piece.Number] = piece
}
newPieces := make(Pieces, 0, len(pieceMap))
for _, piece := range pieceMap {
newPieces = append(newPieces, piece)
}
sort.Sort(newPieces)
return newPieces, nil
}

View File

@ -274,3 +274,445 @@ func TestPiecesEqual(t *testing.T) {
require.Equal(t, tt.equal, tt.source.Equal(tt.target))
}
}
func TestPiecesAdd(t *testing.T) {
node0 := testrand.NodeID()
node1 := testrand.NodeID()
node2 := testrand.NodeID()
node3 := testrand.NodeID()
tests := []struct {
name string
pieces metabase.Pieces
piecesToAdd metabase.Pieces
want metabase.Pieces
wantErr string
}{
{
name: "piece exists",
pieces: metabase.Pieces{
metabase.Piece{
Number: 0,
StorageNode: node0,
},
metabase.Piece{
Number: 1,
StorageNode: node1,
},
},
piecesToAdd: metabase.Pieces{
metabase.Piece{
Number: 1,
StorageNode: node1,
},
},
wantErr: "metabase: piece to add already exists (piece no: 1)",
want: metabase.Pieces{},
},
{
name: "pieces added",
pieces: metabase.Pieces{
metabase.Piece{
Number: 0,
StorageNode: node0,
},
metabase.Piece{
Number: 3,
StorageNode: node3,
},
},
piecesToAdd: metabase.Pieces{
metabase.Piece{
Number: 2,
StorageNode: node2,
},
metabase.Piece{
Number: 1,
StorageNode: node1,
},
},
wantErr: "",
want: metabase.Pieces{
metabase.Piece{
Number: 0,
StorageNode: node0,
},
metabase.Piece{
Number: 1,
StorageNode: node1,
},
metabase.Piece{
Number: 2,
StorageNode: node2,
},
metabase.Piece{
Number: 3,
StorageNode: node3,
},
},
},
{
name: "adding new pieces to empty piece",
pieces: metabase.Pieces{},
piecesToAdd: metabase.Pieces{
metabase.Piece{
Number: 1,
StorageNode: node1,
},
metabase.Piece{
Number: 0,
StorageNode: node0,
},
},
wantErr: "",
want: metabase.Pieces{
metabase.Piece{
Number: 0,
StorageNode: node0,
},
metabase.Piece{
Number: 1,
StorageNode: node1,
},
},
},
{
name: "adding empty piece",
pieces: metabase.Pieces{
metabase.Piece{
Number: 0,
StorageNode: node0,
},
metabase.Piece{
Number: 1,
StorageNode: node1,
},
},
piecesToAdd: metabase.Pieces{},
wantErr: "",
want: metabase.Pieces{
metabase.Piece{
Number: 0,
StorageNode: node0,
},
metabase.Piece{
Number: 1,
StorageNode: node1,
},
},
},
{
name: "adding empty piece to empty pieces",
pieces: metabase.Pieces{},
piecesToAdd: metabase.Pieces{},
wantErr: "",
want: metabase.Pieces{},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
require.NotNil(t, tt.pieces, tt.name)
got, err := tt.pieces.Add(tt.piecesToAdd)
if tt.wantErr != "" {
require.EqualError(t, err, tt.wantErr, tt.name)
} else {
require.NoError(t, err, tt.name)
}
require.Equal(t, got, tt.want, tt.name)
})
}
}
func TestPiecesRemove(t *testing.T) {
node0 := testrand.NodeID()
node1 := testrand.NodeID()
node2 := testrand.NodeID()
node3 := testrand.NodeID()
tests := []struct {
name string
pieces metabase.Pieces
piecesToRemove metabase.Pieces
want metabase.Pieces
wantErr string
}{
{
name: "piece missing",
pieces: metabase.Pieces{},
piecesToRemove: metabase.Pieces{
metabase.Piece{
Number: 1,
StorageNode: node1,
},
},
wantErr: "metabase: invalid request: pieces missing",
want: metabase.Pieces{},
},
{
name: "piecesToRemove struct is empty",
pieces: metabase.Pieces{
metabase.Piece{
Number: 1,
StorageNode: node1,
},
},
piecesToRemove: metabase.Pieces{},
wantErr: "",
want: metabase.Pieces{
metabase.Piece{
Number: 1,
StorageNode: node1,
},
},
},
{
name: "both pieces and piecesToRemove struct are empty",
pieces: metabase.Pieces{},
piecesToRemove: metabase.Pieces{},
wantErr: "metabase: invalid request: pieces missing",
want: metabase.Pieces{},
},
{
name: "pieces removed",
pieces: metabase.Pieces{
metabase.Piece{
Number: 0,
StorageNode: node0,
},
metabase.Piece{
Number: 1,
StorageNode: node1,
},
metabase.Piece{
Number: 2,
StorageNode: node2,
},
metabase.Piece{
Number: 3,
StorageNode: node3,
},
},
piecesToRemove: metabase.Pieces{
metabase.Piece{
Number: 2,
StorageNode: node2,
},
metabase.Piece{
Number: 1,
StorageNode: node1,
},
},
wantErr: "",
want: metabase.Pieces{
metabase.Piece{
Number: 0,
StorageNode: node0,
},
metabase.Piece{
Number: 3,
StorageNode: node3,
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
require.NotNil(t, tt.pieces, tt.name)
got, err := tt.pieces.Remove(tt.piecesToRemove)
if tt.wantErr != "" {
require.EqualError(t, err, tt.wantErr, tt.name)
} else {
require.NoError(t, err, tt.name)
}
require.Equal(t, got, tt.want, tt.name)
})
}
}
func TestPiecesUpdate(t *testing.T) {
node0 := testrand.NodeID()
node1 := testrand.NodeID()
node2 := testrand.NodeID()
node3 := testrand.NodeID()
tests := []struct {
name string
pieces metabase.Pieces
piecesToAdd metabase.Pieces
piecesToRemove metabase.Pieces
want metabase.Pieces
wantErr string
}{
{
name: "add and remove pieces",
pieces: metabase.Pieces{
metabase.Piece{
Number: 0,
StorageNode: node0,
},
metabase.Piece{
Number: 1,
StorageNode: node1,
},
metabase.Piece{
Number: 2,
StorageNode: node2,
},
},
piecesToRemove: metabase.Pieces{
metabase.Piece{
Number: 0,
StorageNode: node0,
},
},
piecesToAdd: metabase.Pieces{
metabase.Piece{
Number: 3,
StorageNode: node3,
},
},
wantErr: "",
want: metabase.Pieces{
metabase.Piece{
Number: 1,
StorageNode: node1,
},
metabase.Piece{
Number: 2,
StorageNode: node2,
},
metabase.Piece{
Number: 3,
StorageNode: node3,
},
},
},
{
name: "add pieces only",
pieces: metabase.Pieces{
metabase.Piece{
Number: 1,
StorageNode: node1,
},
metabase.Piece{
Number: 2,
StorageNode: node2,
},
},
piecesToRemove: metabase.Pieces{},
piecesToAdd: metabase.Pieces{
metabase.Piece{
Number: 0,
StorageNode: node0,
},
},
wantErr: "",
want: metabase.Pieces{
metabase.Piece{
Number: 0,
StorageNode: node0,
},
metabase.Piece{
Number: 1,
StorageNode: node1,
},
metabase.Piece{
Number: 2,
StorageNode: node2,
},
},
},
{
name: "remove pieces only",
pieces: metabase.Pieces{
metabase.Piece{
Number: 1,
StorageNode: node1,
},
metabase.Piece{
Number: 2,
StorageNode: node2,
},
},
piecesToRemove: metabase.Pieces{
metabase.Piece{
Number: 2,
StorageNode: node2,
},
},
piecesToAdd: metabase.Pieces{},
wantErr: "",
want: metabase.Pieces{
metabase.Piece{
Number: 1,
StorageNode: node1,
},
},
},
{
name: "both piecesToAdd and piecesToRemove are empty",
pieces: metabase.Pieces{
metabase.Piece{
Number: 1,
StorageNode: node1,
},
metabase.Piece{
Number: 2,
StorageNode: node2,
},
},
piecesToRemove: metabase.Pieces{},
piecesToAdd: metabase.Pieces{},
wantErr: "",
want: metabase.Pieces{
metabase.Piece{
Number: 1,
StorageNode: node1,
},
metabase.Piece{
Number: 2,
StorageNode: node2,
},
},
},
{
name: "updating empty pieces",
pieces: metabase.Pieces{},
piecesToRemove: metabase.Pieces{
metabase.Piece{
Number: 1,
StorageNode: node1,
},
},
piecesToAdd: metabase.Pieces{
metabase.Piece{
Number: 0,
StorageNode: node1,
},
},
wantErr: "",
want: metabase.Pieces{
metabase.Piece{
Number: 0,
StorageNode: node1,
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
require.NotNil(t, tt.pieces, tt.name)
got, err := tt.pieces.Update(tt.piecesToAdd, tt.piecesToRemove)
if tt.wantErr != "" {
require.EqualError(t, err, tt.wantErr, tt.name)
} else {
require.NoError(t, err, tt.name)
}
require.Equal(t, got, tt.want, tt.name)
})
}
}

View File

@ -8,7 +8,6 @@ import (
"errors"
"fmt"
"math"
"sort"
"time"
"github.com/zeebo/errs"
@ -383,7 +382,7 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue
// add pieces that failed piece hashes verification to the removal list
toRemove = append(toRemove, failedPieces...)
newPieces, err := updatePieces(segment.Pieces, repairedPieces, toRemove)
newPieces, err := segment.Pieces.Update(repairedPieces, toRemove)
if err != nil {
return false, repairPutError.Wrap(err)
}
@ -430,45 +429,6 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue
return true, nil
}
func updatePieces(orignalPieces, toAddPieces, toRemovePieces metabase.Pieces) (metabase.Pieces, error) {
pieceMap := make(map[uint16]metabase.Piece)
for _, piece := range orignalPieces {
pieceMap[piece.Number] = piece
}
// remove the toRemove pieces from the map
// only if all piece number, node id match
for _, piece := range toRemovePieces {
if piece == (metabase.Piece{}) {
continue
}
existing := pieceMap[piece.Number]
if existing != (metabase.Piece{}) && existing.StorageNode == piece.StorageNode {
delete(pieceMap, piece.Number)
}
}
// add the pieces to the map
for _, piece := range toAddPieces {
if piece == (metabase.Piece{}) {
continue
}
_, exists := pieceMap[piece.Number]
if exists {
return metabase.Pieces{}, Error.New("piece to add already exists (piece no: %d)", piece.Number)
}
pieceMap[piece.Number] = piece
}
newPieces := make(metabase.Pieces, 0, len(pieceMap))
for _, piece := range pieceMap {
newPieces = append(newPieces, piece)
}
sort.Sort(newPieces)
return newPieces, nil
}
func (repairer *SegmentRepairer) getStatsByRS(redundancy *pb.RedundancyScheme) *stats {
rsString := getRSString(repairer.loadRedundancy(redundancy))
return repairer.statsCollector.getStatsByRS(rsString)