Kaloyan Raev 92a2be2abd satellite/metainfo: get away from using pb.Pointer in Metainfo Loop
As part of the Metainfo Refactoring, we need to make the Metainfo Loop
working with both the current PointerDB and the new Metabase. Thus, the
Metainfo Loop should pass to the Observer interface more specific Object
and Segment types instead of pb.Pointer.

After this change, there are still a couple of use cases that require
access to the pb.Pointer (hence we have it as a field in the
metainfo.Segment type):
1. Expired Deletion Service
2. Repair Service

It would require additional refactoring in these two services before we
are able to clean this.

Change-Id: Ib3eb6b7507ed89d5ba745ffbb6b37524ef10ed9f
2020-10-27 13:06:47 +00:00

435 lines
15 KiB

// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package tally_test
import (
func TestDeleteTalliesBefore(t *testing.T) {
tests := []struct {
eraseBefore time.Time
expectedRaws int
eraseBefore: time.Now(),
expectedRaws: 1,
eraseBefore: time.Now().Add(24 * time.Hour),
expectedRaws: 0,
for _, tt := range tests {
test := tt
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 0, UplinkCount: 0,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
id := teststorj.NodeIDFromBytes([]byte{})
nodeData := make(map[storj.NodeID]float64)
nodeData[id] = float64(1000)
err := planet.Satellites[0].DB.StoragenodeAccounting().SaveTallies(ctx, time.Now(), nodeData)
require.NoError(t, err)
err = planet.Satellites[0].DB.StoragenodeAccounting().DeleteTalliesBefore(ctx, test.eraseBefore)
require.NoError(t, err)
raws, err := planet.Satellites[0].DB.StoragenodeAccounting().GetTallies(ctx)
require.NoError(t, err)
assert.Len(t, raws, test.expectedRaws)
func TestOnlyInline(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 6, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
uplink := planet.Uplinks[0]
// Setup: create data for the uplink to upload
expectedData := testrand.Bytes(1 * memory.KiB)
// Setup: get the expected size of the data that will be stored in pointer
// Since the data is small enough to be stored inline, when it is encrypted, we only
// add 16 bytes of encryption authentication overhead. No encryption block
// padding will be added since we are not chunking data that we store inline.
const encryptionAuthOverhead = 16 // bytes
expectedTotalBytes := len(expectedData) + encryptionAuthOverhead
// Setup: The data in this tally should match the pointer that the uplink.upload created
expectedBucketName := "testbucket"
expectedTally := &accounting.BucketTally{
BucketLocation: metabase.BucketLocation{
ProjectID: uplink.Projects[0].ID,
BucketName: expectedBucketName,
ObjectCount: 1,
InlineSegments: 1,
InlineBytes: int64(expectedTotalBytes),
MetadataSize: 113, // brittle, this is hardcoded since its too difficult to get this value progamatically
// Execute test: upload a file, then calculate at rest data
err := uplink.Upload(ctx, planet.Satellites[0], expectedBucketName, "test/path", expectedData)
assert.NoError(t, err)
// run multiple times to ensure we add tallies
for i := 0; i < 2; i++ {
obs := tally.NewObserver(planet.Satellites[0].Log.Named("observer"), time.Now())
err := planet.Satellites[0].Metainfo.Loop.Join(ctx, obs)
require.NoError(t, err)
now := time.Now().Add(time.Duration(i) * time.Second)
err = planet.Satellites[0].DB.ProjectAccounting().SaveTallies(ctx, now, obs.Bucket)
require.NoError(t, err)
assert.Equal(t, 1, len(obs.Bucket))
for _, actualTally := range obs.Bucket {
assert.Equal(t, expectedTally, actualTally)
func TestCalculateNodeAtRestData(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 6, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
tallySvc := planet.Satellites[0].Accounting.Tally
uplink := planet.Uplinks[0]
// Setup: create 50KiB of data for the uplink to upload
expectedData := testrand.Bytes(50 * memory.KiB)
// TODO uplink currently hardcode block size so we need to use the same value in test
encryptionParameters := storj.EncryptionParameters{
CipherSuite: storj.EncAESGCM,
BlockSize: 29 * 256 * memory.B.Int32(),
expectedTotalBytes, err := encryption.CalcEncryptedSize(int64(len(expectedData)), encryptionParameters)
require.NoError(t, err)
// Execute test: upload a file, then calculate at rest data
expectedBucketName := "testbucket"
err = uplink.Upload(ctx, planet.Satellites[0], expectedBucketName, "test/path", expectedData)
require.NoError(t, err)
obs := tally.NewObserver(planet.Satellites[0].Log.Named("observer"), time.Now())
err = planet.Satellites[0].Metainfo.Loop.Join(ctx, obs)
require.NoError(t, err)
// Confirm the correct number of shares were stored
rs := satelliteRS(planet.Satellites[0])
if !correctRedundencyScheme(len(obs.Node), rs) {
t.Fatalf("expected between: %d and %d, actual: %d", rs.RepairShares, rs.TotalShares, len(obs.Node))
// Confirm the correct number of bytes were stored on each node
for _, actualTotalBytes := range obs.Node {
assert.Equal(t, expectedTotalBytes, int64(actualTotalBytes))
func TestCalculateBucketAtRestData(t *testing.T) {
var testCases = []struct {
name string
project string
segmentIndex int64
bucketName string
objectName string
inline bool
last bool
{"inline, same project, same bucket", "9656af6e-2d9c-42fa-91f2-bfd516a722d7", metabase.LastSegmentIndex, "mockBucketName", "mockObjectName", true, true},
{"remote, same project, same bucket", "9656af6e-2d9c-42fa-91f2-bfd516a722d7", 0, "mockBucketName", "mockObjectName1", false, false},
{"last segment, same project, different bucket", "9656af6e-2d9c-42fa-91f2-bfd516a722d7", metabase.LastSegmentIndex, "mockBucketName1", "mockObjectName2", false, true},
{"different project", "9656af6e-2d9c-42fa-91f2-bfd516a722d1", 0, "mockBucketName", "mockObjectName", false, false},
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 6, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellitePeer := planet.Satellites[0]
redundancyScheme := satelliteRS(satellitePeer)
expectedBucketTallies := make(map[metabase.BucketLocation]*accounting.BucketTally)
for _, tt := range testCases {
tt := tt // avoid scopelint error
t.Run(, func(t *testing.T) {
projectID, err := uuid.FromString(tt.project)
require.NoError(t, err)
// setup: create a pointer and save it to pointerDB
pointer, err := makePointer(planet.StorageNodes, redundancyScheme, int64(20), tt.inline)
require.NoError(t, err)
metainfo := satellitePeer.Metainfo.Service
location := metabase.SegmentLocation{
ProjectID: projectID,
BucketName: tt.bucketName,
Index: tt.segmentIndex,
ObjectKey: metabase.ObjectKey(tt.objectName),
err = metainfo.Put(ctx, location.Encode(), pointer)
require.NoError(t, err)
bucketLocation := metabase.BucketLocation{
ProjectID: projectID,
BucketName: tt.bucketName,
newTally := addBucketTally(expectedBucketTallies[bucketLocation], tt.inline, tt.last)
newTally.BucketName = tt.bucketName
newTally.ProjectID = projectID
expectedBucketTallies[bucketLocation] = newTally
obs := tally.NewObserver(satellitePeer.Log.Named("observer"), time.Now())
err = satellitePeer.Metainfo.Loop.Join(ctx, obs)
require.NoError(t, err)
require.Equal(t, expectedBucketTallies, obs.Bucket)
func TestTallyIgnoresExpiredPointers(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 6, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellitePeer := planet.Satellites[0]
redundancyScheme := satelliteRS(satellitePeer)
projectID, err := uuid.FromString("9656af6e-2d9c-42fa-91f2-bfd516a722d7")
require.NoError(t, err)
bucket := "bucket"
// setup: create an expired pointer and save it to pointerDB
pointer, err := makePointer(planet.StorageNodes, redundancyScheme, int64(2), false)
require.NoError(t, err)
pointer.ExpirationDate = time.Now().Add(-24 * time.Hour)
metainfo := satellitePeer.Metainfo.Service
location := metabase.SegmentLocation{
ProjectID: projectID,
BucketName: bucket,
Index: metabase.LastSegmentIndex,
ObjectKey: metabase.ObjectKey("object/name"),
err = metainfo.Put(ctx, location.Encode(), pointer)
require.NoError(t, err)
obs := tally.NewObserver(satellitePeer.Log.Named("observer"), time.Now())
err = satellitePeer.Metainfo.Loop.Join(ctx, obs)
require.NoError(t, err)
// there should be no observed buckets because all of the pointers are expired
require.Equal(t, obs.Bucket, map[metabase.BucketLocation]*accounting.BucketTally{})
func TestTallyLiveAccounting(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 6, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
tally := planet.Satellites[0].Accounting.Tally
projectID := planet.Uplinks[0].Projects[0].ID
expectedData := testrand.Bytes(5 * memory.MB)
err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "testbucket", "test/path", expectedData)
require.NoError(t, err)
key, err := planet.Satellites[0].Metainfo.Database.List(ctx, nil, 10)
require.NoError(t, err)
require.Len(t, key, 1)
ptr, err := planet.Satellites[0].Metainfo.Service.Get(ctx, metabase.SegmentKey(key[0]))
require.NoError(t, err)
require.NotNil(t, ptr)
segmentSize := ptr.GetSegmentSize()
expectedSize := segmentSize
total, err := planet.Satellites[0].Accounting.ProjectUsage.GetProjectStorageTotals(ctx, projectID)
require.NoError(t, err)
require.Equal(t, expectedSize, total)
for i := 0; i < 5; i++ {
err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "testbucket", fmt.Sprintf("test/path/%d", i), expectedData)
require.NoError(t, err)
expectedSize += segmentSize
total, err := planet.Satellites[0].Accounting.ProjectUsage.GetProjectStorageTotals(ctx, projectID)
require.NoError(t, err)
require.Equal(t, expectedSize, total)
func TestTallyEmptyProjectUpdatesLiveAccounting(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 6, UplinkCount: 2,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
project1 := planet.Uplinks[1].Projects[0].ID
data := testrand.Bytes(1 * memory.MB)
// we need an extra bucket with data for this test. If no buckets are found at all,
// the update block is skipped in tally
err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "bucket", "test", data)
require.NoError(t, err)
err = planet.Uplinks[1].Upload(ctx, planet.Satellites[0], "bucket", "test", data)
require.NoError(t, err)
total, err := planet.Satellites[0].Accounting.ProjectUsage.GetProjectStorageTotals(ctx, project1)
require.NoError(t, err)
require.True(t, total >= int64(len(data)))
err = planet.Uplinks[1].DeleteObject(ctx, planet.Satellites[0], "bucket", "test")
require.NoError(t, err)
p1Total, err := planet.Satellites[0].Accounting.ProjectUsage.GetProjectStorageTotals(ctx, project1)
require.NoError(t, err)
require.Zero(t, p1Total)
// addBucketTally creates a new expected bucket tally based on the
// pointer that was just created for the test case.
func addBucketTally(existingTally *accounting.BucketTally, inline, last bool) *accounting.BucketTally {
// if there is already an existing tally for this project and bucket, then
// add the new pointer data to the existing tally
if existingTally != nil {
existingTally.MetadataSize += int64(2)
existingTally.RemoteBytes += int64(20)
return existingTally
// if the pointer was inline, create a tally with inline info
if inline {
return &accounting.BucketTally{
ObjectCount: int64(1),
InlineSegments: int64(1),
InlineBytes: int64(20),
MetadataSize: int64(2),
// if the pointer was remote, create a tally with remote info
newRemoteTally := &accounting.BucketTally{
RemoteSegments: int64(1),
RemoteBytes: int64(20),
MetadataSize: int64(2),
if last {
return newRemoteTally
// makePointer creates a pointer.
func makePointer(storageNodes []*testplanet.StorageNode, rs storj.RedundancyScheme, segmentSize int64, inline bool) (*pb.Pointer, error) {
metadata, err := pb.Marshal(&pb.StreamMeta{NumberOfSegments: 1})
if err != nil {
return nil, err
if inline {
inlinePointer := &pb.Pointer{
CreationDate: time.Now(),
Type: pb.Pointer_INLINE,
InlineSegment: make([]byte, segmentSize),
SegmentSize: segmentSize,
Metadata: metadata,
return inlinePointer, nil
pieces := make([]*pb.RemotePiece, rs.TotalShares)
for i := range pieces {
pieces[i] = &pb.RemotePiece{
PieceNum: int32(i),
NodeId: storageNodes[i].ID(),
return &pb.Pointer{
CreationDate: time.Now(),
Type: pb.Pointer_REMOTE,
Remote: &pb.RemoteSegment{
RootPieceId: storj.PieceID{0xFF},
Redundancy: &pb.RedundancyScheme{
Type: pb.RedundancyScheme_RS,
MinReq: int32(rs.RequiredShares),
Total: int32(rs.TotalShares),
RepairThreshold: int32(rs.RepairShares),
SuccessThreshold: int32(rs.OptimalShares),
ErasureShareSize: rs.ShareSize,
RemotePieces: pieces,
SegmentSize: segmentSize,
Metadata: metadata,
}, nil
func correctRedundencyScheme(shareCount int, uplinkRS storj.RedundancyScheme) bool {
// The shareCount should be a value between RequiredShares and TotalShares where
// RequiredShares is the min number of shares required to recover a segment and
// TotalShares is the number of shares to encode
return int(uplinkRS.RepairShares) <= shareCount && shareCount <= int(uplinkRS.TotalShares)
func satelliteRS(satellite *testplanet.Satellite) storj.RedundancyScheme {
return storj.RedundancyScheme{
RequiredShares: int16(satellite.Config.Metainfo.RS.MinThreshold),
RepairShares: int16(satellite.Config.Metainfo.RS.RepairThreshold),
OptimalShares: int16(satellite.Config.Metainfo.RS.SuccessThreshold),
TotalShares: int16(satellite.Config.Metainfo.RS.TotalThreshold),
ShareSize: satellite.Config.Metainfo.RS.ErasureShareSize.Int32(),