Use Nodes array from pdb.Get (#578)

* Use Nodes array from pdb.Get

* fix problems with captplanet tests

* better comments
This commit is contained in:
Michal Niewrzal 2018-11-06 18:03:11 +01:00 committed by GitHub
parent de46a999bc
commit deb015970d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 46 additions and 34 deletions

View File

@ -83,7 +83,7 @@ func main() {
} }
// Example Get // Example Get
getRes, err := client.Get(ctx, path) getRes, _, err := client.Get(ctx, path)
if err != nil { if err != nil {
logger.Error("couldn't GET pointer from db", zap.Error(err)) logger.Error("couldn't GET pointer from db", zap.Error(err))

View File

@ -72,7 +72,7 @@ func (cursor *Cursor) NextStripe(ctx context.Context) (stripe *Stripe, err error
} }
// get pointer info // get pointer info
pointer, err := cursor.pointers.Get(ctx, path) pointer, _, err := cursor.pointers.Get(ctx, path)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -47,7 +47,7 @@ type ListItem struct {
// Client services offerred for the interface // Client services offerred for the interface
type Client interface { type Client interface {
Put(ctx context.Context, path storj.Path, pointer *pb.Pointer) error Put(ctx context.Context, path storj.Path, pointer *pb.Pointer) error
Get(ctx context.Context, path storj.Path) (*pb.Pointer, error) Get(ctx context.Context, path storj.Path) (*pb.Pointer, []*pb.Node, error)
List(ctx context.Context, prefix, startAfter, endBefore storj.Path, recursive bool, limit int, metaFlags uint32) (items []ListItem, more bool, err error) List(ctx context.Context, prefix, startAfter, endBefore storj.Path, recursive bool, limit int, metaFlags uint32) (items []ListItem, more bool, err error)
Delete(ctx context.Context, path storj.Path) error Delete(ctx context.Context, path storj.Path) error
@ -96,21 +96,21 @@ func (pdb *PointerDB) Put(ctx context.Context, path storj.Path, pointer *pb.Poin
} }
// Get is the interface to make a GET request, needs PATH and APIKey // Get is the interface to make a GET request, needs PATH and APIKey
func (pdb *PointerDB) Get(ctx context.Context, path storj.Path) (pointer *pb.Pointer, err error) { func (pdb *PointerDB) Get(ctx context.Context, path storj.Path) (pointer *pb.Pointer, nodes []*pb.Node, err error) {
defer mon.Task()(&ctx)(&err) defer mon.Task()(&ctx)(&err)
res, err := pdb.grpcClient.Get(ctx, &pb.GetRequest{Path: path}) res, err := pdb.grpcClient.Get(ctx, &pb.GetRequest{Path: path})
if err != nil { if err != nil {
if status.Code(err) == codes.NotFound { if status.Code(err) == codes.NotFound {
return nil, storage.ErrKeyNotFound.Wrap(err) return nil, nil, storage.ErrKeyNotFound.Wrap(err)
} }
return nil, Error.Wrap(err) return nil, nil, Error.Wrap(err)
} }
pdb.pba = res.GetPba() pdb.pba = res.GetPba()
pdb.authorization = res.GetAuthorization() pdb.authorization = res.GetAuthorization()
return res.GetPointer(), nil return res.GetPointer(), res.GetNodes(), nil
} }
// List is the interface to make a LIST request, needs StartingPathKey, Limit, and APIKey // List is the interface to make a LIST request, needs StartingPathKey, Limit, and APIKey

View File

@ -144,7 +144,7 @@ func TestGet(t *testing.T) {
err = proto.Unmarshal(byteData, ptr) err = proto.Unmarshal(byteData, ptr)
assert.NoError(t, err) assert.NoError(t, err)
getResponse := pb.GetResponse{Pointer: ptr} getResponse := pb.GetResponse{Pointer: ptr, Nodes: []*pb.Node{}}
errTag := fmt.Sprintf("Test case #%d", i) errTag := fmt.Sprintf("Test case #%d", i)
@ -153,13 +153,15 @@ func TestGet(t *testing.T) {
gc.EXPECT().Get(gomock.Any(), &getRequest).Return(&getResponse, tt.err) gc.EXPECT().Get(gomock.Any(), &getRequest).Return(&getResponse, tt.err)
pointer, err := pdb.Get(ctx, tt.path) pointer, nodes, err := pdb.Get(ctx, tt.path)
if err != nil { if err != nil {
assert.True(t, strings.Contains(err.Error(), tt.errString), errTag) assert.True(t, strings.Contains(err.Error(), tt.errString), errTag)
assert.Nil(t, pointer) assert.Nil(t, pointer)
assert.Nil(t, nodes)
} else { } else {
assert.NotNil(t, pointer) assert.NotNil(t, pointer)
assert.NotNil(t, nodes)
assert.NoError(t, err, errTag) assert.NoError(t, err, errTag)
} }
} }

View File

@ -48,11 +48,12 @@ func (mr *MockClientMockRecorder) Delete(arg0, arg1 interface{}) *gomock.Call {
} }
// Get mocks base method // Get mocks base method
func (m *MockClient) Get(arg0 context.Context, arg1 string) (*pb.Pointer, error) { func (m *MockClient) Get(arg0 context.Context, arg1 string) (*pb.Pointer, []*pb.Node, error) {
ret := m.ctrl.Call(m, "Get", arg0, arg1) ret := m.ctrl.Call(m, "Get", arg0, arg1)
ret0, _ := ret[0].(*pb.Pointer) ret0, _ := ret[0].(*pb.Pointer)
ret1, _ := ret[1].(error) ret1, _ := ret[1].([]*pb.Node)
return ret0, ret1 ret2, _ := ret[2].(error)
return ret0, ret1, ret2
} }
// Get indicates an expected call of Get // Get indicates an expected call of Get

View File

@ -13,7 +13,6 @@ import (
"github.com/vivint/infectious" "github.com/vivint/infectious"
"go.uber.org/zap" "go.uber.org/zap"
monkit "gopkg.in/spacemonkeygo/monkit.v2" monkit "gopkg.in/spacemonkeygo/monkit.v2"
"storj.io/storj/pkg/dht" "storj.io/storj/pkg/dht"
"storj.io/storj/pkg/eestream" "storj.io/storj/pkg/eestream"
"storj.io/storj/pkg/node" "storj.io/storj/pkg/node"
@ -73,7 +72,7 @@ func NewSegmentStore(oc overlay.Client, ec ecclient.Client, pdb pdbclient.Client
func (s *segmentStore) Meta(ctx context.Context, path storj.Path) (meta Meta, err error) { func (s *segmentStore) Meta(ctx context.Context, path storj.Path) (meta Meta, err error) {
defer mon.Task()(&ctx)(&err) defer mon.Task()(&ctx)(&err)
pr, err := s.pdb.Get(ctx, path) pr, _, err := s.pdb.Get(ctx, path)
if err != nil { if err != nil {
return Meta{}, Error.Wrap(err) return Meta{}, Error.Wrap(err)
} }
@ -193,7 +192,7 @@ func (s *segmentStore) makeRemotePointer(nodes []*pb.Node, pieceID client.PieceI
func (s *segmentStore) Get(ctx context.Context, path storj.Path) (rr ranger.Ranger, meta Meta, err error) { func (s *segmentStore) Get(ctx context.Context, path storj.Path) (rr ranger.Ranger, meta Meta, err error) {
defer mon.Task()(&ctx)(&err) defer mon.Task()(&ctx)(&err)
pr, err := s.pdb.Get(ctx, path) pr, nodes, err := s.pdb.Get(ctx, path)
if err != nil { if err != nil {
return nil, Meta{}, Error.Wrap(err) return nil, Meta{}, Error.Wrap(err)
} }
@ -201,9 +200,13 @@ func (s *segmentStore) Get(ctx context.Context, path storj.Path) (rr ranger.Rang
if pr.GetType() == pb.Pointer_REMOTE { if pr.GetType() == pb.Pointer_REMOTE {
seg := pr.GetRemote() seg := pr.GetRemote()
pid := client.PieceID(seg.GetPieceId()) pid := client.PieceID(seg.GetPieceId())
nodes, err := s.lookupNodes(ctx, seg)
if err != nil { // fall back if nodes are not available
return nil, Meta{}, Error.Wrap(err) if nodes == nil {
nodes, err = s.lookupNodes(ctx, seg)
if err != nil {
return nil, Meta{}, Error.Wrap(err)
}
} }
es, err := makeErasureScheme(pr.GetRemote().GetRedundancy()) es, err := makeErasureScheme(pr.GetRemote().GetRedundancy())
@ -251,7 +254,7 @@ func makeErasureScheme(rs *pb.RedundancyScheme) (eestream.ErasureScheme, error)
func (s *segmentStore) Delete(ctx context.Context, path storj.Path) (err error) { func (s *segmentStore) Delete(ctx context.Context, path storj.Path) (err error) {
defer mon.Task()(&ctx)(&err) defer mon.Task()(&ctx)(&err)
pr, err := s.pdb.Get(ctx, path) pr, nodes, err := s.pdb.Get(ctx, path)
if err != nil { if err != nil {
return Error.Wrap(err) return Error.Wrap(err)
} }
@ -259,9 +262,13 @@ func (s *segmentStore) Delete(ctx context.Context, path storj.Path) (err error)
if pr.GetType() == pb.Pointer_REMOTE { if pr.GetType() == pb.Pointer_REMOTE {
seg := pr.GetRemote() seg := pr.GetRemote()
pid := client.PieceID(seg.PieceId) pid := client.PieceID(seg.PieceId)
nodes, err := s.lookupNodes(ctx, seg)
if err != nil { // fall back if nodes are not available
return Error.Wrap(err) if nodes == nil {
nodes, err = s.lookupNodes(ctx, seg)
if err != nil {
return Error.Wrap(err)
}
} }
authorization := s.pdb.SignedMessage() authorization := s.pdb.SignedMessage()
@ -281,7 +288,7 @@ func (s *segmentStore) Repair(ctx context.Context, path storj.Path, lostPieces [
defer mon.Task()(&ctx)(&err) defer mon.Task()(&ctx)(&err)
//Read the segment's pointer's info from the PointerDB //Read the segment's pointer's info from the PointerDB
pr, err := s.pdb.Get(ctx, path) pr, originalNodes, err := s.pdb.Get(ctx, path)
if err != nil { if err != nil {
return Error.Wrap(err) return Error.Wrap(err)
} }
@ -293,10 +300,13 @@ func (s *segmentStore) Repair(ctx context.Context, path storj.Path, lostPieces [
seg := pr.GetRemote() seg := pr.GetRemote()
pid := client.PieceID(seg.GetPieceId()) pid := client.PieceID(seg.GetPieceId())
// Get the list of remote pieces from the pointer // fall back if nodes are not available
originalNodes, err := s.lookupNodes(ctx, seg) if originalNodes == nil {
if err != nil { // Get the list of remote pieces from the pointer
return Error.Wrap(err) originalNodes, err = s.lookupNodes(ctx, seg)
if err != nil {
return Error.Wrap(err)
}
} }
// get the nodes list that needs to be excluded // get the nodes list that needs to be excluded

View File

@ -12,7 +12,6 @@ import (
"github.com/golang/mock/gomock" "github.com/golang/mock/gomock"
"github.com/golang/protobuf/ptypes" "github.com/golang/protobuf/ptypes"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"storj.io/storj/pkg/eestream" "storj.io/storj/pkg/eestream"
mock_eestream "storj.io/storj/pkg/eestream/mocks" mock_eestream "storj.io/storj/pkg/eestream/mocks"
mock_overlay "storj.io/storj/pkg/overlay/mocks" mock_overlay "storj.io/storj/pkg/overlay/mocks"
@ -72,7 +71,7 @@ func TestSegmentStoreMeta(t *testing.T) {
calls := []*gomock.Call{ calls := []*gomock.Call{
mockPDB.EXPECT().Get( mockPDB.EXPECT().Get(
gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(),
).Return(tt.returnPointer, nil), ).Return(tt.returnPointer, nil, nil),
} }
gomock.InOrder(calls...) gomock.InOrder(calls...)
@ -219,7 +218,7 @@ func TestSegmentStoreGetInline(t *testing.T) {
ExpirationDate: someTime, ExpirationDate: someTime,
Size: tt.size, Size: tt.size,
Metadata: tt.metadata, Metadata: tt.metadata,
}, nil), }, nil, nil),
} }
gomock.InOrder(calls...) gomock.InOrder(calls...)
@ -282,7 +281,7 @@ func TestSegmentStoreRepairRemote(t *testing.T) {
ExpirationDate: someTime, ExpirationDate: someTime,
Size: tt.size, Size: tt.size,
Metadata: tt.metadata, Metadata: tt.metadata,
}, nil), }, nil, nil),
mockOC.EXPECT().BulkLookup(gomock.Any(), gomock.Any()), mockOC.EXPECT().BulkLookup(gomock.Any(), gomock.Any()),
mockOC.EXPECT().Choose(gomock.Any(), gomock.Any()).Return(tt.newNodes, nil), mockOC.EXPECT().Choose(gomock.Any(), gomock.Any()).Return(tt.newNodes, nil),
mockPDB.EXPECT().SignedMessage(), mockPDB.EXPECT().SignedMessage(),
@ -355,7 +354,7 @@ func TestSegmentStoreGetRemote(t *testing.T) {
ExpirationDate: someTime, ExpirationDate: someTime,
Size: tt.size, Size: tt.size,
Metadata: tt.metadata, Metadata: tt.metadata,
}, nil), }, nil, nil),
mockOC.EXPECT().BulkLookup(gomock.Any(), gomock.Any()), mockOC.EXPECT().BulkLookup(gomock.Any(), gomock.Any()),
mockPDB.EXPECT().SignedMessage(), mockPDB.EXPECT().SignedMessage(),
mockPDB.EXPECT().PayerBandwidthAllocation(), mockPDB.EXPECT().PayerBandwidthAllocation(),
@ -409,7 +408,7 @@ func TestSegmentStoreDeleteInline(t *testing.T) {
ExpirationDate: someTime, ExpirationDate: someTime,
Size: tt.size, Size: tt.size,
Metadata: tt.metadata, Metadata: tt.metadata,
}, nil), }, nil, nil),
mockPDB.EXPECT().Delete( mockPDB.EXPECT().Delete(
gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(),
), ),
@ -469,7 +468,7 @@ func TestSegmentStoreDeleteRemote(t *testing.T) {
ExpirationDate: someTime, ExpirationDate: someTime,
Size: tt.size, Size: tt.size,
Metadata: tt.metadata, Metadata: tt.metadata,
}, nil), }, nil, nil),
mockOC.EXPECT().BulkLookup(gomock.Any(), gomock.Any()), mockOC.EXPECT().BulkLookup(gomock.Any(), gomock.Any()),
mockPDB.EXPECT().SignedMessage(), mockPDB.EXPECT().SignedMessage(),
mockEC.EXPECT().Delete( mockEC.EXPECT().Delete(