diff --git a/pkg/audit/reverify_test.go b/pkg/audit/reverify_test.go index 9a8553304..4b9c33678 100644 --- a/pkg/audit/reverify_test.go +++ b/pkg/audit/reverify_test.go @@ -59,7 +59,7 @@ func TestReverifySuccess(t *testing.T) { pieces := stripe.Segment.GetRemote().GetRemotePieces() rootPieceID := stripe.Segment.GetRemote().RootPieceId - limit, err := orders.CreateAuditOrderLimit(ctx, planet.Satellites[0].Identity.PeerIdentity(), bucketID, pieces[0].NodeId, rootPieceID, shareSize) + limit, err := orders.CreateAuditOrderLimit(ctx, planet.Satellites[0].Identity.PeerIdentity(), bucketID, pieces[0].NodeId, pieces[0].PieceNum, rootPieceID, shareSize) require.NoError(t, err) share, err := audits.Verifier.GetShare(ctx, limit, stripe.Index, shareSize, int(pieces[0].PieceNum)) @@ -126,7 +126,7 @@ func TestReverifyFailMissingShare(t *testing.T) { pieces := stripe.Segment.GetRemote().GetRemotePieces() rootPieceID := stripe.Segment.GetRemote().RootPieceId - limit, err := orders.CreateAuditOrderLimit(ctx, planet.Satellites[0].Identity.PeerIdentity(), bucketID, pieces[0].NodeId, rootPieceID, shareSize) + limit, err := orders.CreateAuditOrderLimit(ctx, planet.Satellites[0].Identity.PeerIdentity(), bucketID, pieces[0].NodeId, pieces[0].PieceNum, rootPieceID, shareSize) require.NoError(t, err) share, err := audits.Verifier.GetShare(ctx, limit, stripe.Index, shareSize, int(pieces[0].PieceNum)) @@ -145,9 +145,9 @@ func TestReverifyFailMissingShare(t *testing.T) { require.NoError(t, err) // delete the piece from the first node - nodeID := stripe.Segment.GetRemote().GetRemotePieces()[0].NodeId - pieceID := stripe.Segment.GetRemote().RootPieceId.Derive(nodeID) - node := getStorageNode(planet, nodeID) + piece := stripe.Segment.GetRemote().GetRemotePieces()[0] + pieceID := stripe.Segment.GetRemote().RootPieceId.Derive(piece.NodeId, piece.PieceNum) + node := getStorageNode(planet, piece.NodeId) err = node.Storage2.Store.Delete(ctx, planet.Satellites[0].ID(), pieceID) require.NoError(t, err) diff --git a/pkg/audit/verifier.go b/pkg/audit/verifier.go index 2d88a6eed..03e9a90e9 100644 --- a/pkg/audit/verifier.go +++ b/pkg/audit/verifier.go @@ -310,7 +310,7 @@ func (verifier *Verifier) Reverify(ctx context.Context, stripe *Stripe) (report containedInSegment++ go func(pending *PendingAudit, piece *pb.RemotePiece) { - limit, err := verifier.orders.CreateAuditOrderLimit(ctx, verifier.auditor, createBucketID(stripe.SegmentPath), pending.NodeID, pending.PieceID, pending.ShareSize) + limit, err := verifier.orders.CreateAuditOrderLimit(ctx, verifier.auditor, createBucketID(stripe.SegmentPath), pending.NodeID, piece.PieceNum, pending.PieceID, pending.ShareSize) if err != nil { if overlay.ErrNodeDisqualified.Has(err) { _, errDelete := verifier.containment.Delete(ctx, piece.NodeId) diff --git a/pkg/audit/verifier_test.go b/pkg/audit/verifier_test.go index 391afc957..d205fd768 100644 --- a/pkg/audit/verifier_test.go +++ b/pkg/audit/verifier_test.go @@ -396,9 +396,9 @@ func TestVerifierMissingPiece(t *testing.T) { require.NoError(t, err) // delete the piece from the first node - nodeID := stripe.Segment.GetRemote().GetRemotePieces()[0].NodeId - pieceID := stripe.Segment.GetRemote().RootPieceId.Derive(nodeID) - node := getStorageNode(planet, nodeID) + piece := stripe.Segment.GetRemote().GetRemotePieces()[0] + pieceID := stripe.Segment.GetRemote().RootPieceId.Derive(piece.NodeId, piece.PieceNum) + node := getStorageNode(planet, piece.NodeId) err = node.Storage2.Store.Delete(ctx, planet.Satellites[0].ID(), pieceID) require.NoError(t, err) diff --git a/pkg/storj/pieceid.go b/pkg/storj/pieceid.go index 5691e6526..bee444757 100644 --- a/pkg/storj/pieceid.go +++ b/pkg/storj/pieceid.go @@ -9,6 +9,7 @@ import ( "crypto/sha512" "database/sql/driver" "encoding/base32" + "encoding/binary" "encoding/json" "github.com/zeebo/errs" @@ -66,11 +67,14 @@ func (id PieceID) String() string { return pieceIDEncoding.EncodeToString(id.Byt // Bytes returns bytes of the piece ID func (id PieceID) Bytes() []byte { return id[:] } -// Derive a new PieceID from the current piece ID and the given storage node ID -func (id PieceID) Derive(storagenodeID NodeID) PieceID { +// Derive a new PieceID from the current piece ID, the given storage node ID and piece number +func (id PieceID) Derive(storagenodeID NodeID, pieceNum int32) PieceID { // TODO: should the secret / content be swapped? mac := hmac.New(sha512.New, id.Bytes()) _, _ = mac.Write(storagenodeID.Bytes()) // on hash.Hash write never returns an error + num := make([]byte, 4) + binary.BigEndian.PutUint32(num, uint32(pieceNum)) + _, _ = mac.Write(num) // on hash.Hash write never returns an error var derived PieceID copy(derived[:], mac.Sum(nil)) return derived diff --git a/pkg/storj/pieceid_test.go b/pkg/storj/pieceid_test.go index 5fc22f23e..67956377d 100644 --- a/pkg/storj/pieceid_test.go +++ b/pkg/storj/pieceid_test.go @@ -49,17 +49,19 @@ func TestPieceID_Derive(t *testing.T) { n0 := testidentity.MustPregeneratedIdentity(0, storj.LatestIDVersion()).ID n1 := testidentity.MustPregeneratedIdentity(1, storj.LatestIDVersion()).ID - assert.NotEqual(t, a.Derive(n0), a.Derive(n1), "a(n0) != a(n1)") - assert.NotEqual(t, b.Derive(n0), b.Derive(n1), "b(n0) != b(n1)") - assert.NotEqual(t, a.Derive(n0), b.Derive(n0), "a(n0) != b(n0)") - assert.NotEqual(t, a.Derive(n1), b.Derive(n1), "a(n1) != b(n1)") + assert.NotEqual(t, a.Derive(n0, 0), a.Derive(n1, 0), "a(n0, 0) != a(n1, 0)") + assert.NotEqual(t, b.Derive(n0, 0), b.Derive(n1, 0), "b(n0, 0) != b(n1, 0)") + assert.NotEqual(t, a.Derive(n0, 0), b.Derive(n0, 0), "a(n0, 0) != b(n0, 0)") + assert.NotEqual(t, a.Derive(n1, 0), b.Derive(n1, 0), "a(n1, 0) != b(n1, 0)") + + assert.NotEqual(t, a.Derive(n0, 0), a.Derive(n0, 1), "a(n0, 0) != a(n0, 1)") // idempotent - assert.Equal(t, a.Derive(n0), a.Derive(n0), "a(n0)") - assert.Equal(t, a.Derive(n1), a.Derive(n1), "a(n1)") + assert.Equal(t, a.Derive(n0, 0), a.Derive(n0, 0), "a(n0, 0)") + assert.Equal(t, a.Derive(n1, 0), a.Derive(n1, 0), "a(n1, 0)") - assert.Equal(t, b.Derive(n0), b.Derive(n0), "b(n0)") - assert.Equal(t, b.Derive(n1), b.Derive(n1), "b(n1)") + assert.Equal(t, b.Derive(n0, 0), b.Derive(n0, 0), "b(n0, 0)") + assert.Equal(t, b.Derive(n1, 0), b.Derive(n1, 0), "b(n1, 0)") } func TestPieceID_MarshalJSON(t *testing.T) { diff --git a/satellite/metainfo/validation.go b/satellite/metainfo/validation.go index 9cb85c009..677980b0e 100644 --- a/satellite/metainfo/validation.go +++ b/satellite/metainfo/validation.go @@ -203,7 +203,7 @@ func (endpoint *Endpoint) validateCommitSegment(ctx context.Context, req *pb.Seg if limit == nil { return Error.New("invalid no order limit for piece") } - derivedPieceID := remote.RootPieceId.Derive(piece.NodeId) + derivedPieceID := remote.RootPieceId.Derive(piece.NodeId, piece.PieceNum) if limit.PieceId.IsZero() || limit.PieceId != derivedPieceID { return Error.New("invalid order limit piece id") } diff --git a/satellite/orders/service.go b/satellite/orders/service.go index 629393d83..7693ae059 100644 --- a/satellite/orders/service.go +++ b/satellite/orders/service.go @@ -170,7 +170,7 @@ func (service *Service) CreateGetOrderLimits(ctx context.Context, uplink *identi SatelliteAddress: service.satelliteAddress, UplinkId: uplink.ID, StorageNodeId: piece.NodeId, - PieceId: rootPieceID.Derive(piece.NodeId), + PieceId: rootPieceID.Derive(piece.NodeId, piece.PieceNum), Action: pb.PieceAction_GET, Limit: pieceSize, PieceExpiration: expiration, @@ -238,7 +238,7 @@ func (service *Service) CreatePutOrderLimits(ctx context.Context, uplink *identi SatelliteAddress: service.satelliteAddress, UplinkId: uplink.ID, StorageNodeId: node.Id, - PieceId: rootPieceID.Derive(node.Id), + PieceId: rootPieceID.Derive(node.Id, pieceNum), Action: pb.PieceAction_PUT, Limit: maxPieceSize, PieceExpiration: expiration, @@ -323,7 +323,7 @@ func (service *Service) CreateDeleteOrderLimits(ctx context.Context, uplink *ide SatelliteAddress: service.satelliteAddress, UplinkId: uplink.ID, StorageNodeId: piece.NodeId, - PieceId: rootPieceID.Derive(piece.NodeId), + PieceId: rootPieceID.Derive(piece.NodeId, piece.PieceNum), Action: pb.PieceAction_DELETE, Limit: 0, PieceExpiration: expiration, @@ -412,7 +412,7 @@ func (service *Service) CreateAuditOrderLimits(ctx context.Context, auditor *ide SatelliteAddress: service.satelliteAddress, UplinkId: auditor.ID, StorageNodeId: piece.NodeId, - PieceId: rootPieceID.Derive(piece.NodeId), + PieceId: rootPieceID.Derive(piece.NodeId, piece.PieceNum), Action: pb.PieceAction_GET_AUDIT, Limit: int64(shareSize), PieceExpiration: expiration, @@ -452,7 +452,8 @@ func (service *Service) CreateAuditOrderLimits(ctx context.Context, auditor *ide } // CreateAuditOrderLimit creates an order limit for auditing a single the piece from a pointer. -func (service *Service) CreateAuditOrderLimit(ctx context.Context, auditor *identity.PeerIdentity, bucketID []byte, nodeID storj.NodeID, rootPieceID storj.PieceID, shareSize int32) (limit *pb.AddressedOrderLimit, err error) { +func (service *Service) CreateAuditOrderLimit(ctx context.Context, auditor *identity.PeerIdentity, bucketID []byte, nodeID storj.NodeID, pieceNum int32, rootPieceID storj.PieceID, shareSize int32) (limit *pb.AddressedOrderLimit, err error) { + // TODO reduce number of params ? defer mon.Task()(&ctx)(&err) // convert orderExpiration from duration to timestamp orderExpirationTime := time.Now().UTC().Add(service.orderExpiration) @@ -485,7 +486,7 @@ func (service *Service) CreateAuditOrderLimit(ctx context.Context, auditor *iden SatelliteAddress: service.satelliteAddress, UplinkId: auditor.ID, StorageNodeId: nodeID, - PieceId: rootPieceID.Derive(nodeID), + PieceId: rootPieceID.Derive(nodeID, pieceNum), Action: pb.PieceAction_GET_AUDIT, Limit: int64(shareSize), OrderCreation: time.Now(), @@ -569,7 +570,7 @@ func (service *Service) CreateGetRepairOrderLimits(ctx context.Context, repairer SatelliteAddress: service.satelliteAddress, UplinkId: repairer.ID, StorageNodeId: piece.NodeId, - PieceId: rootPieceID.Derive(piece.NodeId), + PieceId: rootPieceID.Derive(piece.NodeId, piece.PieceNum), Action: pb.PieceAction_GET_REPAIR, Limit: pieceSize, PieceExpiration: expiration, @@ -633,13 +634,13 @@ func (service *Service) CreatePutRepairOrderLimits(ctx context.Context, repairer } limits := make([]*pb.AddressedOrderLimit, totalPieces) - var pieceNum int + var pieceNum int32 for _, node := range newNodes { - for pieceNum < totalPieces && getOrderLimits[pieceNum] != nil { + for int(pieceNum) < totalPieces && getOrderLimits[pieceNum] != nil { pieceNum++ } - if pieceNum >= totalPieces { // should not happen + if int(pieceNum) >= totalPieces { // should not happen return nil, Error.New("piece num greater than total pieces: %d >= %d", pieceNum, totalPieces) } @@ -649,7 +650,7 @@ func (service *Service) CreatePutRepairOrderLimits(ctx context.Context, repairer SatelliteAddress: service.satelliteAddress, UplinkId: repairer.ID, StorageNodeId: node.Id, - PieceId: rootPieceID.Derive(node.Id), + PieceId: rootPieceID.Derive(node.Id, pieceNum), Action: pb.PieceAction_PUT_REPAIR, Limit: pieceSize, PieceExpiration: expiration,