storj/satellite/metainfo/config.go
Michał Niewrzał 3fc0d2a83e satellite/metainfo: add testing method from multipart-upload branch
We wanto have single uplink branch for standard and multipart-upload satellite but some tests are using helper methods from multipart. This change adds methods used by uplink test.

Change-Id: I82352ed56674ff7e8743b58061ba594018e78e3b
2021-01-26 09:13:12 +00:00

245 lines
8.2 KiB
Go

// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package metainfo
import (
"context"
"fmt"
"strconv"
"strings"
"time"
"go.uber.org/zap"
"storj.io/common/memory"
"storj.io/common/pb"
"storj.io/common/storj"
"storj.io/common/uuid"
"storj.io/storj/private/dbutil"
"storj.io/storj/satellite/metainfo/metabase"
"storj.io/storj/satellite/metainfo/objectdeletion"
"storj.io/storj/satellite/metainfo/piecedeletion"
"storj.io/storj/storage"
"storj.io/storj/storage/cockroachkv"
"storj.io/storj/storage/postgreskv"
"storj.io/uplink/private/storage/meta"
)
const (
// BoltPointerBucket is the string representing the bucket used for `PointerEntries` in BoltDB.
BoltPointerBucket = "pointers"
)
// RSConfig is a configuration struct that keeps details about default
// redundancy strategy information.
//
// Can be used as a flag.
type RSConfig struct {
ErasureShareSize memory.Size
Min int
Repair int
Success int
Total int
}
// Type implements pflag.Value.
func (RSConfig) Type() string { return "metainfo.RSConfig" }
// String is required for pflag.Value.
func (rs *RSConfig) String() string {
return fmt.Sprintf("%d/%d/%d/%d-%s",
rs.Min,
rs.Repair,
rs.Success,
rs.Total,
rs.ErasureShareSize.String())
}
// Set sets the value from a string in the format k/m/o/n-size (min/repair/optimal/total-erasuresharesize).
func (rs *RSConfig) Set(s string) error {
// Split on dash. Expect two items. First item is RS numbers. Second item is memory.Size.
info := strings.Split(s, "-")
if len(info) != 2 {
return Error.New("Invalid default RS config (expect format k/m/o/n-ShareSize, got %s)", s)
}
rsNumbersString := info[0]
shareSizeString := info[1]
// Attempt to parse "-size" part of config.
shareSizeInt, err := memory.ParseString(shareSizeString)
if err != nil {
return Error.New("Invalid share size in RS config: '%s', %w", shareSizeString, err)
}
shareSize := memory.Size(shareSizeInt)
// Split on forward slash. Expect exactly four positive non-decreasing integers.
rsNumbers := strings.Split(rsNumbersString, "/")
if len(rsNumbers) != 4 {
return Error.New("Invalid default RS numbers (wrong size, expect 4): %s", rsNumbersString)
}
minValue := 1
values := []int{}
for _, nextValueString := range rsNumbers {
nextValue, err := strconv.Atoi(nextValueString)
if err != nil {
return Error.New("Invalid default RS numbers (should all be valid integers): %s, %w", rsNumbersString, err)
}
if nextValue < minValue {
return Error.New("Invalid default RS numbers (should be non-decreasing): %s", rsNumbersString)
}
values = append(values, nextValue)
minValue = nextValue
}
rs.ErasureShareSize = shareSize
rs.Min = values[0]
rs.Repair = values[1]
rs.Success = values[2]
rs.Total = values[3]
return nil
}
// RateLimiterConfig is a configuration struct for endpoint rate limiting.
type RateLimiterConfig struct {
Enabled bool `help:"whether rate limiting is enabled." releaseDefault:"true" devDefault:"true"`
Rate float64 `help:"request rate per project per second." releaseDefault:"1000" devDefault:"100"`
CacheCapacity int `help:"number of projects to cache." releaseDefault:"10000" devDefault:"10"`
CacheExpiration time.Duration `help:"how long to cache the projects limiter." releaseDefault:"10m" devDefault:"10s"`
}
// ProjectLimitConfig is a configuration struct for default project limits.
type ProjectLimitConfig struct {
MaxBuckets int `help:"max bucket count for a project." default:"100"`
DefaultMaxUsage memory.Size `help:"the default storage usage limit" default:"500.00GB"`
DefaultMaxBandwidth memory.Size `help:"the default bandwidth usage limit" default:"500.00GB"`
}
// Config is a configuration struct that is everything you need to start a metainfo.
type Config struct {
DatabaseURL string `help:"the database connection string to use" default:"postgres://"`
MinRemoteSegmentSize memory.Size `default:"1240" help:"minimum remote segment size"`
MaxInlineSegmentSize memory.Size `default:"4KiB" help:"maximum inline segment size"`
MaxSegmentSize memory.Size `default:"64MiB" help:"maximum segment size"`
MaxMetadataSize memory.Size `default:"2KiB" help:"maximum segment metadata size"`
MaxCommitInterval time.Duration `default:"48h" help:"maximum time allowed to pass between creating and committing a segment"`
Overlay bool `default:"true" help:"toggle flag if overlay is enabled"`
RS RSConfig `releaseDefault:"29/35/80/110-256B" devDefault:"4/6/8/10-256B" help:"redundancy scheme configuration in the format k/m/o/n-sharesize"`
Loop LoopConfig `help:"loop configuration"`
RateLimiter RateLimiterConfig `help:"rate limiter configuration"`
ProjectLimits ProjectLimitConfig `help:"project limit configuration"`
PieceDeletion piecedeletion.Config `help:"piece deletion configuration"`
ObjectDeletion objectdeletion.Config `help:"object deletion configuration"`
}
// PointerDB stores pointers.
//
// architecture: Database
type PointerDB interface {
// MigrateToLatest migrates to latest schema version.
MigrateToLatest(ctx context.Context) error
storage.KeyValueStore
}
// OpenStore returns database for storing pointer data.
func OpenStore(ctx context.Context, logger *zap.Logger, dbURLString string, app string) (db PointerDB, err error) {
_, source, implementation, err := dbutil.SplitConnStr(dbURLString)
if err != nil {
return nil, err
}
switch implementation {
case dbutil.Postgres:
db, err = postgreskv.Open(ctx, source, app)
case dbutil.Cockroach:
db, err = cockroachkv.Open(ctx, source, app)
default:
err = Error.New("unsupported db implementation: %s", dbURLString)
}
if err != nil {
return nil, err
}
logger.Debug("Connected to:", zap.String("db source", source))
return db, nil
}
// PointerDBMetabase this is wrapper struct that translates pointerDB to metabase.
// Use only for testing purposes.
type PointerDBMetabase struct {
metainfo *Service
}
// NewPointerDBMetabase creates new NewPointerDBMetabase instance.
func NewPointerDBMetabase(service *Service) *PointerDBMetabase {
return &PointerDBMetabase{
metainfo: service,
}
}
// TestingAllCommittedObjects gets all committed objects from bucket. Use only for testing purposes.
func (m *PointerDBMetabase) TestingAllCommittedObjects(ctx context.Context, projectID uuid.UUID, bucketName string) (objects []metabase.ObjectEntry, err error) {
location, err := CreatePath(ctx, projectID, -1, []byte(bucketName), []byte{})
if err != nil {
return nil, err
}
items, _, err := m.metainfo.List(ctx, location.Encode(), "", true, -1, meta.All)
if err != nil {
return nil, err
}
entries := make([]metabase.ObjectEntry, len(items))
for i, item := range items {
entries[i] = metabase.ObjectEntry{
ObjectKey: metabase.ObjectKey(item.Path),
}
}
return entries, nil
}
// TestingAllObjectSegments gets all segments for given object. Use only for testing purposes.
func (m *PointerDBMetabase) TestingAllObjectSegments(ctx context.Context, objectLocation metabase.ObjectLocation) (segments []metabase.Segment, err error) {
location, err := CreatePath(ctx, objectLocation.ProjectID, -1, []byte(objectLocation.BucketName), []byte(objectLocation.ObjectKey))
if err != nil {
return nil, err
}
pointer, err := m.metainfo.Get(ctx, location.Encode())
if err != nil {
return nil, err
}
streamMeta := &pb.StreamMeta{}
err = pb.Unmarshal(pointer.Metadata, streamMeta)
if err != nil {
return nil, err
}
segments = make([]metabase.Segment, 0)
for i := int64(0); i < streamMeta.NumberOfSegments-1; i++ {
location.Index = i
_, err = m.metainfo.Get(ctx, location.Encode())
if err != nil {
if storj.ErrObjectNotFound.Has(err) {
continue
}
return nil, err
}
segments = append(segments, metabase.Segment{
Position: metabase.SegmentPosition{
Index: uint32(i),
},
})
}
segments = append(segments, metabase.Segment{
Position: metabase.SegmentPosition{
Index: uint32(streamMeta.NumberOfSegments - 1),
},
})
return segments, nil
}