022f5d2e14
* add cache, update cache w/piece create/delete * add service w/loop to cache to recalculate space used cache * add piecestore cache to other sn svcs to use * add table to persist the total space used * rm cache where not needed * rm stuff from sn svcs * start fixing tests, changes per comments * update commits * add unit tests * fix commiting before we write header bytes * fix cache create test * copy cache map, add started back to recalc * fix test * add test, update comments
219 lines
6.1 KiB
Go
219 lines
6.1 KiB
Go
// Copyright (C) 2019 Storj Labs, Inc.
|
|
// See LICENSE for copying information.
|
|
|
|
package pieces_test
|
|
|
|
import (
|
|
"io"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
"go.uber.org/zap"
|
|
"go.uber.org/zap/zaptest"
|
|
|
|
"storj.io/storj/internal/memory"
|
|
"storj.io/storj/internal/testcontext"
|
|
"storj.io/storj/internal/testrand"
|
|
"storj.io/storj/pkg/pb"
|
|
"storj.io/storj/pkg/storj"
|
|
"storj.io/storj/storage"
|
|
"storj.io/storj/storage/filestore"
|
|
"storj.io/storj/storagenode/pieces"
|
|
)
|
|
|
|
func BenchmarkReadWrite(b *testing.B) {
|
|
ctx := testcontext.New(b)
|
|
defer ctx.Cleanup()
|
|
|
|
dir, err := filestore.NewDir(ctx.Dir("pieces"))
|
|
require.NoError(b, err)
|
|
blobs := filestore.New(zap.NewNop(), dir)
|
|
defer ctx.Check(blobs.Close)
|
|
|
|
store := pieces.NewStore(zap.NewNop(), blobs, nil, nil, nil)
|
|
|
|
// setup test parameters
|
|
const blockSize = int(256 * memory.KiB)
|
|
satelliteID := testrand.NodeID()
|
|
source := testrand.Bytes(30 * memory.MB)
|
|
|
|
b.Run("Write", func(b *testing.B) {
|
|
for i := 0; i < b.N; i++ {
|
|
pieceID := testrand.PieceID()
|
|
writer, err := store.Writer(ctx, satelliteID, pieceID)
|
|
require.NoError(b, err)
|
|
|
|
data := source
|
|
for len(data) > 0 {
|
|
n := blockSize
|
|
if n > len(data) {
|
|
n = len(data)
|
|
}
|
|
_, err = writer.Write(data[:n])
|
|
require.NoError(b, err)
|
|
data = data[n:]
|
|
}
|
|
|
|
require.NoError(b, writer.Commit(ctx, &pb.PieceHeader{}))
|
|
}
|
|
})
|
|
|
|
testPieceID := storj.PieceID{1}
|
|
{ // write a test piece
|
|
writer, err := store.Writer(ctx, satelliteID, testPieceID)
|
|
require.NoError(b, err)
|
|
_, err = writer.Write(source)
|
|
require.NoError(b, err)
|
|
require.NoError(b, writer.Commit(ctx, &pb.PieceHeader{}))
|
|
}
|
|
|
|
b.Run("Read", func(b *testing.B) {
|
|
for i := 0; i < b.N; i++ {
|
|
reader, err := store.Reader(ctx, satelliteID, testPieceID)
|
|
require.NoError(b, err)
|
|
|
|
data := make([]byte, blockSize)
|
|
for {
|
|
_, err := reader.Read(data)
|
|
if err != nil {
|
|
if err == io.EOF {
|
|
break
|
|
}
|
|
require.NoError(b, err)
|
|
}
|
|
}
|
|
require.NoError(b, reader.Close())
|
|
}
|
|
})
|
|
}
|
|
|
|
func readAndWritePiece(t *testing.T, content []byte) {
|
|
ctx := testcontext.New(t)
|
|
defer ctx.Cleanup()
|
|
|
|
dir, err := filestore.NewDir(ctx.Dir("pieces"))
|
|
require.NoError(t, err)
|
|
blobs := filestore.New(zaptest.NewLogger(t), dir)
|
|
defer ctx.Check(blobs.Close)
|
|
|
|
store := pieces.NewStore(zaptest.NewLogger(t), blobs, nil, nil, nil)
|
|
|
|
// test parameters
|
|
satelliteID := testrand.NodeID()
|
|
pieceID := testrand.PieceID()
|
|
fakeHash := testrand.Bytes(32)
|
|
creationTime := time.Unix(1564362827, 18364029)
|
|
fakeSig := testrand.Bytes(32)
|
|
expirationTime := time.Unix(1595898827, 18364029)
|
|
|
|
// write a V1 format piece
|
|
w, err := store.Writer(ctx, satelliteID, pieceID)
|
|
require.NoError(t, err)
|
|
if len(content) > 0 {
|
|
_, err = w.Write(content)
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
// make sure w.Size() works
|
|
assert.Equal(t, int64(len(content)), w.Size())
|
|
|
|
// commit the writer with the piece header, and close it
|
|
err = w.Commit(ctx, &pb.PieceHeader{
|
|
Hash: fakeHash,
|
|
CreationTime: creationTime,
|
|
Signature: fakeSig,
|
|
OrderLimit: pb.OrderLimit{
|
|
PieceExpiration: expirationTime.UTC(),
|
|
},
|
|
})
|
|
require.NoError(t, err)
|
|
|
|
// open a reader
|
|
r, err := store.Reader(ctx, satelliteID, pieceID)
|
|
require.NoError(t, err)
|
|
defer ctx.Check(r.Close)
|
|
assert.Equal(t, filestore.MaxFormatVersionSupported, r.StorageFormatVersion())
|
|
|
|
// make sure r.Size() works
|
|
assert.Equal(t, int64(len(content)), r.Size())
|
|
|
|
// make sure seek-nowhere works as expected before piece header is read
|
|
pos, err := r.Seek(0, io.SeekCurrent)
|
|
require.NoError(t, err)
|
|
require.Equal(t, int64(0), pos)
|
|
|
|
// read piece header
|
|
header, err := r.GetPieceHeader()
|
|
require.NoError(t, err)
|
|
assert.Equal(t, fakeHash, header.Hash)
|
|
assert.Truef(t, header.CreationTime.Equal(creationTime),
|
|
"header.CreationTime = %s, but expected creationTime = %s", header.CreationTime, creationTime)
|
|
assert.Equal(t, fakeSig, header.Signature)
|
|
require.NotZero(t, header.OrderLimit.PieceExpiration)
|
|
assert.Truef(t, header.OrderLimit.PieceExpiration.Equal(expirationTime),
|
|
"*header.ExpirationTime = %s, but expected expirationTime = %s", header.OrderLimit.PieceExpiration, expirationTime)
|
|
assert.Equal(t, pb.OrderLimit{PieceExpiration: expirationTime.UTC()}, header.OrderLimit)
|
|
assert.Equal(t, filestore.FormatV1, storage.FormatVersion(header.FormatVersion))
|
|
|
|
// make sure seek-nowhere works as expected after piece header is read too
|
|
// (from the point of view of the piece store, the file position has not moved)
|
|
pos, err = r.Seek(0, io.SeekCurrent)
|
|
require.NoError(t, err)
|
|
assert.Equal(t, int64(0), pos)
|
|
|
|
// read piece contents
|
|
bufSize := memory.MB.Int()
|
|
if len(content) < bufSize {
|
|
bufSize = len(content)
|
|
}
|
|
buf := make([]byte, bufSize)
|
|
bytesRead, err := r.Read(buf)
|
|
require.NoError(t, err)
|
|
require.Equal(t, bufSize, bytesRead)
|
|
require.Equal(t, content[:len(buf)], buf)
|
|
|
|
// GetPieceHeader should error here now
|
|
header, err = r.GetPieceHeader()
|
|
require.Error(t, err)
|
|
assert.Truef(t, pieces.Error.Has(err), "err is not a pieces.Error: %v", err)
|
|
assert.Nil(t, header)
|
|
|
|
// check file position again
|
|
pos, err = r.Seek(0, io.SeekCurrent)
|
|
require.NoError(t, err)
|
|
require.Equal(t, int64(bufSize), pos)
|
|
|
|
const miniReadSize = 256
|
|
if len(content) > int(pos+miniReadSize) {
|
|
// Continuing to read should be ok
|
|
bytesRead, err = r.Read(buf[:miniReadSize])
|
|
require.NoError(t, err)
|
|
require.Equal(t, miniReadSize, bytesRead)
|
|
require.Equal(t, content[int(memory.MB):int(memory.MB)+miniReadSize], buf[:miniReadSize])
|
|
|
|
// Perform a Seek that actually moves the file pointer
|
|
const startReadFrom = 11
|
|
pos, err = r.Seek(startReadFrom, io.SeekStart)
|
|
require.NoError(t, err)
|
|
assert.Equal(t, int64(startReadFrom), pos)
|
|
|
|
// And make sure that Seek had an effect
|
|
bytesRead, err = r.Read(buf[:miniReadSize])
|
|
require.NoError(t, err)
|
|
require.Equal(t, miniReadSize, bytesRead)
|
|
require.Equal(t, content[startReadFrom:startReadFrom+miniReadSize], buf[:miniReadSize])
|
|
}
|
|
}
|
|
|
|
func TestReadWriteWithPieceHeader(t *testing.T) {
|
|
content := testrand.Bytes(30 * memory.MB)
|
|
readAndWritePiece(t, content)
|
|
}
|
|
|
|
func TestEmptyPiece(t *testing.T) {
|
|
var content [0]byte
|
|
readAndWritePiece(t, content[:])
|
|
}
|