diff --git a/pkg/piecestore/rpc/client/client.go b/pkg/piecestore/rpc/client/client.go index 0cb0f4132..cf886b638 100644 --- a/pkg/piecestore/rpc/client/client.go +++ b/pkg/piecestore/rpc/client/client.go @@ -4,13 +4,11 @@ package client import ( - "crypto/rand" "fmt" "io" "log" "time" - "github.com/mr-tron/base58/base58" "golang.org/x/net/context" "google.golang.org/grpc" @@ -26,14 +24,6 @@ type PSClient interface { CloseConn() error } -// PieceID - Id for piece -type PieceID string - -// String -- Get String from PieceID -func (id PieceID) String() string { - return string(id) -} - // Client -- Struct Info needed for protobuf api calls type Client struct { route pb.PieceStoreRoutesClient @@ -95,19 +85,3 @@ func (client *Client) Delete(ctx context.Context, id PieceID) error { log.Printf("Route summary : %v", reply) return nil } - -func (id PieceID) IsValid() bool { - return len(id) >= 20 -} - -// NewPieceID creates a PieceID -func NewPieceID() PieceID { - b := make([]byte, 32) - - _, err := rand.Read(b) - if err != nil { - panic(err) - } - - return PieceID(base58.Encode(b)) -} diff --git a/pkg/piecestore/rpc/client/pieceid.go b/pkg/piecestore/rpc/client/pieceid.go new file mode 100644 index 000000000..c4b4f03cd --- /dev/null +++ b/pkg/piecestore/rpc/client/pieceid.go @@ -0,0 +1,49 @@ +// Copyright (C) 2018 Storj Labs, Inc. +// See LICENSE for copying information. + +package client + +import ( + "crypto/hmac" + "crypto/rand" + "crypto/sha512" + + "github.com/mr-tron/base58/base58" +) + +// PieceID is the unique identifier for pieces +type PieceID string + +// NewPieceID creates a PieceID +func NewPieceID() PieceID { + b := make([]byte, 32) + + _, err := rand.Read(b) + if err != nil { + panic(err) + } + + return PieceID(base58.Encode(b)) +} + +// String representation of the PieceID +func (id PieceID) String() string { + return string(id) +} + +// IsValid checks if the current PieceID is valid +func (id PieceID) IsValid() bool { + return len(id) >= 20 +} + +// Derive a new PieceID from the current PieceID and the given secret +func (id PieceID) Derive(secret []byte) PieceID { + mac := hmac.New(sha512.New, secret) + mac.Write([]byte(id)) + h := mac.Sum(nil) + // Trim the hash if greater than 32 bytes + if len(h) > 32 { + h = h[:32] + } + return PieceID(base58.Encode(h)) +} diff --git a/pkg/piecestore/rpc/client/client_test.go b/pkg/piecestore/rpc/client/pieceid_test.go similarity index 57% rename from pkg/piecestore/rpc/client/client_test.go rename to pkg/piecestore/rpc/client/pieceid_test.go index 4a32c2858..02b967ae3 100644 --- a/pkg/piecestore/rpc/client/client_test.go +++ b/pkg/piecestore/rpc/client/pieceid_test.go @@ -6,7 +6,10 @@ package client import ( "testing" + "github.com/mr-tron/base58/base58" "github.com/stretchr/testify/assert" + + "storj.io/storj/pkg/kademlia" ) func TestNewPieceID(t *testing.T) { @@ -22,6 +25,17 @@ func TestNewPieceID(t *testing.T) { }) } -func TestMain(m *testing.M) { - m.Run() +func TestDerivePieceID(t *testing.T) { + pid := NewPieceID() + nid, err := kademlia.NewID() + assert.NoError(t, err) + + did := pid.Derive(nid.Bytes()) + assert.NotEqual(t, pid, did) + + did2 := pid.Derive(nid.Bytes()) + assert.Equal(t, did, did2) + + _, err = base58.Decode(did.String()) + assert.NoError(t, err) } diff --git a/pkg/storage/ec/client.go b/pkg/storage/ec/client.go index ffd62c3cd..fe6801a8a 100644 --- a/pkg/storage/ec/client.go +++ b/pkg/storage/ec/client.go @@ -69,18 +69,19 @@ func (ec *ecClient) Put(ctx context.Context, nodes []proto.Node, rs eestream.Red errs := make(chan error, len(readers)) for i, n := range nodes { go func(i int, n proto.Node) { + derivedPieceID := pieceID.Derive([]byte(n.GetId())) ps, err := ec.d.dial(ctx, n) if err != nil { - zap.S().Errorf("Failed putting piece %s to node %s: %v", - pieceID, n.GetId(), err) + zap.S().Errorf("Failed putting piece %s -> %s to node %s: %v", + pieceID, derivedPieceID, n.GetId(), err) errs <- err return } - err = ps.Put(ctx, pieceID, readers[i], expiration) + err = ps.Put(ctx, derivedPieceID, readers[i], expiration) ps.CloseConn() if err != nil { - zap.S().Errorf("Failed putting piece %s to node %s: %v", - pieceID, n.GetId(), err) + zap.S().Errorf("Failed putting piece %s -> %s to node %s: %v", + pieceID, derivedPieceID, n.GetId(), err) } errs <- err }(i, n) @@ -110,19 +111,20 @@ func (ec *ecClient) Get(ctx context.Context, nodes []proto.Node, es eestream.Era ch := make(chan rangerInfo, len(nodes)) for i, n := range nodes { go func(i int, n proto.Node) { + derivedPieceID := pieceID.Derive([]byte(n.GetId())) ps, err := ec.d.dial(ctx, n) if err != nil { - zap.S().Errorf("Failed getting piece %s from node %s: %v", - pieceID, n.GetId(), err) + zap.S().Errorf("Failed getting piece %s -> %s from node %s: %v", + pieceID, derivedPieceID, n.GetId(), err) ch <- rangerInfo{i: i, rr: nil, err: err} return } - rr, err := ps.Get(ctx, pieceID, size) + rr, err := ps.Get(ctx, derivedPieceID, size) // no ps.CloseConn() here, the connection will be closed by // the caller using RangeCloser.Close if err != nil { - zap.S().Errorf("Failed getting piece %s from node %s: %v", - pieceID, n.GetId(), err) + zap.S().Errorf("Failed getting piece %s -> %s from node %s: %v", + pieceID, derivedPieceID, n.GetId(), err) } ch <- rangerInfo{i: i, rr: rr, err: err} }(i, n) @@ -141,18 +143,19 @@ func (ec *ecClient) Delete(ctx context.Context, nodes []proto.Node, pieceID clie errs := make(chan error, len(nodes)) for _, n := range nodes { go func(n proto.Node) { + derivedPieceID := pieceID.Derive([]byte(n.GetId())) ps, err := ec.d.dial(ctx, n) if err != nil { - zap.S().Errorf("Failed deleting piece %s from node %s: %v", - pieceID, n.GetId(), err) + zap.S().Errorf("Failed deleting piece %s -> %s from node %s: %v", + pieceID, derivedPieceID, n.GetId(), err) errs <- err return } - err = ps.Delete(ctx, pieceID) + err = ps.Delete(ctx, derivedPieceID) ps.CloseConn() if err != nil { - zap.S().Errorf("Failed deleting piece %s from node %s: %v", - pieceID, n.GetId(), err) + zap.S().Errorf("Failed deleting piece %s -> %s from node %s: %v", + pieceID, derivedPieceID, n.GetId(), err) } errs <- err }(n) diff --git a/pkg/storage/ec/client_test.go b/pkg/storage/ec/client_test.go index 456ed2c57..a81cf35d1 100644 --- a/pkg/storage/ec/client_test.go +++ b/pkg/storage/ec/client_test.go @@ -146,9 +146,10 @@ func TestPut(t *testing.T) { m := make(map[proto.Node]client.PSClient, len(tt.nodes)) for _, n := range tt.nodes { if errs[n] != ErrDialFailed && tt.mbm >= 0 { + derivedID := id.Derive([]byte(n.GetId())) ps := NewMockPSClient(ctrl) gomock.InOrder( - ps.EXPECT().Put(gomock.Any(), id, gomock.Any(), ttl).Return(errs[n]), + ps.EXPECT().Put(gomock.Any(), derivedID, gomock.Any(), ttl).Return(errs[n]), ps.EXPECT().CloseConn().Return(nil), ) m[n] = ps @@ -218,8 +219,9 @@ func TestGet(t *testing.T) { m := make(map[proto.Node]client.PSClient, len(tt.nodes)) for _, n := range tt.nodes { if errs[n] != ErrDialFailed { + derivedID := id.Derive([]byte(n.GetId())) ps := NewMockPSClient(ctrl) - ps.EXPECT().Get(gomock.Any(), id, int64(size)).Return( + ps.EXPECT().Get(gomock.Any(), derivedID, int64(size)).Return( ranger.NopCloser(ranger.ByteRanger(nil)), errs[n]) m[n] = ps } @@ -274,9 +276,10 @@ func TestDelete(t *testing.T) { m := make(map[proto.Node]client.PSClient, len(tt.nodes)) for _, n := range tt.nodes { if errs[n] != ErrDialFailed { + derivedID := id.Derive([]byte(n.GetId())) ps := NewMockPSClient(ctrl) gomock.InOrder( - ps.EXPECT().Delete(gomock.Any(), id).Return(errs[n]), + ps.EXPECT().Delete(gomock.Any(), derivedID).Return(errs[n]), ps.EXPECT().CloseConn().Return(nil), ) m[n] = ps