storj/pkg/metainfo/kvmetainfo/buckets.go

238 lines
7.9 KiB
Go
Raw Normal View History

2019-01-24 20:15:10 +00:00
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package kvmetainfo
import (
"bytes"
"context"
"strconv"
"time"
2019-06-20 22:50:13 +01:00
"github.com/zeebo/errs"
"storj.io/storj/pkg/encryption"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/storage/meta"
"storj.io/storj/pkg/storage/objects"
"storj.io/storj/pkg/storj"
"storj.io/storj/storage"
)
// CreateBucket creates a new bucket or updates and existing bucket with the specified information
func (db *Project) CreateBucket(ctx context.Context, bucketName string, info *storj.Bucket) (bucketInfo storj.Bucket, err error) {
defer mon.Task()(&ctx)(&err)
if bucketName == "" {
2018-11-12 13:23:19 +00:00
return storj.Bucket{}, storj.ErrNoBucket.New("")
}
if info == nil {
info = &storj.Bucket{PathCipher: storj.AESGCM}
}
if info.EncryptionParameters.CipherSuite == storj.EncUnspecified {
info.EncryptionParameters.CipherSuite = storj.EncAESGCM
}
if info.EncryptionParameters.BlockSize == 0 {
info.EncryptionParameters.BlockSize = db.encryptedBlockSize
}
if info.RedundancyScheme.Algorithm == storj.InvalidRedundancyAlgorithm {
info.RedundancyScheme.Algorithm = storj.ReedSolomon
}
if info.RedundancyScheme.RequiredShares == 0 {
info.RedundancyScheme.RequiredShares = int16(db.redundancy.RequiredCount())
}
if info.RedundancyScheme.RepairShares == 0 {
info.RedundancyScheme.RepairShares = int16(db.redundancy.RepairThreshold())
}
if info.RedundancyScheme.OptimalShares == 0 {
info.RedundancyScheme.OptimalShares = int16(db.redundancy.OptimalThreshold())
}
if info.RedundancyScheme.TotalShares == 0 {
info.RedundancyScheme.TotalShares = int16(db.redundancy.TotalCount())
}
if info.RedundancyScheme.ShareSize == 0 {
info.RedundancyScheme.ShareSize = int32(db.redundancy.ErasureShareSize())
}
if info.SegmentsSize == 0 {
info.SegmentsSize = db.segmentsSize
}
if err := validateBlockSize(info.RedundancyScheme, info.EncryptionParameters.BlockSize); err != nil {
return bucketInfo, err
}
if info.PathCipher < storj.Unencrypted || info.PathCipher > storj.SecretBox {
return storj.Bucket{}, encryption.ErrInvalidConfig.New("encryption type %d is not supported", info.PathCipher)
}
r := bytes.NewReader(nil)
userMeta := map[string]string{
"attribution-to": info.Attribution,
"path-enc-type": strconv.Itoa(int(info.PathCipher)),
"default-seg-size": strconv.FormatInt(info.SegmentsSize, 10),
"default-enc-type": strconv.Itoa(int(info.EncryptionParameters.CipherSuite.ToCipher())),
"default-enc-blksz": strconv.FormatInt(int64(info.EncryptionParameters.BlockSize), 10),
"default-rs-algo": strconv.Itoa(int(info.RedundancyScheme.Algorithm)),
"default-rs-sharsz": strconv.FormatInt(int64(info.RedundancyScheme.ShareSize), 10),
"default-rs-reqd": strconv.Itoa(int(info.RedundancyScheme.RequiredShares)),
"default-rs-repair": strconv.Itoa(int(info.RedundancyScheme.RepairShares)),
"default-rs-optim": strconv.Itoa(int(info.RedundancyScheme.OptimalShares)),
"default-rs-total": strconv.Itoa(int(info.RedundancyScheme.TotalShares)),
}
var exp time.Time
m, err := db.buckets.Put(ctx, bucketName, r, pb.SerializableMeta{UserDefined: userMeta}, exp)
if err != nil {
return storj.Bucket{}, err
}
rv := *info
rv.Name = bucketName
rv.Created = m.Modified
return rv, nil
}
// validateBlockSize confirms the encryption block size aligns with stripe size.
// Stripes contain encrypted data therefore we want the stripe boundaries to match
// with the encryption block size boundaries. We also want stripes to be small for
// audits, but encryption can be a bit larger. All told, block size should be an integer
// multiple of stripe size.
func validateBlockSize(redundancyScheme storj.RedundancyScheme, blockSize int32) error {
stripeSize := redundancyScheme.StripeSize()
if blockSize%stripeSize != 0 {
2019-06-20 22:50:13 +01:00
return errs.New("encryption BlockSize (%d) must be a multiple of RS ShareSize (%d) * RS RequiredShares (%d)",
blockSize, redundancyScheme.ShareSize, redundancyScheme.RequiredShares,
)
}
return nil
}
// DeleteBucket deletes bucket
func (db *Project) DeleteBucket(ctx context.Context, bucketName string) (err error) {
defer mon.Task()(&ctx)(&err)
if bucketName == "" {
2018-11-12 13:23:19 +00:00
return storj.ErrNoBucket.New("")
}
err = db.buckets.Delete(ctx, bucketName)
if storage.ErrKeyNotFound.Has(err) {
err = storj.ErrBucketNotFound.Wrap(err)
}
return err
}
// GetBucket gets bucket information
func (db *Project) GetBucket(ctx context.Context, bucketName string) (bucketInfo storj.Bucket, err error) {
defer mon.Task()(&ctx)(&err)
if bucketName == "" {
2018-11-12 13:23:19 +00:00
return storj.Bucket{}, storj.ErrNoBucket.New("")
}
objMeta, err := db.buckets.Meta(ctx, bucketName)
if err != nil {
if storage.ErrKeyNotFound.Has(err) {
err = storj.ErrBucketNotFound.Wrap(err)
}
return storj.Bucket{}, err
}
return bucketFromMeta(ctx, bucketName, objMeta)
}
// ListBuckets lists buckets
func (db *Project) ListBuckets(ctx context.Context, options storj.BucketListOptions) (list storj.BucketList, err error) {
defer mon.Task()(&ctx)(&err)
var startAfter, endBefore string
switch options.Direction {
case storj.Before:
// before lists backwards from cursor, without cursor
endBefore = options.Cursor
case storj.Backward:
// backward lists backwards from cursor, including cursor
endBefore = keyAfter(options.Cursor)
case storj.Forward:
// forward lists forwards from cursor, including cursor
startAfter = keyBefore(options.Cursor)
case storj.After:
// after lists forwards from cursor, without cursor
startAfter = options.Cursor
default:
return storj.BucketList{}, errClass.New("invalid direction %d", options.Direction)
}
2018-11-12 13:23:19 +00:00
// TODO: remove this hack-fix of specifying the last key
if options.Cursor == "" && (options.Direction == storj.Before || options.Direction == storj.Backward) {
endBefore = "\x7f\x7f\x7f\x7f\x7f\x7f\x7f"
}
objItems, more, err := db.buckets.List(ctx, "", startAfter, endBefore, false, options.Limit, meta.Modified)
if err != nil {
return storj.BucketList{}, err
}
list = storj.BucketList{
More: more,
Items: make([]storj.Bucket, 0, len(objItems)),
}
for _, itm := range objItems {
if itm.IsPrefix {
continue
}
m, err := bucketFromMeta(ctx, itm.Path, itm.Meta)
if err != nil {
return storj.BucketList{}, err
}
list.Items = append(list.Items, m)
}
return list, nil
}
func bucketFromMeta(ctx context.Context, bucketName string, m objects.Meta) (out storj.Bucket, err error) {
defer mon.Task()(&ctx)(&err)
out.Name = bucketName
out.Created = m.Modified
// backwards compatibility for old buckets
out.PathCipher = storj.AESGCM
out.EncryptionParameters.CipherSuite = storj.EncUnspecified
applySetting := func(nameInMap string, bits int, storeFunc func(val int64)) {
if err != nil {
return
}
if stringVal := m.UserDefined[nameInMap]; stringVal != "" {
var intVal int64
intVal, err = strconv.ParseInt(stringVal, 10, bits)
if err != nil {
err = errs.New("invalid metadata field for %s: %v", nameInMap, err)
return
}
storeFunc(intVal)
}
}
es := &out.EncryptionParameters
rs := &out.RedundancyScheme
out.Attribution = m.UserDefined["attribution-to"]
applySetting("path-enc-type", 16, func(v int64) { out.PathCipher = storj.Cipher(v) })
applySetting("default-seg-size", 64, func(v int64) { out.SegmentsSize = v })
applySetting("default-enc-type", 32, func(v int64) { es.CipherSuite = storj.Cipher(v).ToCipherSuite() })
applySetting("default-enc-blksz", 32, func(v int64) { es.BlockSize = int32(v) })
applySetting("default-rs-algo", 32, func(v int64) { rs.Algorithm = storj.RedundancyAlgorithm(v) })
applySetting("default-rs-sharsz", 32, func(v int64) { rs.ShareSize = int32(v) })
applySetting("default-rs-reqd", 16, func(v int64) { rs.RequiredShares = int16(v) })
applySetting("default-rs-repair", 16, func(v int64) { rs.RepairShares = int16(v) })
applySetting("default-rs-optim", 16, func(v int64) { rs.OptimalShares = int16(v) })
applySetting("default-rs-total", 16, func(v int64) { rs.TotalShares = int16(v) })
return out, err
}