storagenode/satellite: support different piece hash algorithms

Change-Id: I3db321e79f12f3ebaa249e6c32fa37fd9615687e
This commit is contained in:
Márton Elek 2022-08-01 11:30:33 +02:00 committed by Storj Robot
parent d23cfa6445
commit 4b1be6bf8e
11 changed files with 94 additions and 43 deletions

View File

@ -39,6 +39,7 @@ import (
"storj.io/storj/storage"
"storj.io/storj/storagenode"
"storj.io/uplink/private/eestream"
"storj.io/uplink/private/piecestore"
)
// TestDataRepair does the following:
@ -49,14 +50,15 @@ import (
// - Shuts down several nodes, but keeping up a number equal to the minim
// threshold
// - Downloads the data from those left nodes and check that it's the same than the uploaded one.
func TestDataRepairInMemory(t *testing.T) {
testDataRepair(t, true)
}
func TestDataRepairToDisk(t *testing.T) {
testDataRepair(t, false)
func TestDataRepairInMemoryBlake(t *testing.T) {
testDataRepair(t, true, pb.PieceHashAlgorithm_BLAKE3)
}
func testDataRepair(t *testing.T, inMemoryRepair bool) {
func TestDataRepairToDiskSHA256(t *testing.T) {
testDataRepair(t, false, pb.PieceHashAlgorithm_SHA256)
}
func testDataRepair(t *testing.T, inMemoryRepair bool, hashAlgo pb.PieceHashAlgorithm) {
const (
RepairMaxExcessRateOptimalThreshold = 0.05
minThreshold = 3
@ -91,7 +93,8 @@ func testDataRepair(t *testing.T, inMemoryRepair bool) {
}
testData := testrand.Bytes(8 * memory.KiB)
err := uplinkPeer.Upload(ctx, satellite, "testbucket", "test/path", testData)
err := uplinkPeer.Upload(piecestore.WithPieceHashAlgo(ctx, hashAlgo), satellite, "testbucket", "test/path", testData)
require.NoError(t, err)
segment, _ := getRemoteSegment(ctx, t, satellite, planet.Uplinks[0].Projects[0].ID, "testbucket")

View File

@ -8,6 +8,7 @@ import (
"context"
"errors"
"fmt"
"hash"
"io"
"io/ioutil"
"sort"
@ -22,7 +23,6 @@ import (
"storj.io/common/errs2"
"storj.io/common/pb"
"storj.io/common/pkcrypto"
"storj.io/common/rpc"
"storj.io/common/signing"
"storj.io/common/storj"
@ -239,6 +239,31 @@ func (ec *ECRepairer) Get(ctx context.Context, limits []*pb.AddressedOrderLimit,
return decodeReader, pieces, nil
}
// lazyHashWriter is a writer which can get the hash algorithm just before the first write.
type lazyHashWriter struct {
hasher hash.Hash
downloader *piecestore.Download
}
func (l *lazyHashWriter) Write(p []byte) (n int, err error) {
// hash is available only after receiving the first message.
if l.hasher == nil {
h, _ := l.downloader.GetHashAndLimit()
l.hasher = pb.NewHashFromAlgorithm(h.HashAlgorithm)
}
return l.hasher.Write(p)
}
// Sum delegates hash calculation to the real hash algorithm.
func (l *lazyHashWriter) Sum(b []byte) []byte {
if l.hasher == nil {
return []byte{}
}
return l.hasher.Sum(b)
}
var _ io.Writer = &lazyHashWriter{}
// downloadAndVerifyPiece downloads a piece from a storagenode,
// expects the original order limit to have the correct piece public key,
// and expects the hash of the data to match the signed hash provided by the storagenode.
@ -264,7 +289,9 @@ func (ec *ECRepairer) downloadAndVerifyPiece(ctx context.Context, limit *pb.Addr
}
defer func() { err = errs.Combine(err, downloader.Close()) }()
hashWriter := pkcrypto.NewHash()
hashWriter := &lazyHashWriter{
downloader: downloader,
}
downloadReader := io.TeeReader(downloader, hashWriter)
var downloadedPieceSize int64

View File

@ -604,7 +604,7 @@ func TestCacheCreateDeleteAndTrash(t *testing.T) {
for _, ref := range refs {
blob, err := cache.Create(ctx, ref, int64(4096))
require.NoError(t, err)
blobWriter, err := pieces.NewWriter(zaptest.NewLogger(t), blob, cache, satelliteID)
blobWriter, err := pieces.NewWriter(zaptest.NewLogger(t), blob, cache, satelliteID, pb.PieceHashAlgorithm_SHA256)
require.NoError(t, err)
_, err = blobWriter.Write(pieceContent)
require.NoError(t, err)

View File

@ -61,7 +61,7 @@ func TestDeleter(t *testing.T) {
pieceID := testrand.PieceID()
data := testrand.Bytes(memory.KB)
w, err := store.Writer(ctx, satelliteID, pieceID)
w, err := store.Writer(ctx, satelliteID, pieceID, pb.PieceHashAlgorithm_SHA256)
require.NoError(t, err)
_, err = w.Write(data)
require.NoError(t, err)

View File

@ -14,7 +14,6 @@ import (
"go.uber.org/zap"
"storj.io/common/pb"
"storj.io/common/pkcrypto"
"storj.io/common/storj"
"storj.io/storj/storage"
"storj.io/storj/storage/filestore"
@ -73,7 +72,7 @@ type Writer struct {
}
// NewWriter creates a new writer for storage.BlobWriter.
func NewWriter(log *zap.Logger, blobWriter storage.BlobWriter, blobs storage.Blobs, satellite storj.NodeID) (*Writer, error) {
func NewWriter(log *zap.Logger, blobWriter storage.BlobWriter, blobs storage.Blobs, satellite storj.NodeID, hashAlgorithm pb.PieceHashAlgorithm) (*Writer, error) {
w := &Writer{log: log}
if blobWriter.StorageFormatVersion() >= filestore.FormatV1 {
// We skip past the reserved header area for now- we want the header to be at the
@ -90,7 +89,9 @@ func NewWriter(log *zap.Logger, blobWriter storage.BlobWriter, blobs storage.Blo
}
}
w.blob = blobWriter
w.hash = pkcrypto.NewHash()
w.hash = pb.NewHashFromAlgorithm(hashAlgorithm)
w.blobs = blobs
w.satellite = satellite
return w, nil

View File

@ -43,7 +43,7 @@ func BenchmarkReadWrite(b *testing.B) {
b.Run("Write", func(b *testing.B) {
for i := 0; i < b.N; i++ {
pieceID := testrand.PieceID()
writer, err := store.Writer(ctx, satelliteID, pieceID)
writer, err := store.Writer(ctx, satelliteID, pieceID, pb.PieceHashAlgorithm_SHA256)
require.NoError(b, err)
data := source
@ -63,7 +63,7 @@ func BenchmarkReadWrite(b *testing.B) {
testPieceID := storj.PieceID{1}
{ // write a test piece
writer, err := store.Writer(ctx, satelliteID, testPieceID)
writer, err := store.Writer(ctx, satelliteID, testPieceID, pb.PieceHashAlgorithm_SHA256)
require.NoError(b, err)
_, err = writer.Write(source)
require.NoError(b, err)
@ -110,7 +110,7 @@ func readAndWritePiece(t *testing.T, content []byte) {
expirationTime := time.Unix(1595898827, 18364029)
// write a V1 format piece
w, err := store.Writer(ctx, satelliteID, pieceID)
w, err := store.Writer(ctx, satelliteID, pieceID, pb.PieceHashAlgorithm_SHA256)
require.NoError(t, err)
if len(content) > 0 {
_, err = w.Write(content)

View File

@ -208,7 +208,7 @@ func (store *Store) VerifyStorageDir(ctx context.Context, id storj.NodeID) error
}
// Writer returns a new piece writer.
func (store *Store) Writer(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID) (_ *Writer, err error) {
func (store *Store) Writer(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID, hashAlgorithm pb.PieceHashAlgorithm) (_ *Writer, err error) {
defer mon.Task()(&ctx)(&err)
blobWriter, err := store.blobs.Create(ctx, storage.BlobRef{
Namespace: satellite.Bytes(),
@ -218,7 +218,7 @@ func (store *Store) Writer(ctx context.Context, satellite storj.NodeID, pieceID
return nil, Error.Wrap(err)
}
writer, err := NewWriter(store.log.Named("blob-writer"), blobWriter, store.blobs, satellite)
writer, err := NewWriter(store.log.Named("blob-writer"), blobWriter, store.blobs, satellite, hashAlgorithm)
return writer, Error.Wrap(err)
}
@ -226,7 +226,7 @@ func (store *Store) Writer(ctx context.Context, satellite storj.NodeID, pieceID
// This is meant to be used externally only in test situations (thus the StoreForTest receiver
// type).
func (store StoreForTest) WriterForFormatVersion(ctx context.Context, satellite storj.NodeID,
pieceID storj.PieceID, formatVersion storage.FormatVersion) (_ *Writer, err error) {
pieceID storj.PieceID, formatVersion storage.FormatVersion, hashAlgorithm pb.PieceHashAlgorithm) (_ *Writer, err error) {
defer mon.Task()(&ctx)(&err)
@ -252,7 +252,7 @@ func (store StoreForTest) WriterForFormatVersion(ctx context.Context, satellite
if err != nil {
return nil, Error.Wrap(err)
}
writer, err := NewWriter(store.log.Named("blob-writer"), blobWriter, store.blobs, satellite)
writer, err := NewWriter(store.log.Named("blob-writer"), blobWriter, store.blobs, satellite, hashAlgorithm)
return writer, Error.Wrap(err)
}
@ -413,7 +413,7 @@ func (store *Store) MigrateV0ToV1(ctx context.Context, satelliteID storj.NodeID,
}
defer func() { err = errs.Combine(err, r.Close()) }()
w, err := store.Writer(ctx, satelliteID, pieceID)
w, err := store.Writer(ctx, satelliteID, pieceID, pb.PieceHashAlgorithm_SHA256)
if err != nil {
return err
}
@ -476,11 +476,12 @@ func (store *Store) GetHashAndLimit(ctx context.Context, satellite storj.NodeID,
return pb.PieceHash{}, pb.OrderLimit{}, Error.Wrap(err)
}
pieceHash := pb.PieceHash{
PieceId: pieceID,
Hash: header.GetHash(),
PieceSize: reader.Size(),
Timestamp: header.GetCreationTime(),
Signature: header.GetSignature(),
PieceId: pieceID,
Hash: header.GetHash(),
HashAlgorithm: header.GetHashAlgorithm(),
PieceSize: reader.Size(),
Timestamp: header.GetCreationTime(),
Signature: header.GetSignature(),
}
return pieceHash, header.OrderLimit, nil
}

View File

@ -53,7 +53,7 @@ func TestPieces(t *testing.T) {
source := testrand.Bytes(8000)
{ // write data
writer, err := store.Writer(ctx, satelliteID, pieceID)
writer, err := store.Writer(ctx, satelliteID, pieceID, pb.PieceHashAlgorithm_SHA256)
require.NoError(t, err)
n, err := io.Copy(writer, bytes.NewReader(source))
@ -123,7 +123,7 @@ func TestPieces(t *testing.T) {
{ // write cancel
cancelledPieceID := storj.NewPieceID()
writer, err := store.Writer(ctx, satelliteID, cancelledPieceID)
writer, err := store.Writer(ctx, satelliteID, cancelledPieceID, pb.PieceHashAlgorithm_SHA256)
require.NoError(t, err)
n, err := io.Copy(writer, bytes.NewReader(source))
@ -144,7 +144,7 @@ func TestPieces(t *testing.T) {
func writeAPiece(ctx context.Context, t testing.TB, store *pieces.Store, satelliteID storj.NodeID, pieceID storj.PieceID, data []byte, atTime time.Time, expireTime *time.Time, formatVersion storage.FormatVersion) {
tStore := &pieces.StoreForTest{store}
writer, err := tStore.WriterForFormatVersion(ctx, satelliteID, pieceID, formatVersion)
writer, err := tStore.WriterForFormatVersion(ctx, satelliteID, pieceID, formatVersion, pb.PieceHashAlgorithm_SHA256)
require.NoError(t, err)
_, err = writer.Write(data)
@ -341,7 +341,7 @@ func TestTrashAndRestore(t *testing.T) {
}
for _, file := range piece.files {
w, err := tStore.WriterForFormatVersion(ctx, satellite.satelliteID, piece.pieceID, file.formatVer)
w, err := tStore.WriterForFormatVersion(ctx, satellite.satelliteID, piece.pieceID, file.formatVer, pb.PieceHashAlgorithm_SHA256)
require.NoError(t, err)
_, err = w.Write(file.data)
@ -560,7 +560,7 @@ func TestPieceVersionMigrate(t *testing.T) {
// write as a v0 piece
tStore := &pieces.StoreForTest{store}
writer, err := tStore.WriterForFormatVersion(ctx, satelliteID, pieceID, filestore.FormatV0)
writer, err := tStore.WriterForFormatVersion(ctx, satelliteID, pieceID, filestore.FormatV0, pb.PieceHashAlgorithm_SHA256)
require.NoError(t, err)
_, err = writer.Write(data)
require.NoError(t, err)

View File

@ -227,6 +227,7 @@ func (endpoint *Endpoint) Upload(stream pb.DRPCPiecestore_UploadStream) (err err
return rpcstatus.Error(rpcstatus.InvalidArgument, "expected order limit as the first message")
}
limit := message.Limit
hashAlgorithm := message.HashAlgorithm
if limit.Action != pb.PieceAction_PUT && limit.Action != pb.PieceAction_PUT_REPAIR {
return rpcstatus.Errorf(rpcstatus.InvalidArgument, "expected put or put repair action got %v", limit.Action)
@ -300,7 +301,7 @@ func (endpoint *Endpoint) Upload(stream pb.DRPCPiecestore_UploadStream) (err err
zap.Int64("Available Space", availableSpace))
mon.Counter("upload_started_count").Inc(1)
pieceWriter, err = endpoint.store.Writer(ctx, limit.SatelliteId, limit.PieceId)
pieceWriter, err = endpoint.store.Writer(ctx, limit.SatelliteId, limit.PieceId, hashAlgorithm)
if err != nil {
return rpcstatus.Wrap(rpcstatus.Internal, err)
}
@ -387,6 +388,10 @@ func (endpoint *Endpoint) Upload(stream pb.DRPCPiecestore_UploadStream) (err err
}
if message.Done != nil {
if message.Done.HashAlgorithm != hashAlgorithm {
return rpcstatus.Wrap(rpcstatus.Internal, errs.New("Hash algorithm in the first and last upload message are different %s %s", hashAlgorithm, message.Done.HashAlgorithm))
}
calculatedHash := pieceWriter.Hash()
if err := endpoint.VerifyPieceHash(ctx, limit, message.Done, calculatedHash); err != nil {
return rpcstatus.Wrap(rpcstatus.Internal, err)
@ -399,10 +404,11 @@ func (endpoint *Endpoint) Upload(stream pb.DRPCPiecestore_UploadStream) (err err
{
info := &pb.PieceHeader{
Hash: calculatedHash,
CreationTime: message.Done.Timestamp,
Signature: message.Done.GetSignature(),
OrderLimit: *limit,
Hash: calculatedHash,
HashAlgorithm: hashAlgorithm,
CreationTime: message.Done.Timestamp,
Signature: message.Done.GetSignature(),
OrderLimit: *limit,
}
if err := pieceWriter.Commit(ctx, info); err != nil {
return rpcstatus.Wrap(rpcstatus.Internal, err)
@ -417,10 +423,11 @@ func (endpoint *Endpoint) Upload(stream pb.DRPCPiecestore_UploadStream) (err err
}
storageNodeHash, err := signing.SignPieceHash(ctx, endpoint.signer, &pb.PieceHash{
PieceId: limit.PieceId,
Hash: calculatedHash,
PieceSize: pieceWriter.Size(),
Timestamp: time.Now(),
PieceId: limit.PieceId,
Hash: calculatedHash,
HashAlgorithm: hashAlgorithm,
PieceSize: pieceWriter.Size(),
Timestamp: time.Now(),
})
if err != nil {
return rpcstatus.Wrap(rpcstatus.Internal, err)

View File

@ -104,6 +104,7 @@ func TestUpload(t *testing.T) {
contentLength memory.Size
action pb.PieceAction
err string
hashAlgo pb.PieceHashAlgorithm
}{
{ // should successfully store data
pieceID: storj.PieceID{1},
@ -123,7 +124,14 @@ func TestUpload(t *testing.T) {
action: pb.PieceAction_GET,
err: "expected put or put repair action got GET",
},
{ // different piece hash
pieceID: storj.PieceID{2},
contentLength: 1 * memory.KiB,
action: pb.PieceAction_PUT,
hashAlgo: pb.PieceHashAlgorithm_BLAKE3,
},
} {
client.UploadHashAlgo = tt.hashAlgo
data := testrand.Bytes(tt.contentLength)
serialNumber := testrand.SerialNumber()
@ -149,7 +157,11 @@ func TestUpload(t *testing.T) {
} else {
require.NoError(t, err)
expectedHash := pkcrypto.SHA256Hash(data)
hasher := pb.NewHashFromAlgorithm(tt.hashAlgo)
_, err = hasher.Write(data)
require.NoError(t, err)
expectedHash := hasher.Sum([]byte{})
assert.Equal(t, expectedHash, pieceHash.Hash)
signee := signing.SignerFromFullIdentity(planet.StorageNodes[0].Identity)

View File

@ -71,7 +71,7 @@ func TestRetainPieces(t *testing.T) {
// Write file for all satellites
for _, satelliteID := range []storj.NodeID{satellite0.ID, satellite1.ID} {
now := time.Now()
w, err := testStore.WriterForFormatVersion(ctx, satelliteID, id, formatVer)
w, err := testStore.WriterForFormatVersion(ctx, satelliteID, id, formatVer, pb.PieceHashAlgorithm_SHA256)
require.NoError(t, err)
_, err = w.Write(testrand.Bytes(size))