92a2be2abd
As part of the Metainfo Refactoring, we need to make the Metainfo Loop working with both the current PointerDB and the new Metabase. Thus, the Metainfo Loop should pass to the Observer interface more specific Object and Segment types instead of pb.Pointer. After this change, there are still a couple of use cases that require access to the pb.Pointer (hence we have it as a field in the metainfo.Segment type): 1. Expired Deletion Service 2. Repair Service It would require additional refactoring in these two services before we are able to clean this. Change-Id: Ib3eb6b7507ed89d5ba745ffbb6b37524ef10ed9f
435 lines
15 KiB
Go
435 lines
15 KiB
Go
// Copyright (C) 2019 Storj Labs, Inc.
|
|
// See LICENSE for copying information.
|
|
|
|
package tally_test
|
|
|
|
import (
|
|
"fmt"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
|
|
"storj.io/common/encryption"
|
|
"storj.io/common/memory"
|
|
"storj.io/common/pb"
|
|
"storj.io/common/storj"
|
|
"storj.io/common/testcontext"
|
|
"storj.io/common/testrand"
|
|
"storj.io/common/uuid"
|
|
"storj.io/storj/private/testplanet"
|
|
"storj.io/storj/private/teststorj"
|
|
"storj.io/storj/satellite/accounting"
|
|
"storj.io/storj/satellite/accounting/tally"
|
|
"storj.io/storj/satellite/metainfo/metabase"
|
|
)
|
|
|
|
func TestDeleteTalliesBefore(t *testing.T) {
|
|
tests := []struct {
|
|
eraseBefore time.Time
|
|
expectedRaws int
|
|
}{
|
|
{
|
|
eraseBefore: time.Now(),
|
|
expectedRaws: 1,
|
|
},
|
|
{
|
|
eraseBefore: time.Now().Add(24 * time.Hour),
|
|
expectedRaws: 0,
|
|
},
|
|
}
|
|
|
|
for _, tt := range tests {
|
|
test := tt
|
|
testplanet.Run(t, testplanet.Config{
|
|
SatelliteCount: 1, StorageNodeCount: 0, UplinkCount: 0,
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
|
id := teststorj.NodeIDFromBytes([]byte{})
|
|
nodeData := make(map[storj.NodeID]float64)
|
|
nodeData[id] = float64(1000)
|
|
|
|
err := planet.Satellites[0].DB.StoragenodeAccounting().SaveTallies(ctx, time.Now(), nodeData)
|
|
require.NoError(t, err)
|
|
|
|
err = planet.Satellites[0].DB.StoragenodeAccounting().DeleteTalliesBefore(ctx, test.eraseBefore)
|
|
require.NoError(t, err)
|
|
|
|
raws, err := planet.Satellites[0].DB.StoragenodeAccounting().GetTallies(ctx)
|
|
require.NoError(t, err)
|
|
assert.Len(t, raws, test.expectedRaws)
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestOnlyInline(t *testing.T) {
|
|
testplanet.Run(t, testplanet.Config{
|
|
SatelliteCount: 1, StorageNodeCount: 6, UplinkCount: 1,
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
|
planet.Satellites[0].Accounting.Tally.Loop.Pause()
|
|
uplink := planet.Uplinks[0]
|
|
|
|
// Setup: create data for the uplink to upload
|
|
expectedData := testrand.Bytes(1 * memory.KiB)
|
|
|
|
// Setup: get the expected size of the data that will be stored in pointer
|
|
// Since the data is small enough to be stored inline, when it is encrypted, we only
|
|
// add 16 bytes of encryption authentication overhead. No encryption block
|
|
// padding will be added since we are not chunking data that we store inline.
|
|
const encryptionAuthOverhead = 16 // bytes
|
|
expectedTotalBytes := len(expectedData) + encryptionAuthOverhead
|
|
|
|
// Setup: The data in this tally should match the pointer that the uplink.upload created
|
|
expectedBucketName := "testbucket"
|
|
expectedTally := &accounting.BucketTally{
|
|
BucketLocation: metabase.BucketLocation{
|
|
ProjectID: uplink.Projects[0].ID,
|
|
BucketName: expectedBucketName,
|
|
},
|
|
ObjectCount: 1,
|
|
InlineSegments: 1,
|
|
InlineBytes: int64(expectedTotalBytes),
|
|
MetadataSize: 113, // brittle, this is hardcoded since its too difficult to get this value progamatically
|
|
}
|
|
|
|
// Execute test: upload a file, then calculate at rest data
|
|
err := uplink.Upload(ctx, planet.Satellites[0], expectedBucketName, "test/path", expectedData)
|
|
assert.NoError(t, err)
|
|
|
|
// run multiple times to ensure we add tallies
|
|
for i := 0; i < 2; i++ {
|
|
obs := tally.NewObserver(planet.Satellites[0].Log.Named("observer"), time.Now())
|
|
err := planet.Satellites[0].Metainfo.Loop.Join(ctx, obs)
|
|
require.NoError(t, err)
|
|
|
|
now := time.Now().Add(time.Duration(i) * time.Second)
|
|
err = planet.Satellites[0].DB.ProjectAccounting().SaveTallies(ctx, now, obs.Bucket)
|
|
require.NoError(t, err)
|
|
|
|
assert.Equal(t, 1, len(obs.Bucket))
|
|
for _, actualTally := range obs.Bucket {
|
|
assert.Equal(t, expectedTally, actualTally)
|
|
}
|
|
}
|
|
})
|
|
}
|
|
|
|
func TestCalculateNodeAtRestData(t *testing.T) {
|
|
testplanet.Run(t, testplanet.Config{
|
|
SatelliteCount: 1, StorageNodeCount: 6, UplinkCount: 1,
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
|
tallySvc := planet.Satellites[0].Accounting.Tally
|
|
tallySvc.Loop.Pause()
|
|
uplink := planet.Uplinks[0]
|
|
|
|
// Setup: create 50KiB of data for the uplink to upload
|
|
expectedData := testrand.Bytes(50 * memory.KiB)
|
|
|
|
// TODO uplink currently hardcode block size so we need to use the same value in test
|
|
encryptionParameters := storj.EncryptionParameters{
|
|
CipherSuite: storj.EncAESGCM,
|
|
BlockSize: 29 * 256 * memory.B.Int32(),
|
|
}
|
|
expectedTotalBytes, err := encryption.CalcEncryptedSize(int64(len(expectedData)), encryptionParameters)
|
|
require.NoError(t, err)
|
|
|
|
// Execute test: upload a file, then calculate at rest data
|
|
expectedBucketName := "testbucket"
|
|
err = uplink.Upload(ctx, planet.Satellites[0], expectedBucketName, "test/path", expectedData)
|
|
require.NoError(t, err)
|
|
|
|
obs := tally.NewObserver(planet.Satellites[0].Log.Named("observer"), time.Now())
|
|
err = planet.Satellites[0].Metainfo.Loop.Join(ctx, obs)
|
|
require.NoError(t, err)
|
|
|
|
// Confirm the correct number of shares were stored
|
|
rs := satelliteRS(planet.Satellites[0])
|
|
if !correctRedundencyScheme(len(obs.Node), rs) {
|
|
t.Fatalf("expected between: %d and %d, actual: %d", rs.RepairShares, rs.TotalShares, len(obs.Node))
|
|
}
|
|
|
|
// Confirm the correct number of bytes were stored on each node
|
|
for _, actualTotalBytes := range obs.Node {
|
|
assert.Equal(t, expectedTotalBytes, int64(actualTotalBytes))
|
|
}
|
|
})
|
|
}
|
|
|
|
func TestCalculateBucketAtRestData(t *testing.T) {
|
|
var testCases = []struct {
|
|
name string
|
|
project string
|
|
segmentIndex int64
|
|
bucketName string
|
|
objectName string
|
|
inline bool
|
|
last bool
|
|
}{
|
|
{"inline, same project, same bucket", "9656af6e-2d9c-42fa-91f2-bfd516a722d7", metabase.LastSegmentIndex, "mockBucketName", "mockObjectName", true, true},
|
|
{"remote, same project, same bucket", "9656af6e-2d9c-42fa-91f2-bfd516a722d7", 0, "mockBucketName", "mockObjectName1", false, false},
|
|
{"last segment, same project, different bucket", "9656af6e-2d9c-42fa-91f2-bfd516a722d7", metabase.LastSegmentIndex, "mockBucketName1", "mockObjectName2", false, true},
|
|
{"different project", "9656af6e-2d9c-42fa-91f2-bfd516a722d1", 0, "mockBucketName", "mockObjectName", false, false},
|
|
}
|
|
|
|
testplanet.Run(t, testplanet.Config{
|
|
SatelliteCount: 1, StorageNodeCount: 6, UplinkCount: 1,
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
|
satellitePeer := planet.Satellites[0]
|
|
redundancyScheme := satelliteRS(satellitePeer)
|
|
expectedBucketTallies := make(map[metabase.BucketLocation]*accounting.BucketTally)
|
|
for _, tt := range testCases {
|
|
tt := tt // avoid scopelint error
|
|
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
projectID, err := uuid.FromString(tt.project)
|
|
require.NoError(t, err)
|
|
|
|
// setup: create a pointer and save it to pointerDB
|
|
pointer, err := makePointer(planet.StorageNodes, redundancyScheme, int64(20), tt.inline)
|
|
require.NoError(t, err)
|
|
|
|
metainfo := satellitePeer.Metainfo.Service
|
|
location := metabase.SegmentLocation{
|
|
ProjectID: projectID,
|
|
BucketName: tt.bucketName,
|
|
Index: tt.segmentIndex,
|
|
ObjectKey: metabase.ObjectKey(tt.objectName),
|
|
}
|
|
err = metainfo.Put(ctx, location.Encode(), pointer)
|
|
require.NoError(t, err)
|
|
|
|
bucketLocation := metabase.BucketLocation{
|
|
ProjectID: projectID,
|
|
BucketName: tt.bucketName,
|
|
}
|
|
newTally := addBucketTally(expectedBucketTallies[bucketLocation], tt.inline, tt.last)
|
|
newTally.BucketName = tt.bucketName
|
|
newTally.ProjectID = projectID
|
|
expectedBucketTallies[bucketLocation] = newTally
|
|
|
|
obs := tally.NewObserver(satellitePeer.Log.Named("observer"), time.Now())
|
|
err = satellitePeer.Metainfo.Loop.Join(ctx, obs)
|
|
require.NoError(t, err)
|
|
require.Equal(t, expectedBucketTallies, obs.Bucket)
|
|
})
|
|
}
|
|
})
|
|
}
|
|
|
|
func TestTallyIgnoresExpiredPointers(t *testing.T) {
|
|
testplanet.Run(t, testplanet.Config{
|
|
SatelliteCount: 1, StorageNodeCount: 6, UplinkCount: 1,
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
|
satellitePeer := planet.Satellites[0]
|
|
redundancyScheme := satelliteRS(satellitePeer)
|
|
|
|
projectID, err := uuid.FromString("9656af6e-2d9c-42fa-91f2-bfd516a722d7")
|
|
require.NoError(t, err)
|
|
bucket := "bucket"
|
|
|
|
// setup: create an expired pointer and save it to pointerDB
|
|
pointer, err := makePointer(planet.StorageNodes, redundancyScheme, int64(2), false)
|
|
require.NoError(t, err)
|
|
|
|
pointer.ExpirationDate = time.Now().Add(-24 * time.Hour)
|
|
|
|
metainfo := satellitePeer.Metainfo.Service
|
|
location := metabase.SegmentLocation{
|
|
ProjectID: projectID,
|
|
BucketName: bucket,
|
|
Index: metabase.LastSegmentIndex,
|
|
ObjectKey: metabase.ObjectKey("object/name"),
|
|
}
|
|
err = metainfo.Put(ctx, location.Encode(), pointer)
|
|
require.NoError(t, err)
|
|
|
|
obs := tally.NewObserver(satellitePeer.Log.Named("observer"), time.Now())
|
|
err = satellitePeer.Metainfo.Loop.Join(ctx, obs)
|
|
require.NoError(t, err)
|
|
|
|
// there should be no observed buckets because all of the pointers are expired
|
|
require.Equal(t, obs.Bucket, map[metabase.BucketLocation]*accounting.BucketTally{})
|
|
})
|
|
}
|
|
|
|
func TestTallyLiveAccounting(t *testing.T) {
|
|
testplanet.Run(t, testplanet.Config{
|
|
SatelliteCount: 1, StorageNodeCount: 6, UplinkCount: 1,
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
|
tally := planet.Satellites[0].Accounting.Tally
|
|
projectID := planet.Uplinks[0].Projects[0].ID
|
|
tally.Loop.Pause()
|
|
|
|
expectedData := testrand.Bytes(5 * memory.MB)
|
|
|
|
err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "testbucket", "test/path", expectedData)
|
|
require.NoError(t, err)
|
|
|
|
key, err := planet.Satellites[0].Metainfo.Database.List(ctx, nil, 10)
|
|
require.NoError(t, err)
|
|
require.Len(t, key, 1)
|
|
|
|
ptr, err := planet.Satellites[0].Metainfo.Service.Get(ctx, metabase.SegmentKey(key[0]))
|
|
require.NoError(t, err)
|
|
require.NotNil(t, ptr)
|
|
|
|
segmentSize := ptr.GetSegmentSize()
|
|
|
|
tally.Loop.TriggerWait()
|
|
|
|
expectedSize := segmentSize
|
|
|
|
total, err := planet.Satellites[0].Accounting.ProjectUsage.GetProjectStorageTotals(ctx, projectID)
|
|
require.NoError(t, err)
|
|
require.Equal(t, expectedSize, total)
|
|
|
|
for i := 0; i < 5; i++ {
|
|
err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "testbucket", fmt.Sprintf("test/path/%d", i), expectedData)
|
|
require.NoError(t, err)
|
|
|
|
tally.Loop.TriggerWait()
|
|
|
|
expectedSize += segmentSize
|
|
|
|
total, err := planet.Satellites[0].Accounting.ProjectUsage.GetProjectStorageTotals(ctx, projectID)
|
|
require.NoError(t, err)
|
|
require.Equal(t, expectedSize, total)
|
|
}
|
|
})
|
|
}
|
|
|
|
func TestTallyEmptyProjectUpdatesLiveAccounting(t *testing.T) {
|
|
testplanet.Run(t, testplanet.Config{
|
|
SatelliteCount: 1, StorageNodeCount: 6, UplinkCount: 2,
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
|
planet.Satellites[0].Accounting.Tally.Loop.Pause()
|
|
|
|
project1 := planet.Uplinks[1].Projects[0].ID
|
|
|
|
data := testrand.Bytes(1 * memory.MB)
|
|
|
|
// we need an extra bucket with data for this test. If no buckets are found at all,
|
|
// the update block is skipped in tally
|
|
err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "bucket", "test", data)
|
|
require.NoError(t, err)
|
|
|
|
err = planet.Uplinks[1].Upload(ctx, planet.Satellites[0], "bucket", "test", data)
|
|
require.NoError(t, err)
|
|
|
|
planet.Satellites[0].Accounting.Tally.Loop.TriggerWait()
|
|
planet.Satellites[0].Accounting.Tally.Loop.Pause()
|
|
|
|
total, err := planet.Satellites[0].Accounting.ProjectUsage.GetProjectStorageTotals(ctx, project1)
|
|
require.NoError(t, err)
|
|
require.True(t, total >= int64(len(data)))
|
|
|
|
err = planet.Uplinks[1].DeleteObject(ctx, planet.Satellites[0], "bucket", "test")
|
|
require.NoError(t, err)
|
|
|
|
planet.Satellites[0].Accounting.Tally.Loop.TriggerWait()
|
|
|
|
p1Total, err := planet.Satellites[0].Accounting.ProjectUsage.GetProjectStorageTotals(ctx, project1)
|
|
require.NoError(t, err)
|
|
require.Zero(t, p1Total)
|
|
})
|
|
}
|
|
|
|
// addBucketTally creates a new expected bucket tally based on the
|
|
// pointer that was just created for the test case.
|
|
func addBucketTally(existingTally *accounting.BucketTally, inline, last bool) *accounting.BucketTally {
|
|
// if there is already an existing tally for this project and bucket, then
|
|
// add the new pointer data to the existing tally
|
|
if existingTally != nil {
|
|
existingTally.MetadataSize += int64(2)
|
|
existingTally.RemoteSegments++
|
|
existingTally.RemoteBytes += int64(20)
|
|
return existingTally
|
|
}
|
|
|
|
// if the pointer was inline, create a tally with inline info
|
|
if inline {
|
|
return &accounting.BucketTally{
|
|
ObjectCount: int64(1),
|
|
InlineSegments: int64(1),
|
|
InlineBytes: int64(20),
|
|
MetadataSize: int64(2),
|
|
}
|
|
}
|
|
|
|
// if the pointer was remote, create a tally with remote info
|
|
newRemoteTally := &accounting.BucketTally{
|
|
RemoteSegments: int64(1),
|
|
RemoteBytes: int64(20),
|
|
MetadataSize: int64(2),
|
|
}
|
|
|
|
if last {
|
|
newRemoteTally.ObjectCount++
|
|
}
|
|
|
|
return newRemoteTally
|
|
}
|
|
|
|
// makePointer creates a pointer.
|
|
func makePointer(storageNodes []*testplanet.StorageNode, rs storj.RedundancyScheme, segmentSize int64, inline bool) (*pb.Pointer, error) {
|
|
metadata, err := pb.Marshal(&pb.StreamMeta{NumberOfSegments: 1})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if inline {
|
|
inlinePointer := &pb.Pointer{
|
|
CreationDate: time.Now(),
|
|
Type: pb.Pointer_INLINE,
|
|
InlineSegment: make([]byte, segmentSize),
|
|
SegmentSize: segmentSize,
|
|
Metadata: metadata,
|
|
}
|
|
return inlinePointer, nil
|
|
}
|
|
|
|
pieces := make([]*pb.RemotePiece, rs.TotalShares)
|
|
for i := range pieces {
|
|
pieces[i] = &pb.RemotePiece{
|
|
PieceNum: int32(i),
|
|
NodeId: storageNodes[i].ID(),
|
|
}
|
|
}
|
|
|
|
return &pb.Pointer{
|
|
CreationDate: time.Now(),
|
|
Type: pb.Pointer_REMOTE,
|
|
Remote: &pb.RemoteSegment{
|
|
RootPieceId: storj.PieceID{0xFF},
|
|
Redundancy: &pb.RedundancyScheme{
|
|
Type: pb.RedundancyScheme_RS,
|
|
MinReq: int32(rs.RequiredShares),
|
|
Total: int32(rs.TotalShares),
|
|
RepairThreshold: int32(rs.RepairShares),
|
|
SuccessThreshold: int32(rs.OptimalShares),
|
|
ErasureShareSize: rs.ShareSize,
|
|
},
|
|
RemotePieces: pieces,
|
|
},
|
|
SegmentSize: segmentSize,
|
|
Metadata: metadata,
|
|
}, nil
|
|
}
|
|
|
|
func correctRedundencyScheme(shareCount int, uplinkRS storj.RedundancyScheme) bool {
|
|
// The shareCount should be a value between RequiredShares and TotalShares where
|
|
// RequiredShares is the min number of shares required to recover a segment and
|
|
// TotalShares is the number of shares to encode
|
|
return int(uplinkRS.RepairShares) <= shareCount && shareCount <= int(uplinkRS.TotalShares)
|
|
}
|
|
|
|
func satelliteRS(satellite *testplanet.Satellite) storj.RedundancyScheme {
|
|
return storj.RedundancyScheme{
|
|
RequiredShares: int16(satellite.Config.Metainfo.RS.MinThreshold),
|
|
RepairShares: int16(satellite.Config.Metainfo.RS.RepairThreshold),
|
|
OptimalShares: int16(satellite.Config.Metainfo.RS.SuccessThreshold),
|
|
TotalShares: int16(satellite.Config.Metainfo.RS.TotalThreshold),
|
|
ShareSize: satellite.Config.Metainfo.RS.ErasureShareSize.Int32(),
|
|
}
|
|
}
|